001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.*; 021 022import java.lang.management.ManagementFactory; 023import java.nio.ByteBuffer; 024import java.util.ArrayList; 025import java.util.List; 026import java.util.Map; 027import java.util.Random; 028import java.util.concurrent.atomic.AtomicInteger; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.ByteBufferKeyValue; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.HBaseClassTestRule; 033import org.apache.hadoop.hbase.HBaseConfiguration; 034import org.apache.hadoop.hbase.KeyValue; 035import org.apache.hadoop.hbase.MultithreadedTestUtil; 036import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; 037import org.apache.hadoop.hbase.io.util.MemorySizeUtil; 038import org.apache.hadoop.hbase.testclassification.RegionServerTests; 039import org.apache.hadoop.hbase.testclassification.SmallTests; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.junit.AfterClass; 042import org.junit.BeforeClass; 043import org.junit.ClassRule; 044import org.junit.Test; 045import org.junit.experimental.categories.Category; 046 047import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 048import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 049import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 050import org.apache.hbase.thirdparty.com.google.common.primitives.Ints; 051 052@Category({RegionServerTests.class, SmallTests.class}) 053public class TestMemStoreLAB { 054 055 @ClassRule 056 public static final HBaseClassTestRule CLASS_RULE = 057 HBaseClassTestRule.forClass(TestMemStoreLAB.class); 058 059 private final static Configuration conf = new Configuration(); 060 061 private static final byte[] rk = Bytes.toBytes("r1"); 062 private static final byte[] cf = Bytes.toBytes("f"); 063 private static final byte[] q = Bytes.toBytes("q"); 064 065 @BeforeClass 066 public static void setUpBeforeClass() throws Exception { 067 ChunkCreator.initialize(1 * 1024, false, 50 * 1024000L, 0.2f, 068 MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null); 069 } 070 071 @AfterClass 072 public static void tearDownAfterClass() throws Exception { 073 long globalMemStoreLimit = 074 (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax() 075 * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false)); 076 ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, globalMemStoreLimit, 0.2f, 077 MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null); 078 } 079 080 /** 081 * Test a bunch of random allocations 082 */ 083 @Test 084 public void testLABRandomAllocation() { 085 Random rand = new Random(); 086 MemStoreLAB mslab = new MemStoreLABImpl(); 087 int expectedOff = 0; 088 ByteBuffer lastBuffer = null; 089 int lastChunkId = -1; 090 // 100K iterations by 0-1K alloc -> 50MB expected 091 // should be reasonable for unit test and also cover wraparound 092 // behavior 093 for (int i = 0; i < 100000; i++) { 094 int valSize = rand.nextInt(3); 095 KeyValue kv = new KeyValue(rk, cf, q, new byte[valSize]); 096 int size = kv.getSerializedSize(); 097 ByteBufferKeyValue newKv = (ByteBufferKeyValue) mslab.copyCellInto(kv); 098 if (newKv.getBuffer() != lastBuffer) { 099 // since we add the chunkID at the 0th offset of the chunk and the 100 // chunkid is an int we need to account for those 4 bytes 101 expectedOff = Bytes.SIZEOF_INT; 102 lastBuffer = newKv.getBuffer(); 103 int chunkId = newKv.getBuffer().getInt(0); 104 assertTrue("chunkid should be different", chunkId != lastChunkId); 105 lastChunkId = chunkId; 106 } 107 assertEquals(expectedOff, newKv.getOffset()); 108 assertTrue("Allocation overruns buffer", 109 newKv.getOffset() + size <= newKv.getBuffer().capacity()); 110 expectedOff += size; 111 } 112 } 113 114 @Test 115 public void testLABLargeAllocation() { 116 MemStoreLAB mslab = new MemStoreLABImpl(); 117 KeyValue kv = new KeyValue(rk, cf, q, new byte[2 * 1024 * 1024]); 118 Cell newCell = mslab.copyCellInto(kv); 119 assertNull("2MB allocation shouldn't be satisfied by LAB.", newCell); 120 } 121 122 /** 123 * Test allocation from lots of threads, making sure the results don't 124 * overlap in any way 125 */ 126 @Test 127 public void testLABThreading() throws Exception { 128 Configuration conf = new Configuration(); 129 MultithreadedTestUtil.TestContext ctx = 130 new MultithreadedTestUtil.TestContext(conf); 131 132 final AtomicInteger totalAllocated = new AtomicInteger(); 133 134 final MemStoreLAB mslab = new MemStoreLABImpl(); 135 List<List<AllocRecord>> allocations = Lists.newArrayList(); 136 137 for (int i = 0; i < 10; i++) { 138 final List<AllocRecord> allocsByThisThread = Lists.newLinkedList(); 139 allocations.add(allocsByThisThread); 140 141 TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) { 142 private Random r = new Random(); 143 @Override 144 public void doAnAction() throws Exception { 145 int valSize = r.nextInt(3); 146 KeyValue kv = new KeyValue(rk, cf, q, new byte[valSize]); 147 int size = kv.getSerializedSize(); 148 ByteBufferKeyValue newCell = (ByteBufferKeyValue) mslab.copyCellInto(kv); 149 totalAllocated.addAndGet(size); 150 allocsByThisThread.add(new AllocRecord(newCell.getBuffer(), newCell.getOffset(), size)); 151 } 152 }; 153 ctx.addThread(t); 154 } 155 156 ctx.startThreads(); 157 while (totalAllocated.get() < 50*1024*1000 && ctx.shouldRun()) { 158 Thread.sleep(10); 159 } 160 ctx.stop(); 161 // Partition the allocations by the actual byte[] they point into, 162 // make sure offsets are unique for each chunk 163 Map<ByteBuffer, Map<Integer, AllocRecord>> mapsByChunk = 164 Maps.newHashMap(); 165 166 int sizeCounted = 0; 167 for (AllocRecord rec : Iterables.concat(allocations)) { 168 sizeCounted += rec.size; 169 if (rec.size == 0) continue; 170 Map<Integer, AllocRecord> mapForThisByteArray = 171 mapsByChunk.get(rec.alloc); 172 if (mapForThisByteArray == null) { 173 mapForThisByteArray = Maps.newTreeMap(); 174 mapsByChunk.put(rec.alloc, mapForThisByteArray); 175 } 176 AllocRecord oldVal = mapForThisByteArray.put(rec.offset, rec); 177 assertNull("Already had an entry " + oldVal + " for allocation " + rec, 178 oldVal); 179 } 180 assertEquals("Sanity check test", sizeCounted, totalAllocated.get()); 181 182 // Now check each byte array to make sure allocations don't overlap 183 for (Map<Integer, AllocRecord> allocsInChunk : mapsByChunk.values()) { 184 // since we add the chunkID at the 0th offset of the chunk and the 185 // chunkid is an int we need to account for those 4 bytes 186 int expectedOff = Bytes.SIZEOF_INT; 187 for (AllocRecord alloc : allocsInChunk.values()) { 188 assertEquals(expectedOff, alloc.offset); 189 assertTrue("Allocation overruns buffer", 190 alloc.offset + alloc.size <= alloc.alloc.capacity()); 191 expectedOff += alloc.size; 192 } 193 } 194 } 195 196 /** 197 * Test frequent chunk retirement with chunk pool triggered by lots of threads, making sure 198 * there's no memory leak (HBASE-16195) 199 * @throws Exception if any error occurred 200 */ 201 @Test 202 public void testLABChunkQueue() throws Exception { 203 ChunkCreator oldInstance = null; 204 try { 205 MemStoreLABImpl mslab = new MemStoreLABImpl(); 206 // by default setting, there should be no chunks initialized in the pool 207 assertTrue(mslab.getPooledChunks().isEmpty()); 208 oldInstance = ChunkCreator.instance; 209 ChunkCreator.instance = null; 210 // reset mslab with chunk pool 211 Configuration conf = HBaseConfiguration.create(); 212 conf.setDouble(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.1); 213 // set chunk size to default max alloc size, so we could easily trigger chunk retirement 214 conf.setLong(MemStoreLABImpl.CHUNK_SIZE_KEY, MemStoreLABImpl.MAX_ALLOC_DEFAULT); 215 // reconstruct mslab 216 long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage() 217 .getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false)); 218 ChunkCreator.initialize(MemStoreLABImpl.MAX_ALLOC_DEFAULT, false, 219 globalMemStoreLimit, 0.1f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null); 220 ChunkCreator.clearDisableFlag(); 221 mslab = new MemStoreLABImpl(conf); 222 // launch multiple threads to trigger frequent chunk retirement 223 List<Thread> threads = new ArrayList<>(); 224 final KeyValue kv = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("q"), 225 new byte[MemStoreLABImpl.MAX_ALLOC_DEFAULT - 32]); 226 for (int i = 0; i < 10; i++) { 227 threads.add(getChunkQueueTestThread(mslab, "testLABChunkQueue-" + i, kv)); 228 } 229 for (Thread thread : threads) { 230 thread.start(); 231 } 232 // let it run for some time 233 Thread.sleep(1000); 234 for (Thread thread : threads) { 235 thread.interrupt(); 236 } 237 boolean threadsRunning = true; 238 boolean alive = false; 239 while (threadsRunning) { 240 alive = false; 241 for (Thread thread : threads) { 242 if (thread.isAlive()) { 243 alive = true; 244 break; 245 } 246 } 247 if (!alive) { 248 threadsRunning = false; 249 } 250 } 251 // none of the chunkIds would have been returned back 252 assertTrue("All the chunks must have been cleared", 253 ChunkCreator.instance.numberOfMappedChunks() != 0); 254 int pooledChunksNum = mslab.getPooledChunks().size(); 255 // close the mslab 256 mslab.close(); 257 // make sure all chunks where reclaimed back to pool 258 int queueLength = mslab.getNumOfChunksReturnedToPool(); 259 assertTrue("All chunks in chunk queue should be reclaimed or removed" 260 + " after mslab closed but actually: " + (pooledChunksNum-queueLength), 261 pooledChunksNum-queueLength == 0); 262 } finally { 263 ChunkCreator.instance = oldInstance; 264 } 265 } 266 267 private Thread getChunkQueueTestThread(final MemStoreLABImpl mslab, String threadName, 268 Cell cellToCopyInto) { 269 Thread thread = new Thread() { 270 volatile boolean stopped = false; 271 272 @Override 273 public void run() { 274 while (!stopped) { 275 // keep triggering chunk retirement 276 mslab.copyCellInto(cellToCopyInto); 277 } 278 } 279 280 @Override 281 public void interrupt() { 282 this.stopped = true; 283 } 284 }; 285 thread.setName(threadName); 286 thread.setDaemon(true); 287 return thread; 288 } 289 290 private static class AllocRecord implements Comparable<AllocRecord>{ 291 private final ByteBuffer alloc; 292 private final int offset; 293 private final int size; 294 295 public AllocRecord(ByteBuffer alloc, int offset, int size) { 296 super(); 297 this.alloc = alloc; 298 this.offset = offset; 299 this.size = size; 300 } 301 302 @Override 303 public int compareTo(AllocRecord e) { 304 if (alloc != e.alloc) { 305 throw new RuntimeException("Can only compare within a particular array"); 306 } 307 return Ints.compare(this.offset, e.offset); 308 } 309 310 @Override 311 public String toString() { 312 return "AllocRecord(offset=" + this.offset + ", size=" + size + ")"; 313 } 314 } 315} 316