001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.*;
021
022import java.lang.management.ManagementFactory;
023import java.nio.ByteBuffer;
024import java.util.ArrayList;
025import java.util.List;
026import java.util.Map;
027import java.util.Random;
028import java.util.concurrent.atomic.AtomicInteger;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.ByteBufferKeyValue;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.HBaseConfiguration;
034import org.apache.hadoop.hbase.KeyValue;
035import org.apache.hadoop.hbase.KeyValueUtil;
036import org.apache.hadoop.hbase.MultithreadedTestUtil;
037import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
038import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
039import org.apache.hadoop.hbase.testclassification.RegionServerTests;
040import org.apache.hadoop.hbase.testclassification.SmallTests;
041import org.apache.hadoop.hbase.util.Bytes;
042import org.junit.AfterClass;
043import org.junit.BeforeClass;
044import org.junit.ClassRule;
045import org.junit.Test;
046import org.junit.experimental.categories.Category;
047
048import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
049import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
050import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
051import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
052
053@Category({RegionServerTests.class, SmallTests.class})
054public class TestMemStoreLAB {
055
056  @ClassRule
057  public static final HBaseClassTestRule CLASS_RULE =
058      HBaseClassTestRule.forClass(TestMemStoreLAB.class);
059
060  private final static Configuration conf = new Configuration();
061
062  private static final byte[] rk = Bytes.toBytes("r1");
063  private static final byte[] cf = Bytes.toBytes("f");
064  private static final byte[] q = Bytes.toBytes("q");
065
066  @BeforeClass
067  public static void setUpBeforeClass() throws Exception {
068    ChunkCreator.initialize(1 * 1024, false, 50 * 1024000L, 0.2f,
069        MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null);
070  }
071
072  @AfterClass
073  public static void tearDownAfterClass() throws Exception {
074    long globalMemStoreLimit =
075        (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax()
076            * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false));
077    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, globalMemStoreLimit, 0.2f,
078      MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null);
079  }
080
081  /**
082   * Test a bunch of random allocations
083   */
084  @Test
085  public void testLABRandomAllocation() {
086    Random rand = new Random();
087    MemStoreLAB mslab = new MemStoreLABImpl();
088    int expectedOff = 0;
089    ByteBuffer lastBuffer = null;
090    int lastChunkId = -1;
091    // 100K iterations by 0-1K alloc -> 50MB expected
092    // should be reasonable for unit test and also cover wraparound
093    // behavior
094    for (int i = 0; i < 100000; i++) {
095      int valSize = rand.nextInt(3);
096      KeyValue kv = new KeyValue(rk, cf, q, new byte[valSize]);
097      int size = KeyValueUtil.length(kv);
098      ByteBufferKeyValue newKv = (ByteBufferKeyValue) mslab.copyCellInto(kv);
099      if (newKv.getBuffer() != lastBuffer) {
100        // since we add the chunkID at the 0th offset of the chunk and the
101        // chunkid is an int we need to account for those 4 bytes
102        expectedOff = Bytes.SIZEOF_INT;
103        lastBuffer = newKv.getBuffer();
104        int chunkId = newKv.getBuffer().getInt(0);
105        assertTrue("chunkid should be different", chunkId != lastChunkId);
106        lastChunkId = chunkId;
107      }
108      assertEquals(expectedOff, newKv.getOffset());
109      assertTrue("Allocation overruns buffer",
110          newKv.getOffset() + size <= newKv.getBuffer().capacity());
111      expectedOff += size;
112    }
113  }
114
115  @Test
116  public void testLABLargeAllocation() {
117    MemStoreLAB mslab = new MemStoreLABImpl();
118    KeyValue kv = new KeyValue(rk, cf, q, new byte[2 * 1024 * 1024]);
119    Cell newCell = mslab.copyCellInto(kv);
120    assertNull("2MB allocation shouldn't be satisfied by LAB.", newCell);
121  }
122
123  /**
124   * Test allocation from lots of threads, making sure the results don't
125   * overlap in any way
126   */
127  @Test
128  public void testLABThreading() throws Exception {
129    Configuration conf = new Configuration();
130    MultithreadedTestUtil.TestContext ctx =
131      new MultithreadedTestUtil.TestContext(conf);
132
133    final AtomicInteger totalAllocated = new AtomicInteger();
134
135    final MemStoreLAB mslab = new MemStoreLABImpl();
136    List<List<AllocRecord>> allocations = Lists.newArrayList();
137
138    for (int i = 0; i < 10; i++) {
139      final List<AllocRecord> allocsByThisThread = Lists.newLinkedList();
140      allocations.add(allocsByThisThread);
141
142      TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
143        private Random r = new Random();
144        @Override
145        public void doAnAction() throws Exception {
146          int valSize = r.nextInt(3);
147          KeyValue kv = new KeyValue(rk, cf, q, new byte[valSize]);
148          int size = KeyValueUtil.length(kv);
149          ByteBufferKeyValue newCell = (ByteBufferKeyValue) mslab.copyCellInto(kv);
150          totalAllocated.addAndGet(size);
151          allocsByThisThread.add(new AllocRecord(newCell.getBuffer(), newCell.getOffset(), size));
152        }
153      };
154      ctx.addThread(t);
155    }
156
157    ctx.startThreads();
158    while (totalAllocated.get() < 50*1024*1000 && ctx.shouldRun()) {
159      Thread.sleep(10);
160    }
161    ctx.stop();
162    // Partition the allocations by the actual byte[] they point into,
163    // make sure offsets are unique for each chunk
164    Map<ByteBuffer, Map<Integer, AllocRecord>> mapsByChunk =
165      Maps.newHashMap();
166
167    int sizeCounted = 0;
168    for (AllocRecord rec : Iterables.concat(allocations)) {
169      sizeCounted += rec.size;
170      if (rec.size == 0) continue;
171      Map<Integer, AllocRecord> mapForThisByteArray =
172        mapsByChunk.get(rec.alloc);
173      if (mapForThisByteArray == null) {
174        mapForThisByteArray = Maps.newTreeMap();
175        mapsByChunk.put(rec.alloc, mapForThisByteArray);
176      }
177      AllocRecord oldVal = mapForThisByteArray.put(rec.offset, rec);
178      assertNull("Already had an entry " + oldVal + " for allocation " + rec,
179          oldVal);
180    }
181    assertEquals("Sanity check test", sizeCounted, totalAllocated.get());
182
183    // Now check each byte array to make sure allocations don't overlap
184    for (Map<Integer, AllocRecord> allocsInChunk : mapsByChunk.values()) {
185      // since we add the chunkID at the 0th offset of the chunk and the
186      // chunkid is an int we need to account for those 4 bytes
187      int expectedOff = Bytes.SIZEOF_INT;
188      for (AllocRecord alloc : allocsInChunk.values()) {
189        assertEquals(expectedOff, alloc.offset);
190        assertTrue("Allocation overruns buffer",
191            alloc.offset + alloc.size <= alloc.alloc.capacity());
192        expectedOff += alloc.size;
193      }
194    }
195  }
196
197  /**
198   * Test frequent chunk retirement with chunk pool triggered by lots of threads, making sure
199   * there's no memory leak (HBASE-16195)
200   * @throws Exception if any error occurred
201   */
202  @Test
203  public void testLABChunkQueue() throws Exception {
204    ChunkCreator oldInstance = null;
205    try {
206      MemStoreLABImpl mslab = new MemStoreLABImpl();
207      // by default setting, there should be no chunks initialized in the pool
208      assertTrue(mslab.getPooledChunks().isEmpty());
209      oldInstance = ChunkCreator.instance;
210      ChunkCreator.instance = null;
211      // reset mslab with chunk pool
212      Configuration conf = HBaseConfiguration.create();
213      conf.setDouble(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.1);
214      // set chunk size to default max alloc size, so we could easily trigger chunk retirement
215      conf.setLong(MemStoreLABImpl.CHUNK_SIZE_KEY, MemStoreLABImpl.MAX_ALLOC_DEFAULT);
216      // reconstruct mslab
217      long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage()
218          .getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false));
219      ChunkCreator.initialize(MemStoreLABImpl.MAX_ALLOC_DEFAULT, false,
220        globalMemStoreLimit, 0.1f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null);
221      ChunkCreator.clearDisableFlag();
222      mslab = new MemStoreLABImpl(conf);
223      // launch multiple threads to trigger frequent chunk retirement
224      List<Thread> threads = new ArrayList<>();
225      final KeyValue kv = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("q"),
226          new byte[MemStoreLABImpl.MAX_ALLOC_DEFAULT - 32]);
227      for (int i = 0; i < 10; i++) {
228        threads.add(getChunkQueueTestThread(mslab, "testLABChunkQueue-" + i, kv));
229      }
230      for (Thread thread : threads) {
231        thread.start();
232      }
233      // let it run for some time
234      Thread.sleep(1000);
235      for (Thread thread : threads) {
236        thread.interrupt();
237      }
238      boolean threadsRunning = true;
239      boolean alive = false;
240      while (threadsRunning) {
241        alive = false;
242        for (Thread thread : threads) {
243          if (thread.isAlive()) {
244            alive = true;
245            break;
246          }
247        }
248        if (!alive) {
249          threadsRunning = false;
250        }
251      }
252      // none of the chunkIds would have been returned back
253      assertTrue("All the chunks must have been cleared",
254          ChunkCreator.instance.numberOfMappedChunks() != 0);
255      int pooledChunksNum = mslab.getPooledChunks().size();
256      // close the mslab
257      mslab.close();
258      // make sure all chunks where reclaimed back to pool
259      int queueLength = mslab.getNumOfChunksReturnedToPool();
260      assertTrue("All chunks in chunk queue should be reclaimed or removed"
261          + " after mslab closed but actually: " + (pooledChunksNum-queueLength),
262          pooledChunksNum-queueLength == 0);
263    } finally {
264      ChunkCreator.instance = oldInstance;
265    }
266  }
267
268  private Thread getChunkQueueTestThread(final MemStoreLABImpl mslab, String threadName,
269      Cell cellToCopyInto) {
270    Thread thread = new Thread() {
271      volatile boolean stopped = false;
272
273      @Override
274      public void run() {
275        while (!stopped) {
276          // keep triggering chunk retirement
277          mslab.copyCellInto(cellToCopyInto);
278        }
279      }
280
281      @Override
282      public void interrupt() {
283        this.stopped = true;
284      }
285    };
286    thread.setName(threadName);
287    thread.setDaemon(true);
288    return thread;
289  }
290
291  private static class AllocRecord implements Comparable<AllocRecord>{
292    private final ByteBuffer alloc;
293    private final int offset;
294    private final int size;
295
296    public AllocRecord(ByteBuffer alloc, int offset, int size) {
297      super();
298      this.alloc = alloc;
299      this.offset = offset;
300      this.size = size;
301    }
302
303    @Override
304    public int compareTo(AllocRecord e) {
305      if (alloc != e.alloc) {
306        throw new RuntimeException("Can only compare within a particular array");
307      }
308      return Ints.compare(this.offset, e.offset);
309    }
310
311    @Override
312    public String toString() {
313      return "AllocRecord(offset=" + this.offset + ", size=" + size + ")";
314    }
315  }
316}
317