001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotEquals;
022import static org.junit.Assert.assertNotNull;
023import static org.junit.Assert.assertNull;
024import static org.junit.Assert.assertTrue;
025
026import java.io.IOException;
027import java.lang.management.ManagementFactory;
028import java.util.ArrayList;
029import java.util.List;
030import java.util.concurrent.Executors;
031import java.util.concurrent.ThreadPoolExecutor;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.hbase.Cell;
034import org.apache.hadoop.hbase.CellComparator;
035import org.apache.hadoop.hbase.CellUtil;
036import org.apache.hadoop.hbase.ExtendedCell;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.HBaseConfiguration;
039import org.apache.hadoop.hbase.HBaseTestingUtil;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.KeepDeletedCells;
042import org.apache.hadoop.hbase.KeyValue;
043import org.apache.hadoop.hbase.KeyValueTestUtil;
044import org.apache.hadoop.hbase.MemoryCompactionPolicy;
045import org.apache.hadoop.hbase.TableName;
046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
048import org.apache.hadoop.hbase.client.RegionInfo;
049import org.apache.hadoop.hbase.client.RegionInfoBuilder;
050import org.apache.hadoop.hbase.client.Scan;
051import org.apache.hadoop.hbase.client.TableDescriptor;
052import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
053import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
054import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
055import org.apache.hadoop.hbase.testclassification.MediumTests;
056import org.apache.hadoop.hbase.testclassification.RegionServerTests;
057import org.apache.hadoop.hbase.util.Bytes;
058import org.apache.hadoop.hbase.util.EnvironmentEdge;
059import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
060import org.apache.hadoop.hbase.util.Threads;
061import org.apache.hadoop.hbase.wal.WAL;
062import org.junit.After;
063import org.junit.Before;
064import org.junit.ClassRule;
065import org.junit.Test;
066import org.junit.experimental.categories.Category;
067import org.mockito.Mockito;
068import org.slf4j.Logger;
069import org.slf4j.LoggerFactory;
070
071/**
072 * compacted memstore test case
073 */
074@Category({ RegionServerTests.class, MediumTests.class })
075public class TestCompactingMemStore extends TestDefaultMemStore {
076
077  @ClassRule
078  public static final HBaseClassTestRule CLASS_RULE =
079    HBaseClassTestRule.forClass(TestCompactingMemStore.class);
080
081  private static final Logger LOG = LoggerFactory.getLogger(TestCompactingMemStore.class);
082  protected static ChunkCreator chunkCreator;
083  protected HRegion region;
084  protected RegionServicesForStores regionServicesForStores;
085  protected HStore store;
086
087  @After
088  public void tearDown() throws Exception {
089    chunkCreator.clearChunksInPool();
090    super.tearDown();
091  }
092
093  @Override
094  @Before
095  public void setUp() throws Exception {
096    compactingSetUp();
097    this.memstore = new MyCompactingMemStore(HBaseConfiguration.create(),
098      CellComparator.getInstance(), store, regionServicesForStores, MemoryCompactionPolicy.EAGER);
099    ((CompactingMemStore) memstore).setIndexType(CompactingMemStore.IndexType.ARRAY_MAP);
100  }
101
102  protected void compactingSetUp() throws Exception {
103    super.internalSetUp();
104    Configuration conf = new Configuration();
105    conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, true);
106    conf.setFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.2f);
107    conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, 1000);
108    HBaseTestingUtil hbaseUtility = new HBaseTestingUtil(conf);
109    ColumnFamilyDescriptor familyDescriptor = ColumnFamilyDescriptorBuilder.of(FAMILY);
110    TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TableName.valueOf("foobar"))
111      .setColumnFamily(familyDescriptor).build();
112    RegionInfo info = RegionInfoBuilder.newBuilder(TableName.valueOf("foobar")).build();
113    WAL wal = HBaseTestingUtil.createWal(conf, hbaseUtility.getDataTestDir(), info);
114    this.region =
115      HRegion.createHRegion(info, hbaseUtility.getDataTestDir(), conf, tableDescriptor, wal, true);
116    this.regionServicesForStores = Mockito.spy(region.getRegionServicesForStores());
117    ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
118    Mockito.when(regionServicesForStores.getInMemoryCompactionPool()).thenReturn(pool);
119    this.store = new HStore(region, familyDescriptor, conf, false);
120
121    long globalMemStoreLimit =
122      (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax()
123        * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false));
124    chunkCreator = ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false,
125      globalMemStoreLimit, 0.4f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null,
126      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
127    assertNotNull(chunkCreator);
128  }
129
130  /**
131   * A simple test which flush in memory affect timeOfOldestEdit
132   */
133  @Test
134  public void testTimeOfOldestEdit() {
135    assertEquals(Long.MAX_VALUE, memstore.timeOfOldestEdit());
136    final byte[] r = Bytes.toBytes("r");
137    final byte[] f = Bytes.toBytes("f");
138    final byte[] q = Bytes.toBytes("q");
139    final byte[] v = Bytes.toBytes("v");
140    final KeyValue kv = new KeyValue(r, f, q, v);
141    memstore.add(kv, null);
142    long timeOfOldestEdit = memstore.timeOfOldestEdit();
143    assertNotEquals(Long.MAX_VALUE, timeOfOldestEdit);
144
145    ((CompactingMemStore) memstore).flushInMemory();
146    assertEquals(timeOfOldestEdit, memstore.timeOfOldestEdit());
147    memstore.add(kv, null);
148    assertEquals(timeOfOldestEdit, memstore.timeOfOldestEdit());
149    memstore.snapshot();
150    assertEquals(Long.MAX_VALUE, memstore.timeOfOldestEdit());
151  }
152
153  /**
154   * A simple test which verifies the 3 possible states when scanning across snapshot.
155   */
156  @Override
157  @Test
158  public void testScanAcrossSnapshot2() throws IOException, CloneNotSupportedException {
159    // we are going to the scanning across snapshot with two kvs
160    // kv1 should always be returned before kv2
161    final byte[] one = Bytes.toBytes(1);
162    final byte[] two = Bytes.toBytes(2);
163    final byte[] f = Bytes.toBytes("f");
164    final byte[] q = Bytes.toBytes("q");
165    final byte[] v = Bytes.toBytes(3);
166
167    final KeyValue kv1 = new KeyValue(one, f, q, 10, v);
168    final KeyValue kv2 = new KeyValue(two, f, q, 10, v);
169
170    // use case 1: both kvs in kvset
171    this.memstore.add(kv1.clone(), null);
172    this.memstore.add(kv2.clone(), null);
173    // snapshot is empty,active segment is not empty,
174    // empty segment is skipped.
175    verifyOneScanAcrossSnapshot2(kv1, kv2);
176
177    // use case 2: both kvs in snapshot
178    this.memstore.snapshot();
179    // active segment is empty,snapshot is not empty,
180    // empty segment is skipped.
181    verifyOneScanAcrossSnapshot2(kv1, kv2);
182
183    // use case 3: first in snapshot second in kvset
184    this.memstore = new CompactingMemStore(HBaseConfiguration.create(),
185      CellComparator.getInstance(), store, regionServicesForStores, MemoryCompactionPolicy.EAGER);
186    this.memstore.add(kv1.clone(), null);
187    // As compaction is starting in the background the repetition
188    // of the k1 might be removed BUT the scanners created earlier
189    // should look on the OLD MutableCellSetSegment, so this should be OK...
190    this.memstore.snapshot();
191    this.memstore.add(kv2.clone(), null);
192    verifyScanAcrossSnapshot2(kv1, kv2);
193  }
194
195  /**
196   * Test memstore snapshots
197   */
198  @Override
199  @Test
200  public void testSnapshotting() throws IOException {
201    final int snapshotCount = 5;
202    // Add some rows, run a snapshot. Do it a few times.
203    for (int i = 0; i < snapshotCount; i++) {
204      addRows(this.memstore);
205      runSnapshot(this.memstore, true);
206      assertEquals("History not being cleared", 0, this.memstore.getSnapshot().getCellsCount());
207    }
208  }
209
210  //////////////////////////////////////////////////////////////////////////////
211  // Get tests
212  //////////////////////////////////////////////////////////////////////////////
213
214  /**
215   * Test getNextRow from memstore
216   */
217  @Override
218  @Test
219  public void testGetNextRow() throws Exception {
220    addRows(this.memstore);
221    // Add more versions to make it a little more interesting.
222    Thread.sleep(1);
223    addRows(this.memstore);
224    Cell closestToEmpty = ((CompactingMemStore) this.memstore).getNextRow(KeyValue.LOWESTKEY);
225    assertTrue(CellComparator.getInstance().compareRows(closestToEmpty,
226      new KeyValue(Bytes.toBytes(0), EnvironmentEdgeManager.currentTime())) == 0);
227    for (int i = 0; i < ROW_COUNT; i++) {
228      Cell nr = ((CompactingMemStore) this.memstore)
229        .getNextRow(new KeyValue(Bytes.toBytes(i), EnvironmentEdgeManager.currentTime()));
230      if (i + 1 == ROW_COUNT) {
231        assertNull(nr);
232      } else {
233        assertTrue(CellComparator.getInstance().compareRows(nr,
234          new KeyValue(Bytes.toBytes(i + 1), EnvironmentEdgeManager.currentTime())) == 0);
235      }
236    }
237    // starting from each row, validate results should contain the starting row
238    Configuration conf = HBaseConfiguration.create();
239    for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) {
240      ScanInfo scanInfo =
241        new ScanInfo(conf, FAMILY, 0, 1, Integer.MAX_VALUE, KeepDeletedCells.FALSE,
242          HConstants.DEFAULT_BLOCKSIZE, 0, this.memstore.getComparator(), false);
243      try (InternalScanner scanner =
244        new StoreScanner(new Scan().withStartRow(Bytes.toBytes(startRowId)), scanInfo, null,
245          memstore.getScanners(0))) {
246        List<Cell> results = new ArrayList<>();
247        for (int i = 0; scanner.next(results); i++) {
248          int rowId = startRowId + i;
249          Cell left = results.get(0);
250          byte[] row1 = Bytes.toBytes(rowId);
251          assertTrue("Row name",
252            CellComparator.getInstance().compareRows(left, row1, 0, row1.length) == 0);
253          assertEquals("Count of columns", QUALIFIER_COUNT, results.size());
254          List<Cell> row = new ArrayList<>();
255          for (Cell kv : results) {
256            row.add(kv);
257          }
258          isExpectedRowWithoutTimestamps(rowId, row);
259          // Clear out set. Otherwise row results accumulate.
260          results.clear();
261        }
262      }
263    }
264  }
265
266  @Override
267  @Test
268  public void testGet_memstoreAndSnapShot() throws IOException {
269    byte[] row = Bytes.toBytes("testrow");
270    byte[] fam = Bytes.toBytes("testfamily");
271    byte[] qf1 = Bytes.toBytes("testqualifier1");
272    byte[] qf2 = Bytes.toBytes("testqualifier2");
273    byte[] qf3 = Bytes.toBytes("testqualifier3");
274    byte[] qf4 = Bytes.toBytes("testqualifier4");
275    byte[] qf5 = Bytes.toBytes("testqualifier5");
276    byte[] val = Bytes.toBytes("testval");
277
278    // Setting up memstore
279    memstore.add(new KeyValue(row, fam, qf1, val), null);
280    memstore.add(new KeyValue(row, fam, qf2, val), null);
281    memstore.add(new KeyValue(row, fam, qf3, val), null);
282    // Pushing to pipeline
283    ((CompactingMemStore) memstore).flushInMemory();
284    assertEquals(0, memstore.getSnapshot().getCellsCount());
285    // Creating a snapshot
286    memstore.snapshot();
287    assertEquals(3, memstore.getSnapshot().getCellsCount());
288    // Adding value to "new" memstore
289    assertEquals(0, memstore.getActive().getCellsCount());
290    memstore.add(new KeyValue(row, fam, qf4, val), null);
291    memstore.add(new KeyValue(row, fam, qf5, val), null);
292    assertEquals(2, memstore.getActive().getCellsCount());
293  }
294
295  ////////////////////////////////////
296  // Test for periodic memstore flushes
297  // based on time of oldest edit
298  ////////////////////////////////////
299
300  /**
301   * Add keyvalues with a fixed memstoreTs, and checks that memstore size is decreased as older
302   * keyvalues are deleted from the memstore.
303   */
304  @Override
305  @Test
306  public void testUpsertMemstoreSize() throws Exception {
307    MemStoreSize oldSize = memstore.size();
308
309    List<ExtendedCell> l = new ArrayList<>();
310    KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v");
311    KeyValue kv2 = KeyValueTestUtil.create("r", "f", "q", 101, "v");
312    KeyValue kv3 = KeyValueTestUtil.create("r", "f", "q", 102, "v");
313
314    kv1.setSequenceId(1);
315    kv2.setSequenceId(1);
316    kv3.setSequenceId(1);
317    l.add(kv1);
318    l.add(kv2);
319    l.add(kv3);
320
321    this.memstore.upsert(l, 2, null);// readpoint is 2
322    MemStoreSize newSize = this.memstore.size();
323    assert (newSize.getDataSize() > oldSize.getDataSize());
324    // The kv1 should be removed.
325    assert (memstore.getActive().getCellsCount() == 2);
326
327    KeyValue kv4 = KeyValueTestUtil.create("r", "f", "q", 104, "v");
328    kv4.setSequenceId(1);
329    l.clear();
330    l.add(kv4);
331    this.memstore.upsert(l, 3, null);
332    assertEquals(newSize, this.memstore.size());
333    // The kv2 should be removed.
334    assert (memstore.getActive().getCellsCount() == 2);
335    // this.memstore = null;
336  }
337
338  /**
339   * Tests that the timeOfOldestEdit is updated correctly for the various edit operations in
340   * memstore.
341   */
342  @Override
343  @Test
344  public void testUpdateToTimeOfOldestEdit() throws Exception {
345    try {
346      EnvironmentEdgeForMemstoreTest edge = new EnvironmentEdgeForMemstoreTest();
347      EnvironmentEdgeManager.injectEdge(edge);
348      long t = memstore.timeOfOldestEdit();
349      assertEquals(Long.MAX_VALUE, t);
350
351      // test the case that the timeOfOldestEdit is updated after a KV add
352      memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, "v"), null);
353      t = memstore.timeOfOldestEdit();
354      assertTrue(t == 1234);
355      // The method will also assert
356      // the value is reset to Long.MAX_VALUE
357      t = runSnapshot(memstore, true);
358
359      // test the case that the timeOfOldestEdit is updated after a KV delete
360      memstore.add(KeyValueTestUtil.create("r", "f", "q", 100, KeyValue.Type.Delete, "v"), null);
361      t = memstore.timeOfOldestEdit();
362      assertTrue(t == 1234);
363      t = runSnapshot(memstore, true);
364
365      // test the case that the timeOfOldestEdit is updated after a KV upsert
366      List<ExtendedCell> l = new ArrayList<>();
367      KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v");
368      kv1.setSequenceId(100);
369      l.add(kv1);
370      memstore.upsert(l, 1000, null);
371      t = memstore.timeOfOldestEdit();
372      assertTrue(t == 1234);
373    } finally {
374      EnvironmentEdgeManager.reset();
375    }
376  }
377
378  private long runSnapshot(final AbstractMemStore hmc, boolean useForce) throws IOException {
379    // Save off old state.
380    long oldHistorySize = hmc.getSnapshot().getDataSize();
381    long prevTimeStamp = hmc.timeOfOldestEdit();
382
383    hmc.snapshot();
384    MemStoreSnapshot snapshot = hmc.snapshot();
385    if (useForce) {
386      // Make some assertions about what just happened.
387      assertTrue("History size has not increased", oldHistorySize < snapshot.getDataSize());
388      long t = hmc.timeOfOldestEdit();
389      assertTrue("Time of oldest edit is not Long.MAX_VALUE", t == Long.MAX_VALUE);
390      hmc.clearSnapshot(snapshot.getId());
391    } else {
392      long t = hmc.timeOfOldestEdit();
393      assertTrue("Time of oldest edit didn't remain the same", t == prevTimeStamp);
394    }
395    return prevTimeStamp;
396  }
397
398  private void isExpectedRowWithoutTimestamps(final int rowIndex, List<Cell> kvs) {
399    int i = 0;
400    for (Cell kv : kvs) {
401      byte[] expectedColname = makeQualifier(rowIndex, i++);
402      assertTrue("Column name", CellUtil.matchingQualifier(kv, expectedColname));
403      // Value is column name as bytes. Usually result is
404      // 100 bytes in size at least. This is the default size
405      // for BytesWriteable. For comparison, convert bytes to
406      // String and trim to remove trailing null bytes.
407      assertTrue("Content", CellUtil.matchingValue(kv, expectedColname));
408    }
409  }
410
411  @Test
412  public void testPuttingBackChunksAfterFlushing() throws IOException {
413    byte[] row = Bytes.toBytes("testrow");
414    byte[] fam = Bytes.toBytes("testfamily");
415    byte[] qf1 = Bytes.toBytes("testqualifier1");
416    byte[] qf2 = Bytes.toBytes("testqualifier2");
417    byte[] qf3 = Bytes.toBytes("testqualifier3");
418    byte[] qf4 = Bytes.toBytes("testqualifier4");
419    byte[] qf5 = Bytes.toBytes("testqualifier5");
420    byte[] val = Bytes.toBytes("testval");
421
422    // Setting up memstore
423    memstore.add(new KeyValue(row, fam, qf1, val), null);
424    memstore.add(new KeyValue(row, fam, qf2, val), null);
425    memstore.add(new KeyValue(row, fam, qf3, val), null);
426
427    // Creating a snapshot
428    MemStoreSnapshot snapshot = memstore.snapshot();
429    assertEquals(3, memstore.getSnapshot().getCellsCount());
430
431    // Adding value to "new" memstore
432    assertEquals(0, memstore.getActive().getCellsCount());
433    memstore.add(new KeyValue(row, fam, qf4, val), null);
434    memstore.add(new KeyValue(row, fam, qf5, val), null);
435    assertEquals(2, memstore.getActive().getCellsCount());
436    // close the scanners
437    for (KeyValueScanner scanner : snapshot.getScanners()) {
438      scanner.close();
439    }
440    memstore.clearSnapshot(snapshot.getId());
441
442    int chunkCount = chunkCreator.getPoolSize();
443    assertTrue(chunkCount > 0);
444
445  }
446
447  @Test
448  public void testPuttingBackChunksWithOpeningScanner() throws IOException {
449    byte[] row = Bytes.toBytes("testrow");
450    byte[] fam = Bytes.toBytes("testfamily");
451    byte[] qf1 = Bytes.toBytes("testqualifier1");
452    byte[] qf2 = Bytes.toBytes("testqualifier2");
453    byte[] qf3 = Bytes.toBytes("testqualifier3");
454    byte[] qf4 = Bytes.toBytes("testqualifier4");
455    byte[] qf5 = Bytes.toBytes("testqualifier5");
456    byte[] qf6 = Bytes.toBytes("testqualifier6");
457    byte[] qf7 = Bytes.toBytes("testqualifier7");
458    byte[] val = Bytes.toBytes("testval");
459
460    // Setting up memstore
461    memstore.add(new KeyValue(row, fam, qf1, val), null);
462    memstore.add(new KeyValue(row, fam, qf2, val), null);
463    memstore.add(new KeyValue(row, fam, qf3, val), null);
464
465    // Creating a snapshot
466    MemStoreSnapshot snapshot = memstore.snapshot();
467    assertEquals(3, memstore.getSnapshot().getCellsCount());
468
469    // Adding value to "new" memstore
470    assertEquals(0, memstore.getActive().getCellsCount());
471    memstore.add(new KeyValue(row, fam, qf4, val), null);
472    memstore.add(new KeyValue(row, fam, qf5, val), null);
473    assertEquals(2, memstore.getActive().getCellsCount());
474
475    // opening scanner before clear the snapshot
476    List<KeyValueScanner> scanners = memstore.getScanners(0);
477    // Shouldn't putting back the chunks to pool,since some scanners are opening
478    // based on their data
479    // close the scanners
480    for (KeyValueScanner scanner : snapshot.getScanners()) {
481      scanner.close();
482    }
483    memstore.clearSnapshot(snapshot.getId());
484
485    assertTrue(chunkCreator.getPoolSize() == 0);
486
487    // Chunks will be put back to pool after close scanners;
488    for (KeyValueScanner scanner : scanners) {
489      scanner.close();
490    }
491    assertTrue(chunkCreator.getPoolSize() > 0);
492
493    // clear chunks
494    chunkCreator.clearChunksInPool();
495
496    // Creating another snapshot
497
498    snapshot = memstore.snapshot();
499    // Adding more value
500    memstore.add(new KeyValue(row, fam, qf6, val), null);
501    memstore.add(new KeyValue(row, fam, qf7, val), null);
502    // opening scanners
503    scanners = memstore.getScanners(0);
504    // close scanners before clear the snapshot
505    for (KeyValueScanner scanner : scanners) {
506      scanner.close();
507    }
508    // Since no opening scanner, the chunks of snapshot should be put back to
509    // pool
510    // close the scanners
511    for (KeyValueScanner scanner : snapshot.getScanners()) {
512      scanner.close();
513    }
514    memstore.clearSnapshot(snapshot.getId());
515    assertTrue(chunkCreator.getPoolSize() > 0);
516  }
517
518  @Test
519  public void testPuttingBackChunksWithOpeningPipelineScanner() throws IOException {
520
521    // set memstore to do data compaction and not to use the speculative scan
522    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER;
523    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
524      String.valueOf(compactionType));
525    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
526
527    byte[] row = Bytes.toBytes("testrow");
528    byte[] fam = Bytes.toBytes("testfamily");
529    byte[] qf1 = Bytes.toBytes("testqualifier1");
530    byte[] qf2 = Bytes.toBytes("testqualifier2");
531    byte[] qf3 = Bytes.toBytes("testqualifier3");
532    byte[] val = Bytes.toBytes("testval");
533
534    // Setting up memstore
535    memstore.add(new KeyValue(row, fam, qf1, 1, val), null);
536    memstore.add(new KeyValue(row, fam, qf2, 1, val), null);
537    memstore.add(new KeyValue(row, fam, qf3, 1, val), null);
538
539    // Creating a pipeline
540    ((MyCompactingMemStore) memstore).disableCompaction();
541    ((CompactingMemStore) memstore).flushInMemory();
542
543    // Adding value to "new" memstore
544    assertEquals(0, memstore.getActive().getCellsCount());
545    memstore.add(new KeyValue(row, fam, qf1, 2, val), null);
546    memstore.add(new KeyValue(row, fam, qf2, 2, val), null);
547    assertEquals(2, memstore.getActive().getCellsCount());
548
549    // pipeline bucket 2
550    ((CompactingMemStore) memstore).flushInMemory();
551    // opening scanner before force flushing
552    List<KeyValueScanner> scanners = memstore.getScanners(0);
553    // Shouldn't putting back the chunks to pool,since some scanners are opening
554    // based on their data
555    ((MyCompactingMemStore) memstore).enableCompaction();
556    // trigger compaction
557    ((CompactingMemStore) memstore).flushInMemory();
558
559    // Adding value to "new" memstore
560    assertEquals(0, memstore.getActive().getCellsCount());
561    memstore.add(new KeyValue(row, fam, qf3, 3, val), null);
562    memstore.add(new KeyValue(row, fam, qf2, 3, val), null);
563    memstore.add(new KeyValue(row, fam, qf1, 3, val), null);
564    assertEquals(3, memstore.getActive().getCellsCount());
565
566    assertTrue(chunkCreator.getPoolSize() == 0);
567
568    // Chunks will be put back to pool after close scanners;
569    for (KeyValueScanner scanner : scanners) {
570      scanner.close();
571    }
572    assertTrue(chunkCreator.getPoolSize() > 0);
573
574    // clear chunks
575    chunkCreator.clearChunksInPool();
576
577    // Creating another snapshot
578
579    MemStoreSnapshot snapshot = memstore.snapshot();
580    // close the scanners
581    for (KeyValueScanner scanner : snapshot.getScanners()) {
582      scanner.close();
583    }
584    memstore.clearSnapshot(snapshot.getId());
585
586    snapshot = memstore.snapshot();
587    // Adding more value
588    memstore.add(new KeyValue(row, fam, qf2, 4, val), null);
589    memstore.add(new KeyValue(row, fam, qf3, 4, val), null);
590    // opening scanners
591    scanners = memstore.getScanners(0);
592    // close scanners before clear the snapshot
593    for (KeyValueScanner scanner : scanners) {
594      scanner.close();
595    }
596    // Since no opening scanner, the chunks of snapshot should be put back to
597    // pool
598    // close the scanners
599    for (KeyValueScanner scanner : snapshot.getScanners()) {
600      scanner.close();
601    }
602    memstore.clearSnapshot(snapshot.getId());
603    assertTrue(chunkCreator.getPoolSize() > 0);
604  }
605
606  //////////////////////////////////////////////////////////////////////////////
607  // Compaction tests
608  //////////////////////////////////////////////////////////////////////////////
609  @Test
610  public void testCompaction1Bucket() throws IOException {
611
612    // set memstore to do basic structure flattening, the "eager" option is tested in
613    // TestCompactingToCellFlatMapMemStore
614    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
615    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
616      String.valueOf(compactionType));
617    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
618
619    String[] keys1 = { "A", "A", "B", "C" }; // A1, A2, B3, C4
620
621    // test 1 bucket
622    int totalCellsLen = addRowsByKeys(memstore, keys1);
623    int oneCellOnCSLMHeapSize = 120;
624    int oneCellOnCAHeapSize = 88;
625    long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize;
626    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
627    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
628
629    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
630    assertEquals(0, memstore.getSnapshot().getCellsCount());
631    // There is no compaction, as the compacting memstore type is basic.
632    // totalCellsLen remains the same
633    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
634      + 4 * oneCellOnCAHeapSize;
635    assertEquals(totalCellsLen, regionServicesForStores.getMemStoreSize());
636    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
637
638    MemStoreSize mss = memstore.getFlushableSize();
639    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
640    // simulate flusher
641    region.decrMemStoreSize(mss);
642    ImmutableSegment s = memstore.getSnapshot();
643    assertEquals(4, s.getCellsCount());
644    assertEquals(0, regionServicesForStores.getMemStoreSize());
645
646    memstore.clearSnapshot(snapshot.getId());
647  }
648
649  @Test
650  public void testCompaction2Buckets() throws IOException {
651
652    // set memstore to do basic structure flattening, the "eager" option is tested in
653    // TestCompactingToCellFlatMapMemStore
654    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.BASIC;
655    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
656      String.valueOf(compactionType));
657    memstore.getConfiguration().set(MemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY,
658      String.valueOf(1));
659    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
660    String[] keys1 = { "A", "A", "B", "C" };
661    String[] keys2 = { "A", "B", "D" };
662
663    int totalCellsLen1 = addRowsByKeys(memstore, keys1);
664    int oneCellOnCSLMHeapSize = 120;
665    int oneCellOnCAHeapSize = 88;
666    long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize;
667
668    assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize());
669    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
670
671    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
672    int counter = 0;
673    for (Segment s : memstore.getSegments()) {
674      counter += s.getCellsCount();
675    }
676    assertEquals(4, counter);
677    assertEquals(0, memstore.getSnapshot().getCellsCount());
678    // There is no compaction, as the compacting memstore type is basic.
679    // totalCellsLen remains the same
680    assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize());
681    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
682      + 4 * oneCellOnCAHeapSize;
683    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
684
685    int totalCellsLen2 = addRowsByKeys(memstore, keys2);
686    totalHeapSize += 3 * oneCellOnCSLMHeapSize;
687    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
688    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
689
690    MemStoreSize mss = memstore.getFlushableSize();
691    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
692    assertEquals(0, memstore.getSnapshot().getCellsCount());
693    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
694    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
695      + 7 * oneCellOnCAHeapSize;
696    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
697
698    mss = memstore.getFlushableSize();
699    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
700    // simulate flusher
701    region.decrMemStoreSize(mss);
702    ImmutableSegment s = memstore.getSnapshot();
703    assertEquals(7, s.getCellsCount());
704    assertEquals(0, regionServicesForStores.getMemStoreSize());
705
706    memstore.clearSnapshot(snapshot.getId());
707  }
708
709  @Test
710  public void testCompaction3Buckets() throws IOException {
711
712    // set memstore to do data compaction and not to use the speculative scan
713    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.EAGER;
714    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
715      String.valueOf(compactionType));
716    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
717    String[] keys1 = { "A", "A", "B", "C" };
718    String[] keys2 = { "A", "B", "D" };
719    String[] keys3 = { "D", "B", "B" };
720
721    int totalCellsLen1 = addRowsByKeys(memstore, keys1);// Adding 4 cells.
722    int oneCellOnCSLMHeapSize = 120;
723    int oneCellOnCAHeapSize = 88;
724    assertEquals(totalCellsLen1, region.getMemStoreDataSize());
725    long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 4 * oneCellOnCSLMHeapSize;
726    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
727    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
728
729    assertEquals(0, memstore.getSnapshot().getCellsCount());
730    // One cell is duplicated and the compaction will remove it. All cells of same time so adjusting
731    // totalCellsLen
732    totalCellsLen1 = (totalCellsLen1 * 3) / 4;
733    assertEquals(totalCellsLen1, regionServicesForStores.getMemStoreSize());
734    // In memory flush to make a CellArrayMap instead of CSLM. See the overhead diff.
735    totalHeapSize = MutableSegment.DEEP_OVERHEAD + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM
736      + 3 * oneCellOnCAHeapSize;
737    assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
738
739    int totalCellsLen2 = addRowsByKeys(memstore, keys2);// Adding 3 more cells.
740    long totalHeapSize2 = totalHeapSize + 3 * oneCellOnCSLMHeapSize;
741
742    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
743    assertEquals(totalHeapSize2, ((CompactingMemStore) memstore).heapSize());
744
745    ((MyCompactingMemStore) memstore).disableCompaction();
746    MemStoreSize mss = memstore.getFlushableSize();
747    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction
748    assertEquals(0, memstore.getSnapshot().getCellsCount());
749    // No change in the cells data size. ie. memstore size. as there is no compaction.
750    assertEquals(totalCellsLen1 + totalCellsLen2, regionServicesForStores.getMemStoreSize());
751    assertEquals(totalHeapSize2 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM,
752      ((CompactingMemStore) memstore).heapSize());
753
754    int totalCellsLen3 = addRowsByKeys(memstore, keys3);// 3 more cells added
755    assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
756      regionServicesForStores.getMemStoreSize());
757    long totalHeapSize3 =
758      totalHeapSize2 + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM + 3 * oneCellOnCSLMHeapSize;
759    assertEquals(totalHeapSize3, ((CompactingMemStore) memstore).heapSize());
760
761    ((MyCompactingMemStore) memstore).enableCompaction();
762    mss = memstore.getFlushableSize();
763    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
764    assertEquals(0, memstore.getSnapshot().getCellsCount());
765    // active flushed to pipeline and all 3 segments compacted. Will get rid of duplicated cells.
766    // Out of total 10, only 4 cells are unique
767    totalCellsLen2 = totalCellsLen2 / 3;// 2 out of 3 cells are duplicated
768    totalCellsLen3 = 0;// All duplicated cells.
769    assertEquals(totalCellsLen1 + totalCellsLen2 + totalCellsLen3,
770      regionServicesForStores.getMemStoreSize());
771    // Only 4 unique cells left
772    assertEquals(4 * oneCellOnCAHeapSize + MutableSegment.DEEP_OVERHEAD
773      + CellArrayImmutableSegment.DEEP_OVERHEAD_CAM, ((CompactingMemStore) memstore).heapSize());
774
775    mss = memstore.getFlushableSize();
776    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
777    // simulate flusher
778    region.decrMemStoreSize(mss);
779    ImmutableSegment s = memstore.getSnapshot();
780    assertEquals(4, s.getCellsCount());
781    assertEquals(0, regionServicesForStores.getMemStoreSize());
782
783    memstore.clearSnapshot(snapshot.getId());
784  }
785
786  @Test
787  public void testMagicCompaction3Buckets() throws IOException {
788
789    MemoryCompactionPolicy compactionType = MemoryCompactionPolicy.ADAPTIVE;
790    memstore.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
791      String.valueOf(compactionType));
792    memstore.getConfiguration()
793      .setDouble(AdaptiveMemStoreCompactionStrategy.ADAPTIVE_COMPACTION_THRESHOLD_KEY, 0.45);
794    memstore.getConfiguration()
795      .setInt(AdaptiveMemStoreCompactionStrategy.COMPACTING_MEMSTORE_THRESHOLD_KEY, 2);
796    memstore.getConfiguration().setInt(CompactingMemStore.IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 1);
797    ((MyCompactingMemStore) memstore).initiateType(compactionType, memstore.getConfiguration());
798
799    String[] keys1 = { "A", "B", "D" };
800    String[] keys2 = { "A" };
801    String[] keys3 = { "A", "A", "B", "C" };
802    String[] keys4 = { "D", "B", "B" };
803
804    int totalCellsLen1 = addRowsByKeys(memstore, keys1);// Adding 3 cells.
805    int oneCellOnCSLMHeapSize = 120;
806    assertEquals(totalCellsLen1, region.getMemStoreDataSize());
807    long totalHeapSize = MutableSegment.DEEP_OVERHEAD + 3 * oneCellOnCSLMHeapSize;
808    assertEquals(totalHeapSize, memstore.heapSize());
809
810    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline - flatten
811    assertEquals(3, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells());
812    assertEquals(1.0,
813      ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0);
814    assertEquals(0, memstore.getSnapshot().getCellsCount());
815
816    addRowsByKeys(memstore, keys2);// Adding 1 more cell - flatten.
817    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction
818    assertEquals(4, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells());
819    assertEquals(1.0,
820      ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0);
821    assertEquals(0, memstore.getSnapshot().getCellsCount());
822
823    addRowsByKeys(memstore, keys3);// Adding 4 more cells - merge.
824    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline without compaction
825    assertEquals(8, ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells());
826    assertEquals((4.0 / 8.0),
827      ((CompactingMemStore) memstore).getImmutableSegments().getEstimatedUniquesFrac(), 0);
828    assertEquals(0, memstore.getSnapshot().getCellsCount());
829
830    addRowsByKeys(memstore, keys4);// 3 more cells added - compact (or not)
831    ((CompactingMemStore) memstore).flushInMemory(); // push keys to pipeline and compact
832    int numCells = ((CompactingMemStore) memstore).getImmutableSegments().getNumOfCells();
833    assertTrue(4 == numCells || 11 == numCells);
834    assertEquals(0, memstore.getSnapshot().getCellsCount());
835
836    MemStoreSize mss = memstore.getFlushableSize();
837    MemStoreSnapshot snapshot = memstore.snapshot(); // push keys to snapshot
838    // simulate flusher
839    region.decrMemStoreSize(mss);
840    ImmutableSegment s = memstore.getSnapshot();
841    numCells = s.getCellsCount();
842    assertTrue(4 == numCells || 11 == numCells);
843    assertEquals(0, regionServicesForStores.getMemStoreSize());
844
845    memstore.clearSnapshot(snapshot.getId());
846  }
847
848  @Override
849  @Test
850  public void testScan() throws IOException {
851    scanMemStore(memstore, 6635);
852  }
853
854  protected int addRowsByKeys(final AbstractMemStore hmc, String[] keys) {
855    byte[] fam = Bytes.toBytes("testfamily");
856    byte[] qf = Bytes.toBytes("testqualifier");
857    long size = hmc.getActive().getDataSize();
858    long heapOverhead = hmc.getActive().getHeapSize();
859    int cellsCount = hmc.getActive().getCellsCount();
860    int totalLen = 0;
861    for (int i = 0; i < keys.length; i++) {
862      long timestamp = EnvironmentEdgeManager.currentTime();
863      Threads.sleep(1); // to make sure each kv gets a different ts
864      byte[] row = Bytes.toBytes(keys[i]);
865      byte[] val = Bytes.toBytes(keys[i] + i);
866      KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
867      totalLen += Segment.getCellLength(kv);
868      hmc.add(kv, null);
869      LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp());
870    }
871    regionServicesForStores.addMemStoreSize(hmc.getActive().getDataSize() - size,
872      hmc.getActive().getHeapSize() - heapOverhead, 0,
873      hmc.getActive().getCellsCount() - cellsCount);
874    return totalLen;
875  }
876
877  // for controlling the val size when adding a new cell
878  protected int addRowsByKeys(final AbstractMemStore hmc, String[] keys, byte[] val) {
879    byte[] fam = Bytes.toBytes("testfamily");
880    byte[] qf = Bytes.toBytes("testqualifier");
881    long size = hmc.getActive().getDataSize();
882    long heapOverhead = hmc.getActive().getHeapSize();
883    int cellsCount = hmc.getActive().getCellsCount();
884    int totalLen = 0;
885    for (int i = 0; i < keys.length; i++) {
886      long timestamp = EnvironmentEdgeManager.currentTime();
887      Threads.sleep(1); // to make sure each kv gets a different ts
888      byte[] row = Bytes.toBytes(keys[i]);
889      KeyValue kv = new KeyValue(row, fam, qf, timestamp, val);
890      totalLen += Segment.getCellLength(kv);
891      hmc.add(kv, null);
892      LOG.debug("added kv: " + kv.getKeyString() + ", timestamp:" + kv.getTimestamp());
893    }
894    regionServicesForStores.addMemStoreSize(hmc.getActive().getDataSize() - size,
895      hmc.getActive().getHeapSize() - heapOverhead, 0, cellsCount);
896    return totalLen;
897  }
898
899  private class EnvironmentEdgeForMemstoreTest implements EnvironmentEdge {
900    long t = 1234;
901
902    @Override
903    public long currentTime() {
904      return t;
905    }
906  }
907
908  static protected class MyCompactingMemStore extends CompactingMemStore {
909
910    public MyCompactingMemStore(Configuration conf, CellComparator c, HStore store,
911      RegionServicesForStores regionServices, MemoryCompactionPolicy compactionPolicy)
912      throws IOException {
913      super(conf, c, store, regionServices, compactionPolicy);
914    }
915
916    void disableCompaction() {
917      allowCompaction.set(false);
918    }
919
920    void enableCompaction() {
921      allowCompaction.set(true);
922    }
923
924    void initiateType(MemoryCompactionPolicy compactionType, Configuration conf)
925      throws IllegalArgumentIOException {
926      compactor.initiateCompactionStrategy(compactionType, conf, "CF_TEST");
927    }
928  }
929}