001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotNull;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.lang.management.ManagementFactory;
026import java.nio.ByteBuffer;
027import java.util.List;
028import java.util.Random;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.ByteBufferKeyValue;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.KeyValue;
033import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
034import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
035import org.apache.hadoop.hbase.testclassification.RegionServerTests;
036import org.apache.hadoop.hbase.testclassification.SmallTests;
037import org.apache.hadoop.hbase.util.Bytes;
038import org.junit.After;
039import org.junit.AfterClass;
040import org.junit.BeforeClass;
041import org.junit.ClassRule;
042import org.junit.Test;
043import org.junit.experimental.categories.Category;
044
045/**
046 * Test the {@link org.apache.hadoop.hbase.regionserver.ChunkCreator.MemStoreChunkPool} class
047 */
048@Category({RegionServerTests.class, SmallTests.class})
049public class TestMemStoreChunkPool {
050
051  @ClassRule
052  public static final HBaseClassTestRule CLASS_RULE =
053      HBaseClassTestRule.forClass(TestMemStoreChunkPool.class);
054
055  private final static Configuration conf = new Configuration();
056  private static ChunkCreator chunkCreator;
057  private static boolean chunkPoolDisabledBeforeTest;
058
059  @BeforeClass
060  public static void setUpBeforeClass() throws Exception {
061    conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, true);
062    conf.setFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.2f);
063    chunkPoolDisabledBeforeTest = ChunkCreator.chunkPoolDisabled;
064    ChunkCreator.chunkPoolDisabled = false;
065    long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage()
066        .getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false));
067    chunkCreator = ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false,
068      globalMemStoreLimit, 0.2f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT,
069      null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
070    assertNotNull(chunkCreator);
071  }
072
073  @AfterClass
074  public static void tearDownAfterClass() throws Exception {
075    ChunkCreator.chunkPoolDisabled = chunkPoolDisabledBeforeTest;
076  }
077
078  @After
079  public void tearDown() throws Exception {
080    chunkCreator.clearChunksInPool();
081  }
082
083  @Test
084  public void testReusingChunks() {
085    Random rand = new Random();
086    MemStoreLAB mslab = new MemStoreLABImpl(conf);
087    int expectedOff = 0;
088    ByteBuffer lastBuffer = null;
089    final byte[] rk = Bytes.toBytes("r1");
090    final byte[] cf = Bytes.toBytes("f");
091    final byte[] q = Bytes.toBytes("q");
092    // Randomly allocate some bytes
093    for (int i = 0; i < 100; i++) {
094      int valSize = rand.nextInt(1000);
095      KeyValue kv = new KeyValue(rk, cf, q, new byte[valSize]);
096      int size = kv.getSerializedSize();
097      ByteBufferKeyValue newKv = (ByteBufferKeyValue) mslab.copyCellInto(kv);
098      if (newKv.getBuffer() != lastBuffer) {
099        expectedOff = 4;
100        lastBuffer = newKv.getBuffer();
101      }
102      assertEquals(expectedOff, newKv.getOffset());
103      assertTrue("Allocation overruns buffer",
104          newKv.getOffset() + size <= newKv.getBuffer().capacity());
105      expectedOff += size;
106    }
107    // chunks will be put back to pool after close
108    mslab.close();
109    int chunkCount = chunkCreator.getPoolSize();
110    assertTrue(chunkCount > 0);
111    // reconstruct mslab
112    mslab = new MemStoreLABImpl(conf);
113    // chunk should be got from the pool, so we can reuse it.
114    KeyValue kv = new KeyValue(rk, cf, q, new byte[10]);
115    mslab.copyCellInto(kv);
116    assertEquals(chunkCount - 1, chunkCreator.getPoolSize());
117  }
118
119  @Test
120  public void testPuttingBackChunksAfterFlushing() throws UnexpectedStateException {
121    byte[] row = Bytes.toBytes("testrow");
122    byte[] fam = Bytes.toBytes("testfamily");
123    byte[] qf1 = Bytes.toBytes("testqualifier1");
124    byte[] qf2 = Bytes.toBytes("testqualifier2");
125    byte[] qf3 = Bytes.toBytes("testqualifier3");
126    byte[] qf4 = Bytes.toBytes("testqualifier4");
127    byte[] qf5 = Bytes.toBytes("testqualifier5");
128    byte[] val = Bytes.toBytes("testval");
129
130    DefaultMemStore memstore = new DefaultMemStore();
131
132    // Setting up memstore
133    memstore.add(new KeyValue(row, fam, qf1, val), null);
134    memstore.add(new KeyValue(row, fam, qf2, val), null);
135    memstore.add(new KeyValue(row, fam, qf3, val), null);
136
137    // Creating a snapshot
138    MemStoreSnapshot snapshot = memstore.snapshot();
139    assertEquals(3, memstore.getSnapshot().getCellsCount());
140
141    // Adding value to "new" memstore
142    assertEquals(0, memstore.getActive().getCellsCount());
143    memstore.add(new KeyValue(row, fam, qf4, val), null);
144    memstore.add(new KeyValue(row, fam, qf5, val), null);
145    assertEquals(2, memstore.getActive().getCellsCount());
146    // close the scanner - this is how the snapshot will be used
147    for(KeyValueScanner scanner : snapshot.getScanners()) {
148      scanner.close();
149    }
150    memstore.clearSnapshot(snapshot.getId());
151
152    int chunkCount = chunkCreator.getPoolSize();
153    assertTrue(chunkCount > 0);
154
155  }
156
157  @Test
158  public void testPuttingBackChunksWithOpeningScanner()
159      throws IOException {
160    byte[] row = Bytes.toBytes("testrow");
161    byte[] fam = Bytes.toBytes("testfamily");
162    byte[] qf1 = Bytes.toBytes("testqualifier1");
163    byte[] qf2 = Bytes.toBytes("testqualifier2");
164    byte[] qf3 = Bytes.toBytes("testqualifier3");
165    byte[] qf4 = Bytes.toBytes("testqualifier4");
166    byte[] qf5 = Bytes.toBytes("testqualifier5");
167    byte[] qf6 = Bytes.toBytes("testqualifier6");
168    byte[] qf7 = Bytes.toBytes("testqualifier7");
169    byte[] val = Bytes.toBytes("testval");
170
171    DefaultMemStore memstore = new DefaultMemStore();
172
173    // Setting up memstore
174    memstore.add(new KeyValue(row, fam, qf1, val), null);
175    memstore.add(new KeyValue(row, fam, qf2, val), null);
176    memstore.add(new KeyValue(row, fam, qf3, val), null);
177
178    // Creating a snapshot
179    MemStoreSnapshot snapshot = memstore.snapshot();
180    assertEquals(3, memstore.getSnapshot().getCellsCount());
181
182    // Adding value to "new" memstore
183    assertEquals(0, memstore.getActive().getCellsCount());
184    memstore.add(new KeyValue(row, fam, qf4, val), null);
185    memstore.add(new KeyValue(row, fam, qf5, val), null);
186    assertEquals(2, memstore.getActive().getCellsCount());
187
188    // opening scanner before clear the snapshot
189    List<KeyValueScanner> scanners = memstore.getScanners(0);
190    // Shouldn't putting back the chunks to pool,since some scanners are opening
191    // based on their data
192    // close the snapshot scanner
193    for(KeyValueScanner scanner : snapshot.getScanners()) {
194      scanner.close();
195    }
196    memstore.clearSnapshot(snapshot.getId());
197
198    assertTrue(chunkCreator.getPoolSize() == 0);
199
200    // Chunks will be put back to pool after close scanners;
201    for (KeyValueScanner scanner : scanners) {
202      scanner.close();
203    }
204    assertTrue(chunkCreator.getPoolSize() > 0);
205
206    // clear chunks
207    chunkCreator.clearChunksInPool();
208
209    // Creating another snapshot
210    snapshot = memstore.snapshot();
211    // Adding more value
212    memstore.add(new KeyValue(row, fam, qf6, val), null);
213    memstore.add(new KeyValue(row, fam, qf7, val), null);
214    // opening scanners
215    scanners = memstore.getScanners(0);
216    // close scanners before clear the snapshot
217    for (KeyValueScanner scanner : scanners) {
218      scanner.close();
219    }
220    // Since no opening scanner, the chunks of snapshot should be put back to
221    // pool
222    // close the snapshot scanners
223    for(KeyValueScanner scanner : snapshot.getScanners()) {
224      scanner.close();
225    }
226    memstore.clearSnapshot(snapshot.getId());
227    assertTrue(chunkCreator.getPoolSize() > 0);
228  }
229
230  @Test
231  public void testPutbackChunksMultiThreaded() throws Exception {
232    final int maxCount = 10;
233    final int initialCount = 5;
234    final int chunkSize = 40;
235    final int valSize = 7;
236    ChunkCreator oldCreator = ChunkCreator.getInstance();
237    ChunkCreator newCreator = new ChunkCreator(chunkSize, false, 400, 1, 0.5f, null, 0);
238    assertEquals(initialCount, newCreator.getPoolSize());
239    assertEquals(maxCount, newCreator.getMaxCount());
240    ChunkCreator.instance = newCreator;// Replace the global ref with the new one we created.
241                                             // Used it for the testing. Later in finally we put
242                                             // back the original
243    final KeyValue kv = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("q"),
244        new byte[valSize]);
245    try {
246      Runnable r = new Runnable() {
247        @Override
248        public void run() {
249          MemStoreLAB memStoreLAB = new MemStoreLABImpl(conf);
250          for (int i = 0; i < maxCount; i++) {
251            memStoreLAB.copyCellInto(kv);// Try allocate size = chunkSize. Means every
252                                         // allocate call will result in a new chunk
253          }
254          // Close MemStoreLAB so that all chunks will be tried to be put back to pool
255          memStoreLAB.close();
256        }
257      };
258      Thread t1 = new Thread(r);
259      Thread t2 = new Thread(r);
260      Thread t3 = new Thread(r);
261      t1.start();
262      t2.start();
263      t3.start();
264      t1.join();
265      t2.join();
266      t3.join();
267      assertTrue(newCreator.getPoolSize() <= maxCount);
268    } finally {
269      ChunkCreator.instance = oldCreator;
270    }
271  }
272}