001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.List;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.Cell;
028import org.apache.hadoop.hbase.CellComparatorImpl;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseConfiguration;
031import org.apache.hadoop.hbase.KeyValue;
032import org.apache.hadoop.hbase.MemoryCompactionPolicy;
033import org.apache.hadoop.hbase.testclassification.LargeTests;
034import org.apache.hadoop.hbase.testclassification.RegionServerTests;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.hbase.util.ClassSize;
037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
038import org.apache.hadoop.hbase.util.Threads;
039import org.junit.ClassRule;
040import org.junit.Test;
041import org.junit.experimental.categories.Category;
042import org.junit.runner.RunWith;
043import org.junit.runners.Parameterized;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047/**
048 * compacted memstore test case
049 */
050@Category({ RegionServerTests.class, LargeTests.class })
051@RunWith(Parameterized.class)
052public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore {
053
054  @ClassRule
055  public static final HBaseClassTestRule CLASS_RULE =
056    HBaseClassTestRule.forClass(TestCompactingToCellFlatMapMemStore.class);
057
058  @Parameterized.Parameters
059  public static Object[] data() {
060    return new Object[] { "CHUNK_MAP", "ARRAY_MAP" }; // test different immutable indexes
061  }
062
063  private static final Logger LOG =
064    LoggerFactory.getLogger(TestCompactingToCellFlatMapMemStore.class);
065  public final boolean toCellChunkMap;
066  Configuration conf;
067
068  //////////////////////////////////////////////////////////////////////////////
069  // Helpers
070  //////////////////////////////////////////////////////////////////////////////
071  public TestCompactingToCellFlatMapMemStore(String type) {
072    if (type == "CHUNK_MAP") {
073      toCellChunkMap = true;
074    } else {
075      toCellChunkMap = false;
076    }
077  }
078
079  @Override
080  public void tearDown() throws Exception {
081    chunkCreator.clearChunksInPool();
082  }
083
084  @Override
085  public void setUp() throws Exception {
086
087    compactingSetUp();
088    this.conf = HBaseConfiguration.create();
089
090    // set memstore to do data compaction
091    conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
092      String.valueOf(MemoryCompactionPolicy.EAGER));
093    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.02);
094    this.memstore = new MyCompactingMemStore(conf, CellComparatorImpl.COMPARATOR, store,
095      regionServicesForStores, MemoryCompactionPolicy.EAGER);
096  }
097
098  //////////////////////////////////////////////////////////////////////////////
099  // Compaction tests
100  //////////////////////////////////////////////////////////////////////////////
101  @Override
102  public void testCompaction1Bucket() throws IOException {
103    int counter = 0;
104    String[] keys1 = { "A", "A", "B", "C" }; // A1, A2, B3, C4
105    if (toCellChunkMap) {
106      // set memstore to flat into CellChunkMap
107      ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
108    } else {
109      ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.ARRAY_MAP);
110    }
111
112    // test 1 bucket
113    long totalCellsLen = addRowsByKeysDataSize(memstore, keys1);
114    long cellBeforeFlushSize = cellBeforeFlushSize();
115    long cellAfterFlushSize = cellAfterFlushSize();
116    long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * cellBeforeFlushSize;
117
118    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
119    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
120
121    assertEquals(4, memstore.getActive().getCellsCount());
122    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
123    assertEquals(0, memstore.getSnapshot().getCellsCount());
124    // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting
125    // totalCellsLen
126    totalCellsLen = (totalCellsLen * 3) / 4;
127    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
128
129    totalHeapSize = 3 * cellAfterFlushSize + MutableSegment.DEEP_OVERHEAD
130      + (toCellChunkMap
131        ? CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
132        : CellArrayImmutableSegment.DEEP_OVERHEAD_CAM);
133    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
134    for (Segment s : memstore.getSegments()) {
135      counter += s.getCellsCount();
136    }
137    assertEquals(3, counter);
138    MemStoreSize mss = memstore.getFlushableSize();
139    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
140    region.decrMemStoreSize(mss); // simulate flusher
141    ImmutableSegment s = memstore.getSnapshot();
142    assertEquals(3, s.getCellsCount());
143    assertEquals(0, regionServicesForStores.getMemStoreSize());
144
145    memstore.clearSnapshot(snapshot.getId());
146  }
147
148  @Override
149  public void testCompaction2Buckets() throws IOException {
150    if (toCellChunkMap) {
151      // set memstore to flat into CellChunkMap
152      ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
153    } else {
154      ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.ARRAY_MAP);
155    }
156    String[] keys1 = { "A", "A", "B", "C" };
157    String[] keys2 = { "A", "B", "D" };
158
159    long totalCellsLen1 = addRowsByKeysDataSize(memstore, keys1); // INSERT 4
160    long cellBeforeFlushSize = cellBeforeFlushSize();
161    long cellAfterFlushSize = cellAfterFlushSize();
162    long totalHeapSize1 = MutableSegment.DEEP_OVERHEAD + 4 * cellBeforeFlushSize;
163    assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize());
164    assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
165
166    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
167    int counter = 0; // COMPACT 4->3
168    for (Segment s : memstore.getSegments()) {
169      counter += s.getCellsCount();
170    }
171    assertEquals(3, counter);
172    assertEquals(0, memstore.getSnapshot().getCellsCount());
173    // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting
174    // totalCellsLen
175    totalCellsLen1 = (totalCellsLen1 * 3) / 4;
176    totalHeapSize1 = 3 * cellAfterFlushSize + MutableSegment.DEEP_OVERHEAD
177      + (toCellChunkMap
178        ? CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
179        : CellArrayImmutableSegment.DEEP_OVERHEAD_CAM);
180    assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize());
181    assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
182
183    long totalCellsLen2 = addRowsByKeysDataSize(memstore, keys2); // INSERT 3 (3+3=6)
184    long totalHeapSize2 = 3 * cellBeforeFlushSize;
185    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
186    assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
187
188    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
189    assertEquals(0, memstore.getSnapshot().getCellsCount());// COMPACT 6->4
190    counter = 0;
191    for (Segment s : memstore.getSegments()) {
192      counter += s.getCellsCount();
193    }
194    assertEquals(4, counter);
195    totalCellsLen2 = totalCellsLen2 / 3;// 2 cells duplicated in set 2
196    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
197    totalHeapSize2 = 1 * cellAfterFlushSize;
198    assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
199
200    MemStoreSize mss = memstore.getFlushableSize();
201    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
202    // simulate flusher
203    region.decrMemStoreSize(mss);
204    ImmutableSegment s = memstore.getSnapshot();
205    assertEquals(4, s.getCellsCount());
206    assertEquals(0, regionServicesForStores.getMemStoreSize());
207
208    memstore.clearSnapshot(snapshot.getId());
209  }
210
211  @Override
212  public void testCompaction3Buckets() throws IOException {
213    if (toCellChunkMap) {
214      // set memstore to flat into CellChunkMap
215      ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
216    } else {
217      // set to CellArrayMap as CCM is configured by default due to MSLAB usage
218      ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.ARRAY_MAP);
219    }
220    String[] keys1 = { "A", "A", "B", "C" };
221    String[] keys2 = { "A", "B", "D" };
222    String[] keys3 = { "D", "B", "B" };
223
224    long totalCellsLen1 = addRowsByKeysDataSize(memstore, keys1);
225    long cellBeforeFlushSize = cellBeforeFlushSize();
226    long cellAfterFlushSize = cellAfterFlushSize();
227    long totalHeapSize1 = MutableSegment.DEEP_OVERHEAD + 4 * cellBeforeFlushSize;
228    assertEquals(totalCellsLen1, region.getMemStoreDataSize());
229    assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
230
231    MemStoreSize mss = memstore.getFlushableSize();
232    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
233
234    assertEquals(0, memstore.getSnapshot().getCellsCount());
235    // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting
236    // totalCellsLen
237    totalCellsLen1 = (totalCellsLen1 * 3) / 4;
238    totalHeapSize1 = 3 * cellAfterFlushSize + MutableSegment.DEEP_OVERHEAD
239      + (toCellChunkMap
240        ? CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
241        : CellArrayImmutableSegment.DEEP_OVERHEAD_CAM);
242    assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize());
243    assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
244
245    long totalCellsLen2 = addRowsByKeysDataSize(memstore, keys2);
246    long totalHeapSize2 = 3 * cellBeforeFlushSize;
247
248    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
249    assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
250
251    ((MyCompactingMemStore) memstore).disableCompaction();
252    mss = memstore.getFlushableSize();
253    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction
254    totalHeapSize2 = totalHeapSize2 + CSLMImmutableSegment.DEEP_OVERHEAD_CSLM;
255    assertEquals(0, memstore.getSnapshot().getCellsCount());
256    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
257    assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
258
259    long totalCellsLen3 = addRowsByKeysDataSize(memstore, keys3);
260    long totalHeapSize3 = 3 * cellBeforeFlushSize;
261    assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
262      regionServicesForStores.getMemStoreSize());
263    assertEquals(totalHeapSize1 + totalHeapSize2 + totalHeapSize3,
264      ((CompactingMemStore) memstore).heapSize());
265
266    ((MyCompactingMemStore) memstore).enableCompaction();
267    mss = memstore.getFlushableSize();
268    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
269    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
270      Threads.sleep(10);
271    }
272    assertEquals(0, memstore.getSnapshot().getCellsCount());
273    // active flushed to pipeline and all 3 segments compacted. Will get rid of duplicated cells.
274    // Out of total 10, only 4 cells are unique
275    totalCellsLen2 = totalCellsLen2 / 3;// 2 out of 3 cells are duplicated
276    totalCellsLen3 = 0;// All duplicated cells.
277    assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
278      regionServicesForStores.getMemStoreSize());
279    // Only 4 unique cells left
280    long totalHeapSize4 = 4 * cellAfterFlushSize + MutableSegment.DEEP_OVERHEAD
281      + (toCellChunkMap
282        ? CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
283        : CellArrayImmutableSegment.DEEP_OVERHEAD_CAM);
284    assertEquals(totalHeapSize4, ((CompactingMemStore) memstore).heapSize());
285
286    mss = memstore.getFlushableSize();
287    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
288    // simulate flusher
289    region.decrMemStoreSize(mss);
290    ImmutableSegment s = memstore.getSnapshot();
291    assertEquals(4, s.getCellsCount());
292    assertEquals(0, regionServicesForStores.getMemStoreSize());
293
294    memstore.clearSnapshot(snapshot.getId());
295  }
296
297  //////////////////////////////////////////////////////////////////////////////
298  // Merging tests
299  //////////////////////////////////////////////////////////////////////////////
300  @Test
301  public void testMerging() throws IOException {
302    if (toCellChunkMap) {
303      // set memstore to flat into CellChunkMap
304      ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
305    }
306    String[] keys1 = { "A", "A", "B", "C", "F", "H" };
307    String[] keys2 = { "A", "B", "D", "G", "I", "J" };
308    String[] keys3 = { "D", "B", "B", "E" };
309
310    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
311    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
312      String.valueOf(compactionType));
313    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
314    addRowsByKeysDataSize(memstore, keys1);
315
316    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline should not compact
317
318    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
319      Threads.sleep(10);
320    }
321    assertEquals(0, memstore.getSnapshot().getCellsCount());
322
323    addRowsByKeysDataSize(memstore, keys2); // also should only flatten
324
325    int counter2 = 0;
326    for (Segment s : memstore.getSegments()) {
327      counter2 += s.getCellsCount();
328    }
329    assertEquals(12, counter2);
330
331    ((MyCompactingMemStore) memstore).disableCompaction();
332
333    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without flattening
334    assertEquals(0, memstore.getSnapshot().getCellsCount());
335
336    int counter3 = 0;
337    for (Segment s : memstore.getSegments()) {
338      counter3 += s.getCellsCount();
339    }
340    assertEquals(12, counter3);
341
342    addRowsByKeysDataSize(memstore, keys3);
343
344    int counter4 = 0;
345    for (Segment s : memstore.getSegments()) {
346      counter4 += s.getCellsCount();
347    }
348    assertEquals(16, counter4);
349
350    ((MyCompactingMemStore) memstore).enableCompaction();
351
352    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
353    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
354      Threads.sleep(10);
355    }
356    assertEquals(0, memstore.getSnapshot().getCellsCount());
357
358    int counter = 0;
359    for (Segment s : memstore.getSegments()) {
360      counter += s.getCellsCount();
361    }
362    assertEquals(16, counter);
363
364    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
365    ImmutableSegment s = memstore.getSnapshot();
366    memstore.clearSnapshot(snapshot.getId());
367  }
368
369  @Test
370  public void testTimeRangeAfterCompaction() throws IOException {
371    if (toCellChunkMap) {
372      // set memstore to flat into CellChunkMap
373      ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
374    }
375    testTimeRange(true);
376  }
377
378  @Test
379  public void testTimeRangeAfterMerge() throws IOException {
380    if (toCellChunkMap) {
381      // set memstore to flat into CellChunkMap
382      ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
383    }
384    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
385    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
386      String.valueOf(compactionType));
387    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
388    testTimeRange(false);
389  }
390
391  private void testTimeRange(boolean isCompaction) throws IOException {
392    final long initTs = 100;
393    long currentTs = initTs;
394    byte[] row = Bytes.toBytes("row");
395    byte[] family = Bytes.toBytes("family");
396    byte[] qf1 = Bytes.toBytes("qf1");
397
398    // first segment in pipeline
399    this.memstore.add(new KeyValue(row, family, qf1, ++currentTs, (byte[]) null), null);
400    long minTs = currentTs;
401    this.memstore.add(new KeyValue(row, family, qf1, ++currentTs, (byte[]) null), null);
402
403    long numberOfCell = 2;
404    assertEquals(numberOfCell,
405      memstore.getSegments().stream().mapToInt(Segment::getCellsCount).sum());
406    assertEquals(minTs, memstore.getSegments().stream()
407      .mapToLong(m -> m.getTimeRangeTracker().getMin()).min().getAsLong());
408    assertEquals(currentTs, memstore.getSegments().stream()
409      .mapToLong(m -> m.getTimeRangeTracker().getMax()).max().getAsLong());
410
411    ((CompactingMemStore) memstore).flushInMemory();
412
413    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
414      Threads.sleep(10);
415    }
416    if (isCompaction) {
417      // max version = 1, so one cell will be dropped.
418      numberOfCell = 1;
419      minTs = currentTs;
420    }
421    // second segment in pipeline
422    this.memstore.add(new KeyValue(row, family, qf1, ++currentTs, (byte[]) null), null);
423    this.memstore.add(new KeyValue(row, family, qf1, ++currentTs, (byte[]) null), null);
424    numberOfCell += 2;
425    assertEquals(numberOfCell,
426      memstore.getSegments().stream().mapToInt(Segment::getCellsCount).sum());
427    assertEquals(minTs, memstore.getSegments().stream()
428      .mapToLong(m -> m.getTimeRangeTracker().getMin()).min().getAsLong());
429    assertEquals(currentTs, memstore.getSegments().stream()
430      .mapToLong(m -> m.getTimeRangeTracker().getMax()).max().getAsLong());
431
432    ((CompactingMemStore) memstore).flushInMemory(); // trigger the merge
433
434    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
435      Threads.sleep(10);
436    }
437    if (isCompaction) {
438      // max version = 1, so one cell will be dropped.
439      numberOfCell = 1;
440      minTs = currentTs;
441    }
442
443    assertEquals(numberOfCell,
444      memstore.getSegments().stream().mapToInt(Segment::getCellsCount).sum());
445    assertEquals(minTs, memstore.getSegments().stream()
446      .mapToLong(m -> m.getTimeRangeTracker().getMin()).min().getAsLong());
447    assertEquals(currentTs, memstore.getSegments().stream()
448      .mapToLong(m -> m.getTimeRangeTracker().getMax()).max().getAsLong());
449  }
450
451  @Test
452  public void testCountOfCellsAfterFlatteningByScan() throws IOException {
453    String[] keys1 = { "A", "B", "C" }; // A, B, C
454    addRowsByKeysWith50Cols(memstore, keys1);
455    // this should only flatten as there are no duplicates
456    ((CompactingMemStore) memstore).flushInMemory();
457    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
458      Threads.sleep(10);
459    }
460    List<KeyValueScanner> scanners = memstore.getScanners(Long.MAX_VALUE);
461    // seek
462    int count = 0;
463    for (int i = 0; i < scanners.size(); i++) {
464      scanners.get(i).seek(KeyValue.LOWESTKEY);
465      while (scanners.get(i).next() != null) {
466        count++;
467      }
468    }
469    assertEquals("the count should be ", 150, count);
470    for (int i = 0; i < scanners.size(); i++) {
471      scanners.get(i).close();
472    }
473  }
474
475  @Test
476  public void testCountOfCellsAfterFlatteningByIterator() throws IOException {
477    String[] keys1 = { "A", "B", "C" }; // A, B, C
478    addRowsByKeysWith50Cols(memstore, keys1);
479    // this should only flatten as there are no duplicates
480    ((CompactingMemStore) memstore).flushInMemory();
481    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
482      Threads.sleep(10);
483    }
484    // Just doing the cnt operation here
485    MemStoreSegmentsIterator itr = new MemStoreMergerSegmentsIterator(
486      ((CompactingMemStore) memstore).getImmutableSegments().getStoreSegments(),
487      CellComparatorImpl.COMPARATOR, 10);
488    int cnt = 0;
489    try {
490      while (itr.next() != null) {
491        cnt++;
492      }
493    } finally {
494      itr.close();
495    }
496    assertEquals("the count should be ", 150, cnt);
497  }
498
499  private void addRowsByKeysWith50Cols(AbstractMemStore hmc, String[] keys) {
500    byte[] fam = Bytes.toBytes("testfamily");
501    for (int i = 0; i < keys.length; i++) {
502      long timestamp = EnvironmentEdgeManager.currentTime();
503      Threads.sleep(1); // to make sure each kv gets a different ts
504      byte[] row = Bytes.toBytes(keys[i]);
505      for (int j = 0; j < 50; j++) {
506        byte[] qf = Bytes.toBytes("testqualifier" + j);
507        byte[] val = Bytes.toBytes(keys[i] + j);
508        KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
509        hmc.add(kv, null);
510      }
511    }
512  }
513
514  @Override
515  @Test
516  public void testPuttingBackChunksWithOpeningScanner() throws IOException {
517    byte[] row = Bytes.toBytes("testrow");
518    byte[] fam = Bytes.toBytes("testfamily");
519    byte[] qf1 = Bytes.toBytes("testqualifier1");
520    byte[] qf2 = Bytes.toBytes("testqualifier2");
521    byte[] qf3 = Bytes.toBytes("testqualifier3");
522    byte[] qf4 = Bytes.toBytes("testqualifier4");
523    byte[] qf5 = Bytes.toBytes("testqualifier5");
524    byte[] qf6 = Bytes.toBytes("testqualifier6");
525    byte[] qf7 = Bytes.toBytes("testqualifier7");
526    byte[] val = Bytes.toBytes("testval");
527
528    // Setting up memstore
529    memstore.add(new KeyValue(row, fam, qf1, val), null);
530    memstore.add(new KeyValue(row, fam, qf2, val), null);
531    memstore.add(new KeyValue(row, fam, qf3, val), null);
532
533    // Creating a snapshot
534    MemStoreSnapshot snapshot = memstore.snapshot();
535    assertEquals(3, memstore.getSnapshot().getCellsCount());
536
537    // Adding value to "new" memstore
538    assertEquals(0, memstore.getActive().getCellsCount());
539    memstore.add(new KeyValue(row, fam, qf4, val), null);
540    memstore.add(new KeyValue(row, fam, qf5, val), null);
541    assertEquals(2, memstore.getActive().getCellsCount());
542
543    // opening scanner before clear the snapshot
544    List<KeyValueScanner> scanners = memstore.getScanners(0);
545    // Shouldn't putting back the chunks to pool,since some scanners are opening
546    // based on their data
547    // close the scanners
548    for (KeyValueScanner scanner : snapshot.getScanners()) {
549      scanner.close();
550    }
551    memstore.clearSnapshot(snapshot.getId());
552
553    assertTrue(chunkCreator.getPoolSize() == 0);
554
555    // Chunks will be put back to pool after close scanners;
556    for (KeyValueScanner scanner : scanners) {
557      scanner.close();
558    }
559    assertTrue(chunkCreator.getPoolSize() > 0);
560
561    // clear chunks
562    chunkCreator.clearChunksInPool();
563
564    // Creating another snapshot
565
566    snapshot = memstore.snapshot();
567    // Adding more value
568    memstore.add(new KeyValue(row, fam, qf6, val), null);
569    memstore.add(new KeyValue(row, fam, qf7, val), null);
570    // opening scanners
571    scanners = memstore.getScanners(0);
572    // close scanners before clear the snapshot
573    for (KeyValueScanner scanner : scanners) {
574      scanner.close();
575    }
576    // Since no opening scanner, the chunks of snapshot should be put back to
577    // pool
578    // close the scanners
579    for (KeyValueScanner scanner : snapshot.getScanners()) {
580      scanner.close();
581    }
582    memstore.clearSnapshot(snapshot.getId());
583    assertTrue(chunkCreator.getPoolSize() > 0);
584  }
585
586  @Override
587  @Test
588  public void testPuttingBackChunksAfterFlushing() throws IOException {
589    byte[] row = Bytes.toBytes("testrow");
590    byte[] fam = Bytes.toBytes("testfamily");
591    byte[] qf1 = Bytes.toBytes("testqualifier1");
592    byte[] qf2 = Bytes.toBytes("testqualifier2");
593    byte[] qf3 = Bytes.toBytes("testqualifier3");
594    byte[] qf4 = Bytes.toBytes("testqualifier4");
595    byte[] qf5 = Bytes.toBytes("testqualifier5");
596    byte[] val = Bytes.toBytes("testval");
597
598    // Setting up memstore
599    memstore.add(new KeyValue(row, fam, qf1, val), null);
600    memstore.add(new KeyValue(row, fam, qf2, val), null);
601    memstore.add(new KeyValue(row, fam, qf3, val), null);
602
603    // Creating a snapshot
604    MemStoreSnapshot snapshot = memstore.snapshot();
605    assertEquals(3, memstore.getSnapshot().getCellsCount());
606
607    // Adding value to "new" memstore
608    assertEquals(0, memstore.getActive().getCellsCount());
609    memstore.add(new KeyValue(row, fam, qf4, val), null);
610    memstore.add(new KeyValue(row, fam, qf5, val), null);
611    assertEquals(2, memstore.getActive().getCellsCount());
612    // close the scanners
613    for (KeyValueScanner scanner : snapshot.getScanners()) {
614      scanner.close();
615    }
616    memstore.clearSnapshot(snapshot.getId());
617
618    int chunkCount = chunkCreator.getPoolSize();
619    assertTrue(chunkCount > 0);
620  }
621
622  @Test
623  public void testFlatteningToCellChunkMap() throws IOException {
624    if (!toCellChunkMap) {
625      return;
626    }
627    // set memstore to flat into CellChunkMap
628    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
629    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
630      String.valueOf(compactionType));
631    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
632    ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
633    int numOfCells = 8;
634    String[] keys1 = { "A", "A", "B", "C", "D", "D", "E", "F" }; // A1, A2, B3, C4, D5, D6, E7, F8
635
636    // make one cell
637    byte[] row = Bytes.toBytes(keys1[0]);
638    byte[] val = Bytes.toBytes(keys1[0] + 0);
639    KeyValue kv = new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"),
640      EnvironmentEdgeManager.currentTime(), val);
641
642    // test 1 bucket
643    int totalCellsLen = addRowsByKeys(memstore, keys1);
644    long oneCellOnCSLMHeapSize = ClassSize.align(
645      ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + KeyValue.FIXED_OVERHEAD + kv.getSerializedSize());
646
647    long totalHeapSize = numOfCells * oneCellOnCSLMHeapSize + MutableSegment.DEEP_OVERHEAD;
648    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
649    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
650
651    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and flatten
652    assertEquals(0, memstore.getSnapshot().getCellsCount());
653    long oneCellOnCCMHeapSize =
654      ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(kv.getSerializedSize());
655    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
656      + numOfCells * oneCellOnCCMHeapSize;
657
658    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
659    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
660
661    MemStoreSize mss = memstore.getFlushableSize();
662    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
663    // simulate flusher
664    region.decrMemStoreSize(mss);
665    ImmutableSegment s = memstore.getSnapshot();
666    assertEquals(numOfCells, s.getCellsCount());
667    assertEquals(0, regionServicesForStores.getMemStoreSize());
668
669    memstore.clearSnapshot(snapshot.getId());
670  }
671
672  /**
673   * CellChunkMap Segment index requires all cell data to be written in the MSLAB Chunks. Even
674   * though MSLAB is enabled, cells bigger than maxAlloc (even if smaller than the size of a chunk)
675   * are not written in the MSLAB Chunks. If such cells are found in the process of flattening into
676   * CellChunkMap (in-memory-flush) they need to be copied into MSLAB.
677   * testFlatteningToBigCellChunkMap checks that the process of flattening into CellChunkMap
678   * succeeds, even when such big cells are allocated.
679   */
680  @Test
681  public void testFlatteningToBigCellChunkMap() throws IOException {
682
683    if (toCellChunkMap == false) {
684      return;
685    }
686    // set memstore to flat into CellChunkMap
687    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
688    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
689      String.valueOf(compactionType));
690    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
691    ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
692    int numOfCells = 4;
693    char[] chars = new char[MemStoreLAB.MAX_ALLOC_DEFAULT];
694    for (int i = 0; i < chars.length; i++) {
695      chars[i] = 'A';
696    }
697    String bigVal = new String(chars);
698    String[] keys1 = { "A", "B", "C", "D" };
699
700    // make one cell
701    byte[] row = Bytes.toBytes(keys1[0]);
702    byte[] val = Bytes.toBytes(bigVal);
703    KeyValue kv = new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"),
704      EnvironmentEdgeManager.currentTime(), val);
705
706    // test 1 bucket
707    int totalCellsLen = addRowsByKeys(memstore, keys1, val);
708
709    long oneCellOnCSLMHeapSize =
710      ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize());
711
712    long totalHeapSize = numOfCells * oneCellOnCSLMHeapSize + MutableSegment.DEEP_OVERHEAD;
713    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
714    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
715
716    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and flatten
717    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
718      Threads.sleep(10);
719    }
720    assertEquals(0, memstore.getSnapshot().getCellsCount());
721    // One cell is duplicated, but it shouldn't be compacted because we are in BASIC mode.
722    // totalCellsLen should remain the same
723    long oneCellOnCCMHeapSize =
724      ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(kv.getSerializedSize());
725    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
726      + numOfCells * oneCellOnCCMHeapSize;
727
728    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
729    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
730
731    MemStoreSize mss = memstore.getFlushableSize();
732    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
733    // simulate flusher
734    region.decrMemStoreSize(mss);
735    ImmutableSegment s = memstore.getSnapshot();
736    assertEquals(numOfCells, s.getCellsCount());
737    assertEquals(0, regionServicesForStores.getMemStoreSize());
738
739    memstore.clearSnapshot(snapshot.getId());
740  }
741
742  /**
743   * CellChunkMap Segment index requires all cell data to be written in the MSLAB Chunks. Even
744   * though MSLAB is enabled, cells bigger than the size of a chunk are not written in the MSLAB
745   * Chunks. If such cells are found in the process of flattening into CellChunkMap
746   * (in-memory-flush) they need to be copied into MSLAB. testFlatteningToJumboCellChunkMap checks
747   * that the process of flattening into CellChunkMap succeeds, even when such big cells are
748   * allocated.
749   */
750  @Test
751  public void testFlatteningToJumboCellChunkMap() throws IOException {
752
753    if (toCellChunkMap == false) {
754      return;
755    }
756    // set memstore to flat into CellChunkMap
757    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
758    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
759      String.valueOf(compactionType));
760    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
761    ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
762
763    int numOfCells = 1;
764    char[] chars = new char[MemStoreLAB.CHUNK_SIZE_DEFAULT];
765    for (int i = 0; i < chars.length; i++) {
766      chars[i] = 'A';
767    }
768    String bigVal = new String(chars);
769    String[] keys1 = { "A" };
770
771    // make one cell
772    byte[] row = Bytes.toBytes(keys1[0]);
773    byte[] val = Bytes.toBytes(bigVal);
774    KeyValue kv = new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"),
775      EnvironmentEdgeManager.currentTime(), val);
776
777    // test 1 bucket
778    int totalCellsLen = addRowsByKeys(memstore, keys1, val);
779
780    long oneCellOnCSLMHeapSize =
781      ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize());
782
783    long totalHeapSize = numOfCells * oneCellOnCSLMHeapSize + MutableSegment.DEEP_OVERHEAD;
784    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
785    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
786
787    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and flatten
788    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
789      Threads.sleep(10);
790    }
791    assertEquals(0, memstore.getSnapshot().getCellsCount());
792
793    // One cell is duplicated, but it shouldn't be compacted because we are in BASIC mode.
794    // totalCellsLen should remain the same
795    long oneCellOnCCMHeapSize =
796      (long) ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(kv.getSerializedSize());
797    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
798      + numOfCells * oneCellOnCCMHeapSize;
799
800    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
801
802    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
803
804    MemStoreSize mss = memstore.getFlushableSize();
805    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
806    // simulate flusher
807    region.decrMemStoreSize(mss);
808    ImmutableSegment s = memstore.getSnapshot();
809    assertEquals(numOfCells, s.getCellsCount());
810    assertEquals(0, regionServicesForStores.getMemStoreSize());
811
812    memstore.clearSnapshot(snapshot.getId());
813
814    // Allocating two big cells (too big for being copied into a regular chunk).
815    String[] keys2 = { "C", "D" };
816    addRowsByKeys(memstore, keys2, val);
817    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
818      Threads.sleep(10);
819    }
820
821    // The in-memory flush size is bigger than the size of a single cell,
822    // but smaller than the size of two cells.
823    // Therefore, the two created cells are flushed together as a single CSLMImmutableSegment and
824    // flattened.
825    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
826      + 2 * oneCellOnCCMHeapSize;
827    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
828  }
829
830  /**
831   * CellChunkMap Segment index requires all cell data to be written in the MSLAB Chunks. Even
832   * though MSLAB is enabled, cells bigger than the size of a chunk are not written in the MSLAB
833   * Chunks. If such cells are found in the process of a merge they need to be copied into MSLAB.
834   * testForceCopyOfBigCellIntoImmutableSegment checks that the ImmutableMemStoreLAB's
835   * forceCopyOfBigCellInto does what it's supposed to do.
836   */
837  @org.junit.Ignore
838  @Test // Flakey. Disabled by HBASE-24128. HBASE-24129 is for reenable.
839  // TestCompactingToCellFlatMapMemStore.testForceCopyOfBigCellIntoImmutableSegment:902 i=1
840  // expected:<8389924> but was:<8389992>
841  public void testForceCopyOfBigCellIntoImmutableSegment() throws IOException {
842
843    if (toCellChunkMap == false) {
844      return;
845    }
846
847    // set memstore to flat into CellChunkMap
848    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
849    memstore.getConfiguration().setInt(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY,
850      4);
851    memstore.getConfiguration().setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY,
852      0.014);
853    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
854      String.valueOf(compactionType));
855    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
856    ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
857
858    char[] chars = new char[MemStoreLAB.CHUNK_SIZE_DEFAULT];
859    for (int i = 0; i < chars.length; i++) {
860      chars[i] = 'A';
861    }
862    String bigVal = new String(chars);
863    byte[] val = Bytes.toBytes(bigVal);
864
865    // We need to add two cells, three times, in order to guarantee a merge
866    List<String[]> keysList = new ArrayList<>();
867    keysList.add(new String[] { "A", "B" });
868    keysList.add(new String[] { "C", "D" });
869    keysList.add(new String[] { "E", "F" });
870    keysList.add(new String[] { "G", "H" });
871
872    // Measuring the size of a single kv
873    KeyValue kv = new KeyValue(Bytes.toBytes("A"), Bytes.toBytes("testfamily"),
874      Bytes.toBytes("testqualifier"), EnvironmentEdgeManager.currentTime(), val);
875    long oneCellOnCCMHeapSize =
876      (long) ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(kv.getSerializedSize());
877    long oneCellOnCSLMHeapSize =
878      ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize());
879    long totalHeapSize = MutableSegment.DEEP_OVERHEAD;
880    for (int i = 0; i < keysList.size(); i++) {
881      addRowsByKeys(memstore, keysList.get(i), val);
882      while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
883        Threads.sleep(10);
884      }
885
886      if (i == 0) {
887        totalHeapSize += CellChunkImmutableSegment.DEEP_OVERHEAD_CCM + oneCellOnCCMHeapSize
888          + oneCellOnCSLMHeapSize;
889      } else {
890        // The in-memory flush size is bigger than the size of a single cell,
891        // but smaller than the size of two cells.
892        // Therefore, the two created cells are flattened in a seperate segment.
893        totalHeapSize += 2 * (CellChunkImmutableSegment.DEEP_OVERHEAD_CCM + oneCellOnCCMHeapSize);
894      }
895      if (i == 2) {
896        // Four out of the five segments are merged into one
897        totalHeapSize -= (4 * CellChunkImmutableSegment.DEEP_OVERHEAD_CCM);
898        totalHeapSize = ClassSize.align(totalHeapSize);
899      }
900      assertEquals("i=" + i, totalHeapSize, ((CompactingMemStore) memstore).heapSize());
901    }
902  }
903
904  /**
905   * Test big cell size after in memory compaction. (HBASE-26467)
906   */
907  @Test
908  public void testBigCellSizeAfterInMemoryCompaction() throws IOException {
909    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
910    memstore.getConfiguration().setInt(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY,
911      1);
912    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
913      String.valueOf(compactionType));
914    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
915
916    byte[] val = new byte[MemStoreLAB.CHUNK_SIZE_DEFAULT];
917
918    long size = addRowsByKeys(memstore, new String[] { "A" }, val);
919    ((MyCompactingMemStore) memstore).flushInMemory();
920
921    for (KeyValueScanner scanner : memstore.getScanners(Long.MAX_VALUE)) {
922      Cell cell;
923      while ((cell = scanner.next()) != null) {
924        assertEquals(size, cell.getSerializedSize());
925      }
926    }
927  }
928
929  private long addRowsByKeysDataSize(final AbstractMemStore hmc, String[] keys) {
930    byte[] fam = Bytes.toBytes("testfamily");
931    byte[] qf = Bytes.toBytes("testqualifier");
932    MemStoreSizing memstoreSizing = new NonThreadSafeMemStoreSizing();
933    for (int i = 0; i < keys.length; i++) {
934      long timestamp = EnvironmentEdgeManager.currentTime();
935      Threads.sleep(1); // to make sure each kv gets a different ts
936      byte[] row = Bytes.toBytes(keys[i]);
937      byte[] val = Bytes.toBytes(keys[i] + i);
938      KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
939      hmc.add(kv, memstoreSizing);
940      LOG.debug("added kv: " + kv.getKeyString() + ", timestamp" + kv.getTimestamp());
941    }
942    MemStoreSize mss = memstoreSizing.getMemStoreSize();
943    regionServicesForStores.addMemStoreSize(mss.getDataSize(), mss.getHeapSize(),
944      mss.getOffHeapSize(), mss.getCellsCount());
945    return mss.getDataSize();
946  }
947
948  private long cellBeforeFlushSize() {
949    // make one cell
950    byte[] row = Bytes.toBytes("A");
951    byte[] val = Bytes.toBytes("A" + 0);
952    KeyValue kv = new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"),
953      EnvironmentEdgeManager.currentTime(), val);
954    return ClassSize.align(
955      ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + KeyValue.FIXED_OVERHEAD + kv.getSerializedSize());
956  }
957
958  private long cellAfterFlushSize() {
959    // make one cell
960    byte[] row = Bytes.toBytes("A");
961    byte[] val = Bytes.toBytes("A" + 0);
962    KeyValue kv = new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"),
963      EnvironmentEdgeManager.currentTime(), val);
964
965    return toCellChunkMap
966      ? ClassSize.align(ClassSize.CELL_CHUNK_MAP_ENTRY + kv.getSerializedSize())
967      : ClassSize
968        .align(ClassSize.CELL_ARRAY_MAP_ENTRY + KeyValue.FIXED_OVERHEAD + kv.getSerializedSize());
969  }
970}