001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.List;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.CellComparatorImpl;
028import org.apache.hadoop.hbase.HBaseClassTestRule;
029import org.apache.hadoop.hbase.HBaseConfiguration;
030import org.apache.hadoop.hbase.KeyValue;
031import org.apache.hadoop.hbase.MemoryCompactionPolicy;
032import org.apache.hadoop.hbase.testclassification.LargeTests;
033import org.apache.hadoop.hbase.testclassification.RegionServerTests;
034import org.apache.hadoop.hbase.util.Bytes;
035import org.apache.hadoop.hbase.util.ClassSize;
036import org.apache.hadoop.hbase.util.Threads;
037import org.junit.ClassRule;
038import org.junit.Test;
039import org.junit.experimental.categories.Category;
040import org.junit.runner.RunWith;
041import org.junit.runners.Parameterized;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045/**
046 * compacted memstore test case
047 */
048@Category({RegionServerTests.class, LargeTests.class})
049@RunWith(Parameterized.class)
050public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore {
051
052  @ClassRule
053  public static final HBaseClassTestRule CLASS_RULE =
054      HBaseClassTestRule.forClass(TestCompactingToCellFlatMapMemStore.class);
055
056  @Parameterized.Parameters
057  public static Object[] data() {
058    return new Object[] { "CHUNK_MAP", "ARRAY_MAP" }; // test different immutable indexes
059  }
060  private static final Logger LOG =
061      LoggerFactory.getLogger(TestCompactingToCellFlatMapMemStore.class);
062  public final boolean toCellChunkMap;
063  Configuration conf;
064  //////////////////////////////////////////////////////////////////////////////
065  // Helpers
066  //////////////////////////////////////////////////////////////////////////////
067  public TestCompactingToCellFlatMapMemStore(String type){
068    if (type == "CHUNK_MAP") {
069      toCellChunkMap = true;
070    } else {
071      toCellChunkMap = false;
072    }
073  }
074
075  @Override public void tearDown() throws Exception {
076    chunkCreator.clearChunksInPool();
077  }
078
079  @Override public void setUp() throws Exception {
080
081    compactingSetUp();
082    this.conf = HBaseConfiguration.create();
083
084    // set memstore to do data compaction
085    conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
086        String.valueOf(MemoryCompactionPolicy.EAGER));
087    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.02);
088    this.memstore =
089        new MyCompactingMemStore(conf, CellComparatorImpl.COMPARATOR, store,
090            regionServicesForStores, MemoryCompactionPolicy.EAGER);
091  }
092
093  //////////////////////////////////////////////////////////////////////////////
094  // Compaction tests
095  //////////////////////////////////////////////////////////////////////////////
096  @Override
097  public void testCompaction1Bucket() throws IOException {
098    int counter = 0;
099    String[] keys1 = { "A", "A", "B", "C" }; //A1, A2, B3, C4
100    if (toCellChunkMap) {
101      // set memstore to flat into CellChunkMap
102      ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
103    } else {
104      ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.ARRAY_MAP);
105    }
106
107    // test 1 bucket
108    long totalCellsLen = addRowsByKeysDataSize(memstore, keys1);
109    long cellBeforeFlushSize = cellBeforeFlushSize();
110    long cellAfterFlushSize  = cellAfterFlushSize();
111    long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * cellBeforeFlushSize;
112
113    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
114    assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
115
116    assertEquals(4, memstore.getActive().getCellsCount());
117    ((CompactingMemStore) memstore).flushInMemory();    // push keys to pipeline and compact
118    assertEquals(0, memstore.getSnapshot().getCellsCount());
119    // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting
120    // totalCellsLen
121    totalCellsLen = (totalCellsLen * 3) / 4;
122    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
123
124    totalHeapSize =
125        3 * cellAfterFlushSize + MutableSegment.DEEP_OVERHEAD
126            + (toCellChunkMap ?
127            CellChunkImmutableSegment.DEEP_OVERHEAD_CCM :
128            CellArrayImmutableSegment.DEEP_OVERHEAD_CAM);
129    assertEquals(totalHeapSize, ((CompactingMemStore)memstore).heapSize());
130    for ( Segment s : memstore.getSegments()) {
131      counter += s.getCellsCount();
132    }
133    assertEquals(3, counter);
134    MemStoreSize mss = memstore.getFlushableSize();
135    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
136    region.decrMemStoreSize(mss);  // simulate flusher
137    ImmutableSegment s = memstore.getSnapshot();
138    assertEquals(3, s.getCellsCount());
139    assertEquals(0, regionServicesForStores.getMemStoreSize());
140
141    memstore.clearSnapshot(snapshot.getId());
142  }
143
144  @Override
145  public void testCompaction2Buckets() throws IOException {
146    if (toCellChunkMap) {
147      // set memstore to flat into CellChunkMap
148      ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
149    } else {
150      ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.ARRAY_MAP);
151    }
152    String[] keys1 = { "A", "A", "B", "C" };
153    String[] keys2 = { "A", "B", "D" };
154
155    long totalCellsLen1 = addRowsByKeysDataSize(memstore, keys1);     // INSERT 4
156    long cellBeforeFlushSize = cellBeforeFlushSize();
157    long cellAfterFlushSize = cellAfterFlushSize();
158    long totalHeapSize1 = MutableSegment.DEEP_OVERHEAD + 4 * cellBeforeFlushSize;
159    assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize());
160    assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
161
162    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
163    int counter = 0;                                          // COMPACT 4->3
164    for ( Segment s : memstore.getSegments()) {
165      counter += s.getCellsCount();
166    }
167    assertEquals(3,counter);
168    assertEquals(0, memstore.getSnapshot().getCellsCount());
169    // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting
170    // totalCellsLen
171    totalCellsLen1 = (totalCellsLen1 * 3) / 4;
172    totalHeapSize1 = 3 * cellAfterFlushSize + MutableSegment.DEEP_OVERHEAD
173        + (toCellChunkMap ?
174        CellChunkImmutableSegment.DEEP_OVERHEAD_CCM :
175        CellArrayImmutableSegment.DEEP_OVERHEAD_CAM);
176    assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize());
177    assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
178
179    long totalCellsLen2 = addRowsByKeysDataSize(memstore, keys2);   // INSERT 3 (3+3=6)
180    long totalHeapSize2 = 3 * cellBeforeFlushSize;
181    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
182    assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
183
184    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
185    assertEquals(0, memstore.getSnapshot().getCellsCount());// COMPACT 6->4
186    counter = 0;
187    for ( Segment s : memstore.getSegments()) {
188      counter += s.getCellsCount();
189    }
190    assertEquals(4,counter);
191    totalCellsLen2 = totalCellsLen2 / 3;// 2 cells duplicated in set 2
192    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
193    totalHeapSize2 = 1 * cellAfterFlushSize;
194    assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
195
196    MemStoreSize mss = memstore.getFlushableSize();
197    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
198    // simulate flusher
199    region.decrMemStoreSize(mss);
200    ImmutableSegment s = memstore.getSnapshot();
201    assertEquals(4, s.getCellsCount());
202    assertEquals(0, regionServicesForStores.getMemStoreSize());
203
204    memstore.clearSnapshot(snapshot.getId());
205  }
206
207  @Override
208  public void testCompaction3Buckets() throws IOException {
209    if (toCellChunkMap) {
210      // set memstore to flat into CellChunkMap
211      ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
212    } else {
213      // set to CellArrayMap as CCM is configured by default due to MSLAB usage
214      ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.ARRAY_MAP);
215    }
216    String[] keys1 = { "A", "A", "B", "C" };
217    String[] keys2 = { "A", "B", "D" };
218    String[] keys3 = { "D", "B", "B" };
219
220    long totalCellsLen1 = addRowsByKeysDataSize(memstore, keys1);
221    long cellBeforeFlushSize = cellBeforeFlushSize();
222    long cellAfterFlushSize = cellAfterFlushSize();
223    long totalHeapSize1 = MutableSegment.DEEP_OVERHEAD + 4 * cellBeforeFlushSize;
224    assertEquals(totalCellsLen1, region.getMemStoreDataSize());
225    assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
226
227    MemStoreSize mss = memstore.getFlushableSize();
228    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
229
230    assertEquals(0, memstore.getSnapshot().getCellsCount());
231    // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting
232    // totalCellsLen
233    totalCellsLen1 = (totalCellsLen1 * 3) / 4;
234    totalHeapSize1 = 3 * cellAfterFlushSize + MutableSegment.DEEP_OVERHEAD
235        + (toCellChunkMap ?
236        CellChunkImmutableSegment.DEEP_OVERHEAD_CCM :
237        CellArrayImmutableSegment.DEEP_OVERHEAD_CAM);
238    assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize());
239    assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
240
241    long totalCellsLen2 = addRowsByKeysDataSize(memstore, keys2);
242    long totalHeapSize2 = 3 * cellBeforeFlushSize;
243
244    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
245    assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
246
247    ((MyCompactingMemStore) memstore).disableCompaction();
248    mss = memstore.getFlushableSize();
249    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction
250    totalHeapSize2 = totalHeapSize2 + CSLMImmutableSegment.DEEP_OVERHEAD_CSLM;
251    assertEquals(0, memstore.getSnapshot().getCellsCount());
252    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
253    assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
254
255    long totalCellsLen3 = addRowsByKeysDataSize(memstore, keys3);
256    long totalHeapSize3 = 3 * cellBeforeFlushSize;
257    assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
258        regionServicesForStores.getMemStoreSize());
259    assertEquals(totalHeapSize1 + totalHeapSize2 + totalHeapSize3,
260        ((CompactingMemStore) memstore).heapSize());
261
262    ((MyCompactingMemStore) memstore).enableCompaction();
263    mss = memstore.getFlushableSize();
264    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
265    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
266      Threads.sleep(10);
267    }
268    assertEquals(0, memstore.getSnapshot().getCellsCount());
269    // active flushed to pipeline and all 3 segments compacted. Will get rid of duplicated cells.
270    // Out of total 10, only 4 cells are unique
271    totalCellsLen2 = totalCellsLen2 / 3;// 2 out of 3 cells are duplicated
272    totalCellsLen3 = 0;// All duplicated cells.
273    assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
274        regionServicesForStores.getMemStoreSize());
275    // Only 4 unique cells left
276    long totalHeapSize4 = 4 * cellAfterFlushSize + MutableSegment.DEEP_OVERHEAD
277        + (toCellChunkMap ?
278        CellChunkImmutableSegment.DEEP_OVERHEAD_CCM :
279        CellArrayImmutableSegment.DEEP_OVERHEAD_CAM);
280    assertEquals(totalHeapSize4, ((CompactingMemStore) memstore).heapSize());
281
282    mss = memstore.getFlushableSize();
283    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
284    // simulate flusher
285    region.decrMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize(),
286      mss.getCellsCount());
287    ImmutableSegment s = memstore.getSnapshot();
288    assertEquals(4, s.getCellsCount());
289    assertEquals(0, regionServicesForStores.getMemStoreSize());
290
291    memstore.clearSnapshot(snapshot.getId());
292  }
293
294  //////////////////////////////////////////////////////////////////////////////
295  // Merging tests
296  //////////////////////////////////////////////////////////////////////////////
297  @Test
298  public void testMerging() throws IOException {
299    if (toCellChunkMap) {
300      // set memstore to flat into CellChunkMap
301      ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
302    }
303    String[] keys1 = { "A", "A", "B", "C", "F", "H"};
304    String[] keys2 = { "A", "B", "D", "G", "I", "J"};
305    String[] keys3 = { "D", "B", "B", "E" };
306
307    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
308    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
309        String.valueOf(compactionType));
310    ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration());
311    addRowsByKeysDataSize(memstore, keys1);
312
313    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline should not compact
314
315    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
316      Threads.sleep(10);
317    }
318    assertEquals(0, memstore.getSnapshot().getCellsCount());
319
320    addRowsByKeysDataSize(memstore, keys2); // also should only flatten
321
322    int counter2 = 0;
323    for ( Segment s : memstore.getSegments()) {
324      counter2 += s.getCellsCount();
325    }
326    assertEquals(12, counter2);
327
328    ((MyCompactingMemStore) memstore).disableCompaction();
329
330    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without flattening
331    assertEquals(0, memstore.getSnapshot().getCellsCount());
332
333    int counter3 = 0;
334    for ( Segment s : memstore.getSegments()) {
335      counter3 += s.getCellsCount();
336    }
337    assertEquals(12, counter3);
338
339    addRowsByKeysDataSize(memstore, keys3);
340
341    int counter4 = 0;
342    for ( Segment s : memstore.getSegments()) {
343      counter4 += s.getCellsCount();
344    }
345    assertEquals(16, counter4);
346
347    ((MyCompactingMemStore) memstore).enableCompaction();
348
349
350    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
351    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
352      Threads.sleep(10);
353    }
354    assertEquals(0, memstore.getSnapshot().getCellsCount());
355
356    int counter = 0;
357    for ( Segment s : memstore.getSegments()) {
358      counter += s.getCellsCount();
359    }
360    assertEquals(16,counter);
361
362    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
363    ImmutableSegment s = memstore.getSnapshot();
364    memstore.clearSnapshot(snapshot.getId());
365  }
366
367  @Test
368  public void testTimeRangeAfterCompaction() throws IOException {
369    if (toCellChunkMap) {
370      // set memstore to flat into CellChunkMap
371      ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
372    }
373    testTimeRange(true);
374  }
375
376  @Test
377  public void testTimeRangeAfterMerge() throws IOException {
378    if (toCellChunkMap) {
379      // set memstore to flat into CellChunkMap
380      ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
381    }
382    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
383    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
384        String.valueOf(compactionType));
385    ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration());
386    testTimeRange(false);
387  }
388
389  private void testTimeRange(boolean isCompaction) throws IOException {
390    final long initTs = 100;
391    long currentTs = initTs;
392    byte[] row = Bytes.toBytes("row");
393    byte[] family = Bytes.toBytes("family");
394    byte[] qf1 = Bytes.toBytes("qf1");
395
396    // first segment in pipeline
397    this.memstore.add(new KeyValue(row, family, qf1, ++currentTs, (byte[])null), null);
398    long minTs = currentTs;
399    this.memstore.add(new KeyValue(row, family, qf1, ++currentTs, (byte[])null), null);
400
401    long numberOfCell = 2;
402    assertEquals(numberOfCell, memstore.getSegments().stream().mapToInt(Segment::getCellsCount).sum());
403    assertEquals(minTs, memstore.getSegments().stream().mapToLong(
404        m -> m.getTimeRangeTracker().getMin()).min().getAsLong());
405    assertEquals(currentTs, memstore.getSegments().stream().mapToLong(
406        m -> m.getTimeRangeTracker().getMax()).max().getAsLong());
407
408    ((CompactingMemStore) memstore).flushInMemory();
409
410    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
411      Threads.sleep(10);
412    }
413    if (isCompaction) {
414      // max version = 1, so one cell will be dropped.
415      numberOfCell = 1;
416      minTs = currentTs;
417    }
418    // second segment in pipeline
419    this.memstore.add(new KeyValue(row, family, qf1, ++currentTs, (byte[])null), null);
420    this.memstore.add(new KeyValue(row, family, qf1, ++currentTs, (byte[])null), null);
421    numberOfCell += 2;
422    assertEquals(numberOfCell, memstore.getSegments().stream().mapToInt(Segment::getCellsCount).sum());
423    assertEquals(minTs, memstore.getSegments().stream().mapToLong(
424        m -> m.getTimeRangeTracker().getMin()).min().getAsLong());
425    assertEquals(currentTs, memstore.getSegments().stream().mapToLong(
426        m -> m.getTimeRangeTracker().getMax()).max().getAsLong());
427
428    ((CompactingMemStore) memstore).flushInMemory(); // trigger the merge
429
430    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
431      Threads.sleep(10);
432    }
433    if (isCompaction) {
434      // max version = 1, so one cell will be dropped.
435      numberOfCell = 1;
436      minTs = currentTs;
437    }
438
439    assertEquals(numberOfCell, memstore.getSegments().stream().mapToInt(Segment::getCellsCount).sum());
440    assertEquals(minTs, memstore.getSegments().stream().mapToLong(
441        m -> m.getTimeRangeTracker().getMin()).min().getAsLong());
442    assertEquals(currentTs, memstore.getSegments().stream().mapToLong(
443        m -> m.getTimeRangeTracker().getMax()).max().getAsLong());
444  }
445
446  @Test
447  public void testCountOfCellsAfterFlatteningByScan() throws IOException {
448    String[] keys1 = { "A", "B", "C" }; // A, B, C
449    addRowsByKeysWith50Cols(memstore, keys1);
450    // this should only flatten as there are no duplicates
451    ((CompactingMemStore) memstore).flushInMemory();
452    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
453      Threads.sleep(10);
454    }
455    List<KeyValueScanner> scanners = memstore.getScanners(Long.MAX_VALUE);
456    // seek
457    int count = 0;
458    for(int i = 0; i < scanners.size(); i++) {
459      scanners.get(i).seek(KeyValue.LOWESTKEY);
460      while (scanners.get(i).next() != null) {
461        count++;
462      }
463    }
464    assertEquals("the count should be ", 150, count);
465    for(int i = 0; i < scanners.size(); i++) {
466      scanners.get(i).close();
467    }
468  }
469
470  @Test
471  public void testCountOfCellsAfterFlatteningByIterator() throws IOException {
472    String[] keys1 = { "A", "B", "C" }; // A, B, C
473    addRowsByKeysWith50Cols(memstore, keys1);
474    // this should only flatten as there are no duplicates
475    ((CompactingMemStore) memstore).flushInMemory();
476    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
477      Threads.sleep(10);
478    }
479    // Just doing the cnt operation here
480    MemStoreSegmentsIterator itr = new MemStoreMergerSegmentsIterator(
481        ((CompactingMemStore) memstore).getImmutableSegments().getStoreSegments(),
482        CellComparatorImpl.COMPARATOR, 10);
483    int cnt = 0;
484    try {
485      while (itr.next() != null) {
486        cnt++;
487      }
488    } finally {
489      itr.close();
490    }
491    assertEquals("the count should be ", 150, cnt);
492  }
493
494  private void addRowsByKeysWith50Cols(AbstractMemStore hmc, String[] keys) {
495    byte[] fam = Bytes.toBytes("testfamily");
496    for (int i = 0; i < keys.length; i++) {
497      long timestamp = System.currentTimeMillis();
498      Threads.sleep(1); // to make sure each kv gets a different ts
499      byte[] row = Bytes.toBytes(keys[i]);
500      for(int  j =0 ;j < 50; j++) {
501        byte[] qf = Bytes.toBytes("testqualifier"+j);
502        byte[] val = Bytes.toBytes(keys[i] + j);
503        KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
504        hmc.add(kv, null);
505      }
506    }
507  }
508
509  @Override
510  @Test
511  public void testPuttingBackChunksWithOpeningScanner() throws IOException {
512    byte[] row = Bytes.toBytes("testrow");
513    byte[] fam = Bytes.toBytes("testfamily");
514    byte[] qf1 = Bytes.toBytes("testqualifier1");
515    byte[] qf2 = Bytes.toBytes("testqualifier2");
516    byte[] qf3 = Bytes.toBytes("testqualifier3");
517    byte[] qf4 = Bytes.toBytes("testqualifier4");
518    byte[] qf5 = Bytes.toBytes("testqualifier5");
519    byte[] qf6 = Bytes.toBytes("testqualifier6");
520    byte[] qf7 = Bytes.toBytes("testqualifier7");
521    byte[] val = Bytes.toBytes("testval");
522
523    // Setting up memstore
524    memstore.add(new KeyValue(row, fam, qf1, val), null);
525    memstore.add(new KeyValue(row, fam, qf2, val), null);
526    memstore.add(new KeyValue(row, fam, qf3, val), null);
527
528    // Creating a snapshot
529    MemStoreSnapshot snapshot = memstore.snapshot();
530    assertEquals(3, memstore.getSnapshot().getCellsCount());
531
532    // Adding value to "new" memstore
533    assertEquals(0, memstore.getActive().getCellsCount());
534    memstore.add(new KeyValue(row, fam, qf4, val), null);
535    memstore.add(new KeyValue(row, fam, qf5, val), null);
536    assertEquals(2, memstore.getActive().getCellsCount());
537
538    // opening scanner before clear the snapshot
539    List<KeyValueScanner> scanners = memstore.getScanners(0);
540    // Shouldn't putting back the chunks to pool,since some scanners are opening
541    // based on their data
542    // close the scanners
543    for(KeyValueScanner scanner : snapshot.getScanners()) {
544      scanner.close();
545    }
546    memstore.clearSnapshot(snapshot.getId());
547
548    assertTrue(chunkCreator.getPoolSize() == 0);
549
550    // Chunks will be put back to pool after close scanners;
551    for (KeyValueScanner scanner : scanners) {
552      scanner.close();
553    }
554    assertTrue(chunkCreator.getPoolSize() > 0);
555
556    // clear chunks
557    chunkCreator.clearChunksInPool();
558
559    // Creating another snapshot
560
561    snapshot = memstore.snapshot();
562    // Adding more value
563    memstore.add(new KeyValue(row, fam, qf6, val), null);
564    memstore.add(new KeyValue(row, fam, qf7, val), null);
565    // opening scanners
566    scanners = memstore.getScanners(0);
567    // close scanners before clear the snapshot
568    for (KeyValueScanner scanner : scanners) {
569      scanner.close();
570    }
571    // Since no opening scanner, the chunks of snapshot should be put back to
572    // pool
573    // close the scanners
574    for(KeyValueScanner scanner : snapshot.getScanners()) {
575      scanner.close();
576    }
577    memstore.clearSnapshot(snapshot.getId());
578    assertTrue(chunkCreator.getPoolSize() > 0);
579  }
580
581  @Override
582  @Test
583  public void testPuttingBackChunksAfterFlushing() throws IOException {
584    byte[] row = Bytes.toBytes("testrow");
585    byte[] fam = Bytes.toBytes("testfamily");
586    byte[] qf1 = Bytes.toBytes("testqualifier1");
587    byte[] qf2 = Bytes.toBytes("testqualifier2");
588    byte[] qf3 = Bytes.toBytes("testqualifier3");
589    byte[] qf4 = Bytes.toBytes("testqualifier4");
590    byte[] qf5 = Bytes.toBytes("testqualifier5");
591    byte[] val = Bytes.toBytes("testval");
592
593    // Setting up memstore
594    memstore.add(new KeyValue(row, fam, qf1, val), null);
595    memstore.add(new KeyValue(row, fam, qf2, val), null);
596    memstore.add(new KeyValue(row, fam, qf3, val), null);
597
598    // Creating a snapshot
599    MemStoreSnapshot snapshot = memstore.snapshot();
600    assertEquals(3, memstore.getSnapshot().getCellsCount());
601
602    // Adding value to "new" memstore
603    assertEquals(0, memstore.getActive().getCellsCount());
604    memstore.add(new KeyValue(row, fam, qf4, val), null);
605    memstore.add(new KeyValue(row, fam, qf5, val), null);
606    assertEquals(2, memstore.getActive().getCellsCount());
607    // close the scanners
608    for(KeyValueScanner scanner : snapshot.getScanners()) {
609      scanner.close();
610    }
611    memstore.clearSnapshot(snapshot.getId());
612
613    int chunkCount = chunkCreator.getPoolSize();
614    assertTrue(chunkCount > 0);
615  }
616
617  @Test
618  public void testFlatteningToCellChunkMap() throws IOException {
619    if(!toCellChunkMap) {
620      return;
621    }
622    // set memstore to flat into CellChunkMap
623    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
624    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
625        String.valueOf(compactionType));
626    ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration());
627    ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
628    int numOfCells = 8;
629    String[] keys1 = { "A", "A", "B", "C", "D", "D", "E", "F" }; //A1, A2, B3, C4, D5, D6, E7, F8
630
631    // make one cell
632    byte[] row = Bytes.toBytes(keys1[0]);
633    byte[] val = Bytes.toBytes(keys1[0] + 0);
634    KeyValue kv =
635        new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"),
636            System.currentTimeMillis(), val);
637
638    // test 1 bucket
639    int totalCellsLen = addRowsByKeys(memstore, keys1);
640    long oneCellOnCSLMHeapSize = ClassSize.align(
641      ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + KeyValue.FIXED_OVERHEAD + kv.getSerializedSize());
642
643    long totalHeapSize = numOfCells * oneCellOnCSLMHeapSize + MutableSegment.DEEP_OVERHEAD;
644    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
645    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
646
647    ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and flatten
648    assertEquals(0, memstore.getSnapshot().getCellsCount());
649    long oneCellOnCCMHeapSize =
650        ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(kv.getSerializedSize());
651    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
652        + numOfCells * oneCellOnCCMHeapSize;
653
654    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
655    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
656
657    MemStoreSize mss = memstore.getFlushableSize();
658    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
659    // simulate flusher
660    region.decrMemStoreSize(mss);
661    ImmutableSegment s = memstore.getSnapshot();
662    assertEquals(numOfCells, s.getCellsCount());
663    assertEquals(0, regionServicesForStores.getMemStoreSize());
664
665    memstore.clearSnapshot(snapshot.getId());
666  }
667
668  /**
669   * CellChunkMap Segment index requires all cell data to be written in the MSLAB Chunks.
670   * Even though MSLAB is enabled, cells bigger than maxAlloc
671   * (even if smaller than the size of a chunk) are not written in the MSLAB Chunks.
672   * If such cells are found in the process of flattening into CellChunkMap
673   * (in-memory-flush) they need to be copied into MSLAB.
674   * testFlatteningToBigCellChunkMap checks that the process of flattening into
675   * CellChunkMap succeeds, even when such big cells are allocated.
676   */
677  @Test
678  public void testFlatteningToBigCellChunkMap() throws IOException {
679
680    if (toCellChunkMap == false) {
681      return;
682    }
683    // set memstore to flat into CellChunkMap
684    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
685    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
686            String.valueOf(compactionType));
687    ((MyCompactingMemStore)memstore).initiateType(compactionType, memstore.getConfiguration());
688    ((CompactingMemStore)memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
689    int numOfCells = 4;
690    char[] chars = new char[MemStoreLAB.MAX_ALLOC_DEFAULT];
691    for (int i = 0; i < chars.length; i++) {
692      chars[i] = 'A';
693    }
694    String bigVal = new String(chars);
695    String[] keys1 = { "A", "B", "C", "D"};
696
697    // make one cell
698    byte[] row = Bytes.toBytes(keys1[0]);
699    byte[] val = Bytes.toBytes(bigVal);
700    KeyValue kv =
701            new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"),
702                    System.currentTimeMillis(), val);
703
704    // test 1 bucket
705    int totalCellsLen = addRowsByKeys(memstore, keys1, val);
706
707    long oneCellOnCSLMHeapSize =
708            ClassSize.align(
709                    ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize());
710
711    long totalHeapSize = numOfCells * oneCellOnCSLMHeapSize + MutableSegment.DEEP_OVERHEAD;
712    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
713    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
714
715    ((CompactingMemStore)memstore).flushInMemory(); // push keys to pipeline and flatten
716    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
717      Threads.sleep(10);
718    }
719    assertEquals(0, memstore.getSnapshot().getCellsCount());
720    // One cell is duplicated, but it shouldn't be compacted because we are in BASIC mode.
721    // totalCellsLen should remain the same
722    long oneCellOnCCMHeapSize =
723            ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(kv.getSerializedSize());
724    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
725            + numOfCells * oneCellOnCCMHeapSize;
726
727    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
728    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
729
730    MemStoreSize mss = memstore.getFlushableSize();
731    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
732    // simulate flusher
733    region.decrMemStoreSize(mss);
734    ImmutableSegment s = memstore.getSnapshot();
735    assertEquals(numOfCells, s.getCellsCount());
736    assertEquals(0, regionServicesForStores.getMemStoreSize());
737
738    memstore.clearSnapshot(snapshot.getId());
739  }
740
741  /**
742   * CellChunkMap Segment index requires all cell data to be written in the MSLAB Chunks.
743   * Even though MSLAB is enabled, cells bigger than the size of a chunk are not
744   * written in the MSLAB Chunks.
745   * If such cells are found in the process of flattening into CellChunkMap
746   * (in-memory-flush) they need to be copied into MSLAB.
747   * testFlatteningToJumboCellChunkMap checks that the process of flattening
748   * into CellChunkMap succeeds, even when such big cells are allocated.
749   */
750  @Test
751  public void testFlatteningToJumboCellChunkMap() throws IOException {
752
753    if (toCellChunkMap == false) {
754      return;
755    }
756    // set memstore to flat into CellChunkMap
757    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
758    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
759        String.valueOf(compactionType));
760    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
761    ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
762
763    int numOfCells = 1;
764    char[] chars = new char[MemStoreLAB.CHUNK_SIZE_DEFAULT];
765    for (int i = 0; i < chars.length; i++) {
766      chars[i] = 'A';
767    }
768    String bigVal = new String(chars);
769    String[] keys1 = {"A"};
770
771    // make one cell
772    byte[] row = Bytes.toBytes(keys1[0]);
773    byte[] val = Bytes.toBytes(bigVal);
774    KeyValue kv =
775            new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"),
776                    System.currentTimeMillis(), val);
777
778    // test 1 bucket
779    int totalCellsLen = addRowsByKeys(memstore, keys1, val);
780
781    long oneCellOnCSLMHeapSize =
782            ClassSize.align(
783                    ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize());
784
785    long totalHeapSize = numOfCells * oneCellOnCSLMHeapSize + MutableSegment.DEEP_OVERHEAD;
786    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
787    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
788
789    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and flatten
790    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
791      Threads.sleep(10);
792    }
793    assertEquals(0, memstore.getSnapshot().getCellsCount());
794
795    // One cell is duplicated, but it shouldn't be compacted because we are in BASIC mode.
796    // totalCellsLen should remain the same
797    long oneCellOnCCMHeapSize =
798        (long) ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(kv.getSerializedSize());
799    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
800            + numOfCells * oneCellOnCCMHeapSize;
801
802    assertEquals(totalCellsLen+ChunkCreator.SIZEOF_CHUNK_HEADER, regionServicesForStores
803        .getMemStoreSize());
804
805    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
806
807    MemStoreSize mss = memstore.getFlushableSize();
808    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
809    // simulate flusher
810    region.decrMemStoreSize(mss);
811    ImmutableSegment s = memstore.getSnapshot();
812    assertEquals(numOfCells, s.getCellsCount());
813    assertEquals(0, regionServicesForStores.getMemStoreSize());
814
815    memstore.clearSnapshot(snapshot.getId());
816
817    // Allocating two big cells (too big for being copied into a regular chunk).
818    String[] keys2 = {"C", "D"};
819    addRowsByKeys(memstore, keys2, val);
820    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
821      Threads.sleep(10);
822    }
823
824    // The in-memory flush size is bigger than the size of a single cell,
825    // but smaller than the size of two cells.
826    // Therefore, the two created cells are flattened together.
827    totalHeapSize = MutableSegment.DEEP_OVERHEAD
828        + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
829        + 1 * oneCellOnCSLMHeapSize
830        + 1 * oneCellOnCCMHeapSize;
831    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
832  }
833
834  /**
835   * CellChunkMap Segment index requires all cell data to be written in the MSLAB Chunks.
836   * Even though MSLAB is enabled, cells bigger than the size of a chunk are not
837   * written in the MSLAB Chunks.
838   * If such cells are found in the process of a merge they need to be copied into MSLAB.
839   * testForceCopyOfBigCellIntoImmutableSegment checks that the
840   * ImmutableMemStoreLAB's forceCopyOfBigCellInto does what it's supposed to do.
841   */
842  @org.junit.Ignore @Test // Flakey. Disabled by HBASE-24128. HBASE-24129 is for reenable.
843  // TestCompactingToCellFlatMapMemStore.testForceCopyOfBigCellIntoImmutableSegment:902 i=1
844  //   expected:<8389924> but was:<8389992>
845  public void testForceCopyOfBigCellIntoImmutableSegment() throws IOException {
846
847    if (toCellChunkMap == false) {
848      return;
849    }
850
851    // set memstore to flat into CellChunkMap
852    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
853    memstore.getConfiguration().setInt(MemStoreCompactionStrategy
854        .COMPACTING_MEMSTORE_THRESHOLD_KEY, 4);
855    memstore.getConfiguration()
856        .setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.014);
857    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
858        String.valueOf(compactionType));
859    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
860    ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
861
862    char[] chars = new char[MemStoreLAB.CHUNK_SIZE_DEFAULT];
863    for (int i = 0; i < chars.length; i++) {
864      chars[i] = 'A';
865    }
866    String bigVal = new String(chars);
867    byte[] val = Bytes.toBytes(bigVal);
868
869    // We need to add two cells, three times, in order to guarantee a merge
870    List<String[]> keysList = new ArrayList<>();
871    keysList.add(new String[]{"A", "B"});
872    keysList.add(new String[]{"C", "D"});
873    keysList.add(new String[]{"E", "F"});
874    keysList.add(new String[]{"G", "H"});
875
876    // Measuring the size of a single kv
877    KeyValue kv = new KeyValue(Bytes.toBytes("A"), Bytes.toBytes("testfamily"),
878            Bytes.toBytes("testqualifier"), System.currentTimeMillis(), val);
879    long oneCellOnCCMHeapSize =
880        (long) ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(kv.getSerializedSize());
881    long oneCellOnCSLMHeapSize =
882        ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize());
883    long totalHeapSize = MutableSegment.DEEP_OVERHEAD;
884    for (int i = 0; i < keysList.size(); i++) {
885      addRowsByKeys(memstore, keysList.get(i), val);
886      while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
887        Threads.sleep(10);
888      }
889
890      if(i==0) {
891        totalHeapSize += CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
892            + oneCellOnCCMHeapSize + oneCellOnCSLMHeapSize;
893      } else {
894        // The in-memory flush size is bigger than the size of a single cell,
895        // but smaller than the size of two cells.
896        // Therefore, the two created cells are flattened in a seperate segment.
897        totalHeapSize += 2 * (CellChunkImmutableSegment.DEEP_OVERHEAD_CCM + oneCellOnCCMHeapSize);
898      }
899      if (i == 2) {
900        // Four out of the five segments are merged into one
901        totalHeapSize -= (4 * CellChunkImmutableSegment.DEEP_OVERHEAD_CCM);
902        totalHeapSize = ClassSize.align(totalHeapSize);
903      }
904      assertEquals("i="+i, totalHeapSize, ((CompactingMemStore) memstore).heapSize());
905    }
906  }
907
908
909  private long addRowsByKeysDataSize(final AbstractMemStore hmc, String[] keys) {
910    byte[] fam = Bytes.toBytes("testfamily");
911    byte[] qf = Bytes.toBytes("testqualifier");
912    MemStoreSizing memstoreSizing = new NonThreadSafeMemStoreSizing();
913    for (int i = 0; i < keys.length; i++) {
914      long timestamp = System.currentTimeMillis();
915      Threads.sleep(1); // to make sure each kv gets a different ts
916      byte[] row = Bytes.toBytes(keys[i]);
917      byte[] val = Bytes.toBytes(keys[i] + i);
918      KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
919      hmc.add(kv, memstoreSizing);
920      LOG.debug("added kv: " + kv.getKeyString() + ", timestamp" + kv.getTimestamp());
921    }
922    MemStoreSize mss = memstoreSizing.getMemStoreSize();
923    regionServicesForStores.addMemStoreSize(mss.getDataSize(), mss.getHeapSize(),
924      mss.getOffHeapSize(), mss.getCellsCount());
925    return mss.getDataSize();
926  }
927
928  private long cellBeforeFlushSize() {
929    // make one cell
930    byte[] row = Bytes.toBytes("A");
931    byte[] val = Bytes.toBytes("A" + 0);
932    KeyValue kv =
933        new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"),
934            System.currentTimeMillis(), val);
935    return ClassSize.align(
936        ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + KeyValue.FIXED_OVERHEAD + kv.getSerializedSize());
937  }
938
939  private long cellAfterFlushSize() {
940    // make one cell
941    byte[] row = Bytes.toBytes("A");
942    byte[] val = Bytes.toBytes("A" + 0);
943    KeyValue kv =
944        new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"),
945            System.currentTimeMillis(), val);
946
947    return toCellChunkMap ?
948        ClassSize.align(
949        ClassSize.CELL_CHUNK_MAP_ENTRY + kv.getSerializedSize()) :
950        ClassSize.align(
951        ClassSize.CELL_ARRAY_MAP_ENTRY + KeyValue.FIXED_OVERHEAD + kv.getSerializedSize());
952  }
953}