001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertNotEquals;
022import static org.junit.jupiter.api.Assertions.assertNotNull;
023import static org.junit.jupiter.api.Assertions.assertNull;
024import static org.junit.jupiter.api.Assertions.assertTrue;
025
026import java.io.IOException;
027import java.lang.management.ManagementFactory;
028import java.util.ArrayList;
029import java.util.List;
030import java.util.concurrent.Executors;
031import java.util.concurrent.ThreadPoolExecutor;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.CellComparator;
035import org.apache.hadoop.hbase.CellUtil;
036import org.apache.hadoop.hbase.ExtendedCell;
037import org.apache.hadoop.hbase.HBaseConfiguration;
038import org.apache.hadoop.hbase.HBaseTestingUtil;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.KeepDeletedCells;
041import org.apache.hadoop.hbase.KeyValue;
042import org.apache.hadoop.hbase.KeyValueTestUtil;
043import org.apache.hadoop.hbase.MemoryCompactionPolicy;
044import org.apache.hadoop.hbase.TableName;
045import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
047import org.apache.hadoop.hbase.client.RegionInfo;
048import org.apache.hadoop.hbase.client.RegionInfoBuilder;
049import org.apache.hadoop.hbase.client.Scan;
050import org.apache.hadoop.hbase.client.TableDescriptor;
051import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
052import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
053import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
054import org.apache.hadoop.hbase.testclassification.MediumTests;
055import org.apache.hadoop.hbase.testclassification.RegionServerTests;
056import org.apache.hadoop.hbase.util.Bytes;
057import org.apache.hadoop.hbase.util.EnvironmentEdge;
058import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
059import org.apache.hadoop.hbase.util.Threads;
060import org.apache.hadoop.hbase.wal.WAL;
061import org.junit.jupiter.api.Tag;
062import org.junit.jupiter.api.Test;
063import org.mockito.Mockito;
064import org.slf4j.Logger;
065import org.slf4j.LoggerFactory;
066
067/**
068 * compacted memstore test case
069 */
070@Tag(RegionServerTests.TAG)
071@Tag(MediumTests.TAG)
072public class TestCompactingMemStore extends TestDefaultMemStore {
073
074  private static final Logger LOG = LoggerFactory.getLogger(TestCompactingMemStore.class);
075  protected static ChunkCreator chunkCreator;
076  protected HRegion region;
077  protected RegionServicesForStores regionServicesForStores;
078  protected HStore store;
079  private Configuration conf;
080
081  @Override
082  protected void internalTearDown() throws Exception {
083    chunkCreator.clearChunksInPool();
084  }
085
086  @Override
087  protected void internalSetUp() throws Exception {
088    super.internalSetUp();
089    conf = new Configuration();
090    conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, true);
091    conf.setFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.2f);
092    conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 1000);
093    HBaseTestingUtil hbaseUtility = new HBaseTestingUtil(conf);
094    ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder.of(FAMILY);
095    TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TableName.valueOf("foobar"))
096      .setColumnFamily(familyDescriptor).build();
097    RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf("foobar")).build();
098    WAL wal = HBaseTestingUtil.createWal(conf, hbaseUtility.getDataTestDir(), info);
099    this.region =
100      HRegion.createHRegion(info, hbaseUtility.getDataTestDir(), conf, tableDescriptor, wal, true);
101    this.regionServicesForStores = Mockito.spy(region.getRegionServicesForStores());
102    ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
103    Mockito.when(regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool);
104    this.store = new HStore(region, familyDescriptor, conf, false);
105  }
106
107  @Override
108  protected void createChunkCreator() {
109    long globalMemStoreLimit =
110      (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax()
111        * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false));
112    chunkCreator = ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false,
113      globalMemStoreLimit, 0.4f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null,
114      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
115    assertNotNull(chunkCreator);
116  }
117
118  @Override
119  protected void createMemStore() throws IOException {
120    this.memstore = new MyCompactingMemStore(HBaseConfiguration.create(),
121      CellComparator.getInstance(), store, regionServicesForStores, MemoryCompactionPolicy.EAGER);
122    ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.ARRAY_MAP);
123  }
124
125  /**
126   * A simple test which flush in memory affect timeOfOldestEdit
127   */
128  @Test
129  public void testTimeOfOldestEdit() {
130    assertEquals(Long.MAX_VALUE, memstore.timeOfOldestEdit());
131    final byte[] r = Bytes.toBytes("r");
132    final byte[] f = Bytes.toBytes("f");
133    final byte[] q = Bytes.toBytes("q");
134    final byte[] v = Bytes.toBytes("v");
135    final KeyValue kv = new KeyValue(r, f, q, v);
136    memstore.add(kv, null);
137    long timeOfOldestEdit = memstore.timeOfOldestEdit();
138    assertNotEquals(Long.MAX_VALUE, timeOfOldestEdit);
139
140    ((CompactingMemStore) memstore).flushInMemory();
141    assertEquals(timeOfOldestEdit, memstore.timeOfOldestEdit());
142    memstore.add(kv, null);
143    assertEquals(timeOfOldestEdit, memstore.timeOfOldestEdit());
144    memstore.snapshot();
145    assertEquals(Long.MAX_VALUE, memstore.timeOfOldestEdit());
146  }
147
148  /**
149   * A simple test which verifies the 3 possible states when scanning across snapshot.
150   */
151  @Override
152  @Test
153  public void testScanAcrossSnapshot2() throws IOException, CloneNotSupportedException {
154    // we are going to the scanning across snapshot with two kvs
155    // kv1 should always be returned before kv2
156    final byte[] one = Bytes.toBytes(1);
157    final byte[] two = Bytes.toBytes(2);
158    final byte[] f = Bytes.toBytes("f");
159    final byte[] q = Bytes.toBytes("q");
160    final byte[] v = Bytes.toBytes(3);
161
162    final KeyValue kv1 = new KeyValue(one, f, q, 10, v);
163    final KeyValue kv2 = new KeyValue(two, f, q, 10, v);
164
165    // use case 1: both kvs in kvset
166    this.memstore.add(kv1.clone(), null);
167    this.memstore.add(kv2.clone(), null);
168    // snapshot is empty,active segment is not empty,
169    // empty segment is skipped.
170    verifyOneScanAcrossSnapshot2(kv1, kv2);
171
172    // use case 2: both kvs in snapshot
173    this.memstore.snapshot();
174    // active segment is empty,snapshot is not empty,
175    // empty segment is skipped.
176    verifyOneScanAcrossSnapshot2(kv1, kv2);
177
178    // use case 3: first in snapshot second in kvset
179    this.memstore = new CompactingMemStore(HBaseConfiguration.create(),
180      CellComparator.getInstance(), store, regionServicesForStores, MemoryCompactionPolicy.EAGER);
181    this.memstore.add(kv1.clone(), null);
182    // As compaction is starting in the background the repetition
183    // of the k1 might be removed BUT the scanners created earlier
184    // should look on the OLD MutableCellSetSegment, so this should be OK...
185    this.memstore.snapshot();
186    this.memstore.add(kv2.clone(), null);
187    verifyScanAcrossSnapshot2(kv1, kv2);
188  }
189
190  /**
191   * Test memstore snapshots
192   */
193  @Override
194  @Test
195  public void testSnapshotting() throws IOException {
196    final int snapshotCount = 5;
197    // Add some rows, run a snapshot. Do it a few times.
198    for (int i = 0; i < snapshotCount; i++) {
199      addRows(this.memstore);
200      runSnapshot(this.memstore, true);
201      assertEquals(0, this.memstore.getSnapshot().getCellsCount(), "History not being cleared");
202    }
203  }
204
205  //////////////////////////////////////////////////////////////////////////////
206  // Get tests
207  //////////////////////////////////////////////////////////////////////////////
208
209  /**
210   * Test getNextRow from memstore
211   */
212  @Override
213  @Test
214  public void testGetNextRow() throws Exception {
215    addRows(this.memstore);
216    // Add more versions to make it a little more interesting.
217    Thread.sleep(1);
218    addRows(this.memstore);
219    Cell closestToEmpty = ((CompactingMemStore) this.memstore).getNextRow(KeyValue.LOWESTKEY);
220    assertEquals(0, CellComparator.getInstance().compareRows(closestToEmpty,
221      new KeyValue(Bytes.toBytes(0), EnvironmentEdgeManager.currentTime())));
222    for (int i = 0; i < ROW_COUNT; i++) {
223      Cell nr = ((CompactingMemStore) this.memstore)
224        .getNextRow(new KeyValue(Bytes.toBytes(i), EnvironmentEdgeManager.currentTime()));
225      if (i + 1 == ROW_COUNT) {
226        assertNull(nr);
227      } else {
228        assertEquals(0, CellComparator.getInstance().compareRows(nr,
229          new KeyValue(Bytes.toBytes(i + 1), EnvironmentEdgeManager.currentTime())));
230      }
231    }
232    // starting from each row, validate results should contain the starting row
233    Configuration conf = HBaseConfiguration.create();
234    for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) {
235      ScanInfo scanInfo =
236        new ScanInfo(conf, FAMILY, 0, 1, Integer.MAX_VALUE, KeepDeletedCells.FALSE,
237          HConstants.DEFAULT_BLOCKSIZE, 0, this.memstore.getComparator(), false);
238      try (InternalScanner scanner =
239        new StoreScanner(new Scan().withStartRow(Bytes.toBytes(startRowId)), scanInfo, null,
240          memstore.getScanners(0))) {
241        List<Cell> results = new ArrayList<>();
242        for (int i = 0; scanner.next(results); i++) {
243          int rowId = startRowId + i;
244          Cell left = results.get(0);
245          byte[] row1 = Bytes.toBytes(rowId);
246          assertEquals(0, CellComparator.getInstance().compareRows(left, row1, 0, row1.length),
247            "Row name");
248          assertEquals(QUALIFIER_COUNT, results.size(), "Count of columns");
249          List<Cell> row = new ArrayList<>();
250          for (Cell kv : results) {
251            row.add(kv);
252          }
253          isExpectedRowWithoutTimestamps(rowId, row);
254          // Clear out set. Otherwise row results accumulate.
255          results.clear();
256        }
257      }
258    }
259  }
260
261  @Override
262  @Test
263  public void testGet_memstoreAndSnapShot() throws IOException {
264    byte[] row = Bytes.toBytes("testrow");
265    byte[] fam = Bytes.toBytes("testfamily");
266    byte[] qf1 = Bytes.toBytes("testqualifier1");
267    byte[] qf2 = Bytes.toBytes("testqualifier2");
268    byte[] qf3 = Bytes.toBytes("testqualifier3");
269    byte[] qf4 = Bytes.toBytes("testqualifier4");
270    byte[] qf5 = Bytes.toBytes("testqualifier5");
271    byte[] val = Bytes.toBytes("testval");
272
273    // Setting up memstore
274    memstore.add(new KeyValue(row, fam, qf1, val), null);
275    memstore.add(new KeyValue(row, fam, qf2, val), null);
276    memstore.add(new KeyValue(row, fam, qf3, val), null);
277    // Pushing to pipeline
278    ((CompactingMemStore) memstore).flushInMemory();
279    assertEquals(0, memstore.getSnapshot().getCellsCount());
280    // Creating a snapshot
281    memstore.snapshot();
282    assertEquals(3, memstore.getSnapshot().getCellsCount());
283    // Adding value to "new" memstore
284    assertEquals(0, memstore.getActive().getCellsCount());
285    memstore.add(new KeyValue(row, fam, qf4, val), null);
286    memstore.add(new KeyValue(row, fam, qf5, val), null);
287    assertEquals(2, memstore.getActive().getCellsCount());
288  }
289
290  ////////////////////////////////////
291  // Test for periodic memstore flushes
292  // based on time of oldest edit
293  ////////////////////////////////////
294
295  /**
296   * Add keyvalues with a fixed memstoreTs, and checks that memstore size is decreased as older
297   * keyvalues are deleted from the memstore.
298   */
299  @Override
300  @Test
301  public void testUpsertMemstoreSize() throws Exception {
302    MemStoreSize oldSize = memstore.size();
303
304    List<ExtendedCell> l = new ArrayList<>();
305    KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v");
306    KeyValue kv2 = KeyValueTestUtil.create("r", "f", "q", 101, "v");
307    KeyValue kv3 = KeyValueTestUtil.create("r", "f", "q", 102, "v");
308
309    kv1.setSequenceId(1);
310    kv2.setSequenceId(1);
311    kv3.setSequenceId(1);
312    l.add(kv1);
313    l.add(kv2);
314    l.add(kv3);
315
316    this.memstore.upsert(l, 2, null);// readpoint is 2
317    MemStoreSize newSize = this.memstore.size();
318    assert (newSize.getDataSize() > oldSize.getDataSize());
319    // The kv1 should be removed.
320    assert (memstore.getActive().getCellsCount() == 2);
321
322    KeyValue kv4 = KeyValueTestUtil.create("r", "f", "q", 104, "v");
323    kv4.setSequenceId(1);
324    l.clear();
325    l.add(kv4);
326    this.memstore.upsert(l, 3, null);
327    assertEquals(newSize, this.memstore.size());
328    // The kv2 should be removed.
329    assert (memstore.getActive().getCellsCount() == 2);
330    // this.memstore = null;
331  }
332
333  /**
334   * Tests that the timeOfOldestEdit is updated correctly for the various edit operations in
335   * memstore.
336   */
337  @Override
338  @Test
339  public void testUpdateToTimeOfOldestEdit() throws Exception {
340    try {
341      EnvironmentEdgeForMemstoreTest edge = new EnvironmentEdgeForMemstoreTest();
342      EnvironmentEdgeManager.injectEdge(edge);
343      long t = memstore.timeOfOldestEdit();
344      assertEquals(Long.MAX_VALUE, t);
345
346      // test the case that the timeOfOldestEdit is updated after a KV add
347      memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, "v"), null);
348      t = memstore.timeOfOldestEdit();
349      assertEquals(1234, t);
350      // The method will also assert
351      // the value is reset to Long.MAX_VALUE
352      t = runSnapshot(memstore, true);
353
354      // test the case that the timeOfOldestEdit is updated after a KV delete
355      memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, KeyValue.Type.Delete, "v"), null);
356      t = memstore.timeOfOldestEdit();
357      assertEquals(1234, t);
358      t = runSnapshot(memstore, true);
359
360      // test the case that the timeOfOldestEdit is updated after a KV upsert
361      List<ExtendedCell> l = new ArrayList<>();
362      KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v");
363      kv1.setSequenceId(100);
364      l.add(kv1);
365      memstore.upsert(l, 1000, null);
366      t = memstore.timeOfOldestEdit();
367      assertEquals(1234, t);
368    } finally {
369      EnvironmentEdgeManager.reset();
370    }
371  }
372
373  private long runSnapshot(final AbstractMemStore hmc, boolean useForce) throws IOException {
374    // Save off old state.
375    long oldHistorySize = hmc.getSnapshot().getDataSize();
376    long prevTimeStamp = hmc.timeOfOldestEdit();
377
378    hmc.snapshot();
379    MemStoreSnapshot snapshot = hmc.snapshot();
380    if (useForce) {
381      // Make some assertions about what just happened.
382      assertTrue(oldHistorySize < snapshot.getDataSize(), "History size has not increased");
383      long t = hmc.timeOfOldestEdit();
384      assertEquals(Long.MAX_VALUE, t, "Time of oldest edit is not Long.MAX_VALUE");
385      hmc.clearSnapshot(snapshot.getId());
386    } else {
387      long t = hmc.timeOfOldestEdit();
388      assertEquals(t, prevTimeStamp, "Time of oldest edit didn't remain the same");
389    }
390    return prevTimeStamp;
391  }
392
393  private void isExpectedRowWithoutTimestamps(final int rowIndex, List<Cell> kvs) {
394    int i = 0;
395    for (Cell kv : kvs) {
396      byte[] expectedColname = makeQualifier(rowIndex, i++);
397      assertTrue(CellUtil.matchingQualifier(kv, expectedColname), "Column name");
398      // Value is column name as bytes. Usually result is
399      // 100 bytes in size at least. This is the default size
400      // for BytesWriteable. For comparison, convert bytes to
401      // String and trim to remove trailing null bytes.
402      assertTrue(CellUtil.matchingValue(kv, expectedColname), "Content");
403    }
404  }
405
406  @Test
407  public void testPuttingBackChunksAfterFlushing() throws IOException {
408    byte[] row = Bytes.toBytes("testrow");
409    byte[] fam = Bytes.toBytes("testfamily");
410    byte[] qf1 = Bytes.toBytes("testqualifier1");
411    byte[] qf2 = Bytes.toBytes("testqualifier2");
412    byte[] qf3 = Bytes.toBytes("testqualifier3");
413    byte[] qf4 = Bytes.toBytes("testqualifier4");
414    byte[] qf5 = Bytes.toBytes("testqualifier5");
415    byte[] val = Bytes.toBytes("testval");
416
417    // Setting up memstore
418    memstore.add(new KeyValue(row, fam, qf1, val), null);
419    memstore.add(new KeyValue(row, fam, qf2, val), null);
420    memstore.add(new KeyValue(row, fam, qf3, val), null);
421
422    // Creating a snapshot
423    MemStoreSnapshot snapshot = memstore.snapshot();
424    assertEquals(3, memstore.getSnapshot().getCellsCount());
425
426    // Adding value to "new" memstore
427    assertEquals(0, memstore.getActive().getCellsCount());
428    memstore.add(new KeyValue(row, fam, qf4, val), null);
429    memstore.add(new KeyValue(row, fam, qf5, val), null);
430    assertEquals(2, memstore.getActive().getCellsCount());
431    // close the scanners
432    for (KeyValueScanner scanner : snapshot.getScanners()) {
433      scanner.close();
434    }
435    memstore.clearSnapshot(snapshot.getId());
436
437    int chunkCount = chunkCreator.getPoolSize();
438    assertTrue(chunkCount > 0);
439
440  }
441
442  @Test
443  public void testPuttingBackChunksWithOpeningScanner() throws IOException {
444    byte[] row = Bytes.toBytes("testrow");
445    byte[] fam = Bytes.toBytes("testfamily");
446    byte[] qf1 = Bytes.toBytes("testqualifier1");
447    byte[] qf2 = Bytes.toBytes("testqualifier2");
448    byte[] qf3 = Bytes.toBytes("testqualifier3");
449    byte[] qf4 = Bytes.toBytes("testqualifier4");
450    byte[] qf5 = Bytes.toBytes("testqualifier5");
451    byte[] qf6 = Bytes.toBytes("testqualifier6");
452    byte[] qf7 = Bytes.toBytes("testqualifier7");
453    byte[] val = Bytes.toBytes("testval");
454
455    // Setting up memstore
456    memstore.add(new KeyValue(row, fam, qf1, val), null);
457    memstore.add(new KeyValue(row, fam, qf2, val), null);
458    memstore.add(new KeyValue(row, fam, qf3, val), null);
459
460    // Creating a snapshot
461    MemStoreSnapshot snapshot = memstore.snapshot();
462    assertEquals(3, memstore.getSnapshot().getCellsCount());
463
464    // Adding value to "new" memstore
465    assertEquals(0, memstore.getActive().getCellsCount());
466    memstore.add(new KeyValue(row, fam, qf4, val), null);
467    memstore.add(new KeyValue(row, fam, qf5, val), null);
468    assertEquals(2, memstore.getActive().getCellsCount());
469
470    // opening scanner before clear the snapshot
471    List<KeyValueScanner> scanners = memstore.getScanners(0);
472    // Shouldn't putting back the chunks to pool,since some scanners are opening
473    // based on their data
474    // close the scanners
475    for (KeyValueScanner scanner : snapshot.getScanners()) {
476      scanner.close();
477    }
478    memstore.clearSnapshot(snapshot.getId());
479
480    assertEquals(0, chunkCreator.getPoolSize());
481
482    // Chunks will be put back to pool after close scanners;
483    for (KeyValueScanner scanner : scanners) {
484      scanner.close();
485    }
486    assertTrue(chunkCreator.getPoolSize() > 0);
487
488    // clear chunks
489    chunkCreator.clearChunksInPool();
490
491    // Creating another snapshot
492
493    snapshot = memstore.snapshot();
494    // Adding more value
495    memstore.add(new KeyValue(row, fam, qf6, val), null);
496    memstore.add(new KeyValue(row, fam, qf7, val), null);
497    // opening scanners
498    scanners = memstore.getScanners(0);
499    // close scanners before clear the snapshot
500    for (KeyValueScanner scanner : scanners) {
501      scanner.close();
502    }
503    // Since no opening scanner, the chunks of snapshot should be put back to
504    // pool
505    // close the scanners
506    for (KeyValueScanner scanner : snapshot.getScanners()) {
507      scanner.close();
508    }
509    memstore.clearSnapshot(snapshot.getId());
510    assertTrue(chunkCreator.getPoolSize() > 0);
511  }
512
513  @Test
514  public void testPuttingBackChunksWithOpeningPipelineScanner() throws IOException {
515
516    // set memstore to do data compaction and not to use the speculative scan
517    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER;
518    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
519      String.valueOf(compactionType));
520    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
521
522    byte[] row = Bytes.toBytes("testrow");
523    byte[] fam = Bytes.toBytes("testfamily");
524    byte[] qf1 = Bytes.toBytes("testqualifier1");
525    byte[] qf2 = Bytes.toBytes("testqualifier2");
526    byte[] qf3 = Bytes.toBytes("testqualifier3");
527    byte[] val = Bytes.toBytes("testval");
528
529    // Setting up memstore
530    memstore.add(new KeyValue(row, fam, qf1, 1, val), null);
531    memstore.add(new KeyValue(row, fam, qf2, 1, val), null);
532    memstore.add(new KeyValue(row, fam, qf3, 1, val), null);
533
534    // Creating a pipeline
535    ((MyCompactingMemStore) memstore).disableCompaction();
536    ((CompactingMemStore) memstore).flushInMemory();
537
538    // Adding value to "new" memstore
539    assertEquals(0, memstore.getActive().getCellsCount());
540    memstore.add(new KeyValue(row, fam, qf1, 2, val), null);
541    memstore.add(new KeyValue(row, fam, qf2, 2, val), null);
542    assertEquals(2, memstore.getActive().getCellsCount());
543
544    // pipeline bucket 2
545    ((CompactingMemStore) memstore).flushInMemory();
546    // opening scanner before force flushing
547    List<KeyValueScanner> scanners = memstore.getScanners(0);
548    // Shouldn't putting back the chunks to pool,since some scanners are opening
549    // based on their data
550    ((MyCompactingMemStore) memstore).enableCompaction();
551    // trigger compaction
552    ((CompactingMemStore) memstore).flushInMemory();
553
554    // Adding value to "new" memstore
555    assertEquals(0, memstore.getActive().getCellsCount());
556    memstore.add(new KeyValue(row, fam, qf3, 3, val), null);
557    memstore.add(new KeyValue(row, fam, qf2, 3, val), null);
558    memstore.add(new KeyValue(row, fam, qf1, 3, val), null);
559    assertEquals(3, memstore.getActive().getCellsCount());
560
561    assertEquals(0, chunkCreator.getPoolSize());
562
563    // Chunks will be put back to pool after close scanners;
564    for (KeyValueScanner scanner : scanners) {
565      scanner.close();
566    }
567    assertTrue(chunkCreator.getPoolSize() > 0);
568
569    // clear chunks
570    chunkCreator.clearChunksInPool();
571
572    // Creating another snapshot
573
574    MemStoreSnapshot snapshot = memstore.snapshot();
575    // close the scanners
576    for (KeyValueScanner scanner : snapshot.getScanners()) {
577      scanner.close();
578    }
579    memstore.clearSnapshot(snapshot.getId());
580
581    snapshot = memstore.snapshot();
582    // Adding more value
583    memstore.add(new KeyValue(row, fam, qf2, 4, val), null);
584    memstore.add(new KeyValue(row, fam, qf3, 4, val), null);
585    // opening scanners
586    scanners = memstore.getScanners(0);
587    // close scanners before clear the snapshot
588    for (KeyValueScanner scanner : scanners) {
589      scanner.close();
590    }
591    // Since no opening scanner, the chunks of snapshot should be put back to
592    // pool
593    // close the scanners
594    for (KeyValueScanner scanner : snapshot.getScanners()) {
595      scanner.close();
596    }
597    memstore.clearSnapshot(snapshot.getId());
598    assertTrue(chunkCreator.getPoolSize() > 0);
599  }
600
601  //////////////////////////////////////////////////////////////////////////////
602  // Compaction tests
603  //////////////////////////////////////////////////////////////////////////////
604  @Test
605  public void testCompaction1Bucket() throws IOException {
606
607    // set memstore to do basic structure flattening, the "eager" option is tested in
608    // TestCompactingToCellFlatMapMemStore
609    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
610    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
611      String.valueOf(compactionType));
612    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
613
614    String[] keys1 = { "A", "A", "B", "C" }; // A1, A2, B3, C4
615
616    // test 1 bucket
617    int totalCellsLen = addRowsByKeys(memstore, keys1);
618    int oneCellOnCSLMHeapSize = 120;
619    int oneCellOnCAHeapSize = 88;
620    long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize;
621    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
622    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
623
624    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
625    assertEquals(0, memstore.getSnapshot().getCellsCount());
626    // There is no compaction, as the compacting memstore type is basic.
627    // totalCellsLen remains the same
628    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
629      + 4 * oneCellOnCAHeapSize;
630    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
631    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
632
633    MemStoreSize mss = memstore.getFlushableSize();
634    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
635    // simulate flusher
636    region.decrMemStoreSize(mss);
637    ImmutableSegment s = memstore.getSnapshot();
638    assertEquals(4, s.getCellsCount());
639    assertEquals(0, regionServicesForStores.getMemStoreSize());
640
641    memstore.clearSnapshot(snapshot.getId());
642  }
643
644  @Test
645  public void testCompaction2Buckets() throws IOException {
646
647    // set memstore to do basic structure flattening, the "eager" option is tested in
648    // TestCompactingToCellFlatMapMemStore
649    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
650    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
651      String.valueOf(compactionType));
652    memstore.getConfiguration().set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY,
653      String.valueOf(1));
654    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
655    String[] keys1 = { "A", "A", "B", "C" };
656    String[] keys2 = { "A", "B", "D" };
657
658    int totalCellsLen1 = addRowsByKeys(memstore, keys1);
659    int oneCellOnCSLMHeapSize = 120;
660    int oneCellOnCAHeapSize = 88;
661    long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize;
662
663    assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize());
664    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
665
666    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
667    int counter = 0;
668    for (Segment s : memstore.getSegments()) {
669      counter += s.getCellsCount();
670    }
671    assertEquals(4, counter);
672    assertEquals(0, memstore.getSnapshot().getCellsCount());
673    // There is no compaction, as the compacting memstore type is basic.
674    // totalCellsLen remains the same
675    assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize());
676    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
677      + 4 * oneCellOnCAHeapSize;
678    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
679
680    int totalCellsLen2 = addRowsByKeys(memstore, keys2);
681    totalHeapSize += 3 * oneCellOnCSLMHeapSize;
682    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
683    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
684
685    MemStoreSize mss = memstore.getFlushableSize();
686    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
687    assertEquals(0, memstore.getSnapshot().getCellsCount());
688    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
689    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
690      + 7 * oneCellOnCAHeapSize;
691    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
692
693    mss = memstore.getFlushableSize();
694    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
695    // simulate flusher
696    region.decrMemStoreSize(mss);
697    ImmutableSegment s = memstore.getSnapshot();
698    assertEquals(7, s.getCellsCount());
699    assertEquals(0, regionServicesForStores.getMemStoreSize());
700
701    memstore.clearSnapshot(snapshot.getId());
702  }
703
704  @Test
705  public void testCompaction3Buckets() throws IOException {
706
707    // set memstore to do data compaction and not to use the speculative scan
708    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER;
709    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
710      String.valueOf(compactionType));
711    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
712    String[] keys1 = { "A", "A", "B", "C" };
713    String[] keys2 = { "A", "B", "D" };
714    String[] keys3 = { "D", "B", "B" };
715
716    int totalCellsLen1 = addRowsByKeys(memstore, keys1);// Adding 4 cells.
717    int oneCellOnCSLMHeapSize = 120;
718    int oneCellOnCAHeapSize = 88;
719    assertEquals(totalCellsLen1, region.getMemStoreDataSize());
720    long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize;
721    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
722    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
723
724    assertEquals(0, memstore.getSnapshot().getCellsCount());
725    // One cell is duplicated and the compaction will remove it. All cells of same time so adjusting
726    // totalCellsLen
727    totalCellsLen1 = (totalCellsLen1 * 3) / 4;
728    assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize());
729    // In memory flush to make a CellArrayMap instead of CSLM. See the overhead diff.
730    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
731      + 3 * oneCellOnCAHeapSize;
732    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
733
734    int totalCellsLen2 = addRowsByKeys(memstore, keys2);// Adding 3 more cells.
735    long totalHeapSize2 = totalHeapSize + 3 * oneCellOnCSLMHeapSize;
736
737    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
738    assertEquals(totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
739
740    ((MyCompactingMemStore) memstore).disableCompaction();
741    MemStoreSize mss = memstore.getFlushableSize();
742    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction
743    assertEquals(0, memstore.getSnapshot().getCellsCount());
744    // No change in the cells data size. ie. memstore size. as there is no compaction.
745    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
746    assertEquals(totalHeapSize2 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM,
747      ((CompactingMemStore) memstore).heapSize());
748
749    int totalCellsLen3 = addRowsByKeys(memstore, keys3);// 3 more cells added
750    assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
751      regionServicesForStores.getMemStoreSize());
752    long totalHeapSize3 =
753      totalHeapSize2 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM + 3 * oneCellOnCSLMHeapSize;
754    assertEquals(totalHeapSize3, ((CompactingMemStore) memstore).heapSize());
755
756    ((MyCompactingMemStore) memstore).enableCompaction();
757    mss = memstore.getFlushableSize();
758    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
759    assertEquals(0, memstore.getSnapshot().getCellsCount());
760    // active flushed to pipeline and all 3 segments compacted. Will get rid of duplicated cells.
761    // Out of total 10, only 4 cells are unique
762    totalCellsLen2 = totalCellsLen2 / 3;// 2 out of 3 cells are duplicated
763    totalCellsLen3 = 0;// All duplicated cells.
764    assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
765      regionServicesForStores.getMemStoreSize());
766    // Only 4 unique cells left
767    assertEquals(4 * oneCellOnCAHeapSize + MutableSegment.DEEP_OVERHEAD
768      + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM, ((CompactingMemStore) memstore).heapSize());
769
770    mss = memstore.getFlushableSize();
771    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
772    // simulate flusher
773    region.decrMemStoreSize(mss);
774    ImmutableSegment s = memstore.getSnapshot();
775    assertEquals(4, s.getCellsCount());
776    assertEquals(0, regionServicesForStores.getMemStoreSize());
777
778    memstore.clearSnapshot(snapshot.getId());
779  }
780
781  @Test
782  public void testMagicCompaction3Buckets() throws IOException {
783
784    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.ADAPTIVE;
785    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
786      String.valueOf(compactionType));
787    memstore.getConfiguration()
788      .setDouble(AdaptiveMemStoreCompactionStrategy.ADAPTIVE_COMPACTION_THRESHOLD_KEY, 0.45);
789    memstore.getConfiguration()
790      .setInt(AdaptiveMemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, 2);
791    memstore.getConfiguration().setInt(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 1);
792    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
793
794    String[] keys1 = { "A", "B", "D" };
795    String[] keys2 = { "A" };
796    String[] keys3 = { "A", "A", "B", "C" };
797    String[] keys4 = { "D", "B", "B" };
798
799    int totalCellsLen1 = addRowsByKeys(memstore, keys1);// Adding 3 cells.
800    int oneCellOnCSLMHeapSize = 120;
801    assertEquals(totalCellsLen1, region.getMemStoreDataSize());
802    long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 3 * oneCellOnCSLMHeapSize;
803    assertEquals(totalHeapSize, memstore.heapSize());
804
805    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline - flatten
806    assertEquals(3, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells());
807    assertEquals(1.0,
808      ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0);
809    assertEquals(0, memstore.getSnapshot().getCellsCount());
810
811    addRowsByKeys(memstore, keys2);// Adding 1 more cell - flatten.
812    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction
813    assertEquals(4, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells());
814    assertEquals(1.0,
815      ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0);
816    assertEquals(0, memstore.getSnapshot().getCellsCount());
817
818    addRowsByKeys(memstore, keys3);// Adding 4 more cells - merge.
819    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction
820    assertEquals(8, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells());
821    assertEquals((4.0 / 8.0),
822      ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0);
823    assertEquals(0, memstore.getSnapshot().getCellsCount());
824
825    addRowsByKeys(memstore, keys4);// 3 more cells added - compact (or not)
826    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
827    int numCells = ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells();
828    assertTrue(4 == numCells || 11 == numCells);
829    assertEquals(0, memstore.getSnapshot().getCellsCount());
830
831    MemStoreSize mss = memstore.getFlushableSize();
832    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
833    // simulate flusher
834    region.decrMemStoreSize(mss);
835    ImmutableSegment s = memstore.getSnapshot();
836    numCells = s.getCellsCount();
837    assertTrue(4 == numCells || 11 == numCells);
838    assertEquals(0, regionServicesForStores.getMemStoreSize());
839
840    memstore.clearSnapshot(snapshot.getId());
841  }
842
843  @Override
844  @Test
845  public void testScan() throws IOException {
846    scanMemStore(memstore, 6635);
847  }
848
849  protected int addRowsByKeys(final AbstractMemStore hmc, String[] keys) {
850    byte[] fam = Bytes.toBytes("testfamily");
851    byte[] qf = Bytes.toBytes("testqualifier");
852    long size = hmc.getActive().getDataSize();
853    long heapOverhead = hmc.getActive().getHeapSize();
854    int cellsCount = hmc.getActive().getCellsCount();
855    int totalLen = 0;
856    for (int i = 0; i < keys.length; i++) {
857      long timestamp = EnvironmentEdgeManager.currentTime();
858      Threads.sleep(1); // to make sure each kv gets a different ts
859      byte[] row = Bytes.toBytes(keys[i]);
860      byte[] val = Bytes.toBytes(keys[i] + i);
861      KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
862      totalLen += Segment.getCellLength(kv);
863      hmc.add(kv, null);
864      LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp());
865    }
866    regionServicesForStores.addMemStoreSize(hmc.getActive().getDataSize() - size,
867      hmc.getActive().getHeapSize() - heapOverhead, 0,
868      hmc.getActive().getCellsCount() - cellsCount);
869    return totalLen;
870  }
871
872  // for controlling the val size when adding a new cell
873  protected int addRowsByKeys(final AbstractMemStore hmc, String[] keys, byte[] val) {
874    byte[] fam = Bytes.toBytes("testfamily");
875    byte[] qf = Bytes.toBytes("testqualifier");
876    long size = hmc.getActive().getDataSize();
877    long heapOverhead = hmc.getActive().getHeapSize();
878    int cellsCount = hmc.getActive().getCellsCount();
879    int totalLen = 0;
880    for (int i = 0; i < keys.length; i++) {
881      long timestamp = EnvironmentEdgeManager.currentTime();
882      Threads.sleep(1); // to make sure each kv gets a different ts
883      byte[] row = Bytes.toBytes(keys[i]);
884      KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
885      totalLen += Segment.getCellLength(kv);
886      hmc.add(kv, null);
887      LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp());
888    }
889    regionServicesForStores.addMemStoreSize(hmc.getActive().getDataSize() - size,
890      hmc.getActive().getHeapSize() - heapOverhead, 0, cellsCount);
891    return totalLen;
892  }
893
894  private class EnvironmentEdgeForMemstoreTest implements EnvironmentEdge {
895    long t = 1234;
896
897    @Override
898    public long currentTime() {
899      return t;
900    }
901  }
902
903  static protected class MyCompactingMemStore extends CompactingMemStore {
904
905    public MyCompactingMemStore(Configuration conf, CellComparator c, HStore store,
906      RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
907      throws IOException {
908      super(conf, c, store, regionServices, compactionPolicy);
909    }
910
911    void disableCompaction() {
912      allowCompaction.set(false);
913    }
914
915    void enableCompaction() {
916      allowCompaction.set(true);
917    }
918
919    void initiateType(MemoryCompactionPolicy compactionType, Configuration conf)
920      throws IllegalArgumentIOException {
921      compactor.initiateCompactionStrategy(compactionType, conf, "CF_TEST");
922    }
923  }
924}