001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019import static org.junit.Assert.assertEquals; 020import static org.junit.Assert.assertNull; 021import static org.junit.Assert.assertTrue; 022import java.lang.management.ManagementFactory; 023import java.nio.ByteBuffer; 024import java.util.ArrayList; 025import java.util.List; 026import java.util.Map; 027import java.util.Random; 028import java.util.concurrent.atomic.AtomicInteger; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.ByteBufferKeyValue; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.HBaseClassTestRule; 033import org.apache.hadoop.hbase.HBaseConfiguration; 034import org.apache.hadoop.hbase.KeyValue; 035import org.apache.hadoop.hbase.MultithreadedTestUtil; 036import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; 037import org.apache.hadoop.hbase.io.util.MemorySizeUtil; 038import org.apache.hadoop.hbase.testclassification.MediumTests; 039import org.apache.hadoop.hbase.testclassification.RegionServerTests; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.junit.AfterClass; 042import org.junit.BeforeClass; 043import org.junit.ClassRule; 044import org.junit.Test; 045import org.junit.experimental.categories.Category; 046import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 047import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 048import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 049import org.apache.hbase.thirdparty.com.google.common.primitives.Ints; 050 051@Category({RegionServerTests.class, MediumTests.class}) 052public class TestMemStoreLAB { 053 054 @ClassRule 055 public static final HBaseClassTestRule CLASS_RULE = 056 HBaseClassTestRule.forClass(TestMemStoreLAB.class); 057 058 private final static Configuration conf = new Configuration(); 059 060 private static final byte[] rk = Bytes.toBytes("r1"); 061 private static final byte[] cf = Bytes.toBytes("f"); 062 private static final byte[] q = Bytes.toBytes("q"); 063 064 @BeforeClass 065 public static void setUpBeforeClass() throws Exception { 066 ChunkCreator.initialize(1 * 1024, false, 50 * 1024000L, 0.2f, 067 MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 068 } 069 070 @AfterClass 071 public static void tearDownAfterClass() throws Exception { 072 long globalMemStoreLimit = 073 (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax() 074 * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false)); 075 ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, globalMemStoreLimit, 0.2f, 076 MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 077 } 078 079 /** 080 * Test a bunch of random allocations 081 */ 082 @Test 083 public void testLABRandomAllocation() { 084 Random rand = new Random(); 085 MemStoreLAB mslab = new MemStoreLABImpl(); 086 int expectedOff = 0; 087 ByteBuffer lastBuffer = null; 088 int lastChunkId = -1; 089 // 100K iterations by 0-1K alloc -> 50MB expected 090 // should be reasonable for unit test and also cover wraparound 091 // behavior 092 for (int i = 0; i < 100000; i++) { 093 int valSize = rand.nextInt(3); 094 KeyValue kv = new KeyValue(rk, cf, q, new byte[valSize]); 095 int size = kv.getSerializedSize(); 096 ByteBufferKeyValue newKv = (ByteBufferKeyValue) mslab.copyCellInto(kv); 097 if (newKv.getBuffer() != lastBuffer) { 098 // since we add the chunkID at the 0th offset of the chunk and the 099 // chunkid is an int we need to account for those 4 bytes 100 expectedOff = Bytes.SIZEOF_INT; 101 lastBuffer = newKv.getBuffer(); 102 int chunkId = newKv.getBuffer().getInt(0); 103 assertTrue("chunkid should be different", chunkId != lastChunkId); 104 lastChunkId = chunkId; 105 } 106 assertEquals(expectedOff, newKv.getOffset()); 107 assertTrue("Allocation overruns buffer", 108 newKv.getOffset() + size <= newKv.getBuffer().capacity()); 109 expectedOff += size; 110 } 111 } 112 113 @Test 114 public void testLABLargeAllocation() { 115 MemStoreLAB mslab = new MemStoreLABImpl(); 116 KeyValue kv = new KeyValue(rk, cf, q, new byte[2 * 1024 * 1024]); 117 Cell newCell = mslab.copyCellInto(kv); 118 assertNull("2MB allocation shouldn't be satisfied by LAB.", newCell); 119 } 120 121 /** 122 * Test allocation from lots of threads, making sure the results don't 123 * overlap in any way 124 */ 125 @Test 126 public void testLABThreading() throws Exception { 127 Configuration conf = new Configuration(); 128 MultithreadedTestUtil.TestContext ctx = 129 new MultithreadedTestUtil.TestContext(conf); 130 131 final AtomicInteger totalAllocated = new AtomicInteger(); 132 133 final MemStoreLAB mslab = new MemStoreLABImpl(); 134 List<List<AllocRecord>> allocations = Lists.newArrayList(); 135 136 for (int i = 0; i < 10; i++) { 137 final List<AllocRecord> allocsByThisThread = Lists.newLinkedList(); 138 allocations.add(allocsByThisThread); 139 140 TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) { 141 private Random r = new Random(); 142 @Override 143 public void doAnAction() throws Exception { 144 int valSize = r.nextInt(3); 145 KeyValue kv = new KeyValue(rk, cf, q, new byte[valSize]); 146 int size = kv.getSerializedSize(); 147 ByteBufferKeyValue newCell = (ByteBufferKeyValue) mslab.copyCellInto(kv); 148 totalAllocated.addAndGet(size); 149 allocsByThisThread.add(new AllocRecord(newCell.getBuffer(), newCell.getOffset(), size)); 150 } 151 }; 152 ctx.addThread(t); 153 } 154 155 ctx.startThreads(); 156 while (totalAllocated.get() < 50*1024*1000 && ctx.shouldRun()) { 157 Thread.sleep(10); 158 } 159 ctx.stop(); 160 // Partition the allocations by the actual byte[] they point into, 161 // make sure offsets are unique for each chunk 162 Map<ByteBuffer, Map<Integer, AllocRecord>> mapsByChunk = 163 Maps.newHashMap(); 164 165 int sizeCounted = 0; 166 for (AllocRecord rec : Iterables.concat(allocations)) { 167 sizeCounted += rec.size; 168 if (rec.size == 0) { 169 continue; 170 } 171 Map<Integer, AllocRecord> mapForThisByteArray = 172 mapsByChunk.get(rec.alloc); 173 if (mapForThisByteArray == null) { 174 mapForThisByteArray = Maps.newTreeMap(); 175 mapsByChunk.put(rec.alloc, mapForThisByteArray); 176 } 177 AllocRecord oldVal = mapForThisByteArray.put(rec.offset, rec); 178 assertNull("Already had an entry " + oldVal + " for allocation " + rec, 179 oldVal); 180 } 181 assertEquals("Sanity check test", sizeCounted, totalAllocated.get()); 182 183 // Now check each byte array to make sure allocations don't overlap 184 for (Map<Integer, AllocRecord> allocsInChunk : mapsByChunk.values()) { 185 // since we add the chunkID at the 0th offset of the chunk and the 186 // chunkid is an int we need to account for those 4 bytes 187 int expectedOff = Bytes.SIZEOF_INT; 188 for (AllocRecord alloc : allocsInChunk.values()) { 189 assertEquals(expectedOff, alloc.offset); 190 assertTrue("Allocation overruns buffer", 191 alloc.offset + alloc.size <= alloc.alloc.capacity()); 192 expectedOff += alloc.size; 193 } 194 } 195 } 196 197 /** 198 * Test frequent chunk retirement with chunk pool triggered by lots of threads, making sure 199 * there's no memory leak (HBASE-16195) 200 * @throws Exception if any error occurred 201 */ 202 @Test 203 public void testLABChunkQueue() throws Exception { 204 ChunkCreator oldInstance = null; 205 try { 206 MemStoreLABImpl mslab = new MemStoreLABImpl(); 207 // by default setting, there should be no chunks initialized in the pool 208 assertTrue(mslab.getPooledChunks().isEmpty()); 209 oldInstance = ChunkCreator.instance; 210 ChunkCreator.instance = null; 211 // reset mslab with chunk pool 212 Configuration conf = HBaseConfiguration.create(); 213 conf.setDouble(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.1); 214 // set chunk size to default max alloc size, so we could easily trigger chunk retirement 215 conf.setLong(MemStoreLABImpl.CHUNK_SIZE_KEY, MemStoreLABImpl.MAX_ALLOC_DEFAULT); 216 // reconstruct mslab 217 long globalMemStoreLimit = (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage() 218 .getMax() * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false)); 219 ChunkCreator.initialize(MemStoreLABImpl.MAX_ALLOC_DEFAULT, false, 220 globalMemStoreLimit, 0.1f, MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, 221 null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 222 ChunkCreator.clearDisableFlag(); 223 mslab = new MemStoreLABImpl(conf); 224 // launch multiple threads to trigger frequent chunk retirement 225 List<Thread> threads = new ArrayList<>(); 226 final KeyValue kv = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("q"), 227 new byte[MemStoreLABImpl.MAX_ALLOC_DEFAULT - 32]); 228 for (int i = 0; i < 10; i++) { 229 threads.add(getChunkQueueTestThread(mslab, "testLABChunkQueue-" + i, kv)); 230 } 231 for (Thread thread : threads) { 232 thread.start(); 233 } 234 // let it run for some time 235 Thread.sleep(1000); 236 for (Thread thread : threads) { 237 thread.interrupt(); 238 } 239 boolean threadsRunning = true; 240 boolean alive = false; 241 while (threadsRunning) { 242 alive = false; 243 for (Thread thread : threads) { 244 if (thread.isAlive()) { 245 alive = true; 246 break; 247 } 248 } 249 if (!alive) { 250 threadsRunning = false; 251 } 252 } 253 // none of the chunkIds would have been returned back 254 assertTrue("All the chunks must have been cleared", 255 ChunkCreator.instance.numberOfMappedChunks() != 0); 256 int pooledChunksNum = mslab.getPooledChunks().size(); 257 // close the mslab 258 mslab.close(); 259 // make sure all chunks where reclaimed back to pool 260 int queueLength = mslab.getNumOfChunksReturnedToPool(); 261 assertTrue("All chunks in chunk queue should be reclaimed or removed" 262 + " after mslab closed but actually: " + (pooledChunksNum-queueLength), 263 pooledChunksNum-queueLength == 0); 264 } finally { 265 ChunkCreator.instance = oldInstance; 266 } 267 } 268 269 private Thread getChunkQueueTestThread(final MemStoreLABImpl mslab, String threadName, 270 Cell cellToCopyInto) { 271 Thread thread = new Thread() { 272 volatile boolean stopped = false; 273 274 @Override 275 public void run() { 276 while (!stopped) { 277 // keep triggering chunk retirement 278 mslab.copyCellInto(cellToCopyInto); 279 } 280 } 281 282 @Override 283 public void interrupt() { 284 this.stopped = true; 285 } 286 }; 287 thread.setName(threadName); 288 thread.setDaemon(true); 289 return thread; 290 } 291 292 private static class AllocRecord implements Comparable<AllocRecord>{ 293 private final ByteBuffer alloc; 294 private final int offset; 295 private final int size; 296 297 public AllocRecord(ByteBuffer alloc, int offset, int size) { 298 super(); 299 this.alloc = alloc; 300 this.offset = offset; 301 this.size = size; 302 } 303 304 @Override 305 public int compareTo(AllocRecord e) { 306 if (alloc != e.alloc) { 307 throw new RuntimeException("Can only compare within a particular array"); 308 } 309 return Ints.compare(this.offset, e.offset); 310 } 311 312 @Override 313 public String toString() { 314 return "AllocRecord(offset=" + this.offset + ", size=" + size + ")"; 315 } 316 } 317} 318