001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import java.io.IOException;
024import java.lang.management.ManagementFactory;
025import java.nio.ByteBuffer;
026import java.util.List;
027import java.util.Random;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.ByteBufferKeyValue;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.KeyValue;
032import org.apache.hadoop.hbase.exceptions.UnexpectedStateException;
033import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
034import org.apache.hadoop.hbase.testclassification.RegionServerTests;
035import org.apache.hadoop.hbase.testclassification.SmallTests;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.junit.After;
038import org.junit.AfterClass;
039import org.junit.BeforeClass;
040import org.junit.ClassRule;
041import org.junit.Test;
042import org.junit.experimental.categories.Category;
043
044/**
045 * Test the {@link org.apache.hadoop.hbase.regionserver.ChunkCreator.MemStoreChunkPool} class
046 */
047@Category({RegionServerTests.class, SmallTests.class})
048public class TestMemStoreChunkPool {
049
050  @ClassRule
051  public static final HBaseClassTestRule CLASS_RULE =
052      HBaseClassTestRule.forClass(TestMemStoreChunkPool.class);
053
054  private final static Configuration conf = new Configuration();
055  private static ChunkCreator chunkCreator;
056  private static boolean chunkPoolDisabledBeforeTest;
057
058  @BeforeClass
059  public static void setUpBeforeClass() throws Exception {
060    conf.setBoolean(MemStoreLAB.USEMSLAB_KEY, true);
061    conf.setFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.2f);
062    chunkPoolDisabledBeforeTest = ChunkCreator.chunkPoolDisabled;
063    ChunkCreator.chunkPoolDisabled = false;
064    long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage()
065        .getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false));
066    chunkCreator = ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false,
067      globalMemStoreLimit, 0.2f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null);
068    assertTrue(chunkCreator != null);
069  }
070
071  @AfterClass
072  public static void tearDownAfterClass() throws Exception {
073    ChunkCreator.chunkPoolDisabled = chunkPoolDisabledBeforeTest;
074  }
075
076  @After
077  public void tearDown() throws Exception {
078    chunkCreator.clearChunksInPool();
079  }
080
081  @Test
082  public void testReusingChunks() {
083    Random rand = new Random();
084    MemStoreLAB mslab = new MemStoreLABImpl(conf);
085    int expectedOff = 0;
086    ByteBuffer lastBuffer = null;
087    final byte[] rk = Bytes.toBytes("r1");
088    final byte[] cf = Bytes.toBytes("f");
089    final byte[] q = Bytes.toBytes("q");
090    // Randomly allocate some bytes
091    for (int i = 0; i < 100; i++) {
092      int valSize = rand.nextInt(1000);
093      KeyValue kv = new KeyValue(rk, cf, q, new byte[valSize]);
094      int size = kv.getSerializedSize();
095      ByteBufferKeyValue newKv = (ByteBufferKeyValue) mslab.copyCellInto(kv);
096      if (newKv.getBuffer() != lastBuffer) {
097        expectedOff = 4;
098        lastBuffer = newKv.getBuffer();
099      }
100      assertEquals(expectedOff, newKv.getOffset());
101      assertTrue("Allocation overruns buffer",
102          newKv.getOffset() + size <= newKv.getBuffer().capacity());
103      expectedOff += size;
104    }
105    // chunks will be put back to pool after close
106    mslab.close();
107    int chunkCount = chunkCreator.getPoolSize();
108    assertTrue(chunkCount > 0);
109    // reconstruct mslab
110    mslab = new MemStoreLABImpl(conf);
111    // chunk should be got from the pool, so we can reuse it.
112    KeyValue kv = new KeyValue(rk, cf, q, new byte[10]);
113    mslab.copyCellInto(kv);
114    assertEquals(chunkCount - 1, chunkCreator.getPoolSize());
115  }
116
117  @Test
118  public void testPuttingBackChunksAfterFlushing() throws UnexpectedStateException {
119    byte[] row = Bytes.toBytes("testrow");
120    byte[] fam = Bytes.toBytes("testfamily");
121    byte[] qf1 = Bytes.toBytes("testqualifier1");
122    byte[] qf2 = Bytes.toBytes("testqualifier2");
123    byte[] qf3 = Bytes.toBytes("testqualifier3");
124    byte[] qf4 = Bytes.toBytes("testqualifier4");
125    byte[] qf5 = Bytes.toBytes("testqualifier5");
126    byte[] val = Bytes.toBytes("testval");
127
128    DefaultMemStore memstore = new DefaultMemStore();
129
130    // Setting up memstore
131    memstore.add(new KeyValue(row, fam, qf1, val), null);
132    memstore.add(new KeyValue(row, fam, qf2, val), null);
133    memstore.add(new KeyValue(row, fam, qf3, val), null);
134
135    // Creating a snapshot
136    MemStoreSnapshot snapshot = memstore.snapshot();
137    assertEquals(3, memstore.getSnapshot().getCellsCount());
138
139    // Adding value to "new" memstore
140    assertEquals(0, memstore.getActive().getCellsCount());
141    memstore.add(new KeyValue(row, fam, qf4, val), null);
142    memstore.add(new KeyValue(row, fam, qf5, val), null);
143    assertEquals(2, memstore.getActive().getCellsCount());
144    // close the scanner - this is how the snapshot will be used
145    for(KeyValueScanner scanner : snapshot.getScanners()) {
146      scanner.close();
147    }
148    memstore.clearSnapshot(snapshot.getId());
149
150    int chunkCount = chunkCreator.getPoolSize();
151    assertTrue(chunkCount > 0);
152
153  }
154
155  @Test
156  public void testPuttingBackChunksWithOpeningScanner()
157      throws IOException {
158    byte[] row = Bytes.toBytes("testrow");
159    byte[] fam = Bytes.toBytes("testfamily");
160    byte[] qf1 = Bytes.toBytes("testqualifier1");
161    byte[] qf2 = Bytes.toBytes("testqualifier2");
162    byte[] qf3 = Bytes.toBytes("testqualifier3");
163    byte[] qf4 = Bytes.toBytes("testqualifier4");
164    byte[] qf5 = Bytes.toBytes("testqualifier5");
165    byte[] qf6 = Bytes.toBytes("testqualifier6");
166    byte[] qf7 = Bytes.toBytes("testqualifier7");
167    byte[] val = Bytes.toBytes("testval");
168
169    DefaultMemStore memstore = new DefaultMemStore();
170
171    // Setting up memstore
172    memstore.add(new KeyValue(row, fam, qf1, val), null);
173    memstore.add(new KeyValue(row, fam, qf2, val), null);
174    memstore.add(new KeyValue(row, fam, qf3, val), null);
175
176    // Creating a snapshot
177    MemStoreSnapshot snapshot = memstore.snapshot();
178    assertEquals(3, memstore.getSnapshot().getCellsCount());
179
180    // Adding value to "new" memstore
181    assertEquals(0, memstore.getActive().getCellsCount());
182    memstore.add(new KeyValue(row, fam, qf4, val), null);
183    memstore.add(new KeyValue(row, fam, qf5, val), null);
184    assertEquals(2, memstore.getActive().getCellsCount());
185
186    // opening scanner before clear the snapshot
187    List<KeyValueScanner> scanners = memstore.getScanners(0);
188    // Shouldn't putting back the chunks to pool,since some scanners are opening
189    // based on their data
190    // close the snapshot scanner
191    for(KeyValueScanner scanner : snapshot.getScanners()) {
192      scanner.close();
193    }
194    memstore.clearSnapshot(snapshot.getId());
195
196    assertTrue(chunkCreator.getPoolSize() == 0);
197
198    // Chunks will be put back to pool after close scanners;
199    for (KeyValueScanner scanner : scanners) {
200      scanner.close();
201    }
202    assertTrue(chunkCreator.getPoolSize() > 0);
203
204    // clear chunks
205    chunkCreator.clearChunksInPool();
206
207    // Creating another snapshot
208    snapshot = memstore.snapshot();
209    // Adding more value
210    memstore.add(new KeyValue(row, fam, qf6, val), null);
211    memstore.add(new KeyValue(row, fam, qf7, val), null);
212    // opening scanners
213    scanners = memstore.getScanners(0);
214    // close scanners before clear the snapshot
215    for (KeyValueScanner scanner : scanners) {
216      scanner.close();
217    }
218    // Since no opening scanner, the chunks of snapshot should be put back to
219    // pool
220    // close the snapshot scanners
221    for(KeyValueScanner scanner : snapshot.getScanners()) {
222      scanner.close();
223    }
224    memstore.clearSnapshot(snapshot.getId());
225    assertTrue(chunkCreator.getPoolSize() > 0);
226  }
227
228  @Test
229  public void testPutbackChunksMultiThreaded() throws Exception {
230    final int maxCount = 10;
231    final int initialCount = 5;
232    final int chunkSize = 40;
233    final int valSize = 7;
234    ChunkCreator oldCreator = ChunkCreator.getInstance();
235    ChunkCreator newCreator = new ChunkCreator(chunkSize, false, 400, 1, 0.5f, null, 0);
236    assertEquals(initialCount, newCreator.getPoolSize());
237    assertEquals(maxCount, newCreator.getMaxCount());
238    ChunkCreator.instance = newCreator;// Replace the global ref with the new one we created.
239                                             // Used it for the testing. Later in finally we put
240                                             // back the original
241    final KeyValue kv = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("q"),
242        new byte[valSize]);
243    try {
244      Runnable r = new Runnable() {
245        @Override
246        public void run() {
247          MemStoreLAB memStoreLAB = new MemStoreLABImpl(conf);
248          for (int i = 0; i < maxCount; i++) {
249            memStoreLAB.copyCellInto(kv);// Try allocate size = chunkSize. Means every
250                                         // allocate call will result in a new chunk
251          }
252          // Close MemStoreLAB so that all chunks will be tried to be put back to pool
253          memStoreLAB.close();
254        }
255      };
256      Thread t1 = new Thread(r);
257      Thread t2 = new Thread(r);
258      Thread t3 = new Thread(r);
259      t1.start();
260      t2.start();
261      t3.start();
262      t1.join();
263      t2.join();
264      t3.join();
265      assertTrue(newCreator.getPoolSize() <= maxCount);
266    } finally {
267      ChunkCreator.instance = oldCreator;
268    }
269  }
270}