001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.List;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.CellComparatorImpl;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseConfiguration;
030import org.apache.hadoop.hbase.KeyValue;
031import org.apache.hadoop.hbase.KeyValueUtil;
032import org.apache.hadoop.hbase.MemoryCompactionPolicy;
033import org.apache.hadoop.hbase.testclassification.MediumTests;
034import org.apache.hadoop.hbase.testclassification.RegionServerTests;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.hbase.util.ClassSize;
037import org.apache.hadoop.hbase.util.Threads;
038import org.junit.ClassRule;
039import org.junit.Test;
040import org.junit.experimental.categories.Category;
041import org.junit.runner.RunWith;
042import org.junit.runners.Parameterized;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046/**
047 * compacted memstore test case
048 */
049@Category({RegionServerTests.class, MediumTests.class})
050@RunWith(Parameterized.class)
051public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore {
052
053  @ClassRule
054  public static final HBaseClassTestRule CLASS_RULE =
055      HBaseClassTestRule.forClass(TestCompactingToCellFlatMapMemStore.class);
056
057  @Parameterized.Parameters
058  public static Object[] data() {
059    return new Object[] { "CHUNK_MAP", "ARRAY_MAP" }; // test different immutable indexes
060  }
061  private static final Logger LOG =
062      LoggerFactory.getLogger(TestCompactingToCellFlatMapMemStore.class);
063  public final boolean toCellChunkMap;
064  Configuration conf;
065  //////////////////////////////////////////////////////////////////////////////
066  // Helpers
067  //////////////////////////////////////////////////////////////////////////////
068  public TestCompactingToCellFlatMapMemStore(String type){
069    if (type == "CHUNK_MAP") {
070      toCellChunkMap = true;
071    } else {
072      toCellChunkMap = false;
073    }
074  }
075
076  @Override public void tearDown() throws Exception {
077    chunkCreator.clearChunksInPool();
078  }
079
080  @Override public void setUp() throws Exception {
081
082    compactingSetUp();
083    this.conf = HBaseConfiguration.create();
084
085    // set memstore to do data compaction
086    conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
087        String.valueOf(MemoryCompactionPolicy.EAGER));
088    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.02);
089    this.memstore =
090        new MyCompactingMemStore(conf, CellComparatorImpl.COMPARATOR, store,
091            regionServicesForStores, MemoryCompactionPolicy.EAGER);
092  }
093
094  //////////////////////////////////////////////////////////////////////////////
095  // Compaction tests
096  //////////////////////////////////////////////////////////////////////////////
097  @Override
098  public void testCompaction1Bucket() throws IOException {
099    int counter = 0;
100    String[] keys1 = { "A", "A", "B", "C" }; //A1, A2, B3, C4
101    if (toCellChunkMap) {
102      // set memstore to flat into CellChunkMap
103      ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
104    } else {
105      ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.ARRAY_MAP);
106    }
107
108    // test 1 bucket
109    long totalCellsLen = addRowsByKeysDataSize(memstore, keys1);
110    long cellBeforeFlushSize = cellBeforeFlushSize();
111    long cellAfterFlushSize  = cellAfterFlushSize();
112    long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * cellBeforeFlushSize;
113
114    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
115    assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
116
117    assertEquals(4, memstore.getActive().getCellsCount());
118    ((CompactingMemStore) memstore).flushInMemory();    // push keys to pipeline and compact
119    assertEquals(0, memstore.getSnapshot().getCellsCount());
120    // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting
121    // totalCellsLen
122    totalCellsLen = (totalCellsLen * 3) / 4;
123    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
124
125    totalHeapSize =
126        3 * cellAfterFlushSize + MutableSegment.DEEP_OVERHEAD
127            + (toCellChunkMap ?
128            CellChunkImmutableSegment.DEEP_OVERHEAD_CCM :
129            CellArrayImmutableSegment.DEEP_OVERHEAD_CAM);
130    assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
131    for ( Segment s : memstore.getSegments()) {
132      counter += s.getCellsCount();
133    }
134    assertEquals(3, counter);
135    MemStoreSize mss = memstore.getFlushableSize();
136    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
137    region.decrMemStoreSize(mss);  // simulate flusher
138    ImmutableSegment s = memstore.getSnapshot();
139    assertEquals(3, s.getCellsCount());
140    assertEquals(0, regionServicesForStores.getMemStoreSize());
141
142    memstore.clearSnapshot(snapshot.getId());
143  }
144
145  @Override
146  public void testCompaction2Buckets() throws IOException {
147    if (toCellChunkMap) {
148      // set memstore to flat into CellChunkMap
149      ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
150    } else {
151      ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.ARRAY_MAP);
152    }
153    String[] keys1 = { "A", "A", "B", "C" };
154    String[] keys2 = { "A", "B", "D" };
155
156    long totalCellsLen1 = addRowsByKeysDataSize(memstore, keys1);     // INSERT 4
157    long cellBeforeFlushSize = cellBeforeFlushSize();
158    long cellAfterFlushSize = cellAfterFlushSize();
159    long totalHeapSize1 = MutableSegment.DEEP_OVERHEAD + 4 * cellBeforeFlushSize;
160    assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize());
161    assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
162
163    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
164    int counter = 0;                                          // COMPACT 4->3
165    for ( Segment s : memstore.getSegments()) {
166      counter += s.getCellsCount();
167    }
168    assertEquals(3,counter);
169    assertEquals(0, memstore.getSnapshot().getCellsCount());
170    // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting
171    // totalCellsLen
172    totalCellsLen1 = (totalCellsLen1 * 3) / 4;
173    totalHeapSize1 = 3 * cellAfterFlushSize + MutableSegment.DEEP_OVERHEAD
174        + (toCellChunkMap ?
175        CellChunkImmutableSegment.DEEP_OVERHEAD_CCM :
176        CellArrayImmutableSegment.DEEP_OVERHEAD_CAM);
177    assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize());
178    assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
179
180    long totalCellsLen2 = addRowsByKeysDataSize(memstore, keys2);   // INSERT 3 (3+3=6)
181    long totalHeapSize2 = 3 * cellBeforeFlushSize;
182    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
183    assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
184
185    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
186    assertEquals(0, memstore.getSnapshot().getCellsCount());// COMPACT 6->4
187    counter = 0;
188    for ( Segment s : memstore.getSegments()) {
189      counter += s.getCellsCount();
190    }
191    assertEquals(4,counter);
192    totalCellsLen2 = totalCellsLen2 / 3;// 2 cells duplicated in set 2
193    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
194    totalHeapSize2 = 1 * cellAfterFlushSize;
195    assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
196
197    MemStoreSize mss = memstore.getFlushableSize();
198    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
199    // simulate flusher
200    region.decrMemStoreSize(mss);
201    ImmutableSegment s = memstore.getSnapshot();
202    assertEquals(4, s.getCellsCount());
203    assertEquals(0, regionServicesForStores.getMemStoreSize());
204
205    memstore.clearSnapshot(snapshot.getId());
206  }
207
208  @Override
209  public void testCompaction3Buckets() throws IOException {
210    if (toCellChunkMap) {
211      // set memstore to flat into CellChunkMap
212      ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
213    } else {
214      // set to CellArrayMap as CCM is configured by default due to MSLAB usage
215      ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.ARRAY_MAP);
216    }
217    String[] keys1 = { "A", "A", "B", "C" };
218    String[] keys2 = { "A", "B", "D" };
219    String[] keys3 = { "D", "B", "B" };
220
221    long totalCellsLen1 = addRowsByKeysDataSize(memstore, keys1);
222    long cellBeforeFlushSize = cellBeforeFlushSize();
223    long cellAfterFlushSize = cellAfterFlushSize();
224    long totalHeapSize1 = MutableSegment.DEEP_OVERHEAD + 4 * cellBeforeFlushSize;
225    assertEquals(totalCellsLen1, region.getMemStoreDataSize());
226    assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
227
228    MemStoreSize mss = memstore.getFlushableSize();
229    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
230
231    assertEquals(0, memstore.getSnapshot().getCellsCount());
232    // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting
233    // totalCellsLen
234    totalCellsLen1 = (totalCellsLen1 * 3) / 4;
235    totalHeapSize1 = 3 * cellAfterFlushSize + MutableSegment.DEEP_OVERHEAD
236        + (toCellChunkMap ?
237        CellChunkImmutableSegment.DEEP_OVERHEAD_CCM :
238        CellArrayImmutableSegment.DEEP_OVERHEAD_CAM);
239    assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize());
240    assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
241
242    long totalCellsLen2 = addRowsByKeysDataSize(memstore, keys2);
243    long totalHeapSize2 = 3 * cellBeforeFlushSize;
244
245    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
246    assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
247
248    ((MyCompactingMemStore) memstore).disableCompaction();
249    mss = memstore.getFlushableSize();
250    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction
251    totalHeapSize2 = totalHeapSize2 + CSLMImmutableSegment.DEEP_OVERHEAD_CSLM;
252    assertEquals(0, memstore.getSnapshot().getCellsCount());
253    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
254    assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
255
256    long totalCellsLen3 = addRowsByKeysDataSize(memstore, keys3);
257    long totalHeapSize3 = 3 * cellBeforeFlushSize;
258    assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
259        regionServicesForStores.getMemStoreSize());
260    assertEquals(totalHeapSize1 + totalHeapSize2 + totalHeapSize3,
261        ((CompactingMemStore) memstore).heapSize());
262
263    ((MyCompactingMemStore) memstore).enableCompaction();
264    mss = memstore.getFlushableSize();
265    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
266    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
267      Threads.sleep(10);
268    }
269    assertEquals(0, memstore.getSnapshot().getCellsCount());
270    // active flushed to pipeline and all 3 segments compacted. Will get rid of duplicated cells.
271    // Out of total 10, only 4 cells are unique
272    totalCellsLen2 = totalCellsLen2 / 3;// 2 out of 3 cells are duplicated
273    totalCellsLen3 = 0;// All duplicated cells.
274    assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
275        regionServicesForStores.getMemStoreSize());
276    // Only 4 unique cells left
277    long totalHeapSize4 = 4 * cellAfterFlushSize + MutableSegment.DEEP_OVERHEAD
278        + (toCellChunkMap ?
279        CellChunkImmutableSegment.DEEP_OVERHEAD_CCM :
280        CellArrayImmutableSegment.DEEP_OVERHEAD_CAM);
281    assertEquals(totalHeapSize4, ((CompactingMemStore) memstore).heapSize());
282
283    mss = memstore.getFlushableSize();
284    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
285    // simulate flusher
286    region.decrMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize(),
287      mss.getCellsCount());
288    ImmutableSegment s = memstore.getSnapshot();
289    assertEquals(4, s.getCellsCount());
290    assertEquals(0, regionServicesForStores.getMemStoreSize());
291
292    memstore.clearSnapshot(snapshot.getId());
293  }
294
295  //////////////////////////////////////////////////////////////////////////////
296  // Merging tests
297  //////////////////////////////////////////////////////////////////////////////
298  @Test
299  public void testMerging() throws IOException {
300    if (toCellChunkMap) {
301      // set memstore to flat into CellChunkMap
302      ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
303    }
304    String[] keys1 = { "A", "A", "B", "C", "F", "H"};
305    String[] keys2 = { "A", "B", "D", "G", "I", "J"};
306    String[] keys3 = { "D", "B", "B", "E" };
307
308    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
309    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
310        String.valueOf(compactionType));
311    ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration());
312    addRowsByKeysDataSize(memstore, keys1);
313
314    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline should not compact
315
316    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
317      Threads.sleep(10);
318    }
319    assertEquals(0, memstore.getSnapshot().getCellsCount());
320
321    addRowsByKeysDataSize(memstore, keys2); // also should only flatten
322
323    int counter2 = 0;
324    for ( Segment s : memstore.getSegments()) {
325      counter2 += s.getCellsCount();
326    }
327    assertEquals(12, counter2);
328
329    ((MyCompactingMemStore) memstore).disableCompaction();
330
331    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without flattening
332    assertEquals(0, memstore.getSnapshot().getCellsCount());
333
334    int counter3 = 0;
335    for ( Segment s : memstore.getSegments()) {
336      counter3 += s.getCellsCount();
337    }
338    assertEquals(12, counter3);
339
340    addRowsByKeysDataSize(memstore, keys3);
341
342    int counter4 = 0;
343    for ( Segment s : memstore.getSegments()) {
344      counter4 += s.getCellsCount();
345    }
346    assertEquals(16, counter4);
347
348    ((MyCompactingMemStore) memstore).enableCompaction();
349
350
351    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
352    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
353      Threads.sleep(10);
354    }
355    assertEquals(0, memstore.getSnapshot().getCellsCount());
356
357    int counter = 0;
358    for ( Segment s : memstore.getSegments()) {
359      counter += s.getCellsCount();
360    }
361    assertEquals(16,counter);
362
363    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
364    ImmutableSegment s = memstore.getSnapshot();
365    memstore.clearSnapshot(snapshot.getId());
366  }
367
368  @Test
369  public void testTimeRangeAfterCompaction() throws IOException {
370    if (toCellChunkMap) {
371      // set memstore to flat into CellChunkMap
372      ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
373    }
374    testTimeRange(true);
375  }
376
377  @Test
378  public void testTimeRangeAfterMerge() throws IOException {
379    if (toCellChunkMap) {
380      // set memstore to flat into CellChunkMap
381      ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
382    }
383    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
384    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
385        String.valueOf(compactionType));
386    ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration());
387    testTimeRange(false);
388  }
389
390  private void testTimeRange(boolean isCompaction) throws IOException {
391    final long initTs = 100;
392    long currentTs = initTs;
393    byte[] row = Bytes.toBytes("row");
394    byte[] family = Bytes.toBytes("family");
395    byte[] qf1 = Bytes.toBytes("qf1");
396
397    // first segment in pipeline
398    this.memstore.add(new KeyValue(row, family, qf1, ++currentTs, (byte[])null), null);
399    long minTs = currentTs;
400    this.memstore.add(new KeyValue(row, family, qf1, ++currentTs, (byte[])null), null);
401
402    long numberOfCell = 2;
403    assertEquals(numberOfCell, memstore.getSegments().stream().mapToInt(Segment::getCellsCount).sum());
404    assertEquals(minTs, memstore.getSegments().stream().mapToLong(
405        m -> m.getTimeRangeTracker().getMin()).min().getAsLong());
406    assertEquals(currentTs, memstore.getSegments().stream().mapToLong(
407        m -> m.getTimeRangeTracker().getMax()).max().getAsLong());
408
409    ((CompactingMemStore) memstore).flushInMemory();
410
411    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
412      Threads.sleep(10);
413    }
414    if (isCompaction) {
415      // max version = 1, so one cell will be dropped.
416      numberOfCell = 1;
417      minTs = currentTs;
418    }
419    // second segment in pipeline
420    this.memstore.add(new KeyValue(row, family, qf1, ++currentTs, (byte[])null), null);
421    this.memstore.add(new KeyValue(row, family, qf1, ++currentTs, (byte[])null), null);
422    numberOfCell += 2;
423    assertEquals(numberOfCell, memstore.getSegments().stream().mapToInt(Segment::getCellsCount).sum());
424    assertEquals(minTs, memstore.getSegments().stream().mapToLong(
425        m -> m.getTimeRangeTracker().getMin()).min().getAsLong());
426    assertEquals(currentTs, memstore.getSegments().stream().mapToLong(
427        m -> m.getTimeRangeTracker().getMax()).max().getAsLong());
428
429    ((CompactingMemStore) memstore).flushInMemory(); // trigger the merge
430
431    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
432      Threads.sleep(10);
433    }
434    if (isCompaction) {
435      // max version = 1, so one cell will be dropped.
436      numberOfCell = 1;
437      minTs = currentTs;
438    }
439
440    assertEquals(numberOfCell, memstore.getSegments().stream().mapToInt(Segment::getCellsCount).sum());
441    assertEquals(minTs, memstore.getSegments().stream().mapToLong(
442        m -> m.getTimeRangeTracker().getMin()).min().getAsLong());
443    assertEquals(currentTs, memstore.getSegments().stream().mapToLong(
444        m -> m.getTimeRangeTracker().getMax()).max().getAsLong());
445  }
446
447  @Test
448  public void testCountOfCellsAfterFlatteningByScan() throws IOException {
449    String[] keys1 = { "A", "B", "C" }; // A, B, C
450    addRowsByKeysWith50Cols(memstore, keys1);
451    // this should only flatten as there are no duplicates
452    ((CompactingMemStore) memstore).flushInMemory();
453    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
454      Threads.sleep(10);
455    }
456    List<KeyValueScanner> scanners = memstore.getScanners(Long.MAX_VALUE);
457    // seek
458    int count = 0;
459    for(int i = 0; i < scanners.size(); i++) {
460      scanners.get(i).seek(KeyValue.LOWESTKEY);
461      while (scanners.get(i).next() != null) {
462        count++;
463      }
464    }
465    assertEquals("the count should be ", 150, count);
466    for(int i = 0; i < scanners.size(); i++) {
467      scanners.get(i).close();
468    }
469  }
470
471  @Test
472  public void testCountOfCellsAfterFlatteningByIterator() throws IOException {
473    String[] keys1 = { "A", "B", "C" }; // A, B, C
474    addRowsByKeysWith50Cols(memstore, keys1);
475    // this should only flatten as there are no duplicates
476    ((CompactingMemStore) memstore).flushInMemory();
477    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
478      Threads.sleep(10);
479    }
480    // Just doing the cnt operation here
481    MemStoreSegmentsIterator itr = new MemStoreMergerSegmentsIterator(
482        ((CompactingMemStore) memstore).getImmutableSegments().getStoreSegments(),
483        CellComparatorImpl.COMPARATOR, 10);
484    int cnt = 0;
485    try {
486      while (itr.next() != null) {
487        cnt++;
488      }
489    } finally {
490      itr.close();
491    }
492    assertEquals("the count should be ", 150, cnt);
493  }
494
495  private void addRowsByKeysWith50Cols(AbstractMemStore hmc, String[] keys) {
496    byte[] fam = Bytes.toBytes("testfamily");
497    for (int i = 0; i < keys.length; i++) {
498      long timestamp = System.currentTimeMillis();
499      Threads.sleep(1); // to make sure each kv gets a different ts
500      byte[] row = Bytes.toBytes(keys[i]);
501      for(int  j =0 ;j < 50; j++) {
502        byte[] qf = Bytes.toBytes("testqualifier"+j);
503        byte[] val = Bytes.toBytes(keys[i] + j);
504        KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
505        hmc.add(kv, null);
506      }
507    }
508  }
509
510  @Override
511  @Test
512  public void testPuttingBackChunksWithOpeningScanner() throws IOException {
513    byte[] row = Bytes.toBytes("testrow");
514    byte[] fam = Bytes.toBytes("testfamily");
515    byte[] qf1 = Bytes.toBytes("testqualifier1");
516    byte[] qf2 = Bytes.toBytes("testqualifier2");
517    byte[] qf3 = Bytes.toBytes("testqualifier3");
518    byte[] qf4 = Bytes.toBytes("testqualifier4");
519    byte[] qf5 = Bytes.toBytes("testqualifier5");
520    byte[] qf6 = Bytes.toBytes("testqualifier6");
521    byte[] qf7 = Bytes.toBytes("testqualifier7");
522    byte[] val = Bytes.toBytes("testval");
523
524    // Setting up memstore
525    memstore.add(new KeyValue(row, fam, qf1, val), null);
526    memstore.add(new KeyValue(row, fam, qf2, val), null);
527    memstore.add(new KeyValue(row, fam, qf3, val), null);
528
529    // Creating a snapshot
530    MemStoreSnapshot snapshot = memstore.snapshot();
531    assertEquals(3, memstore.getSnapshot().getCellsCount());
532
533    // Adding value to "new" memstore
534    assertEquals(0, memstore.getActive().getCellsCount());
535    memstore.add(new KeyValue(row, fam, qf4, val), null);
536    memstore.add(new KeyValue(row, fam, qf5, val), null);
537    assertEquals(2, memstore.getActive().getCellsCount());
538
539    // opening scanner before clear the snapshot
540    List<KeyValueScanner> scanners = memstore.getScanners(0);
541    // Shouldn't putting back the chunks to pool,since some scanners are opening
542    // based on their data
543    // close the scanners
544    for(KeyValueScanner scanner : snapshot.getScanners()) {
545      scanner.close();
546    }
547    memstore.clearSnapshot(snapshot.getId());
548
549    assertTrue(chunkCreator.getPoolSize() == 0);
550
551    // Chunks will be put back to pool after close scanners;
552    for (KeyValueScanner scanner : scanners) {
553      scanner.close();
554    }
555    assertTrue(chunkCreator.getPoolSize() > 0);
556
557    // clear chunks
558    chunkCreator.clearChunksInPool();
559
560    // Creating another snapshot
561
562    snapshot = memstore.snapshot();
563    // Adding more value
564    memstore.add(new KeyValue(row, fam, qf6, val), null);
565    memstore.add(new KeyValue(row, fam, qf7, val), null);
566    // opening scanners
567    scanners = memstore.getScanners(0);
568    // close scanners before clear the snapshot
569    for (KeyValueScanner scanner : scanners) {
570      scanner.close();
571    }
572    // Since no opening scanner, the chunks of snapshot should be put back to
573    // pool
574    // close the scanners
575    for(KeyValueScanner scanner : snapshot.getScanners()) {
576      scanner.close();
577    }
578    memstore.clearSnapshot(snapshot.getId());
579    assertTrue(chunkCreator.getPoolSize() > 0);
580  }
581
582  @Override
583  @Test
584  public void testPuttingBackChunksAfterFlushing() throws IOException {
585    byte[] row = Bytes.toBytes("testrow");
586    byte[] fam = Bytes.toBytes("testfamily");
587    byte[] qf1 = Bytes.toBytes("testqualifier1");
588    byte[] qf2 = Bytes.toBytes("testqualifier2");
589    byte[] qf3 = Bytes.toBytes("testqualifier3");
590    byte[] qf4 = Bytes.toBytes("testqualifier4");
591    byte[] qf5 = Bytes.toBytes("testqualifier5");
592    byte[] val = Bytes.toBytes("testval");
593
594    // Setting up memstore
595    memstore.add(new KeyValue(row, fam, qf1, val), null);
596    memstore.add(new KeyValue(row, fam, qf2, val), null);
597    memstore.add(new KeyValue(row, fam, qf3, val), null);
598
599    // Creating a snapshot
600    MemStoreSnapshot snapshot = memstore.snapshot();
601    assertEquals(3, memstore.getSnapshot().getCellsCount());
602
603    // Adding value to "new" memstore
604    assertEquals(0, memstore.getActive().getCellsCount());
605    memstore.add(new KeyValue(row, fam, qf4, val), null);
606    memstore.add(new KeyValue(row, fam, qf5, val), null);
607    assertEquals(2, memstore.getActive().getCellsCount());
608    // close the scanners
609    for(KeyValueScanner scanner : snapshot.getScanners()) {
610      scanner.close();
611    }
612    memstore.clearSnapshot(snapshot.getId());
613
614    int chunkCount = chunkCreator.getPoolSize();
615    assertTrue(chunkCount > 0);
616  }
617
618  @Test
619  public void testFlatteningToCellChunkMap() throws IOException {
620    if(!toCellChunkMap) {
621      return;
622    }
623    // set memstore to flat into CellChunkMap
624    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
625    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
626        String.valueOf(compactionType));
627    ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration());
628    ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
629    int numOfCells = 8;
630    String[] keys1 = { "A", "A", "B", "C", "D", "D", "E", "F" }; //A1, A2, B3, C4, D5, D6, E7, F8
631
632    // make one cell
633    byte[] row = Bytes.toBytes(keys1[0]);
634    byte[] val = Bytes.toBytes(keys1[0] + 0);
635    KeyValue kv =
636        new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"),
637            System.currentTimeMillis(), val);
638
639    // test 1 bucket
640    int totalCellsLen = addRowsByKeys(memstore, keys1);
641    long oneCellOnCSLMHeapSize =
642        ClassSize.align(
643            ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + KeyValue.FIXED_OVERHEAD + KeyValueUtil
644                .length(kv));
645
646    long totalHeapSize = numOfCells * oneCellOnCSLMHeapSize + MutableSegment.DEEP_OVERHEAD;
647    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
648    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
649
650    ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and flatten
651    assertEquals(0, memstore.getSnapshot().getCellsCount());
652    long oneCellOnCCMHeapSize =
653        ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(KeyValueUtil.length(kv));
654    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
655        + numOfCells * oneCellOnCCMHeapSize;
656
657    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
658    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
659
660    MemStoreSize mss = memstore.getFlushableSize();
661    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
662    // simulate flusher
663    region.decrMemStoreSize(mss);
664    ImmutableSegment s = memstore.getSnapshot();
665    assertEquals(numOfCells, s.getCellsCount());
666    assertEquals(0, regionServicesForStores.getMemStoreSize());
667
668    memstore.clearSnapshot(snapshot.getId());
669  }
670
671  /**
672   * CellChunkMap Segment index requires all cell data to be written in the MSLAB Chunks.
673   * Even though MSLAB is enabled, cells bigger than maxAlloc
674   * (even if smaller than the size of a chunk) are not written in the MSLAB Chunks.
675   * If such cells are found in the process of flattening into CellChunkMap
676   * (in-memory-flush) they need to be copied into MSLAB.
677   * testFlatteningToBigCellChunkMap checks that the process of flattening into
678   * CellChunkMap succeeds, even when such big cells are allocated.
679   */
680  @Test
681  public void testFlatteningToBigCellChunkMap() throws IOException {
682
683    if (toCellChunkMap == false) {
684      return;
685    }
686    // set memstore to flat into CellChunkMap
687    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
688    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
689            String.valueOf(compactionType));
690    ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration());
691    ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
692    int numOfCells = 4;
693    char[] chars = new char[MemStoreLAB.MAX_ALLOC_DEFAULT];
694    for (int i = 0; i < chars.length; i++) {
695      chars[i] = 'A';
696    }
697    String bigVal = new String(chars);
698    String[] keys1 = { "A", "B", "C", "D"};
699
700    // make one cell
701    byte[] row = Bytes.toBytes(keys1[0]);
702    byte[] val = Bytes.toBytes(bigVal);
703    KeyValue kv =
704            new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"),
705                    System.currentTimeMillis(), val);
706
707    // test 1 bucket
708    int totalCellsLen = addRowsByKeys(memstore, keys1, val);
709
710    long oneCellOnCSLMHeapSize =
711            ClassSize.align(
712                    ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize());
713
714    long totalHeapSize = numOfCells * oneCellOnCSLMHeapSize + MutableSegment.DEEP_OVERHEAD;
715    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
716    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
717
718    ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and flatten
719    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
720      Threads.sleep(10);
721    }
722    assertEquals(0, memstore.getSnapshot().getCellsCount());
723    // One cell is duplicated, but it shouldn't be compacted because we are in BASIC mode.
724    // totalCellsLen should remain the same
725    long oneCellOnCCMHeapSize =
726            ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(KeyValueUtil.length(kv));
727    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
728            + numOfCells * oneCellOnCCMHeapSize;
729
730    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
731    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
732
733    MemStoreSize mss = memstore.getFlushableSize();
734    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
735    // simulate flusher
736    region.decrMemStoreSize(mss);
737    ImmutableSegment s = memstore.getSnapshot();
738    assertEquals(numOfCells, s.getCellsCount());
739    assertEquals(0, regionServicesForStores.getMemStoreSize());
740
741    memstore.clearSnapshot(snapshot.getId());
742  }
743
744  /**
745   * CellChunkMap Segment index requires all cell data to be written in the MSLAB Chunks.
746   * Even though MSLAB is enabled, cells bigger than the size of a chunk are not
747   * written in the MSLAB Chunks.
748   * If such cells are found in the process of flattening into CellChunkMap
749   * (in-memory-flush) they need to be copied into MSLAB.
750   * testFlatteningToJumboCellChunkMap checks that the process of flattening
751   * into CellChunkMap succeeds, even when such big cells are allocated.
752   */
753  @Test
754  public void testFlatteningToJumboCellChunkMap() throws IOException {
755
756    if (toCellChunkMap == false) {
757      return;
758    }
759    // set memstore to flat into CellChunkMap
760    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
761    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
762            String.valueOf(compactionType));
763    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
764    ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
765
766    int numOfCells = 1;
767    char[] chars = new char[MemStoreLAB.CHUNK_SIZE_DEFAULT];
768    for (int i = 0; i < chars.length; i++) {
769      chars[i] = 'A';
770    }
771    String bigVal = new String(chars);
772    String[] keys1 = {"A"};
773
774    // make one cell
775    byte[] row = Bytes.toBytes(keys1[0]);
776    byte[] val = Bytes.toBytes(bigVal);
777    KeyValue kv =
778            new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"),
779                    System.currentTimeMillis(), val);
780
781    // test 1 bucket
782    int totalCellsLen = addRowsByKeys(memstore, keys1, val);
783
784    long oneCellOnCSLMHeapSize =
785            ClassSize.align(
786                    ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize());
787
788    long totalHeapSize = numOfCells * oneCellOnCSLMHeapSize + MutableSegment.DEEP_OVERHEAD;
789    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
790    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
791
792    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and flatten
793    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
794      Threads.sleep(10);
795    }
796    assertEquals(0, memstore.getSnapshot().getCellsCount());
797
798    // One cell is duplicated, but it shouldn't be compacted because we are in BASIC mode.
799    // totalCellsLen should remain the same
800    long oneCellOnCCMHeapSize =
801            ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(KeyValueUtil.length(kv));
802    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
803            + numOfCells * oneCellOnCCMHeapSize;
804
805    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
806    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
807
808    MemStoreSize mss = memstore.getFlushableSize();
809    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
810    // simulate flusher
811    region.decrMemStoreSize(mss);
812    ImmutableSegment s = memstore.getSnapshot();
813    assertEquals(numOfCells, s.getCellsCount());
814    assertEquals(0, regionServicesForStores.getMemStoreSize());
815
816    memstore.clearSnapshot(snapshot.getId());
817
818    // Allocating two big cells (too big for being copied into a regular chunk).
819    String[] keys2 = {"C", "D"};
820    addRowsByKeys(memstore, keys2, val);
821    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
822      Threads.sleep(10);
823    }
824
825    // The in-memory flush size is bigger than the size of a single cell,
826    // but smaller than the size of two cells.
827    // Therefore, the two created cells are flattened together.
828    totalHeapSize = MutableSegment.DEEP_OVERHEAD
829            + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
830            + 2 * oneCellOnCCMHeapSize;
831    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
832  }
833
834  /**
835   * CellChunkMap Segment index requires all cell data to be written in the MSLAB Chunks.
836   * Even though MSLAB is enabled, cells bigger than the size of a chunk are not
837   * written in the MSLAB Chunks.
838   * If such cells are found in the process of a merge they need to be copied into MSLAB.
839   * testForceCopyOfBigCellIntoImmutableSegment checks that the
840   * ImmutableMemStoreLAB's forceCopyOfBigCellInto does what it's supposed to do.
841   */
842  @Test
843  public void testForceCopyOfBigCellIntoImmutableSegment() throws IOException {
844
845    if (toCellChunkMap == false) {
846      return;
847    }
848
849    // set memstore to flat into CellChunkMap
850    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
851    memstore.getConfiguration().setInt(MemStoreCompactionStrategy
852        .COMPACTING_MEMSTORE_THRESHOLD_KEY, 4);
853    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
854        String.valueOf(compactionType));
855    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
856    ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
857
858    char[] chars = new char[MemStoreLAB.CHUNK_SIZE_DEFAULT];
859    for (int i = 0; i < chars.length; i++) {
860      chars[i] = 'A';
861    }
862    String bigVal = new String(chars);
863    byte[] val = Bytes.toBytes(bigVal);
864
865    // We need to add two cells, five times, in order to guarantee a merge
866    List<String[]> keysList = new ArrayList<>();
867    keysList.add(new String[]{"A", "B"});
868    keysList.add(new String[]{"C", "D"});
869    keysList.add(new String[]{"E", "F"});
870    keysList.add(new String[]{"G", "H"});
871    keysList.add(new String[]{"I", "J"});
872
873    // Measuring the size of a single kv
874    KeyValue kv = new KeyValue(Bytes.toBytes("A"), Bytes.toBytes("testfamily"),
875            Bytes.toBytes("testqualifier"), System.currentTimeMillis(), val);
876    long oneCellOnCCMHeapSize =
877            ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(KeyValueUtil.length(kv));
878
879    long totalHeapSize = MutableSegment.DEEP_OVERHEAD;
880    for (int i = 0; i < 5; i++) {
881      addRowsByKeys(memstore, keysList.get(i), val);
882      while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
883        Threads.sleep(10);
884      }
885
886      // The in-memory flush size is bigger than the size of a single cell,
887      // but smaller than the size of two cells.
888      // Therefore, the two created cells are flattened together.
889      totalHeapSize += CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
890              + 2 * oneCellOnCCMHeapSize;
891      if (i == 4) {
892        // Four out of the five are merged into one,
893        // and the segment becomes immutable
894        totalHeapSize -= (3 * CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
895                + MutableSegment.DEEP_OVERHEAD);
896      }
897      assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
898    }
899  }
900
901
902  private long addRowsByKeysDataSize(final AbstractMemStore hmc, String[] keys) {
903    byte[] fam = Bytes.toBytes("testfamily");
904    byte[] qf = Bytes.toBytes("testqualifier");
905    MemStoreSizing memstoreSizing = new NonThreadSafeMemStoreSizing();
906    for (int i = 0; i < keys.length; i++) {
907      long timestamp = System.currentTimeMillis();
908      Threads.sleep(1); // to make sure each kv gets a different ts
909      byte[] row = Bytes.toBytes(keys[i]);
910      byte[] val = Bytes.toBytes(keys[i] + i);
911      KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
912      hmc.add(kv, memstoreSizing);
913      LOG.debug("added kv: " + kv.getKeyString() + ", timestamp" + kv.getTimestamp());
914    }
915    MemStoreSize mss = memstoreSizing.getMemStoreSize();
916    regionServicesForStores.addMemStoreSize(mss.getDataSize(), mss.getHeapSize(),
917      mss.getOffHeapSize(), mss.getCellsCount());
918    return mss.getDataSize();
919  }
920
921  private long cellBeforeFlushSize() {
922    // make one cell
923    byte[] row = Bytes.toBytes("A");
924    byte[] val = Bytes.toBytes("A" + 0);
925    KeyValue kv =
926        new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"),
927            System.currentTimeMillis(), val);
928    return ClassSize.align(
929        ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + KeyValue.FIXED_OVERHEAD + KeyValueUtil.length(kv));
930  }
931
932  private long cellAfterFlushSize() {
933    // make one cell
934    byte[] row = Bytes.toBytes("A");
935    byte[] val = Bytes.toBytes("A" + 0);
936    KeyValue kv =
937        new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"),
938            System.currentTimeMillis(), val);
939
940    return toCellChunkMap ?
941        ClassSize.align(
942        ClassSize.CELL_CHUNK_MAP_ENTRY + KeyValueUtil.length(kv)) :
943        ClassSize.align(
944        ClassSize.CELL_ARRAY_MAP_ENTRY + KeyValue.FIXED_OVERHEAD + KeyValueUtil.length(kv));
945  }
946}