001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019import static org.junit.Assert.assertEquals;
020import static org.junit.Assert.assertNull;
021import static org.junit.Assert.assertTrue;
022import java.lang.management.ManagementFactory;
023import java.nio.ByteBuffer;
024import java.util.ArrayList;
025import java.util.List;
026import java.util.Map;
027import java.util.Random;
028import java.util.concurrent.atomic.AtomicInteger;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.ByteBufferKeyValue;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.HBaseConfiguration;
034import org.apache.hadoop.hbase.KeyValue;
035import org.apache.hadoop.hbase.MultithreadedTestUtil;
036import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
037import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
038import org.apache.hadoop.hbase.testclassification.MediumTests;
039import org.apache.hadoop.hbase.testclassification.RegionServerTests;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.junit.AfterClass;
042import org.junit.BeforeClass;
043import org.junit.ClassRule;
044import org.junit.Test;
045import org.junit.experimental.categories.Category;
046import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
047import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
048import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
049import org.apache.hbase.thirdparty.com.google.common.primitives.Ints;
050
051@Category({RegionServerTests.class, MediumTests.class})
052public class TestMemStoreLAB {
053
054  @ClassRule
055  public static final HBaseClassTestRule CLASS_RULE =
056      HBaseClassTestRule.forClass(TestMemStoreLAB.class);
057
058  private final static Configuration conf = new Configuration();
059
060  private static final byte[] rk = Bytes.toBytes("r1");
061  private static final byte[] cf = Bytes.toBytes("f");
062  private static final byte[] q = Bytes.toBytes("q");
063
064  @BeforeClass
065  public static void setUpBeforeClass() throws Exception {
066    ChunkCreator.initialize(1 * 1024, false, 50 * 1024000L, 0.2f,
067        MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null);
068  }
069
070  @AfterClass
071  public static void tearDownAfterClass() throws Exception {
072    long globalMemStoreLimit =
073        (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax()
074            * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false));
075    ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, globalMemStoreLimit, 0.2f,
076      MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null);
077  }
078
079  /**
080   * Test a bunch of random allocations
081   */
082  @Test
083  public void testLABRandomAllocation() {
084    Random rand = new Random();
085    MemStoreLAB mslab = new MemStoreLABImpl();
086    int expectedOff = 0;
087    ByteBuffer lastBuffer = null;
088    int lastChunkId = -1;
089    // 100K iterations by 0-1K alloc -> 50MB expected
090    // should be reasonable for unit test and also cover wraparound
091    // behavior
092    for (int i = 0; i < 100000; i++) {
093      int valSize = rand.nextInt(3);
094      KeyValue kv = new KeyValue(rk, cf, q, new byte[valSize]);
095      int size = kv.getSerializedSize();
096      ByteBufferKeyValue newKv = (ByteBufferKeyValue) mslab.copyCellInto(kv);
097      if (newKv.getBuffer() != lastBuffer) {
098        // since we add the chunkID at the 0th offset of the chunk and the
099        // chunkid is an int we need to account for those 4 bytes
100        expectedOff = Bytes.SIZEOF_INT;
101        lastBuffer = newKv.getBuffer();
102        int chunkId = newKv.getBuffer().getInt(0);
103        assertTrue("chunkid should be different", chunkId != lastChunkId);
104        lastChunkId = chunkId;
105      }
106      assertEquals(expectedOff, newKv.getOffset());
107      assertTrue("Allocation overruns buffer",
108          newKv.getOffset() + size <= newKv.getBuffer().capacity());
109      expectedOff += size;
110    }
111  }
112
113  @Test
114  public void testLABLargeAllocation() {
115    MemStoreLAB mslab = new MemStoreLABImpl();
116    KeyValue kv = new KeyValue(rk, cf, q, new byte[2 * 1024 * 1024]);
117    Cell newCell = mslab.copyCellInto(kv);
118    assertNull("2MB allocation shouldn't be satisfied by LAB.", newCell);
119  }
120
121  /**
122   * Test allocation from lots of threads, making sure the results don't
123   * overlap in any way
124   */
125  @Test
126  public void testLABThreading() throws Exception {
127    Configuration conf = new Configuration();
128    MultithreadedTestUtil.TestContext ctx =
129      new MultithreadedTestUtil.TestContext(conf);
130
131    final AtomicInteger totalAllocated = new AtomicInteger();
132
133    final MemStoreLAB mslab = new MemStoreLABImpl();
134    List<List<AllocRecord>> allocations = Lists.newArrayList();
135
136    for (int i = 0; i < 10; i++) {
137      final List<AllocRecord> allocsByThisThread = Lists.newLinkedList();
138      allocations.add(allocsByThisThread);
139
140      TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) {
141        private Random r = new Random();
142        @Override
143        public void doAnAction() throws Exception {
144          int valSize = r.nextInt(3);
145          KeyValue kv = new KeyValue(rk, cf, q, new byte[valSize]);
146          int size = kv.getSerializedSize();
147          ByteBufferKeyValue newCell = (ByteBufferKeyValue) mslab.copyCellInto(kv);
148          totalAllocated.addAndGet(size);
149          allocsByThisThread.add(new AllocRecord(newCell.getBuffer(), newCell.getOffset(), size));
150        }
151      };
152      ctx.addThread(t);
153    }
154
155    ctx.startThreads();
156    while (totalAllocated.get() < 50*1024*1000 && ctx.shouldRun()) {
157      Thread.sleep(10);
158    }
159    ctx.stop();
160    // Partition the allocations by the actual byte[] they point into,
161    // make sure offsets are unique for each chunk
162    Map<ByteBuffer, Map<Integer, AllocRecord>> mapsByChunk =
163      Maps.newHashMap();
164
165    int sizeCounted = 0;
166    for (AllocRecord rec : Iterables.concat(allocations)) {
167      sizeCounted += rec.size;
168      if (rec.size == 0) {
169        continue;
170      }
171      Map<Integer, AllocRecord> mapForThisByteArray =
172        mapsByChunk.get(rec.alloc);
173      if (mapForThisByteArray == null) {
174        mapForThisByteArray = Maps.newTreeMap();
175        mapsByChunk.put(rec.alloc, mapForThisByteArray);
176      }
177      AllocRecord oldVal = mapForThisByteArray.put(rec.offset, rec);
178      assertNull("Already had an entry " + oldVal + " for allocation " + rec,
179          oldVal);
180    }
181    assertEquals("Sanity check test", sizeCounted, totalAllocated.get());
182
183    // Now check each byte array to make sure allocations don't overlap
184    for (Map<Integer, AllocRecord> allocsInChunk : mapsByChunk.values()) {
185      // since we add the chunkID at the 0th offset of the chunk and the
186      // chunkid is an int we need to account for those 4 bytes
187      int expectedOff = Bytes.SIZEOF_INT;
188      for (AllocRecord alloc : allocsInChunk.values()) {
189        assertEquals(expectedOff, alloc.offset);
190        assertTrue("Allocation overruns buffer",
191            alloc.offset + alloc.size <= alloc.alloc.capacity());
192        expectedOff += alloc.size;
193      }
194    }
195  }
196
197  /**
198   * Test frequent chunk retirement with chunk pool triggered by lots of threads, making sure
199   * there's no memory leak (HBASE-16195)
200   * @throws Exception if any error occurred
201   */
202  @Test
203  public void testLABChunkQueue() throws Exception {
204    ChunkCreator oldInstance = null;
205    try {
206      MemStoreLABImpl mslab = new MemStoreLABImpl();
207      // by default setting, there should be no chunks initialized in the pool
208      assertTrue(mslab.getPooledChunks().isEmpty());
209      oldInstance = ChunkCreator.instance;
210      ChunkCreator.instance = null;
211      // reset mslab with chunk pool
212      Configuration conf = HBaseConfiguration.create();
213      conf.setDouble(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.1);
214      // set chunk size to default max alloc size, so we could easily trigger chunk retirement
215      conf.setLong(MemStoreLABImpl.CHUNK_SIZE_KEY, MemStoreLABImpl.MAX_ALLOC_DEFAULT);
216      // reconstruct mslab
217      long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage()
218          .getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false));
219      ChunkCreator.initialize(MemStoreLABImpl.MAX_ALLOC_DEFAULT, false,
220        globalMemStoreLimit, 0.1f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null);
221      ChunkCreator.clearDisableFlag();
222      mslab = new MemStoreLABImpl(conf);
223      // launch multiple threads to trigger frequent chunk retirement
224      List<Thread> threads = new ArrayList<>();
225      final KeyValue kv = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("q"),
226          new byte[MemStoreLABImpl.MAX_ALLOC_DEFAULT - 32]);
227      for (int i = 0; i < 10; i++) {
228        threads.add(getChunkQueueTestThread(mslab, "testLABChunkQueue-" + i, kv));
229      }
230      for (Thread thread : threads) {
231        thread.start();
232      }
233      // let it run for some time
234      Thread.sleep(1000);
235      for (Thread thread : threads) {
236        thread.interrupt();
237      }
238      boolean threadsRunning = true;
239      boolean alive = false;
240      while (threadsRunning) {
241        alive = false;
242        for (Thread thread : threads) {
243          if (thread.isAlive()) {
244            alive = true;
245            break;
246          }
247        }
248        if (!alive) {
249          threadsRunning = false;
250        }
251      }
252      // none of the chunkIds would have been returned back
253      assertTrue("All the chunks must have been cleared",
254          ChunkCreator.instance.numberOfMappedChunks() != 0);
255      int pooledChunksNum = mslab.getPooledChunks().size();
256      // close the mslab
257      mslab.close();
258      // make sure all chunks where reclaimed back to pool
259      int queueLength = mslab.getNumOfChunksReturnedToPool();
260      assertTrue("All chunks in chunk queue should be reclaimed or removed"
261          + " after mslab closed but actually: " + (pooledChunksNum-queueLength),
262          pooledChunksNum-queueLength == 0);
263    } finally {
264      ChunkCreator.instance = oldInstance;
265    }
266  }
267
268  private Thread getChunkQueueTestThread(final MemStoreLABImpl mslab, String threadName,
269      Cell cellToCopyInto) {
270    Thread thread = new Thread() {
271      volatile boolean stopped = false;
272
273      @Override
274      public void run() {
275        while (!stopped) {
276          // keep triggering chunk retirement
277          mslab.copyCellInto(cellToCopyInto);
278        }
279      }
280
281      @Override
282      public void interrupt() {
283        this.stopped = true;
284      }
285    };
286    thread.setName(threadName);
287    thread.setDaemon(true);
288    return thread;
289  }
290
291  private static class AllocRecord implements Comparable<AllocRecord>{
292    private final ByteBuffer alloc;
293    private final int offset;
294    private final int size;
295
296    public AllocRecord(ByteBuffer alloc, int offset, int size) {
297      super();
298      this.alloc = alloc;
299      this.offset = offset;
300      this.size = size;
301    }
302
303    @Override
304    public int compareTo(AllocRecord e) {
305      if (alloc != e.alloc) {
306        throw new RuntimeException("Can only compare within a particular array");
307      }
308      return Ints.compare(this.offset, e.offset);
309    }
310
311    @Override
312    public String toString() {
313      return "AllocRecord(offset=" + this.offset + ", size=" + size + ")";
314    }
315  }
316}
317