001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertTrue;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.List;
026import java.util.stream.Stream;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.CellComparatorImpl;
030import org.apache.hadoop.hbase.HBaseConfiguration;
031import org.apache.hadoop.hbase.KeyValue;
032import org.apache.hadoop.hbase.MemoryCompactionPolicy;
033import org.apache.hadoop.hbase.testclassification.MediumTests;
034import org.apache.hadoop.hbase.testclassification.RegionServerTests;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.hbase.util.ClassSize;
037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
038import org.apache.hadoop.hbase.util.Threads;
039import org.junit.jupiter.api.Disabled;
040import org.junit.jupiter.api.Tag;
041import org.junit.jupiter.api.Test;
042import org.junit.jupiter.params.ParameterizedClass;
043import org.junit.jupiter.params.provider.Arguments;
044import org.junit.jupiter.params.provider.MethodSource;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048/**
049 * compacted memstore test case
050 */
051@Tag(RegionServerTests.TAG)
052@Tag(MediumTests.TAG)
053@ParameterizedClass(name = "{index}: type={0}")
054@MethodSource("parameters")
055public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore {
056
057  private static final Logger LOG =
058    LoggerFactory.getLogger(TestCompactingToCellFlatMapMemStore.class);
059
060  public static Stream<Arguments> parameters() {
061    // test different immutable indexes
062    return Stream.of(Arguments.of("CHUNK_MAP"), Arguments.of("ARRAY_MAP"));
063  }
064
065  public final boolean toCellChunkMap;
066  private final String type;
067  Configuration conf;
068
069  //////////////////////////////////////////////////////////////////////////////
070  // Helpers
071  //////////////////////////////////////////////////////////////////////////////
072  public TestCompactingToCellFlatMapMemStore(String type) {
073    this.type = type;
074    toCellChunkMap = "CHUNK_MAP".equals(type);
075  }
076
077  @Override
078  protected String getParameterizedTestNameSuffix() {
079    return "_" + type;
080  }
081
082  @Override
083  protected void createMemStore() throws IOException {
084    this.conf = HBaseConfiguration.create();
085
086    // set memstore to do data compaction
087    conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
088      String.valueOf(MemoryCompactionPolicy.EAGER));
089    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.02);
090    this.memstore = new MyCompactingMemStore(conf, CellComparatorImpl.COMPARATOR, store,
091      regionServicesForStores, MemoryCompactionPolicy.EAGER);
092  }
093
094  //////////////////////////////////////////////////////////////////////////////
095  // Compaction tests
096  //////////////////////////////////////////////////////////////////////////////
097  @Override
098  @Test
099  public void testCompaction1Bucket() throws IOException {
100    int counter = 0;
101    String[] keys1 = { "A", "A", "B", "C" }; // A1, A2, B3, C4
102    if (toCellChunkMap) {
103      // set memstore to flat into CellChunkMap
104      ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
105    } else {
106      ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.ARRAY_MAP);
107    }
108
109    // test 1 bucket
110    long totalCellsLen = addRowsByKeysDataSize(memstore, keys1);
111    long cellBeforeFlushSize = cellBeforeFlushSize();
112    long cellAfterFlushSize = cellAfterFlushSize();
113    long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * cellBeforeFlushSize;
114
115    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
116    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
117
118    assertEquals(4, memstore.getActive().getCellsCount());
119    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
120    assertEquals(0, memstore.getSnapshot().getCellsCount());
121    // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting
122    // totalCellsLen
123    totalCellsLen = (totalCellsLen * 3) / 4;
124    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
125
126    totalHeapSize = 3 * cellAfterFlushSize + MutableSegment.DEEP_OVERHEAD
127      + (toCellChunkMap
128        ? CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
129        : CellArrayImmutableSegment.DEEP_OVERHEAD_CAM);
130    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
131    for (Segment s : memstore.getSegments()) {
132      counter += s.getCellsCount();
133    }
134    assertEquals(3, counter);
135    MemStoreSize mss = memstore.getFlushableSize();
136    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
137    region.decrMemStoreSize(mss); // simulate flusher
138    ImmutableSegment s = memstore.getSnapshot();
139    assertEquals(3, s.getCellsCount());
140    assertEquals(0, regionServicesForStores.getMemStoreSize());
141
142    memstore.clearSnapshot(snapshot.getId());
143  }
144
145  @Override
146  @Test
147  public void testCompaction2Buckets() throws IOException {
148    if (toCellChunkMap) {
149      // set memstore to flat into CellChunkMap
150      ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
151    } else {
152      ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.ARRAY_MAP);
153    }
154    String[] keys1 = { "A", "A", "B", "C" };
155    String[] keys2 = { "A", "B", "D" };
156
157    long totalCellsLen1 = addRowsByKeysDataSize(memstore, keys1); // INSERT 4
158    long cellBeforeFlushSize = cellBeforeFlushSize();
159    long cellAfterFlushSize = cellAfterFlushSize();
160    long totalHeapSize1 = MutableSegment.DEEP_OVERHEAD + 4 * cellBeforeFlushSize;
161    assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize());
162    assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
163
164    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
165    int counter = 0; // COMPACT 4->3
166    for (Segment s : memstore.getSegments()) {
167      counter += s.getCellsCount();
168    }
169    assertEquals(3, counter);
170    assertEquals(0, memstore.getSnapshot().getCellsCount());
171    // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting
172    // totalCellsLen
173    totalCellsLen1 = (totalCellsLen1 * 3) / 4;
174    totalHeapSize1 = 3 * cellAfterFlushSize + MutableSegment.DEEP_OVERHEAD
175      + (toCellChunkMap
176        ? CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
177        : CellArrayImmutableSegment.DEEP_OVERHEAD_CAM);
178    assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize());
179    assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
180
181    long totalCellsLen2 = addRowsByKeysDataSize(memstore, keys2); // INSERT 3 (3+3=6)
182    long totalHeapSize2 = 3 * cellBeforeFlushSize;
183    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
184    assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
185
186    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
187    assertEquals(0, memstore.getSnapshot().getCellsCount());// COMPACT 6->4
188    counter = 0;
189    for (Segment s : memstore.getSegments()) {
190      counter += s.getCellsCount();
191    }
192    assertEquals(4, counter);
193    totalCellsLen2 = totalCellsLen2 / 3;// 2 cells duplicated in set 2
194    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
195    totalHeapSize2 = 1 * cellAfterFlushSize;
196    assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
197
198    MemStoreSize mss = memstore.getFlushableSize();
199    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
200    // simulate flusher
201    region.decrMemStoreSize(mss);
202    ImmutableSegment s = memstore.getSnapshot();
203    assertEquals(4, s.getCellsCount());
204    assertEquals(0, regionServicesForStores.getMemStoreSize());
205
206    memstore.clearSnapshot(snapshot.getId());
207  }
208
209  @Override
210  @Test
211  public void testCompaction3Buckets() throws IOException {
212    if (toCellChunkMap) {
213      // set memstore to flat into CellChunkMap
214      ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
215    } else {
216      // set to CellArrayMap as CCM is configured by default due to MSLAB usage
217      ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.ARRAY_MAP);
218    }
219    String[] keys1 = { "A", "A", "B", "C" };
220    String[] keys2 = { "A", "B", "D" };
221    String[] keys3 = { "D", "B", "B" };
222
223    long totalCellsLen1 = addRowsByKeysDataSize(memstore, keys1);
224    long cellBeforeFlushSize = cellBeforeFlushSize();
225    long cellAfterFlushSize = cellAfterFlushSize();
226    long totalHeapSize1 = MutableSegment.DEEP_OVERHEAD + 4 * cellBeforeFlushSize;
227    assertEquals(totalCellsLen1, region.getMemStoreDataSize());
228    assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
229
230    MemStoreSize mss = memstore.getFlushableSize();
231    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
232
233    assertEquals(0, memstore.getSnapshot().getCellsCount());
234    // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting
235    // totalCellsLen
236    totalCellsLen1 = (totalCellsLen1 * 3) / 4;
237    totalHeapSize1 = 3 * cellAfterFlushSize + MutableSegment.DEEP_OVERHEAD
238      + (toCellChunkMap
239        ? CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
240        : CellArrayImmutableSegment.DEEP_OVERHEAD_CAM);
241    assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize());
242    assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
243
244    long totalCellsLen2 = addRowsByKeysDataSize(memstore, keys2);
245    long totalHeapSize2 = 3 * cellBeforeFlushSize;
246
247    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
248    assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
249
250    ((MyCompactingMemStore) memstore).disableCompaction();
251    mss = memstore.getFlushableSize();
252    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction
253    totalHeapSize2 = totalHeapSize2 + CSLMImmutableSegment.DEEP_OVERHEAD_CSLM;
254    assertEquals(0, memstore.getSnapshot().getCellsCount());
255    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
256    assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
257
258    long totalCellsLen3 = addRowsByKeysDataSize(memstore, keys3);
259    long totalHeapSize3 = 3 * cellBeforeFlushSize;
260    assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
261      regionServicesForStores.getMemStoreSize());
262    assertEquals(totalHeapSize1 + totalHeapSize2 + totalHeapSize3,
263      ((CompactingMemStore) memstore).heapSize());
264
265    ((MyCompactingMemStore) memstore).enableCompaction();
266    mss = memstore.getFlushableSize();
267    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
268    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
269      Threads.sleep(10);
270    }
271    assertEquals(0, memstore.getSnapshot().getCellsCount());
272    // active flushed to pipeline and all 3 segments compacted. Will get rid of duplicated cells.
273    // Out of total 10, only 4 cells are unique
274    totalCellsLen2 = totalCellsLen2 / 3;// 2 out of 3 cells are duplicated
275    totalCellsLen3 = 0;// All duplicated cells.
276    assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
277      regionServicesForStores.getMemStoreSize());
278    // Only 4 unique cells left
279    long totalHeapSize4 = 4 * cellAfterFlushSize + MutableSegment.DEEP_OVERHEAD
280      + (toCellChunkMap
281        ? CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
282        : CellArrayImmutableSegment.DEEP_OVERHEAD_CAM);
283    assertEquals(totalHeapSize4, ((CompactingMemStore) memstore).heapSize());
284
285    mss = memstore.getFlushableSize();
286    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
287    // simulate flusher
288    region.decrMemStoreSize(mss);
289    ImmutableSegment s = memstore.getSnapshot();
290    assertEquals(4, s.getCellsCount());
291    assertEquals(0, regionServicesForStores.getMemStoreSize());
292
293    memstore.clearSnapshot(snapshot.getId());
294  }
295
296  //////////////////////////////////////////////////////////////////////////////
297  // Merging tests
298  //////////////////////////////////////////////////////////////////////////////
299  @Test
300  public void testMerging() throws IOException {
301    if (toCellChunkMap) {
302      // set memstore to flat into CellChunkMap
303      ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
304    }
305    String[] keys1 = { "A", "A", "B", "C", "F", "H" };
306    String[] keys2 = { "A", "B", "D", "G", "I", "J" };
307    String[] keys3 = { "D", "B", "B", "E" };
308
309    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
310    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
311      String.valueOf(compactionType));
312    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
313    addRowsByKeysDataSize(memstore, keys1);
314
315    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline should not compact
316
317    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
318      Threads.sleep(10);
319    }
320    assertEquals(0, memstore.getSnapshot().getCellsCount());
321
322    addRowsByKeysDataSize(memstore, keys2); // also should only flatten
323
324    int counter2 = 0;
325    for (Segment s : memstore.getSegments()) {
326      counter2 += s.getCellsCount();
327    }
328    assertEquals(12, counter2);
329
330    ((MyCompactingMemStore) memstore).disableCompaction();
331
332    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without flattening
333    assertEquals(0, memstore.getSnapshot().getCellsCount());
334
335    int counter3 = 0;
336    for (Segment s : memstore.getSegments()) {
337      counter3 += s.getCellsCount();
338    }
339    assertEquals(12, counter3);
340
341    addRowsByKeysDataSize(memstore, keys3);
342
343    int counter4 = 0;
344    for (Segment s : memstore.getSegments()) {
345      counter4 += s.getCellsCount();
346    }
347    assertEquals(16, counter4);
348
349    ((MyCompactingMemStore) memstore).enableCompaction();
350
351    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
352    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
353      Threads.sleep(10);
354    }
355    assertEquals(0, memstore.getSnapshot().getCellsCount());
356
357    int counter = 0;
358    for (Segment s : memstore.getSegments()) {
359      counter += s.getCellsCount();
360    }
361    assertEquals(16, counter);
362
363    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
364    ImmutableSegment s = memstore.getSnapshot();
365    memstore.clearSnapshot(snapshot.getId());
366  }
367
368  @Test
369  public void testTimeRangeAfterCompaction() throws IOException {
370    if (toCellChunkMap) {
371      // set memstore to flat into CellChunkMap
372      ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
373    }
374    testTimeRange(true);
375  }
376
377  @Test
378  public void testTimeRangeAfterMerge() throws IOException {
379    if (toCellChunkMap) {
380      // set memstore to flat into CellChunkMap
381      ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
382    }
383    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
384    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
385      String.valueOf(compactionType));
386    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
387    testTimeRange(false);
388  }
389
390  private void testTimeRange(boolean isCompaction) throws IOException {
391    final long initTs = 100;
392    long currentTs = initTs;
393    byte[] row = Bytes.toBytes("row");
394    byte[] family = Bytes.toBytes("family");
395    byte[] qf1 = Bytes.toBytes("qf1");
396
397    // first segment in pipeline
398    this.memstore.add(new KeyValue(row, family, qf1, ++currentTs, (byte[]) null), null);
399    long minTs = currentTs;
400    this.memstore.add(new KeyValue(row, family, qf1, ++currentTs, (byte[]) null), null);
401
402    long numberOfCell = 2;
403    assertEquals(numberOfCell,
404      memstore.getSegments().stream().mapToInt(Segment::getCellsCount).sum());
405    assertEquals(minTs, memstore.getSegments().stream()
406      .mapToLong(m -> m.getTimeRangeTracker().getMin()).min().getAsLong());
407    assertEquals(currentTs, memstore.getSegments().stream()
408      .mapToLong(m -> m.getTimeRangeTracker().getMax()).max().getAsLong());
409
410    ((CompactingMemStore) memstore).flushInMemory();
411
412    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
413      Threads.sleep(10);
414    }
415    if (isCompaction) {
416      // max version = 1, so one cell will be dropped.
417      numberOfCell = 1;
418      minTs = currentTs;
419    }
420    // second segment in pipeline
421    this.memstore.add(new KeyValue(row, family, qf1, ++currentTs, (byte[]) null), null);
422    this.memstore.add(new KeyValue(row, family, qf1, ++currentTs, (byte[]) null), null);
423    numberOfCell += 2;
424    assertEquals(numberOfCell,
425      memstore.getSegments().stream().mapToInt(Segment::getCellsCount).sum());
426    assertEquals(minTs, memstore.getSegments().stream()
427      .mapToLong(m -> m.getTimeRangeTracker().getMin()).min().getAsLong());
428    assertEquals(currentTs, memstore.getSegments().stream()
429      .mapToLong(m -> m.getTimeRangeTracker().getMax()).max().getAsLong());
430
431    ((CompactingMemStore) memstore).flushInMemory(); // trigger the merge
432
433    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
434      Threads.sleep(10);
435    }
436    if (isCompaction) {
437      // max version = 1, so one cell will be dropped.
438      numberOfCell = 1;
439      minTs = currentTs;
440    }
441
442    assertEquals(numberOfCell,
443      memstore.getSegments().stream().mapToInt(Segment::getCellsCount).sum());
444    assertEquals(minTs, memstore.getSegments().stream()
445      .mapToLong(m -> m.getTimeRangeTracker().getMin()).min().getAsLong());
446    assertEquals(currentTs, memstore.getSegments().stream()
447      .mapToLong(m -> m.getTimeRangeTracker().getMax()).max().getAsLong());
448  }
449
450  @Test
451  public void testCountOfCellsAfterFlatteningByScan() throws IOException {
452    String[] keys1 = { "A", "B", "C" }; // A, B, C
453    addRowsByKeysWith50Cols(memstore, keys1);
454    // this should only flatten as there are no duplicates
455    ((CompactingMemStore) memstore).flushInMemory();
456    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
457      Threads.sleep(10);
458    }
459    List<KeyValueScanner> scanners = memstore.getScanners(Long.MAX_VALUE);
460    // seek
461    int count = 0;
462    for (int i = 0; i < scanners.size(); i++) {
463      scanners.get(i).seek(KeyValue.LOWESTKEY);
464      while (scanners.get(i).next() != null) {
465        count++;
466      }
467    }
468    assertEquals(150, count, "the count should be ");
469    for (int i = 0; i < scanners.size(); i++) {
470      scanners.get(i).close();
471    }
472  }
473
474  @Test
475  public void testCountOfCellsAfterFlatteningByIterator() throws IOException {
476    String[] keys1 = { "A", "B", "C" }; // A, B, C
477    addRowsByKeysWith50Cols(memstore, keys1);
478    // this should only flatten as there are no duplicates
479    ((CompactingMemStore) memstore).flushInMemory();
480    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
481      Threads.sleep(10);
482    }
483    // Just doing the cnt operation here
484    MemStoreSegmentsIterator itr = new MemStoreMergerSegmentsIterator(
485      ((CompactingMemStore) memstore).getImmutableSegments().getStoreSegments(),
486      CellComparatorImpl.COMPARATOR, 10);
487    int cnt = 0;
488    try {
489      while (itr.next() != null) {
490        cnt++;
491      }
492    } finally {
493      itr.close();
494    }
495    assertEquals(150, cnt, "the count should be ");
496  }
497
498  private void addRowsByKeysWith50Cols(AbstractMemStore hmc, String[] keys) {
499    byte[] fam = Bytes.toBytes("testfamily");
500    for (int i = 0; i < keys.length; i++) {
501      long timestamp = EnvironmentEdgeManager.currentTime();
502      Threads.sleep(1); // to make sure each kv gets a different ts
503      byte[] row = Bytes.toBytes(keys[i]);
504      for (int j = 0; j < 50; j++) {
505        byte[] qf = Bytes.toBytes("testqualifier" + j);
506        byte[] val = Bytes.toBytes(keys[i] + j);
507        KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
508        hmc.add(kv, null);
509      }
510    }
511  }
512
513  @Override
514  @Test
515  public void testPuttingBackChunksWithOpeningScanner() throws IOException {
516    byte[] row = Bytes.toBytes("testrow");
517    byte[] fam = Bytes.toBytes("testfamily");
518    byte[] qf1 = Bytes.toBytes("testqualifier1");
519    byte[] qf2 = Bytes.toBytes("testqualifier2");
520    byte[] qf3 = Bytes.toBytes("testqualifier3");
521    byte[] qf4 = Bytes.toBytes("testqualifier4");
522    byte[] qf5 = Bytes.toBytes("testqualifier5");
523    byte[] qf6 = Bytes.toBytes("testqualifier6");
524    byte[] qf7 = Bytes.toBytes("testqualifier7");
525    byte[] val = Bytes.toBytes("testval");
526
527    // Setting up memstore
528    memstore.add(new KeyValue(row, fam, qf1, val), null);
529    memstore.add(new KeyValue(row, fam, qf2, val), null);
530    memstore.add(new KeyValue(row, fam, qf3, val), null);
531
532    // Creating a snapshot
533    MemStoreSnapshot snapshot = memstore.snapshot();
534    assertEquals(3, memstore.getSnapshot().getCellsCount());
535
536    // Adding value to "new" memstore
537    assertEquals(0, memstore.getActive().getCellsCount());
538    memstore.add(new KeyValue(row, fam, qf4, val), null);
539    memstore.add(new KeyValue(row, fam, qf5, val), null);
540    assertEquals(2, memstore.getActive().getCellsCount());
541
542    // opening scanner before clear the snapshot
543    List<KeyValueScanner> scanners = memstore.getScanners(0);
544    // Shouldn't putting back the chunks to pool,since some scanners are opening
545    // based on their data
546    // close the scanners
547    for (KeyValueScanner scanner : snapshot.getScanners()) {
548      scanner.close();
549    }
550    memstore.clearSnapshot(snapshot.getId());
551
552    assertTrue(chunkCreator.getPoolSize() == 0);
553
554    // Chunks will be put back to pool after close scanners;
555    for (KeyValueScanner scanner : scanners) {
556      scanner.close();
557    }
558    assertTrue(chunkCreator.getPoolSize() > 0);
559
560    // clear chunks
561    chunkCreator.clearChunksInPool();
562
563    // Creating another snapshot
564
565    snapshot = memstore.snapshot();
566    // Adding more value
567    memstore.add(new KeyValue(row, fam, qf6, val), null);
568    memstore.add(new KeyValue(row, fam, qf7, val), null);
569    // opening scanners
570    scanners = memstore.getScanners(0);
571    // close scanners before clear the snapshot
572    for (KeyValueScanner scanner : scanners) {
573      scanner.close();
574    }
575    // Since no opening scanner, the chunks of snapshot should be put back to
576    // pool
577    // close the scanners
578    for (KeyValueScanner scanner : snapshot.getScanners()) {
579      scanner.close();
580    }
581    memstore.clearSnapshot(snapshot.getId());
582    assertTrue(chunkCreator.getPoolSize() > 0);
583  }
584
585  @Override
586  @Test
587  public void testPuttingBackChunksAfterFlushing() throws IOException {
588    byte[] row = Bytes.toBytes("testrow");
589    byte[] fam = Bytes.toBytes("testfamily");
590    byte[] qf1 = Bytes.toBytes("testqualifier1");
591    byte[] qf2 = Bytes.toBytes("testqualifier2");
592    byte[] qf3 = Bytes.toBytes("testqualifier3");
593    byte[] qf4 = Bytes.toBytes("testqualifier4");
594    byte[] qf5 = Bytes.toBytes("testqualifier5");
595    byte[] val = Bytes.toBytes("testval");
596
597    // Setting up memstore
598    memstore.add(new KeyValue(row, fam, qf1, val), null);
599    memstore.add(new KeyValue(row, fam, qf2, val), null);
600    memstore.add(new KeyValue(row, fam, qf3, val), null);
601
602    // Creating a snapshot
603    MemStoreSnapshot snapshot = memstore.snapshot();
604    assertEquals(3, memstore.getSnapshot().getCellsCount());
605
606    // Adding value to "new" memstore
607    assertEquals(0, memstore.getActive().getCellsCount());
608    memstore.add(new KeyValue(row, fam, qf4, val), null);
609    memstore.add(new KeyValue(row, fam, qf5, val), null);
610    assertEquals(2, memstore.getActive().getCellsCount());
611    // close the scanners
612    for (KeyValueScanner scanner : snapshot.getScanners()) {
613      scanner.close();
614    }
615    memstore.clearSnapshot(snapshot.getId());
616
617    int chunkCount = chunkCreator.getPoolSize();
618    assertTrue(chunkCount > 0);
619  }
620
621  @Test
622  public void testFlatteningToCellChunkMap() throws IOException {
623
624    // set memstore to flat into CellChunkMap
625    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
626    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
627      String.valueOf(compactionType));
628    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
629    ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
630    int numOfCells = 8;
631    String[] keys1 = { "A", "A", "B", "C", "D", "D", "E", "F" }; // A1, A2, B3, C4, D5, D6, E7, F8
632
633    // make one cell
634    byte[] row = Bytes.toBytes(keys1[0]);
635    byte[] val = Bytes.toBytes(keys1[0] + 0);
636    KeyValue kv = new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"),
637      EnvironmentEdgeManager.currentTime(), val);
638
639    // test 1 bucket
640    int totalCellsLen = addRowsByKeys(memstore, keys1);
641    long oneCellOnCSLMHeapSize = ClassSize.align(
642      ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + KeyValue.FIXED_OVERHEAD + kv.getSerializedSize());
643
644    long totalHeapSize = numOfCells * oneCellOnCSLMHeapSize + MutableSegment.DEEP_OVERHEAD;
645    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
646    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
647
648    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and flatten
649    assertEquals(0, memstore.getSnapshot().getCellsCount());
650    long oneCellOnCCMHeapSize =
651      ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(kv.getSerializedSize());
652    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
653      + numOfCells * oneCellOnCCMHeapSize;
654
655    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
656    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
657
658    MemStoreSize mss = memstore.getFlushableSize();
659    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
660    // simulate flusher
661    region.decrMemStoreSize(mss);
662    ImmutableSegment s = memstore.getSnapshot();
663    assertEquals(numOfCells, s.getCellsCount());
664    assertEquals(0, regionServicesForStores.getMemStoreSize());
665
666    memstore.clearSnapshot(snapshot.getId());
667  }
668
669  /**
670   * CellChunkMap Segment index requires all cell data to be written in the MSLAB Chunks. Even
671   * though MSLAB is enabled, cells bigger than maxAlloc (even if smaller than the size of a chunk)
672   * are not written in the MSLAB Chunks. If such cells are found in the process of flattening into
673   * CellChunkMap (in-memory-flush) they need to be copied into MSLAB.
674   * testFlatteningToBigCellChunkMap checks that the process of flattening into CellChunkMap
675   * succeeds, even when such big cells are allocated.
676   */
677  @Test
678  public void testFlatteningToBigCellChunkMap() throws IOException {
679
680    if (toCellChunkMap == false) {
681      return;
682    }
683    // set memstore to flat into CellChunkMap
684    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
685    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
686      String.valueOf(compactionType));
687    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
688    ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
689    int numOfCells = 4;
690    char[] chars = new char[MemStoreLAB.MAX_ALLOC_DEFAULT];
691    for (int i = 0; i < chars.length; i++) {
692      chars[i] = 'A';
693    }
694    String bigVal = new String(chars);
695    String[] keys1 = { "A", "B", "C", "D" };
696
697    // make one cell
698    byte[] row = Bytes.toBytes(keys1[0]);
699    byte[] val = Bytes.toBytes(bigVal);
700    KeyValue kv = new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"),
701      EnvironmentEdgeManager.currentTime(), val);
702
703    // test 1 bucket
704    int totalCellsLen = addRowsByKeys(memstore, keys1, val);
705
706    long oneCellOnCSLMHeapSize =
707      ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize());
708
709    long totalHeapSize = numOfCells * oneCellOnCSLMHeapSize + MutableSegment.DEEP_OVERHEAD;
710    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
711    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
712
713    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and flatten
714    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
715      Threads.sleep(10);
716    }
717    assertEquals(0, memstore.getSnapshot().getCellsCount());
718    // One cell is duplicated, but it shouldn't be compacted because we are in BASIC mode.
719    // totalCellsLen should remain the same
720    long oneCellOnCCMHeapSize =
721      ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(kv.getSerializedSize());
722    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
723      + numOfCells * oneCellOnCCMHeapSize;
724
725    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
726    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
727
728    MemStoreSize mss = memstore.getFlushableSize();
729    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
730    // simulate flusher
731    region.decrMemStoreSize(mss);
732    ImmutableSegment s = memstore.getSnapshot();
733    assertEquals(numOfCells, s.getCellsCount());
734    assertEquals(0, regionServicesForStores.getMemStoreSize());
735
736    memstore.clearSnapshot(snapshot.getId());
737  }
738
739  /**
740   * CellChunkMap Segment index requires all cell data to be written in the MSLAB Chunks. Even
741   * though MSLAB is enabled, cells bigger than the size of a chunk are not written in the MSLAB
742   * Chunks. If such cells are found in the process of flattening into CellChunkMap
743   * (in-memory-flush) they need to be copied into MSLAB. testFlatteningToJumboCellChunkMap checks
744   * that the process of flattening into CellChunkMap succeeds, even when such big cells are
745   * allocated.
746   */
747  @Test
748  public void testFlatteningToJumboCellChunkMap() throws IOException {
749
750    if (toCellChunkMap == false) {
751      return;
752    }
753    // set memstore to flat into CellChunkMap
754    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
755    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
756      String.valueOf(compactionType));
757    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
758    ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
759
760    int numOfCells = 1;
761    char[] chars = new char[MemStoreLAB.CHUNK_SIZE_DEFAULT];
762    for (int i = 0; i < chars.length; i++) {
763      chars[i] = 'A';
764    }
765    String bigVal = new String(chars);
766    String[] keys1 = { "A" };
767
768    // make one cell
769    byte[] row = Bytes.toBytes(keys1[0]);
770    byte[] val = Bytes.toBytes(bigVal);
771    KeyValue kv = new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"),
772      EnvironmentEdgeManager.currentTime(), val);
773
774    // test 1 bucket
775    int totalCellsLen = addRowsByKeys(memstore, keys1, val);
776
777    long oneCellOnCSLMHeapSize =
778      ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize());
779
780    long totalHeapSize = numOfCells * oneCellOnCSLMHeapSize + MutableSegment.DEEP_OVERHEAD;
781    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
782    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
783
784    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and flatten
785    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
786      Threads.sleep(10);
787    }
788    assertEquals(0, memstore.getSnapshot().getCellsCount());
789
790    // One cell is duplicated, but it shouldn't be compacted because we are in BASIC mode.
791    // totalCellsLen should remain the same
792    long oneCellOnCCMHeapSize =
793      (long) ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(kv.getSerializedSize());
794    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
795      + numOfCells * oneCellOnCCMHeapSize;
796
797    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
798
799    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
800
801    MemStoreSize mss = memstore.getFlushableSize();
802    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
803    // simulate flusher
804    region.decrMemStoreSize(mss);
805    ImmutableSegment s = memstore.getSnapshot();
806    assertEquals(numOfCells, s.getCellsCount());
807    assertEquals(0, regionServicesForStores.getMemStoreSize());
808
809    memstore.clearSnapshot(snapshot.getId());
810
811    // Allocating two big cells (too big for being copied into a regular chunk).
812    String[] keys2 = { "C", "D" };
813    addRowsByKeys(memstore, keys2, val);
814    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
815      Threads.sleep(10);
816    }
817
818    // The in-memory flush size is bigger than the size of a single cell,
819    // but smaller than the size of two cells.
820    // Therefore, the two created cells are flushed together as a single CSLMImmutableSegment and
821    // flattened.
822    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
823      + 2 * oneCellOnCCMHeapSize;
824    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
825  }
826
827  /**
828   * CellChunkMap Segment index requires all cell data to be written in the MSLAB Chunks. Even
829   * though MSLAB is enabled, cells bigger than the size of a chunk are not written in the MSLAB
830   * Chunks. If such cells are found in the process of a merge they need to be copied into MSLAB.
831   * testForceCopyOfBigCellIntoImmutableSegment checks that the ImmutableMemStoreLAB's
832   * forceCopyOfBigCellInto does what it's supposed to do.
833   */
834  @Disabled
835  @Test // Flakey. Disabled by HBASE-24128. HBASE-24129 is for reenable.
836  // TestCompactingToCellFlatMapMemStore.testForceCopyOfBigCellIntoImmutableSegment:902 i=1
837  // expected:<8389924> but was:<8389992>
838  public void testForceCopyOfBigCellIntoImmutableSegment() throws IOException {
839
840    if (toCellChunkMap == false) {
841      return;
842    }
843
844    // set memstore to flat into CellChunkMap
845    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
846    memstore.getConfiguration().setInt(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY,
847      4);
848    memstore.getConfiguration().setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY,
849      0.014);
850    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
851      String.valueOf(compactionType));
852    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
853    ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
854
855    char[] chars = new char[MemStoreLAB.CHUNK_SIZE_DEFAULT];
856    for (int i = 0; i < chars.length; i++) {
857      chars[i] = 'A';
858    }
859    String bigVal = new String(chars);
860    byte[] val = Bytes.toBytes(bigVal);
861
862    // We need to add two cells, three times, in order to guarantee a merge
863    List<String[]> keysList = new ArrayList<>();
864    keysList.add(new String[] { "A", "B" });
865    keysList.add(new String[] { "C", "D" });
866    keysList.add(new String[] { "E", "F" });
867    keysList.add(new String[] { "G", "H" });
868
869    // Measuring the size of a single kv
870    KeyValue kv = new KeyValue(Bytes.toBytes("A"), Bytes.toBytes("testfamily"),
871      Bytes.toBytes("testqualifier"), EnvironmentEdgeManager.currentTime(), val);
872    long oneCellOnCCMHeapSize =
873      (long) ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(kv.getSerializedSize());
874    long oneCellOnCSLMHeapSize =
875      ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize());
876    long totalHeapSize = MutableSegment.DEEP_OVERHEAD;
877    for (int i = 0; i < keysList.size(); i++) {
878      addRowsByKeys(memstore, keysList.get(i), val);
879      while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
880        Threads.sleep(10);
881      }
882
883      if (i == 0) {
884        totalHeapSize += CellChunkImmutableSegment.DEEP_OVERHEAD_CCM + oneCellOnCCMHeapSize
885          + oneCellOnCSLMHeapSize;
886      } else {
887        // The in-memory flush size is bigger than the size of a single cell,
888        // but smaller than the size of two cells.
889        // Therefore, the two created cells are flattened in a seperate segment.
890        totalHeapSize += 2 * (CellChunkImmutableSegment.DEEP_OVERHEAD_CCM + oneCellOnCCMHeapSize);
891      }
892      if (i == 2) {
893        // Four out of the five segments are merged into one
894        totalHeapSize -= (4 * CellChunkImmutableSegment.DEEP_OVERHEAD_CCM);
895        totalHeapSize = ClassSize.align(totalHeapSize);
896      }
897      assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize(), "i=" + i);
898    }
899  }
900
901  /**
902   * Test big cell size after in memory compaction. (HBASE-26467)
903   */
904  @Test
905  public void testBigCellSizeAfterInMemoryCompaction() throws IOException {
906    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
907    memstore.getConfiguration().setInt(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY,
908      1);
909    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
910      String.valueOf(compactionType));
911    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
912
913    byte[] val = new byte[MemStoreLAB.CHUNK_SIZE_DEFAULT];
914
915    long size = addRowsByKeys(memstore, new String[] { "A" }, val);
916    ((MyCompactingMemStore) memstore).flushInMemory();
917
918    for (KeyValueScanner scanner : memstore.getScanners(Long.MAX_VALUE)) {
919      Cell cell;
920      while ((cell = scanner.next()) != null) {
921        assertEquals(size, cell.getSerializedSize());
922      }
923    }
924  }
925
926  @Override
927  @Test
928  public void testScan() throws IOException {
929    scanMemStore(memstore, 915);
930  }
931
932  private long addRowsByKeysDataSize(final AbstractMemStore hmc, String[] keys) {
933    byte[] fam = Bytes.toBytes("testfamily");
934    byte[] qf = Bytes.toBytes("testqualifier");
935    MemStoreSizing memstoreSizing = new NonThreadSafeMemStoreSizing();
936    for (int i = 0; i < keys.length; i++) {
937      long timestamp = EnvironmentEdgeManager.currentTime();
938      Threads.sleep(1); // to make sure each kv gets a different ts
939      byte[] row = Bytes.toBytes(keys[i]);
940      byte[] val = Bytes.toBytes(keys[i] + i);
941      KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
942      hmc.add(kv, memstoreSizing);
943      LOG.debug("added kv: " + kv.getKeyString() + ", timestamp" + kv.getTimestamp());
944    }
945    MemStoreSize mss = memstoreSizing.getMemStoreSize();
946    regionServicesForStores.addMemStoreSize(mss.getDataSize(), mss.getHeapSize(),
947      mss.getOffHeapSize(), mss.getCellsCount());
948    return mss.getDataSize();
949  }
950
951  private long cellBeforeFlushSize() {
952    // make one cell
953    byte[] row = Bytes.toBytes("A");
954    byte[] val = Bytes.toBytes("A" + 0);
955    KeyValue kv = new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"),
956      EnvironmentEdgeManager.currentTime(), val);
957    return ClassSize.align(
958      ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + KeyValue.FIXED_OVERHEAD + kv.getSerializedSize());
959  }
960
961  private long cellAfterFlushSize() {
962    // make one cell
963    byte[] row = Bytes.toBytes("A");
964    byte[] val = Bytes.toBytes("A" + 0);
965    KeyValue kv = new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"),
966      EnvironmentEdgeManager.currentTime(), val);
967
968    return toCellChunkMap
969      ? ClassSize.align(ClassSize.CELL_CHUNK_MAP_ENTRY + kv.getSerializedSize())
970      : ClassSize
971        .align(ClassSize.CELL_ARRAY_MAP_ENTRY + KeyValue.FIXED_OVERHEAD + kv.getSerializedSize());
972  }
973}