001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.List;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.Cell;
028import org.apache.hadoop.hbase.CellComparatorImpl;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseConfiguration;
031import org.apache.hadoop.hbase.KeyValue;
032import org.apache.hadoop.hbase.MemoryCompactionPolicy;
033import org.apache.hadoop.hbase.testclassification.LargeTests;
034import org.apache.hadoop.hbase.testclassification.RegionServerTests;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.hbase.util.ClassSize;
037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
038import org.apache.hadoop.hbase.util.Threads;
039import org.junit.ClassRule;
040import org.junit.Test;
041import org.junit.experimental.categories.Category;
042import org.junit.runner.RunWith;
043import org.junit.runners.Parameterized;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047/**
048 * compacted memstore test case
049 */
050@Category({ RegionServerTests.class, LargeTests.class })
051@RunWith(Parameterized.class)
052public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore {
053
054  @ClassRule
055  public static final HBaseClassTestRule CLASS_RULE =
056    HBaseClassTestRule.forClass(TestCompactingToCellFlatMapMemStore.class);
057
058  @Parameterized.Parameters
059  public static Object[] data() {
060    return new Object[] { "CHUNK_MAP", "ARRAY_MAP" }; // test different immutable indexes
061  }
062
063  private static final Logger LOG =
064    LoggerFactory.getLogger(TestCompactingToCellFlatMapMemStore.class);
065  public final boolean toCellChunkMap;
066  Configuration conf;
067
068  //////////////////////////////////////////////////////////////////////////////
069  // Helpers
070  //////////////////////////////////////////////////////////////////////////////
071  public TestCompactingToCellFlatMapMemStore(String type) {
072    if (type == "CHUNK_MAP") {
073      toCellChunkMap = true;
074    } else {
075      toCellChunkMap = false;
076    }
077  }
078
079  @Override
080  public void tearDown() throws Exception {
081    chunkCreator.clearChunksInPool();
082  }
083
084  @Override
085  public void setUp() throws Exception {
086
087    compactingSetUp();
088    this.conf = HBaseConfiguration.create();
089
090    // set memstore to do data compaction
091    conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
092      String.valueOf(MemoryCompactionPolicy.EAGER));
093    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.02);
094    this.memstore = new MyCompactingMemStore(conf, CellComparatorImpl.COMPARATOR, store,
095      regionServicesForStores, MemoryCompactionPolicy.EAGER);
096  }
097
098  //////////////////////////////////////////////////////////////////////////////
099  // Compaction tests
100  //////////////////////////////////////////////////////////////////////////////
101  @Override
102  public void testCompaction1Bucket() throws IOException {
103    int counter = 0;
104    String[] keys1 = { "A", "A", "B", "C" }; // A1, A2, B3, C4
105    if (toCellChunkMap) {
106      // set memstore to flat into CellChunkMap
107      ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
108    } else {
109      ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.ARRAY_MAP);
110    }
111
112    // test 1 bucket
113    long totalCellsLen = addRowsByKeysDataSize(memstore, keys1);
114    long cellBeforeFlushSize = cellBeforeFlushSize();
115    long cellAfterFlushSize = cellAfterFlushSize();
116    long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * cellBeforeFlushSize;
117
118    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
119    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
120
121    assertEquals(4, memstore.getActive().getCellsCount());
122    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
123    assertEquals(0, memstore.getSnapshot().getCellsCount());
124    // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting
125    // totalCellsLen
126    totalCellsLen = (totalCellsLen * 3) / 4;
127    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
128
129    totalHeapSize = 3 * cellAfterFlushSize + MutableSegment.DEEP_OVERHEAD
130      + (toCellChunkMap
131        ? CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
132        : CellArrayImmutableSegment.DEEP_OVERHEAD_CAM);
133    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
134    for (Segment s : memstore.getSegments()) {
135      counter += s.getCellsCount();
136    }
137    assertEquals(3, counter);
138    MemStoreSize mss = memstore.getFlushableSize();
139    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
140    region.decrMemStoreSize(mss); // simulate flusher
141    ImmutableSegment s = memstore.getSnapshot();
142    assertEquals(3, s.getCellsCount());
143    assertEquals(0, regionServicesForStores.getMemStoreSize());
144
145    memstore.clearSnapshot(snapshot.getId());
146  }
147
148  @Override
149  public void testCompaction2Buckets() throws IOException {
150    if (toCellChunkMap) {
151      // set memstore to flat into CellChunkMap
152      ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
153    } else {
154      ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.ARRAY_MAP);
155    }
156    String[] keys1 = { "A", "A", "B", "C" };
157    String[] keys2 = { "A", "B", "D" };
158
159    long totalCellsLen1 = addRowsByKeysDataSize(memstore, keys1); // INSERT 4
160    long cellBeforeFlushSize = cellBeforeFlushSize();
161    long cellAfterFlushSize = cellAfterFlushSize();
162    long totalHeapSize1 = MutableSegment.DEEP_OVERHEAD + 4 * cellBeforeFlushSize;
163    assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize());
164    assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
165
166    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
167    int counter = 0; // COMPACT 4->3
168    for (Segment s : memstore.getSegments()) {
169      counter += s.getCellsCount();
170    }
171    assertEquals(3, counter);
172    assertEquals(0, memstore.getSnapshot().getCellsCount());
173    // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting
174    // totalCellsLen
175    totalCellsLen1 = (totalCellsLen1 * 3) / 4;
176    totalHeapSize1 = 3 * cellAfterFlushSize + MutableSegment.DEEP_OVERHEAD
177      + (toCellChunkMap
178        ? CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
179        : CellArrayImmutableSegment.DEEP_OVERHEAD_CAM);
180    assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize());
181    assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
182
183    long totalCellsLen2 = addRowsByKeysDataSize(memstore, keys2); // INSERT 3 (3+3=6)
184    long totalHeapSize2 = 3 * cellBeforeFlushSize;
185    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
186    assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
187
188    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
189    assertEquals(0, memstore.getSnapshot().getCellsCount());// COMPACT 6->4
190    counter = 0;
191    for (Segment s : memstore.getSegments()) {
192      counter += s.getCellsCount();
193    }
194    assertEquals(4, counter);
195    totalCellsLen2 = totalCellsLen2 / 3;// 2 cells duplicated in set 2
196    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
197    totalHeapSize2 = 1 * cellAfterFlushSize;
198    assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
199
200    MemStoreSize mss = memstore.getFlushableSize();
201    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
202    // simulate flusher
203    region.decrMemStoreSize(mss);
204    ImmutableSegment s = memstore.getSnapshot();
205    assertEquals(4, s.getCellsCount());
206    assertEquals(0, regionServicesForStores.getMemStoreSize());
207
208    memstore.clearSnapshot(snapshot.getId());
209  }
210
211  @Override
212  public void testCompaction3Buckets() throws IOException {
213    if (toCellChunkMap) {
214      // set memstore to flat into CellChunkMap
215      ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
216    } else {
217      // set to CellArrayMap as CCM is configured by default due to MSLAB usage
218      ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.ARRAY_MAP);
219    }
220    String[] keys1 = { "A", "A", "B", "C" };
221    String[] keys2 = { "A", "B", "D" };
222    String[] keys3 = { "D", "B", "B" };
223
224    long totalCellsLen1 = addRowsByKeysDataSize(memstore, keys1);
225    long cellBeforeFlushSize = cellBeforeFlushSize();
226    long cellAfterFlushSize = cellAfterFlushSize();
227    long totalHeapSize1 = MutableSegment.DEEP_OVERHEAD + 4 * cellBeforeFlushSize;
228    assertEquals(totalCellsLen1, region.getMemStoreDataSize());
229    assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
230
231    MemStoreSize mss = memstore.getFlushableSize();
232    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
233
234    assertEquals(0, memstore.getSnapshot().getCellsCount());
235    // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting
236    // totalCellsLen
237    totalCellsLen1 = (totalCellsLen1 * 3) / 4;
238    totalHeapSize1 = 3 * cellAfterFlushSize + MutableSegment.DEEP_OVERHEAD
239      + (toCellChunkMap
240        ? CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
241        : CellArrayImmutableSegment.DEEP_OVERHEAD_CAM);
242    assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize());
243    assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
244
245    long totalCellsLen2 = addRowsByKeysDataSize(memstore, keys2);
246    long totalHeapSize2 = 3 * cellBeforeFlushSize;
247
248    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
249    assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
250
251    ((MyCompactingMemStore) memstore).disableCompaction();
252    mss = memstore.getFlushableSize();
253    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction
254    totalHeapSize2 = totalHeapSize2 + CSLMImmutableSegment.DEEP_OVERHEAD_CSLM;
255    assertEquals(0, memstore.getSnapshot().getCellsCount());
256    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
257    assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
258
259    long totalCellsLen3 = addRowsByKeysDataSize(memstore, keys3);
260    long totalHeapSize3 = 3 * cellBeforeFlushSize;
261    assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
262      regionServicesForStores.getMemStoreSize());
263    assertEquals(totalHeapSize1 + totalHeapSize2 + totalHeapSize3,
264      ((CompactingMemStore) memstore).heapSize());
265
266    ((MyCompactingMemStore) memstore).enableCompaction();
267    mss = memstore.getFlushableSize();
268    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
269    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
270      Threads.sleep(10);
271    }
272    assertEquals(0, memstore.getSnapshot().getCellsCount());
273    // active flushed to pipeline and all 3 segments compacted. Will get rid of duplicated cells.
274    // Out of total 10, only 4 cells are unique
275    totalCellsLen2 = totalCellsLen2 / 3;// 2 out of 3 cells are duplicated
276    totalCellsLen3 = 0;// All duplicated cells.
277    assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
278      regionServicesForStores.getMemStoreSize());
279    // Only 4 unique cells left
280    long totalHeapSize4 = 4 * cellAfterFlushSize + MutableSegment.DEEP_OVERHEAD
281      + (toCellChunkMap
282        ? CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
283        : CellArrayImmutableSegment.DEEP_OVERHEAD_CAM);
284    assertEquals(totalHeapSize4, ((CompactingMemStore) memstore).heapSize());
285
286    mss = memstore.getFlushableSize();
287    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
288    // simulate flusher
289    region.decrMemStoreSize(mss);
290    ImmutableSegment s = memstore.getSnapshot();
291    assertEquals(4, s.getCellsCount());
292    assertEquals(0, regionServicesForStores.getMemStoreSize());
293
294    memstore.clearSnapshot(snapshot.getId());
295  }
296
297  //////////////////////////////////////////////////////////////////////////////
298  // Merging tests
299  //////////////////////////////////////////////////////////////////////////////
300  @Test
301  public void testMerging() throws IOException {
302    if (toCellChunkMap) {
303      // set memstore to flat into CellChunkMap
304      ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
305    }
306    String[] keys1 = { "A", "A", "B", "C", "F", "H" };
307    String[] keys2 = { "A", "B", "D", "G", "I", "J" };
308    String[] keys3 = { "D", "B", "B", "E" };
309
310    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
311    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
312      String.valueOf(compactionType));
313    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
314    addRowsByKeysDataSize(memstore, keys1);
315
316    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline should not compact
317
318    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
319      Threads.sleep(10);
320    }
321    assertEquals(0, memstore.getSnapshot().getCellsCount());
322
323    addRowsByKeysDataSize(memstore, keys2); // also should only flatten
324
325    int counter2 = 0;
326    for (Segment s : memstore.getSegments()) {
327      counter2 += s.getCellsCount();
328    }
329    assertEquals(12, counter2);
330
331    ((MyCompactingMemStore) memstore).disableCompaction();
332
333    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without flattening
334    assertEquals(0, memstore.getSnapshot().getCellsCount());
335
336    int counter3 = 0;
337    for (Segment s : memstore.getSegments()) {
338      counter3 += s.getCellsCount();
339    }
340    assertEquals(12, counter3);
341
342    addRowsByKeysDataSize(memstore, keys3);
343
344    int counter4 = 0;
345    for (Segment s : memstore.getSegments()) {
346      counter4 += s.getCellsCount();
347    }
348    assertEquals(16, counter4);
349
350    ((MyCompactingMemStore) memstore).enableCompaction();
351
352    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
353    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
354      Threads.sleep(10);
355    }
356    assertEquals(0, memstore.getSnapshot().getCellsCount());
357
358    int counter = 0;
359    for (Segment s : memstore.getSegments()) {
360      counter += s.getCellsCount();
361    }
362    assertEquals(16, counter);
363
364    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
365    ImmutableSegment s = memstore.getSnapshot();
366    memstore.clearSnapshot(snapshot.getId());
367  }
368
369  @Test
370  public void testTimeRangeAfterCompaction() throws IOException {
371    if (toCellChunkMap) {
372      // set memstore to flat into CellChunkMap
373      ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
374    }
375    testTimeRange(true);
376  }
377
378  @Test
379  public void testTimeRangeAfterMerge() throws IOException {
380    if (toCellChunkMap) {
381      // set memstore to flat into CellChunkMap
382      ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
383    }
384    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
385    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
386      String.valueOf(compactionType));
387    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
388    testTimeRange(false);
389  }
390
391  private void testTimeRange(boolean isCompaction) throws IOException {
392    final long initTs = 100;
393    long currentTs = initTs;
394    byte[] row = Bytes.toBytes("row");
395    byte[] family = Bytes.toBytes("family");
396    byte[] qf1 = Bytes.toBytes("qf1");
397
398    // first segment in pipeline
399    this.memstore.add(new KeyValue(row, family, qf1, ++currentTs, (byte[]) null), null);
400    long minTs = currentTs;
401    this.memstore.add(new KeyValue(row, family, qf1, ++currentTs, (byte[]) null), null);
402
403    long numberOfCell = 2;
404    assertEquals(numberOfCell,
405      memstore.getSegments().stream().mapToInt(Segment::getCellsCount).sum());
406    assertEquals(minTs, memstore.getSegments().stream()
407      .mapToLong(m -> m.getTimeRangeTracker().getMin()).min().getAsLong());
408    assertEquals(currentTs, memstore.getSegments().stream()
409      .mapToLong(m -> m.getTimeRangeTracker().getMax()).max().getAsLong());
410
411    ((CompactingMemStore) memstore).flushInMemory();
412
413    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
414      Threads.sleep(10);
415    }
416    if (isCompaction) {
417      // max version = 1, so one cell will be dropped.
418      numberOfCell = 1;
419      minTs = currentTs;
420    }
421    // second segment in pipeline
422    this.memstore.add(new KeyValue(row, family, qf1, ++currentTs, (byte[]) null), null);
423    this.memstore.add(new KeyValue(row, family, qf1, ++currentTs, (byte[]) null), null);
424    numberOfCell += 2;
425    assertEquals(numberOfCell,
426      memstore.getSegments().stream().mapToInt(Segment::getCellsCount).sum());
427    assertEquals(minTs, memstore.getSegments().stream()
428      .mapToLong(m -> m.getTimeRangeTracker().getMin()).min().getAsLong());
429    assertEquals(currentTs, memstore.getSegments().stream()
430      .mapToLong(m -> m.getTimeRangeTracker().getMax()).max().getAsLong());
431
432    ((CompactingMemStore) memstore).flushInMemory(); // trigger the merge
433
434    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
435      Threads.sleep(10);
436    }
437    if (isCompaction) {
438      // max version = 1, so one cell will be dropped.
439      numberOfCell = 1;
440      minTs = currentTs;
441    }
442
443    assertEquals(numberOfCell,
444      memstore.getSegments().stream().mapToInt(Segment::getCellsCount).sum());
445    assertEquals(minTs, memstore.getSegments().stream()
446      .mapToLong(m -> m.getTimeRangeTracker().getMin()).min().getAsLong());
447    assertEquals(currentTs, memstore.getSegments().stream()
448      .mapToLong(m -> m.getTimeRangeTracker().getMax()).max().getAsLong());
449  }
450
451  @Test
452  public void testCountOfCellsAfterFlatteningByScan() throws IOException {
453    String[] keys1 = { "A", "B", "C" }; // A, B, C
454    addRowsByKeysWith50Cols(memstore, keys1);
455    // this should only flatten as there are no duplicates
456    ((CompactingMemStore) memstore).flushInMemory();
457    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
458      Threads.sleep(10);
459    }
460    List<KeyValueScanner> scanners = memstore.getScanners(Long.MAX_VALUE);
461    // seek
462    int count = 0;
463    for (int i = 0; i < scanners.size(); i++) {
464      scanners.get(i).seek(KeyValue.LOWESTKEY);
465      while (scanners.get(i).next() != null) {
466        count++;
467      }
468    }
469    assertEquals("the count should be ", 150, count);
470    for (int i = 0; i < scanners.size(); i++) {
471      scanners.get(i).close();
472    }
473  }
474
475  @Test
476  public void testCountOfCellsAfterFlatteningByIterator() throws IOException {
477    String[] keys1 = { "A", "B", "C" }; // A, B, C
478    addRowsByKeysWith50Cols(memstore, keys1);
479    // this should only flatten as there are no duplicates
480    ((CompactingMemStore) memstore).flushInMemory();
481    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
482      Threads.sleep(10);
483    }
484    // Just doing the cnt operation here
485    MemStoreSegmentsIterator itr = new MemStoreMergerSegmentsIterator(
486      ((CompactingMemStore) memstore).getImmutableSegments().getStoreSegments(),
487      CellComparatorImpl.COMPARATOR, 10);
488    int cnt = 0;
489    try {
490      while (itr.next() != null) {
491        cnt++;
492      }
493    } finally {
494      itr.close();
495    }
496    assertEquals("the count should be ", 150, cnt);
497  }
498
499  private void addRowsByKeysWith50Cols(AbstractMemStore hmc, String[] keys) {
500    byte[] fam = Bytes.toBytes("testfamily");
501    for (int i = 0; i < keys.length; i++) {
502      long timestamp = EnvironmentEdgeManager.currentTime();
503      Threads.sleep(1); // to make sure each kv gets a different ts
504      byte[] row = Bytes.toBytes(keys[i]);
505      for (int j = 0; j < 50; j++) {
506        byte[] qf = Bytes.toBytes("testqualifier" + j);
507        byte[] val = Bytes.toBytes(keys[i] + j);
508        KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
509        hmc.add(kv, null);
510      }
511    }
512  }
513
514  @Override
515  @Test
516  public void testPuttingBackChunksWithOpeningScanner() throws IOException {
517    byte[] row = Bytes.toBytes("testrow");
518    byte[] fam = Bytes.toBytes("testfamily");
519    byte[] qf1 = Bytes.toBytes("testqualifier1");
520    byte[] qf2 = Bytes.toBytes("testqualifier2");
521    byte[] qf3 = Bytes.toBytes("testqualifier3");
522    byte[] qf4 = Bytes.toBytes("testqualifier4");
523    byte[] qf5 = Bytes.toBytes("testqualifier5");
524    byte[] qf6 = Bytes.toBytes("testqualifier6");
525    byte[] qf7 = Bytes.toBytes("testqualifier7");
526    byte[] val = Bytes.toBytes("testval");
527
528    // Setting up memstore
529    memstore.add(new KeyValue(row, fam, qf1, val), null);
530    memstore.add(new KeyValue(row, fam, qf2, val), null);
531    memstore.add(new KeyValue(row, fam, qf3, val), null);
532
533    // Creating a snapshot
534    MemStoreSnapshot snapshot = memstore.snapshot();
535    assertEquals(3, memstore.getSnapshot().getCellsCount());
536
537    // Adding value to "new" memstore
538    assertEquals(0, memstore.getActive().getCellsCount());
539    memstore.add(new KeyValue(row, fam, qf4, val), null);
540    memstore.add(new KeyValue(row, fam, qf5, val), null);
541    assertEquals(2, memstore.getActive().getCellsCount());
542
543    // opening scanner before clear the snapshot
544    List<KeyValueScanner> scanners = memstore.getScanners(0);
545    // Shouldn't putting back the chunks to pool,since some scanners are opening
546    // based on their data
547    // close the scanners
548    for (KeyValueScanner scanner : snapshot.getScanners()) {
549      scanner.close();
550    }
551    memstore.clearSnapshot(snapshot.getId());
552
553    assertTrue(chunkCreator.getPoolSize() == 0);
554
555    // Chunks will be put back to pool after close scanners;
556    for (KeyValueScanner scanner : scanners) {
557      scanner.close();
558    }
559    assertTrue(chunkCreator.getPoolSize() > 0);
560
561    // clear chunks
562    chunkCreator.clearChunksInPool();
563
564    // Creating another snapshot
565
566    snapshot = memstore.snapshot();
567    // Adding more value
568    memstore.add(new KeyValue(row, fam, qf6, val), null);
569    memstore.add(new KeyValue(row, fam, qf7, val), null);
570    // opening scanners
571    scanners = memstore.getScanners(0);
572    // close scanners before clear the snapshot
573    for (KeyValueScanner scanner : scanners) {
574      scanner.close();
575    }
576    // Since no opening scanner, the chunks of snapshot should be put back to
577    // pool
578    // close the scanners
579    for (KeyValueScanner scanner : snapshot.getScanners()) {
580      scanner.close();
581    }
582    memstore.clearSnapshot(snapshot.getId());
583    assertTrue(chunkCreator.getPoolSize() > 0);
584  }
585
586  @Override
587  @Test
588  public void testPuttingBackChunksAfterFlushing() throws IOException {
589    byte[] row = Bytes.toBytes("testrow");
590    byte[] fam = Bytes.toBytes("testfamily");
591    byte[] qf1 = Bytes.toBytes("testqualifier1");
592    byte[] qf2 = Bytes.toBytes("testqualifier2");
593    byte[] qf3 = Bytes.toBytes("testqualifier3");
594    byte[] qf4 = Bytes.toBytes("testqualifier4");
595    byte[] qf5 = Bytes.toBytes("testqualifier5");
596    byte[] val = Bytes.toBytes("testval");
597
598    // Setting up memstore
599    memstore.add(new KeyValue(row, fam, qf1, val), null);
600    memstore.add(new KeyValue(row, fam, qf2, val), null);
601    memstore.add(new KeyValue(row, fam, qf3, val), null);
602
603    // Creating a snapshot
604    MemStoreSnapshot snapshot = memstore.snapshot();
605    assertEquals(3, memstore.getSnapshot().getCellsCount());
606
607    // Adding value to "new" memstore
608    assertEquals(0, memstore.getActive().getCellsCount());
609    memstore.add(new KeyValue(row, fam, qf4, val), null);
610    memstore.add(new KeyValue(row, fam, qf5, val), null);
611    assertEquals(2, memstore.getActive().getCellsCount());
612    // close the scanners
613    for (KeyValueScanner scanner : snapshot.getScanners()) {
614      scanner.close();
615    }
616    memstore.clearSnapshot(snapshot.getId());
617
618    int chunkCount = chunkCreator.getPoolSize();
619    assertTrue(chunkCount > 0);
620  }
621
622  @Test
623  public void testFlatteningToCellChunkMap() throws IOException {
624
625    // set memstore to flat into CellChunkMap
626    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
627    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
628      String.valueOf(compactionType));
629    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
630    ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
631    int numOfCells = 8;
632    String[] keys1 = { "A", "A", "B", "C", "D", "D", "E", "F" }; // A1, A2, B3, C4, D5, D6, E7, F8
633
634    // make one cell
635    byte[] row = Bytes.toBytes(keys1[0]);
636    byte[] val = Bytes.toBytes(keys1[0] + 0);
637    KeyValue kv = new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"),
638      EnvironmentEdgeManager.currentTime(), val);
639
640    // test 1 bucket
641    int totalCellsLen = addRowsByKeys(memstore, keys1);
642    long oneCellOnCSLMHeapSize = ClassSize.align(
643      ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + KeyValue.FIXED_OVERHEAD + kv.getSerializedSize());
644
645    long totalHeapSize = numOfCells * oneCellOnCSLMHeapSize + MutableSegment.DEEP_OVERHEAD;
646    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
647    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
648
649    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and flatten
650    assertEquals(0, memstore.getSnapshot().getCellsCount());
651    long oneCellOnCCMHeapSize =
652      ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(kv.getSerializedSize());
653    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
654      + numOfCells * oneCellOnCCMHeapSize;
655
656    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
657    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
658
659    MemStoreSize mss = memstore.getFlushableSize();
660    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
661    // simulate flusher
662    region.decrMemStoreSize(mss);
663    ImmutableSegment s = memstore.getSnapshot();
664    assertEquals(numOfCells, s.getCellsCount());
665    assertEquals(0, regionServicesForStores.getMemStoreSize());
666
667    memstore.clearSnapshot(snapshot.getId());
668  }
669
670  /**
671   * CellChunkMap Segment index requires all cell data to be written in the MSLAB Chunks. Even
672   * though MSLAB is enabled, cells bigger than maxAlloc (even if smaller than the size of a chunk)
673   * are not written in the MSLAB Chunks. If such cells are found in the process of flattening into
674   * CellChunkMap (in-memory-flush) they need to be copied into MSLAB.
675   * testFlatteningToBigCellChunkMap checks that the process of flattening into CellChunkMap
676   * succeeds, even when such big cells are allocated.
677   */
678  @Test
679  public void testFlatteningToBigCellChunkMap() throws IOException {
680
681    if (toCellChunkMap == false) {
682      return;
683    }
684    // set memstore to flat into CellChunkMap
685    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
686    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
687      String.valueOf(compactionType));
688    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
689    ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
690    int numOfCells = 4;
691    char[] chars = new char[MemStoreLAB.MAX_ALLOC_DEFAULT];
692    for (int i = 0; i < chars.length; i++) {
693      chars[i] = 'A';
694    }
695    String bigVal = new String(chars);
696    String[] keys1 = { "A", "B", "C", "D" };
697
698    // make one cell
699    byte[] row = Bytes.toBytes(keys1[0]);
700    byte[] val = Bytes.toBytes(bigVal);
701    KeyValue kv = new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"),
702      EnvironmentEdgeManager.currentTime(), val);
703
704    // test 1 bucket
705    int totalCellsLen = addRowsByKeys(memstore, keys1, val);
706
707    long oneCellOnCSLMHeapSize =
708      ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize());
709
710    long totalHeapSize = numOfCells * oneCellOnCSLMHeapSize + MutableSegment.DEEP_OVERHEAD;
711    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
712    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
713
714    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and flatten
715    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
716      Threads.sleep(10);
717    }
718    assertEquals(0, memstore.getSnapshot().getCellsCount());
719    // One cell is duplicated, but it shouldn't be compacted because we are in BASIC mode.
720    // totalCellsLen should remain the same
721    long oneCellOnCCMHeapSize =
722      ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(kv.getSerializedSize());
723    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
724      + numOfCells * oneCellOnCCMHeapSize;
725
726    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
727    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
728
729    MemStoreSize mss = memstore.getFlushableSize();
730    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
731    // simulate flusher
732    region.decrMemStoreSize(mss);
733    ImmutableSegment s = memstore.getSnapshot();
734    assertEquals(numOfCells, s.getCellsCount());
735    assertEquals(0, regionServicesForStores.getMemStoreSize());
736
737    memstore.clearSnapshot(snapshot.getId());
738  }
739
740  /**
741   * CellChunkMap Segment index requires all cell data to be written in the MSLAB Chunks. Even
742   * though MSLAB is enabled, cells bigger than the size of a chunk are not written in the MSLAB
743   * Chunks. If such cells are found in the process of flattening into CellChunkMap
744   * (in-memory-flush) they need to be copied into MSLAB. testFlatteningToJumboCellChunkMap checks
745   * that the process of flattening into CellChunkMap succeeds, even when such big cells are
746   * allocated.
747   */
748  @Test
749  public void testFlatteningToJumboCellChunkMap() throws IOException {
750
751    if (toCellChunkMap == false) {
752      return;
753    }
754    // set memstore to flat into CellChunkMap
755    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
756    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
757      String.valueOf(compactionType));
758    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
759    ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
760
761    int numOfCells = 1;
762    char[] chars = new char[MemStoreLAB.CHUNK_SIZE_DEFAULT];
763    for (int i = 0; i < chars.length; i++) {
764      chars[i] = 'A';
765    }
766    String bigVal = new String(chars);
767    String[] keys1 = { "A" };
768
769    // make one cell
770    byte[] row = Bytes.toBytes(keys1[0]);
771    byte[] val = Bytes.toBytes(bigVal);
772    KeyValue kv = new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"),
773      EnvironmentEdgeManager.currentTime(), val);
774
775    // test 1 bucket
776    int totalCellsLen = addRowsByKeys(memstore, keys1, val);
777
778    long oneCellOnCSLMHeapSize =
779      ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize());
780
781    long totalHeapSize = numOfCells * oneCellOnCSLMHeapSize + MutableSegment.DEEP_OVERHEAD;
782    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
783    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
784
785    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and flatten
786    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
787      Threads.sleep(10);
788    }
789    assertEquals(0, memstore.getSnapshot().getCellsCount());
790
791    // One cell is duplicated, but it shouldn't be compacted because we are in BASIC mode.
792    // totalCellsLen should remain the same
793    long oneCellOnCCMHeapSize =
794      (long) ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(kv.getSerializedSize());
795    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
796      + numOfCells * oneCellOnCCMHeapSize;
797
798    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
799
800    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
801
802    MemStoreSize mss = memstore.getFlushableSize();
803    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
804    // simulate flusher
805    region.decrMemStoreSize(mss);
806    ImmutableSegment s = memstore.getSnapshot();
807    assertEquals(numOfCells, s.getCellsCount());
808    assertEquals(0, regionServicesForStores.getMemStoreSize());
809
810    memstore.clearSnapshot(snapshot.getId());
811
812    // Allocating two big cells (too big for being copied into a regular chunk).
813    String[] keys2 = { "C", "D" };
814    addRowsByKeys(memstore, keys2, val);
815    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
816      Threads.sleep(10);
817    }
818
819    // The in-memory flush size is bigger than the size of a single cell,
820    // but smaller than the size of two cells.
821    // Therefore, the two created cells are flushed together as a single CSLMImmutableSegment and
822    // flattened.
823    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
824      + 2 * oneCellOnCCMHeapSize;
825    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
826  }
827
828  /**
829   * CellChunkMap Segment index requires all cell data to be written in the MSLAB Chunks. Even
830   * though MSLAB is enabled, cells bigger than the size of a chunk are not written in the MSLAB
831   * Chunks. If such cells are found in the process of a merge they need to be copied into MSLAB.
832   * testForceCopyOfBigCellIntoImmutableSegment checks that the ImmutableMemStoreLAB's
833   * forceCopyOfBigCellInto does what it's supposed to do.
834   */
835  @org.junit.Ignore
836  @Test // Flakey. Disabled by HBASE-24128. HBASE-24129 is for reenable.
837  // TestCompactingToCellFlatMapMemStore.testForceCopyOfBigCellIntoImmutableSegment:902 i=1
838  // expected:<8389924> but was:<8389992>
839  public void testForceCopyOfBigCellIntoImmutableSegment() throws IOException {
840
841    if (toCellChunkMap == false) {
842      return;
843    }
844
845    // set memstore to flat into CellChunkMap
846    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
847    memstore.getConfiguration().setInt(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY,
848      4);
849    memstore.getConfiguration().setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY,
850      0.014);
851    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
852      String.valueOf(compactionType));
853    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
854    ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
855
856    char[] chars = new char[MemStoreLAB.CHUNK_SIZE_DEFAULT];
857    for (int i = 0; i < chars.length; i++) {
858      chars[i] = 'A';
859    }
860    String bigVal = new String(chars);
861    byte[] val = Bytes.toBytes(bigVal);
862
863    // We need to add two cells, three times, in order to guarantee a merge
864    List<String[]> keysList = new ArrayList<>();
865    keysList.add(new String[] { "A", "B" });
866    keysList.add(new String[] { "C", "D" });
867    keysList.add(new String[] { "E", "F" });
868    keysList.add(new String[] { "G", "H" });
869
870    // Measuring the size of a single kv
871    KeyValue kv = new KeyValue(Bytes.toBytes("A"), Bytes.toBytes("testfamily"),
872      Bytes.toBytes("testqualifier"), EnvironmentEdgeManager.currentTime(), val);
873    long oneCellOnCCMHeapSize =
874      (long) ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(kv.getSerializedSize());
875    long oneCellOnCSLMHeapSize =
876      ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize());
877    long totalHeapSize = MutableSegment.DEEP_OVERHEAD;
878    for (int i = 0; i < keysList.size(); i++) {
879      addRowsByKeys(memstore, keysList.get(i), val);
880      while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
881        Threads.sleep(10);
882      }
883
884      if (i == 0) {
885        totalHeapSize += CellChunkImmutableSegment.DEEP_OVERHEAD_CCM + oneCellOnCCMHeapSize
886          + oneCellOnCSLMHeapSize;
887      } else {
888        // The in-memory flush size is bigger than the size of a single cell,
889        // but smaller than the size of two cells.
890        // Therefore, the two created cells are flattened in a seperate segment.
891        totalHeapSize += 2 * (CellChunkImmutableSegment.DEEP_OVERHEAD_CCM + oneCellOnCCMHeapSize);
892      }
893      if (i == 2) {
894        // Four out of the five segments are merged into one
895        totalHeapSize -= (4 * CellChunkImmutableSegment.DEEP_OVERHEAD_CCM);
896        totalHeapSize = ClassSize.align(totalHeapSize);
897      }
898      assertEquals("i=" + i, totalHeapSize, ((CompactingMemStore) memstore).heapSize());
899    }
900  }
901
902  /**
903   * Test big cell size after in memory compaction. (HBASE-26467)
904   */
905  @Test
906  public void testBigCellSizeAfterInMemoryCompaction() throws IOException {
907    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
908    memstore.getConfiguration().setInt(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY,
909      1);
910    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
911      String.valueOf(compactionType));
912    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
913
914    byte[] val = new byte[MemStoreLAB.CHUNK_SIZE_DEFAULT];
915
916    long size = addRowsByKeys(memstore, new String[] { "A" }, val);
917    ((MyCompactingMemStore) memstore).flushInMemory();
918
919    for (KeyValueScanner scanner : memstore.getScanners(Long.MAX_VALUE)) {
920      Cell cell;
921      while ((cell = scanner.next()) != null) {
922        assertEquals(size, cell.getSerializedSize());
923      }
924    }
925  }
926
927  private long addRowsByKeysDataSize(final AbstractMemStore hmc, String[] keys) {
928    byte[] fam = Bytes.toBytes("testfamily");
929    byte[] qf = Bytes.toBytes("testqualifier");
930    MemStoreSizing memstoreSizing = new NonThreadSafeMemStoreSizing();
931    for (int i = 0; i < keys.length; i++) {
932      long timestamp = EnvironmentEdgeManager.currentTime();
933      Threads.sleep(1); // to make sure each kv gets a different ts
934      byte[] row = Bytes.toBytes(keys[i]);
935      byte[] val = Bytes.toBytes(keys[i] + i);
936      KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
937      hmc.add(kv, memstoreSizing);
938      LOG.debug("added kv: " + kv.getKeyString() + ", timestamp" + kv.getTimestamp());
939    }
940    MemStoreSize mss = memstoreSizing.getMemStoreSize();
941    regionServicesForStores.addMemStoreSize(mss.getDataSize(), mss.getHeapSize(),
942      mss.getOffHeapSize(), mss.getCellsCount());
943    return mss.getDataSize();
944  }
945
946  private long cellBeforeFlushSize() {
947    // make one cell
948    byte[] row = Bytes.toBytes("A");
949    byte[] val = Bytes.toBytes("A" + 0);
950    KeyValue kv = new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"),
951      EnvironmentEdgeManager.currentTime(), val);
952    return ClassSize.align(
953      ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + KeyValue.FIXED_OVERHEAD + kv.getSerializedSize());
954  }
955
956  private long cellAfterFlushSize() {
957    // make one cell
958    byte[] row = Bytes.toBytes("A");
959    byte[] val = Bytes.toBytes("A" + 0);
960    KeyValue kv = new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"),
961      EnvironmentEdgeManager.currentTime(), val);
962
963    return toCellChunkMap
964      ? ClassSize.align(ClassSize.CELL_CHUNK_MAP_ENTRY + kv.getSerializedSize())
965      : ClassSize
966        .align(ClassSize.CELL_ARRAY_MAP_ENTRY + KeyValue.FIXED_OVERHEAD + kv.getSerializedSize());
967  }
968}