001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.List;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.Cell;
028import org.apache.hadoop.hbase.CellComparatorImpl;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseConfiguration;
031import org.apache.hadoop.hbase.KeyValue;
032import org.apache.hadoop.hbase.MemoryCompactionPolicy;
033import org.apache.hadoop.hbase.testclassification.LargeTests;
034import org.apache.hadoop.hbase.testclassification.RegionServerTests;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.hbase.util.ClassSize;
037import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
038import org.apache.hadoop.hbase.util.Threads;
039import org.junit.ClassRule;
040import org.junit.Test;
041import org.junit.experimental.categories.Category;
042import org.junit.runner.RunWith;
043import org.junit.runners.Parameterized;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047/**
048 * compacted memstore test case
049 */
050@Category({ RegionServerTests.class, LargeTests.class })
051@RunWith(Parameterized.class)
052public class TestCompactingToCellFlatMapMemStore extends TestCompactingMemStore {
053
054  @ClassRule
055  public static final HBaseClassTestRule CLASS_RULE =
056    HBaseClassTestRule.forClass(TestCompactingToCellFlatMapMemStore.class);
057
058  @Parameterized.Parameters
059  public static Object[] data() {
060    return new Object[] { "CHUNK_MAP", "ARRAY_MAP" }; // test different immutable indexes
061  }
062
063  private static final Logger LOG =
064    LoggerFactory.getLogger(TestCompactingToCellFlatMapMemStore.class);
065  public final boolean toCellChunkMap;
066  Configuration conf;
067
068  //////////////////////////////////////////////////////////////////////////////
069  // Helpers
070  //////////////////////////////////////////////////////////////////////////////
071  public TestCompactingToCellFlatMapMemStore(String type) {
072    toCellChunkMap = "CHUNK_MAP".equals(type);
073  }
074
075  @Override
076  public void tearDown() throws Exception {
077    chunkCreator.clearChunksInPool();
078    super.tearDown();
079  }
080
081  @Override
082  public void setUp() throws Exception {
083    compactingSetUp();
084    this.conf = HBaseConfiguration.create();
085
086    // set memstore to do data compaction
087    conf.set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
088      String.valueOf(MemoryCompactionPolicy.EAGER));
089    conf.setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.02);
090    this.memstore = new MyCompactingMemStore(conf, CellComparatorImpl.COMPARATOR, store,
091      regionServicesForStores, MemoryCompactionPolicy.EAGER);
092  }
093
094  //////////////////////////////////////////////////////////////////////////////
095  // Compaction tests
096  //////////////////////////////////////////////////////////////////////////////
097  @Override
098  public void testCompaction1Bucket() throws IOException {
099    int counter = 0;
100    String[] keys1 = { "A", "A", "B", "C" }; // A1, A2, B3, C4
101    if (toCellChunkMap) {
102      // set memstore to flat into CellChunkMap
103      ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
104    } else {
105      ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.ARRAY_MAP);
106    }
107
108    // test 1 bucket
109    long totalCellsLen = addRowsByKeysDataSize(memstore, keys1);
110    long cellBeforeFlushSize = cellBeforeFlushSize();
111    long cellAfterFlushSize = cellAfterFlushSize();
112    long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * cellBeforeFlushSize;
113
114    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
115    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
116
117    assertEquals(4, memstore.getActive().getCellsCount());
118    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
119    assertEquals(0, memstore.getSnapshot().getCellsCount());
120    // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting
121    // totalCellsLen
122    totalCellsLen = (totalCellsLen * 3) / 4;
123    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
124
125    totalHeapSize = 3 * cellAfterFlushSize + MutableSegment.DEEP_OVERHEAD
126      + (toCellChunkMap
127        ? CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
128        : CellArrayImmutableSegment.DEEP_OVERHEAD_CAM);
129    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
130    for (Segment s : memstore.getSegments()) {
131      counter += s.getCellsCount();
132    }
133    assertEquals(3, counter);
134    MemStoreSize mss = memstore.getFlushableSize();
135    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
136    region.decrMemStoreSize(mss); // simulate flusher
137    ImmutableSegment s = memstore.getSnapshot();
138    assertEquals(3, s.getCellsCount());
139    assertEquals(0, regionServicesForStores.getMemStoreSize());
140
141    memstore.clearSnapshot(snapshot.getId());
142  }
143
144  @Override
145  public void testCompaction2Buckets() throws IOException {
146    if (toCellChunkMap) {
147      // set memstore to flat into CellChunkMap
148      ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
149    } else {
150      ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.ARRAY_MAP);
151    }
152    String[] keys1 = { "A", "A", "B", "C" };
153    String[] keys2 = { "A", "B", "D" };
154
155    long totalCellsLen1 = addRowsByKeysDataSize(memstore, keys1); // INSERT 4
156    long cellBeforeFlushSize = cellBeforeFlushSize();
157    long cellAfterFlushSize = cellAfterFlushSize();
158    long totalHeapSize1 = MutableSegment.DEEP_OVERHEAD + 4 * cellBeforeFlushSize;
159    assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize());
160    assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
161
162    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
163    int counter = 0; // COMPACT 4->3
164    for (Segment s : memstore.getSegments()) {
165      counter += s.getCellsCount();
166    }
167    assertEquals(3, counter);
168    assertEquals(0, memstore.getSnapshot().getCellsCount());
169    // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting
170    // totalCellsLen
171    totalCellsLen1 = (totalCellsLen1 * 3) / 4;
172    totalHeapSize1 = 3 * cellAfterFlushSize + MutableSegment.DEEP_OVERHEAD
173      + (toCellChunkMap
174        ? CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
175        : CellArrayImmutableSegment.DEEP_OVERHEAD_CAM);
176    assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize());
177    assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
178
179    long totalCellsLen2 = addRowsByKeysDataSize(memstore, keys2); // INSERT 3 (3+3=6)
180    long totalHeapSize2 = 3 * cellBeforeFlushSize;
181    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
182    assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
183
184    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
185    assertEquals(0, memstore.getSnapshot().getCellsCount());// COMPACT 6->4
186    counter = 0;
187    for (Segment s : memstore.getSegments()) {
188      counter += s.getCellsCount();
189    }
190    assertEquals(4, counter);
191    totalCellsLen2 = totalCellsLen2 / 3;// 2 cells duplicated in set 2
192    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
193    totalHeapSize2 = 1 * cellAfterFlushSize;
194    assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
195
196    MemStoreSize mss = memstore.getFlushableSize();
197    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
198    // simulate flusher
199    region.decrMemStoreSize(mss);
200    ImmutableSegment s = memstore.getSnapshot();
201    assertEquals(4, s.getCellsCount());
202    assertEquals(0, regionServicesForStores.getMemStoreSize());
203
204    memstore.clearSnapshot(snapshot.getId());
205  }
206
207  @Override
208  public void testCompaction3Buckets() throws IOException {
209    if (toCellChunkMap) {
210      // set memstore to flat into CellChunkMap
211      ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
212    } else {
213      // set to CellArrayMap as CCM is configured by default due to MSLAB usage
214      ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.ARRAY_MAP);
215    }
216    String[] keys1 = { "A", "A", "B", "C" };
217    String[] keys2 = { "A", "B", "D" };
218    String[] keys3 = { "D", "B", "B" };
219
220    long totalCellsLen1 = addRowsByKeysDataSize(memstore, keys1);
221    long cellBeforeFlushSize = cellBeforeFlushSize();
222    long cellAfterFlushSize = cellAfterFlushSize();
223    long totalHeapSize1 = MutableSegment.DEEP_OVERHEAD + 4 * cellBeforeFlushSize;
224    assertEquals(totalCellsLen1, region.getMemStoreDataSize());
225    assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
226
227    MemStoreSize mss = memstore.getFlushableSize();
228    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
229
230    assertEquals(0, memstore.getSnapshot().getCellsCount());
231    // One cell is duplicated and the compaction will remove it. All cells of same size so adjusting
232    // totalCellsLen
233    totalCellsLen1 = (totalCellsLen1 * 3) / 4;
234    totalHeapSize1 = 3 * cellAfterFlushSize + MutableSegment.DEEP_OVERHEAD
235      + (toCellChunkMap
236        ? CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
237        : CellArrayImmutableSegment.DEEP_OVERHEAD_CAM);
238    assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize());
239    assertEquals(totalHeapSize1, ((CompactingMemStore) memstore).heapSize());
240
241    long totalCellsLen2 = addRowsByKeysDataSize(memstore, keys2);
242    long totalHeapSize2 = 3 * cellBeforeFlushSize;
243
244    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
245    assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
246
247    ((MyCompactingMemStore) memstore).disableCompaction();
248    mss = memstore.getFlushableSize();
249    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction
250    totalHeapSize2 = totalHeapSize2 + CSLMImmutableSegment.DEEP_OVERHEAD_CSLM;
251    assertEquals(0, memstore.getSnapshot().getCellsCount());
252    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
253    assertEquals(totalHeapSize1 + totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
254
255    long totalCellsLen3 = addRowsByKeysDataSize(memstore, keys3);
256    long totalHeapSize3 = 3 * cellBeforeFlushSize;
257    assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
258      regionServicesForStores.getMemStoreSize());
259    assertEquals(totalHeapSize1 + totalHeapSize2 + totalHeapSize3,
260      ((CompactingMemStore) memstore).heapSize());
261
262    ((MyCompactingMemStore) memstore).enableCompaction();
263    mss = memstore.getFlushableSize();
264    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
265    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
266      Threads.sleep(10);
267    }
268    assertEquals(0, memstore.getSnapshot().getCellsCount());
269    // active flushed to pipeline and all 3 segments compacted. Will get rid of duplicated cells.
270    // Out of total 10, only 4 cells are unique
271    totalCellsLen2 = totalCellsLen2 / 3;// 2 out of 3 cells are duplicated
272    totalCellsLen3 = 0;// All duplicated cells.
273    assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
274      regionServicesForStores.getMemStoreSize());
275    // Only 4 unique cells left
276    long totalHeapSize4 = 4 * cellAfterFlushSize + MutableSegment.DEEP_OVERHEAD
277      + (toCellChunkMap
278        ? CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
279        : CellArrayImmutableSegment.DEEP_OVERHEAD_CAM);
280    assertEquals(totalHeapSize4, ((CompactingMemStore) memstore).heapSize());
281
282    mss = memstore.getFlushableSize();
283    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
284    // simulate flusher
285    region.decrMemStoreSize(mss);
286    ImmutableSegment s = memstore.getSnapshot();
287    assertEquals(4, s.getCellsCount());
288    assertEquals(0, regionServicesForStores.getMemStoreSize());
289
290    memstore.clearSnapshot(snapshot.getId());
291  }
292
293  //////////////////////////////////////////////////////////////////////////////
294  // Merging tests
295  //////////////////////////////////////////////////////////////////////////////
296  @Test
297  public void testMerging() throws IOException {
298    if (toCellChunkMap) {
299      // set memstore to flat into CellChunkMap
300      ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
301    }
302    String[] keys1 = { "A", "A", "B", "C", "F", "H" };
303    String[] keys2 = { "A", "B", "D", "G", "I", "J" };
304    String[] keys3 = { "D", "B", "B", "E" };
305
306    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
307    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
308      String.valueOf(compactionType));
309    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
310    addRowsByKeysDataSize(memstore, keys1);
311
312    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline should not compact
313
314    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
315      Threads.sleep(10);
316    }
317    assertEquals(0, memstore.getSnapshot().getCellsCount());
318
319    addRowsByKeysDataSize(memstore, keys2); // also should only flatten
320
321    int counter2 = 0;
322    for (Segment s : memstore.getSegments()) {
323      counter2 += s.getCellsCount();
324    }
325    assertEquals(12, counter2);
326
327    ((MyCompactingMemStore) memstore).disableCompaction();
328
329    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without flattening
330    assertEquals(0, memstore.getSnapshot().getCellsCount());
331
332    int counter3 = 0;
333    for (Segment s : memstore.getSegments()) {
334      counter3 += s.getCellsCount();
335    }
336    assertEquals(12, counter3);
337
338    addRowsByKeysDataSize(memstore, keys3);
339
340    int counter4 = 0;
341    for (Segment s : memstore.getSegments()) {
342      counter4 += s.getCellsCount();
343    }
344    assertEquals(16, counter4);
345
346    ((MyCompactingMemStore) memstore).enableCompaction();
347
348    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
349    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
350      Threads.sleep(10);
351    }
352    assertEquals(0, memstore.getSnapshot().getCellsCount());
353
354    int counter = 0;
355    for (Segment s : memstore.getSegments()) {
356      counter += s.getCellsCount();
357    }
358    assertEquals(16, counter);
359
360    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
361    ImmutableSegment s = memstore.getSnapshot();
362    memstore.clearSnapshot(snapshot.getId());
363  }
364
365  @Test
366  public void testTimeRangeAfterCompaction() throws IOException {
367    if (toCellChunkMap) {
368      // set memstore to flat into CellChunkMap
369      ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
370    }
371    testTimeRange(true);
372  }
373
374  @Test
375  public void testTimeRangeAfterMerge() throws IOException {
376    if (toCellChunkMap) {
377      // set memstore to flat into CellChunkMap
378      ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
379    }
380    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
381    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
382      String.valueOf(compactionType));
383    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
384    testTimeRange(false);
385  }
386
387  private void testTimeRange(boolean isCompaction) throws IOException {
388    final long initTs = 100;
389    long currentTs = initTs;
390    byte[] row = Bytes.toBytes("row");
391    byte[] family = Bytes.toBytes("family");
392    byte[] qf1 = Bytes.toBytes("qf1");
393
394    // first segment in pipeline
395    this.memstore.add(new KeyValue(row, family, qf1, ++currentTs, (byte[]) null), null);
396    long minTs = currentTs;
397    this.memstore.add(new KeyValue(row, family, qf1, ++currentTs, (byte[]) null), null);
398
399    long numberOfCell = 2;
400    assertEquals(numberOfCell,
401      memstore.getSegments().stream().mapToInt(Segment::getCellsCount).sum());
402    assertEquals(minTs, memstore.getSegments().stream()
403      .mapToLong(m -> m.getTimeRangeTracker().getMin()).min().getAsLong());
404    assertEquals(currentTs, memstore.getSegments().stream()
405      .mapToLong(m -> m.getTimeRangeTracker().getMax()).max().getAsLong());
406
407    ((CompactingMemStore) memstore).flushInMemory();
408
409    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
410      Threads.sleep(10);
411    }
412    if (isCompaction) {
413      // max version = 1, so one cell will be dropped.
414      numberOfCell = 1;
415      minTs = currentTs;
416    }
417    // second segment in pipeline
418    this.memstore.add(new KeyValue(row, family, qf1, ++currentTs, (byte[]) null), null);
419    this.memstore.add(new KeyValue(row, family, qf1, ++currentTs, (byte[]) null), null);
420    numberOfCell += 2;
421    assertEquals(numberOfCell,
422      memstore.getSegments().stream().mapToInt(Segment::getCellsCount).sum());
423    assertEquals(minTs, memstore.getSegments().stream()
424      .mapToLong(m -> m.getTimeRangeTracker().getMin()).min().getAsLong());
425    assertEquals(currentTs, memstore.getSegments().stream()
426      .mapToLong(m -> m.getTimeRangeTracker().getMax()).max().getAsLong());
427
428    ((CompactingMemStore) memstore).flushInMemory(); // trigger the merge
429
430    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
431      Threads.sleep(10);
432    }
433    if (isCompaction) {
434      // max version = 1, so one cell will be dropped.
435      numberOfCell = 1;
436      minTs = currentTs;
437    }
438
439    assertEquals(numberOfCell,
440      memstore.getSegments().stream().mapToInt(Segment::getCellsCount).sum());
441    assertEquals(minTs, memstore.getSegments().stream()
442      .mapToLong(m -> m.getTimeRangeTracker().getMin()).min().getAsLong());
443    assertEquals(currentTs, memstore.getSegments().stream()
444      .mapToLong(m -> m.getTimeRangeTracker().getMax()).max().getAsLong());
445  }
446
447  @Test
448  public void testCountOfCellsAfterFlatteningByScan() throws IOException {
449    String[] keys1 = { "A", "B", "C" }; // A, B, C
450    addRowsByKeysWith50Cols(memstore, keys1);
451    // this should only flatten as there are no duplicates
452    ((CompactingMemStore) memstore).flushInMemory();
453    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
454      Threads.sleep(10);
455    }
456    List<KeyValueScanner> scanners = memstore.getScanners(Long.MAX_VALUE);
457    // seek
458    int count = 0;
459    for (int i = 0; i < scanners.size(); i++) {
460      scanners.get(i).seek(KeyValue.LOWESTKEY);
461      while (scanners.get(i).next() != null) {
462        count++;
463      }
464    }
465    assertEquals("the count should be ", 150, count);
466    for (int i = 0; i < scanners.size(); i++) {
467      scanners.get(i).close();
468    }
469  }
470
471  @Test
472  public void testCountOfCellsAfterFlatteningByIterator() throws IOException {
473    String[] keys1 = { "A", "B", "C" }; // A, B, C
474    addRowsByKeysWith50Cols(memstore, keys1);
475    // this should only flatten as there are no duplicates
476    ((CompactingMemStore) memstore).flushInMemory();
477    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
478      Threads.sleep(10);
479    }
480    // Just doing the cnt operation here
481    MemStoreSegmentsIterator itr = new MemStoreMergerSegmentsIterator(
482      ((CompactingMemStore) memstore).getImmutableSegments().getStoreSegments(),
483      CellComparatorImpl.COMPARATOR, 10);
484    int cnt = 0;
485    try {
486      while (itr.next() != null) {
487        cnt++;
488      }
489    } finally {
490      itr.close();
491    }
492    assertEquals("the count should be ", 150, cnt);
493  }
494
495  private void addRowsByKeysWith50Cols(AbstractMemStore hmc, String[] keys) {
496    byte[] fam = Bytes.toBytes("testfamily");
497    for (int i = 0; i < keys.length; i++) {
498      long timestamp = EnvironmentEdgeManager.currentTime();
499      Threads.sleep(1); // to make sure each kv gets a different ts
500      byte[] row = Bytes.toBytes(keys[i]);
501      for (int j = 0; j < 50; j++) {
502        byte[] qf = Bytes.toBytes("testqualifier" + j);
503        byte[] val = Bytes.toBytes(keys[i] + j);
504        KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
505        hmc.add(kv, null);
506      }
507    }
508  }
509
510  @Override
511  @Test
512  public void testPuttingBackChunksWithOpeningScanner() throws IOException {
513    byte[] row = Bytes.toBytes("testrow");
514    byte[] fam = Bytes.toBytes("testfamily");
515    byte[] qf1 = Bytes.toBytes("testqualifier1");
516    byte[] qf2 = Bytes.toBytes("testqualifier2");
517    byte[] qf3 = Bytes.toBytes("testqualifier3");
518    byte[] qf4 = Bytes.toBytes("testqualifier4");
519    byte[] qf5 = Bytes.toBytes("testqualifier5");
520    byte[] qf6 = Bytes.toBytes("testqualifier6");
521    byte[] qf7 = Bytes.toBytes("testqualifier7");
522    byte[] val = Bytes.toBytes("testval");
523
524    // Setting up memstore
525    memstore.add(new KeyValue(row, fam, qf1, val), null);
526    memstore.add(new KeyValue(row, fam, qf2, val), null);
527    memstore.add(new KeyValue(row, fam, qf3, val), null);
528
529    // Creating a snapshot
530    MemStoreSnapshot snapshot = memstore.snapshot();
531    assertEquals(3, memstore.getSnapshot().getCellsCount());
532
533    // Adding value to "new" memstore
534    assertEquals(0, memstore.getActive().getCellsCount());
535    memstore.add(new KeyValue(row, fam, qf4, val), null);
536    memstore.add(new KeyValue(row, fam, qf5, val), null);
537    assertEquals(2, memstore.getActive().getCellsCount());
538
539    // opening scanner before clear the snapshot
540    List<KeyValueScanner> scanners = memstore.getScanners(0);
541    // Shouldn't putting back the chunks to pool,since some scanners are opening
542    // based on their data
543    // close the scanners
544    for (KeyValueScanner scanner : snapshot.getScanners()) {
545      scanner.close();
546    }
547    memstore.clearSnapshot(snapshot.getId());
548
549    assertTrue(chunkCreator.getPoolSize() == 0);
550
551    // Chunks will be put back to pool after close scanners;
552    for (KeyValueScanner scanner : scanners) {
553      scanner.close();
554    }
555    assertTrue(chunkCreator.getPoolSize() > 0);
556
557    // clear chunks
558    chunkCreator.clearChunksInPool();
559
560    // Creating another snapshot
561
562    snapshot = memstore.snapshot();
563    // Adding more value
564    memstore.add(new KeyValue(row, fam, qf6, val), null);
565    memstore.add(new KeyValue(row, fam, qf7, val), null);
566    // opening scanners
567    scanners = memstore.getScanners(0);
568    // close scanners before clear the snapshot
569    for (KeyValueScanner scanner : scanners) {
570      scanner.close();
571    }
572    // Since no opening scanner, the chunks of snapshot should be put back to
573    // pool
574    // close the scanners
575    for (KeyValueScanner scanner : snapshot.getScanners()) {
576      scanner.close();
577    }
578    memstore.clearSnapshot(snapshot.getId());
579    assertTrue(chunkCreator.getPoolSize() > 0);
580  }
581
582  @Override
583  @Test
584  public void testPuttingBackChunksAfterFlushing() throws IOException {
585    byte[] row = Bytes.toBytes("testrow");
586    byte[] fam = Bytes.toBytes("testfamily");
587    byte[] qf1 = Bytes.toBytes("testqualifier1");
588    byte[] qf2 = Bytes.toBytes("testqualifier2");
589    byte[] qf3 = Bytes.toBytes("testqualifier3");
590    byte[] qf4 = Bytes.toBytes("testqualifier4");
591    byte[] qf5 = Bytes.toBytes("testqualifier5");
592    byte[] val = Bytes.toBytes("testval");
593
594    // Setting up memstore
595    memstore.add(new KeyValue(row, fam, qf1, val), null);
596    memstore.add(new KeyValue(row, fam, qf2, val), null);
597    memstore.add(new KeyValue(row, fam, qf3, val), null);
598
599    // Creating a snapshot
600    MemStoreSnapshot snapshot = memstore.snapshot();
601    assertEquals(3, memstore.getSnapshot().getCellsCount());
602
603    // Adding value to "new" memstore
604    assertEquals(0, memstore.getActive().getCellsCount());
605    memstore.add(new KeyValue(row, fam, qf4, val), null);
606    memstore.add(new KeyValue(row, fam, qf5, val), null);
607    assertEquals(2, memstore.getActive().getCellsCount());
608    // close the scanners
609    for (KeyValueScanner scanner : snapshot.getScanners()) {
610      scanner.close();
611    }
612    memstore.clearSnapshot(snapshot.getId());
613
614    int chunkCount = chunkCreator.getPoolSize();
615    assertTrue(chunkCount > 0);
616  }
617
618  @Test
619  public void testFlatteningToCellChunkMap() throws IOException {
620
621    // set memstore to flat into CellChunkMap
622    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
623    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
624      String.valueOf(compactionType));
625    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
626    ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
627    int numOfCells = 8;
628    String[] keys1 = { "A", "A", "B", "C", "D", "D", "E", "F" }; // A1, A2, B3, C4, D5, D6, E7, F8
629
630    // make one cell
631    byte[] row = Bytes.toBytes(keys1[0]);
632    byte[] val = Bytes.toBytes(keys1[0] + 0);
633    KeyValue kv = new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"),
634      EnvironmentEdgeManager.currentTime(), val);
635
636    // test 1 bucket
637    int totalCellsLen = addRowsByKeys(memstore, keys1);
638    long oneCellOnCSLMHeapSize = ClassSize.align(
639      ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + KeyValue.FIXED_OVERHEAD + kv.getSerializedSize());
640
641    long totalHeapSize = numOfCells * oneCellOnCSLMHeapSize + MutableSegment.DEEP_OVERHEAD;
642    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
643    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
644
645    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and flatten
646    assertEquals(0, memstore.getSnapshot().getCellsCount());
647    long oneCellOnCCMHeapSize =
648      ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(kv.getSerializedSize());
649    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
650      + numOfCells * oneCellOnCCMHeapSize;
651
652    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
653    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
654
655    MemStoreSize mss = memstore.getFlushableSize();
656    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
657    // simulate flusher
658    region.decrMemStoreSize(mss);
659    ImmutableSegment s = memstore.getSnapshot();
660    assertEquals(numOfCells, s.getCellsCount());
661    assertEquals(0, regionServicesForStores.getMemStoreSize());
662
663    memstore.clearSnapshot(snapshot.getId());
664  }
665
666  /**
667   * CellChunkMap Segment index requires all cell data to be written in the MSLAB Chunks. Even
668   * though MSLAB is enabled, cells bigger than maxAlloc (even if smaller than the size of a chunk)
669   * are not written in the MSLAB Chunks. If such cells are found in the process of flattening into
670   * CellChunkMap (in-memory-flush) they need to be copied into MSLAB.
671   * testFlatteningToBigCellChunkMap checks that the process of flattening into CellChunkMap
672   * succeeds, even when such big cells are allocated.
673   */
674  @Test
675  public void testFlatteningToBigCellChunkMap() throws IOException {
676
677    if (toCellChunkMap == false) {
678      return;
679    }
680    // set memstore to flat into CellChunkMap
681    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
682    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
683      String.valueOf(compactionType));
684    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
685    ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
686    int numOfCells = 4;
687    char[] chars = new char[MemStoreLAB.MAX_ALLOC_DEFAULT];
688    for (int i = 0; i < chars.length; i++) {
689      chars[i] = 'A';
690    }
691    String bigVal = new String(chars);
692    String[] keys1 = { "A", "B", "C", "D" };
693
694    // make one cell
695    byte[] row = Bytes.toBytes(keys1[0]);
696    byte[] val = Bytes.toBytes(bigVal);
697    KeyValue kv = new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"),
698      EnvironmentEdgeManager.currentTime(), val);
699
700    // test 1 bucket
701    int totalCellsLen = addRowsByKeys(memstore, keys1, val);
702
703    long oneCellOnCSLMHeapSize =
704      ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize());
705
706    long totalHeapSize = numOfCells * oneCellOnCSLMHeapSize + MutableSegment.DEEP_OVERHEAD;
707    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
708    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
709
710    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and flatten
711    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
712      Threads.sleep(10);
713    }
714    assertEquals(0, memstore.getSnapshot().getCellsCount());
715    // One cell is duplicated, but it shouldn't be compacted because we are in BASIC mode.
716    // totalCellsLen should remain the same
717    long oneCellOnCCMHeapSize =
718      ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(kv.getSerializedSize());
719    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
720      + numOfCells * oneCellOnCCMHeapSize;
721
722    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
723    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
724
725    MemStoreSize mss = memstore.getFlushableSize();
726    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
727    // simulate flusher
728    region.decrMemStoreSize(mss);
729    ImmutableSegment s = memstore.getSnapshot();
730    assertEquals(numOfCells, s.getCellsCount());
731    assertEquals(0, regionServicesForStores.getMemStoreSize());
732
733    memstore.clearSnapshot(snapshot.getId());
734  }
735
736  /**
737   * CellChunkMap Segment index requires all cell data to be written in the MSLAB Chunks. Even
738   * though MSLAB is enabled, cells bigger than the size of a chunk are not written in the MSLAB
739   * Chunks. If such cells are found in the process of flattening into CellChunkMap
740   * (in-memory-flush) they need to be copied into MSLAB. testFlatteningToJumboCellChunkMap checks
741   * that the process of flattening into CellChunkMap succeeds, even when such big cells are
742   * allocated.
743   */
744  @Test
745  public void testFlatteningToJumboCellChunkMap() throws IOException {
746
747    if (toCellChunkMap == false) {
748      return;
749    }
750    // set memstore to flat into CellChunkMap
751    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
752    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
753      String.valueOf(compactionType));
754    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
755    ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
756
757    int numOfCells = 1;
758    char[] chars = new char[MemStoreLAB.CHUNK_SIZE_DEFAULT];
759    for (int i = 0; i < chars.length; i++) {
760      chars[i] = 'A';
761    }
762    String bigVal = new String(chars);
763    String[] keys1 = { "A" };
764
765    // make one cell
766    byte[] row = Bytes.toBytes(keys1[0]);
767    byte[] val = Bytes.toBytes(bigVal);
768    KeyValue kv = new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"),
769      EnvironmentEdgeManager.currentTime(), val);
770
771    // test 1 bucket
772    int totalCellsLen = addRowsByKeys(memstore, keys1, val);
773
774    long oneCellOnCSLMHeapSize =
775      ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize());
776
777    long totalHeapSize = numOfCells * oneCellOnCSLMHeapSize + MutableSegment.DEEP_OVERHEAD;
778    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
779    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
780
781    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and flatten
782    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
783      Threads.sleep(10);
784    }
785    assertEquals(0, memstore.getSnapshot().getCellsCount());
786
787    // One cell is duplicated, but it shouldn't be compacted because we are in BASIC mode.
788    // totalCellsLen should remain the same
789    long oneCellOnCCMHeapSize =
790      (long) ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(kv.getSerializedSize());
791    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
792      + numOfCells * oneCellOnCCMHeapSize;
793
794    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
795
796    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
797
798    MemStoreSize mss = memstore.getFlushableSize();
799    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
800    // simulate flusher
801    region.decrMemStoreSize(mss);
802    ImmutableSegment s = memstore.getSnapshot();
803    assertEquals(numOfCells, s.getCellsCount());
804    assertEquals(0, regionServicesForStores.getMemStoreSize());
805
806    memstore.clearSnapshot(snapshot.getId());
807
808    // Allocating two big cells (too big for being copied into a regular chunk).
809    String[] keys2 = { "C", "D" };
810    addRowsByKeys(memstore, keys2, val);
811    while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
812      Threads.sleep(10);
813    }
814
815    // The in-memory flush size is bigger than the size of a single cell,
816    // but smaller than the size of two cells.
817    // Therefore, the two created cells are flushed together as a single CSLMImmutableSegment and
818    // flattened.
819    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
820      + 2 * oneCellOnCCMHeapSize;
821    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
822  }
823
824  /**
825   * CellChunkMap Segment index requires all cell data to be written in the MSLAB Chunks. Even
826   * though MSLAB is enabled, cells bigger than the size of a chunk are not written in the MSLAB
827   * Chunks. If such cells are found in the process of a merge they need to be copied into MSLAB.
828   * testForceCopyOfBigCellIntoImmutableSegment checks that the ImmutableMemStoreLAB's
829   * forceCopyOfBigCellInto does what it's supposed to do.
830   */
831  @org.junit.Ignore
832  @Test // Flakey. Disabled by HBASE-24128. HBASE-24129 is for reenable.
833  // TestCompactingToCellFlatMapMemStore.testForceCopyOfBigCellIntoImmutableSegment:902 i=1
834  // expected:<8389924> but was:<8389992>
835  public void testForceCopyOfBigCellIntoImmutableSegment() throws IOException {
836
837    if (toCellChunkMap == false) {
838      return;
839    }
840
841    // set memstore to flat into CellChunkMap
842    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
843    memstore.getConfiguration().setInt(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY,
844      4);
845    memstore.getConfiguration().setDouble(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY,
846      0.014);
847    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
848      String.valueOf(compactionType));
849    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
850    ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.CHUNK_MAP);
851
852    char[] chars = new char[MemStoreLAB.CHUNK_SIZE_DEFAULT];
853    for (int i = 0; i < chars.length; i++) {
854      chars[i] = 'A';
855    }
856    String bigVal = new String(chars);
857    byte[] val = Bytes.toBytes(bigVal);
858
859    // We need to add two cells, three times, in order to guarantee a merge
860    List<String[]> keysList = new ArrayList<>();
861    keysList.add(new String[] { "A", "B" });
862    keysList.add(new String[] { "C", "D" });
863    keysList.add(new String[] { "E", "F" });
864    keysList.add(new String[] { "G", "H" });
865
866    // Measuring the size of a single kv
867    KeyValue kv = new KeyValue(Bytes.toBytes("A"), Bytes.toBytes("testfamily"),
868      Bytes.toBytes("testqualifier"), EnvironmentEdgeManager.currentTime(), val);
869    long oneCellOnCCMHeapSize =
870      (long) ClassSize.CELL_CHUNK_MAP_ENTRY + ClassSize.align(kv.getSerializedSize());
871    long oneCellOnCSLMHeapSize =
872      ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + kv.heapSize());
873    long totalHeapSize = MutableSegment.DEEP_OVERHEAD;
874    for (int i = 0; i < keysList.size(); i++) {
875      addRowsByKeys(memstore, keysList.get(i), val);
876      while (((CompactingMemStore) memstore).isMemStoreFlushingInMemory()) {
877        Threads.sleep(10);
878      }
879
880      if (i == 0) {
881        totalHeapSize += CellChunkImmutableSegment.DEEP_OVERHEAD_CCM + oneCellOnCCMHeapSize
882          + oneCellOnCSLMHeapSize;
883      } else {
884        // The in-memory flush size is bigger than the size of a single cell,
885        // but smaller than the size of two cells.
886        // Therefore, the two created cells are flattened in a seperate segment.
887        totalHeapSize += 2 * (CellChunkImmutableSegment.DEEP_OVERHEAD_CCM + oneCellOnCCMHeapSize);
888      }
889      if (i == 2) {
890        // Four out of the five segments are merged into one
891        totalHeapSize -= (4 * CellChunkImmutableSegment.DEEP_OVERHEAD_CCM);
892        totalHeapSize = ClassSize.align(totalHeapSize);
893      }
894      assertEquals("i=" + i, totalHeapSize, ((CompactingMemStore) memstore).heapSize());
895    }
896  }
897
898  /**
899   * Test big cell size after in memory compaction. (HBASE-26467)
900   */
901  @Test
902  public void testBigCellSizeAfterInMemoryCompaction() throws IOException {
903    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
904    memstore.getConfiguration().setInt(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY,
905      1);
906    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
907      String.valueOf(compactionType));
908    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
909
910    byte[] val = new byte[MemStoreLAB.CHUNK_SIZE_DEFAULT];
911
912    long size = addRowsByKeys(memstore, new String[] { "A" }, val);
913    ((MyCompactingMemStore) memstore).flushInMemory();
914
915    for (KeyValueScanner scanner : memstore.getScanners(Long.MAX_VALUE)) {
916      Cell cell;
917      while ((cell = scanner.next()) != null) {
918        assertEquals(size, cell.getSerializedSize());
919      }
920    }
921  }
922
923  @Override
924  @Test
925  public void testScan() throws IOException {
926    scanMemStore(memstore, 915);
927  }
928
929  private long addRowsByKeysDataSize(final AbstractMemStore hmc, String[] keys) {
930    byte[] fam = Bytes.toBytes("testfamily");
931    byte[] qf = Bytes.toBytes("testqualifier");
932    MemStoreSizing memstoreSizing = new NonThreadSafeMemStoreSizing();
933    for (int i = 0; i < keys.length; i++) {
934      long timestamp = EnvironmentEdgeManager.currentTime();
935      Threads.sleep(1); // to make sure each kv gets a different ts
936      byte[] row = Bytes.toBytes(keys[i]);
937      byte[] val = Bytes.toBytes(keys[i] + i);
938      KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
939      hmc.add(kv, memstoreSizing);
940      LOG.debug("added kv: " + kv.getKeyString() + ", timestamp" + kv.getTimestamp());
941    }
942    MemStoreSize mss = memstoreSizing.getMemStoreSize();
943    regionServicesForStores.addMemStoreSize(mss.getDataSize(), mss.getHeapSize(),
944      mss.getOffHeapSize(), mss.getCellsCount());
945    return mss.getDataSize();
946  }
947
948  private long cellBeforeFlushSize() {
949    // make one cell
950    byte[] row = Bytes.toBytes("A");
951    byte[] val = Bytes.toBytes("A" + 0);
952    KeyValue kv = new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"),
953      EnvironmentEdgeManager.currentTime(), val);
954    return ClassSize.align(
955      ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + KeyValue.FIXED_OVERHEAD + kv.getSerializedSize());
956  }
957
958  private long cellAfterFlushSize() {
959    // make one cell
960    byte[] row = Bytes.toBytes("A");
961    byte[] val = Bytes.toBytes("A" + 0);
962    KeyValue kv = new KeyValue(row, Bytes.toBytes("testfamily"), Bytes.toBytes("testqualifier"),
963      EnvironmentEdgeManager.currentTime(), val);
964
965    return toCellChunkMap
966      ? ClassSize.align(ClassSize.CELL_CHUNK_MAP_ENTRY + kv.getSerializedSize())
967      : ClassSize
968        .align(ClassSize.CELL_ARRAY_MAP_ENTRY + KeyValue.FIXED_OVERHEAD + kv.getSerializedSize());
969  }
970}