001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.apache.hadoop.hbase.regionserver.MemStoreLAB.CHUNK_SIZE_KEY; 021import static org.apache.hadoop.hbase.regionserver.MemStoreLAB.MAX_ALLOC_KEY; 022import static org.junit.jupiter.api.Assertions.assertEquals; 023import static org.junit.jupiter.api.Assertions.assertNull; 024import static org.junit.jupiter.api.Assertions.assertTrue; 025 026import java.lang.management.ManagementFactory; 027import java.nio.ByteBuffer; 028import java.util.ArrayList; 029import java.util.HashSet; 030import java.util.List; 031import java.util.Map; 032import java.util.Random; 033import java.util.Set; 034import java.util.concurrent.ThreadLocalRandom; 035import java.util.concurrent.atomic.AtomicInteger; 036import org.apache.hadoop.conf.Configuration; 037import org.apache.hadoop.hbase.ByteBufferKeyValue; 038import org.apache.hadoop.hbase.Cell; 039import org.apache.hadoop.hbase.ExtendedCell; 040import org.apache.hadoop.hbase.HBaseConfiguration; 041import org.apache.hadoop.hbase.KeyValue; 042import org.apache.hadoop.hbase.MultithreadedTestUtil; 043import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; 044import org.apache.hadoop.hbase.io.util.MemorySizeUtil; 045import org.apache.hadoop.hbase.regionserver.ChunkCreator.ChunkType; 046import org.apache.hadoop.hbase.testclassification.MediumTests; 047import org.apache.hadoop.hbase.testclassification.RegionServerTests; 048import org.apache.hadoop.hbase.util.Bytes; 049import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 050import org.junit.jupiter.api.AfterAll; 051import org.junit.jupiter.api.BeforeAll; 052import org.junit.jupiter.api.Tag; 053import org.junit.jupiter.api.Test; 054 055import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; 056import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 057import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 058import org.apache.hbase.thirdparty.com.google.common.primitives.Ints; 059 060@Tag(RegionServerTests.TAG) 061@Tag(MediumTests.TAG) 062public class TestMemStoreLAB { 063 064 private final static Configuration conf = new Configuration(); 065 066 private static final byte[] rk = Bytes.toBytes("r1"); 067 private static final byte[] cf = Bytes.toBytes("f"); 068 private static final byte[] q = Bytes.toBytes("q"); 069 070 @BeforeAll 071 public static void setUpBeforeClass() throws Exception { 072 ChunkCreator.initialize(1 * 1024, false, 50 * 1024000L, 0.2f, 073 MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 074 } 075 076 @AfterAll 077 public static void tearDownAfterClass() throws Exception { 078 long globalMemStoreLimit = 079 (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax() 080 * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false)); 081 ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, globalMemStoreLimit, 0.2f, 082 MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null, MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 083 } 084 085 /** 086 * Test a bunch of random allocations 087 */ 088 @Test 089 public void testLABRandomAllocation() { 090 MemStoreLAB mslab = new MemStoreLABImpl(); 091 int expectedOff = 0; 092 ByteBuffer lastBuffer = null; 093 int lastChunkId = -1; 094 // 100K iterations by 0-1K alloc -> 50MB expected 095 // should be reasonable for unit test and also cover wraparound 096 // behavior 097 Random rand = ThreadLocalRandom.current(); 098 for (int i = 0; i < 100000; i++) { 099 int valSize = rand.nextInt(3); 100 KeyValue kv = new KeyValue(rk, cf, q, new byte[valSize]); 101 int size = kv.getSerializedSize(); 102 ByteBufferKeyValue newKv = (ByteBufferKeyValue) mslab.copyCellInto(kv); 103 if (newKv.getBuffer() != lastBuffer) { 104 // since we add the chunkID at the 0th offset of the chunk and the 105 // chunkid is an int we need to account for those 4 bytes 106 expectedOff = Bytes.SIZEOF_INT; 107 lastBuffer = newKv.getBuffer(); 108 int chunkId = newKv.getBuffer().getInt(0); 109 assertTrue(chunkId != lastChunkId, "chunkid should be different"); 110 lastChunkId = chunkId; 111 } 112 assertEquals(expectedOff, newKv.getOffset()); 113 assertTrue(newKv.getOffset() + size <= newKv.getBuffer().capacity(), 114 "Allocation overruns buffer"); 115 expectedOff += size; 116 } 117 } 118 119 @Test 120 public void testLABLargeAllocation() { 121 MemStoreLAB mslab = new MemStoreLABImpl(); 122 KeyValue kv = new KeyValue(rk, cf, q, new byte[2 * 1024 * 1024]); 123 Cell newCell = mslab.copyCellInto(kv); 124 assertNull(newCell, "2MB allocation shouldn't be satisfied by LAB."); 125 } 126 127 /** 128 * Test allocation from lots of threads, making sure the results don't overlap in any way 129 */ 130 @Test 131 public void testLABThreading() throws Exception { 132 Configuration conf = new Configuration(); 133 MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(conf); 134 135 final AtomicInteger totalAllocated = new AtomicInteger(); 136 137 final MemStoreLAB mslab = new MemStoreLABImpl(); 138 List<List<AllocRecord>> allocations = Lists.newArrayList(); 139 140 for (int i = 0; i < 10; i++) { 141 final List<AllocRecord> allocsByThisThread = Lists.newLinkedList(); 142 allocations.add(allocsByThisThread); 143 144 TestThread t = new MultithreadedTestUtil.RepeatingTestThread(ctx) { 145 @Override 146 public void doAnAction() throws Exception { 147 int valSize = ThreadLocalRandom.current().nextInt(3); 148 KeyValue kv = new KeyValue(rk, cf, q, new byte[valSize]); 149 int size = kv.getSerializedSize(); 150 ByteBufferKeyValue newCell = (ByteBufferKeyValue) mslab.copyCellInto(kv); 151 totalAllocated.addAndGet(size); 152 allocsByThisThread.add(new AllocRecord(newCell.getBuffer(), newCell.getOffset(), size)); 153 } 154 }; 155 ctx.addThread(t); 156 } 157 158 ctx.startThreads(); 159 while (totalAllocated.get() < 50 * 1024 * 1000 && ctx.shouldRun()) { 160 Thread.sleep(10); 161 } 162 ctx.stop(); 163 // Partition the allocations by the actual byte[] they point into, 164 // make sure offsets are unique for each chunk 165 Map<ByteBuffer, Map<Integer, AllocRecord>> mapsByChunk = Maps.newHashMap(); 166 167 int sizeCounted = 0; 168 for (AllocRecord rec : Iterables.concat(allocations)) { 169 sizeCounted += rec.size; 170 if (rec.size == 0) { 171 continue; 172 } 173 Map<Integer, AllocRecord> mapForThisByteArray = mapsByChunk.get(rec.alloc); 174 if (mapForThisByteArray == null) { 175 mapForThisByteArray = Maps.newTreeMap(); 176 mapsByChunk.put(rec.alloc, mapForThisByteArray); 177 } 178 AllocRecord oldVal = mapForThisByteArray.put(rec.offset, rec); 179 assertNull(oldVal, "Already had an entry " + oldVal + " for allocation " + rec); 180 } 181 assertEquals(sizeCounted, totalAllocated.get(), "Sanity check test"); 182 183 // Now check each byte array to make sure allocations don't overlap 184 for (Map<Integer, AllocRecord> allocsInChunk : mapsByChunk.values()) { 185 // since we add the chunkID at the 0th offset of the chunk and the 186 // chunkid is an int we need to account for those 4 bytes 187 int expectedOff = Bytes.SIZEOF_INT; 188 for (AllocRecord alloc : allocsInChunk.values()) { 189 assertEquals(expectedOff, alloc.offset); 190 assertTrue(alloc.offset + alloc.size <= alloc.alloc.capacity(), 191 "Allocation overruns buffer"); 192 expectedOff += alloc.size; 193 } 194 } 195 } 196 197 /** 198 * Test frequent chunk retirement with chunk pool triggered by lots of threads, making sure 199 * there's no memory leak (HBASE-16195) 200 * @throws Exception if any error occurred 201 */ 202 @Test 203 public void testLABChunkQueue() throws Exception { 204 ChunkCreator oldInstance = null; 205 try { 206 MemStoreLABImpl mslab = new MemStoreLABImpl(); 207 // by default setting, there should be no chunks initialized in the pool 208 assertTrue(mslab.getPooledChunks().isEmpty()); 209 oldInstance = ChunkCreator.instance; 210 ChunkCreator.instance = null; 211 // reset mslab with chunk pool 212 Configuration conf = HBaseConfiguration.create(); 213 conf.setDouble(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, 0.1); 214 // set chunk size to default max alloc size, so we could easily trigger chunk retirement 215 conf.setLong(CHUNK_SIZE_KEY, MemStoreLABImpl.MAX_ALLOC_DEFAULT); 216 // reconstruct mslab 217 long globalMemStoreLimit = 218 (long) (ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax() 219 * MemorySizeUtil.getGlobalMemStoreHeapPercent(conf, false)); 220 ChunkCreator.initialize(MemStoreLABImpl.MAX_ALLOC_DEFAULT, false, globalMemStoreLimit, 0.1f, 221 MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT, null, 222 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 223 ChunkCreator.clearDisableFlag(); 224 mslab = new MemStoreLABImpl(conf); 225 // launch multiple threads to trigger frequent chunk retirement 226 List<Thread> threads = new ArrayList<>(); 227 final KeyValue kv = new KeyValue(Bytes.toBytes("r"), Bytes.toBytes("f"), Bytes.toBytes("q"), 228 new byte[MemStoreLABImpl.MAX_ALLOC_DEFAULT - 32]); 229 for (int i = 0; i < 10; i++) { 230 threads.add(getChunkQueueTestThread(mslab, "testLABChunkQueue-" + i, kv)); 231 } 232 for (Thread thread : threads) { 233 thread.start(); 234 } 235 // let it run for some time 236 Thread.sleep(1000); 237 for (Thread thread : threads) { 238 thread.interrupt(); 239 } 240 boolean threadsRunning = true; 241 boolean alive = false; 242 while (threadsRunning) { 243 alive = false; 244 for (Thread thread : threads) { 245 if (thread.isAlive()) { 246 alive = true; 247 break; 248 } 249 } 250 if (!alive) { 251 threadsRunning = false; 252 } 253 } 254 // none of the chunkIds would have been returned back 255 assertTrue(ChunkCreator.instance.numberOfMappedChunks() != 0, 256 "All the chunks must have been cleared"); 257 Set<Integer> chunkIds = new HashSet<Integer>(mslab.chunks); 258 int pooledChunksNum = mslab.getPooledChunks().size(); 259 // close the mslab 260 mslab.close(); 261 // make sure all chunks where reclaimed back to pool 262 int queueLength = mslab.getNumOfChunksReturnedToPool(chunkIds); 263 assertTrue(pooledChunksNum - queueLength == 0, 264 "All chunks in chunk queue should be reclaimed or removed" 265 + " after mslab closed but actually: " + (pooledChunksNum - queueLength)); 266 } finally { 267 ChunkCreator.instance = oldInstance; 268 } 269 } 270 271 /** 272 * Test cell with right length, which constructed by testForceCopyOfBigCellInto. (HBASE-26467) 273 */ 274 @Test 275 public void testForceCopyOfBigCellInto() { 276 Configuration conf = HBaseConfiguration.create(); 277 int chunkSize = ChunkCreator.getInstance().getChunkSize(); 278 conf.setInt(CHUNK_SIZE_KEY, chunkSize); 279 conf.setInt(MAX_ALLOC_KEY, chunkSize / 2); 280 281 MemStoreLABImpl mslab = new MemStoreLABImpl(conf); 282 byte[] row = Bytes.toBytes("row"); 283 byte[] columnFamily = Bytes.toBytes("columnFamily"); 284 byte[] qualify = Bytes.toBytes("qualify"); 285 byte[] smallValue = new byte[chunkSize / 2]; 286 byte[] bigValue = new byte[chunkSize]; 287 KeyValue smallKV = 288 new KeyValue(row, columnFamily, qualify, EnvironmentEdgeManager.currentTime(), smallValue); 289 290 assertEquals(smallKV.getSerializedSize(), 291 mslab.forceCopyOfBigCellInto(smallKV).getSerializedSize()); 292 293 KeyValue bigKV = 294 new KeyValue(row, columnFamily, qualify, EnvironmentEdgeManager.currentTime(), bigValue); 295 assertEquals(bigKV.getSerializedSize(), 296 mslab.forceCopyOfBigCellInto(bigKV).getSerializedSize()); 297 298 /** 299 * Add test by HBASE-26576,all the chunks are in {@link ChunkCreator#chunkIdMap} 300 */ 301 assertTrue(mslab.chunks.size() == 2); 302 Chunk dataChunk = null; 303 Chunk jumboChunk = null; 304 305 for (Integer chunkId : mslab.chunks) { 306 Chunk chunk = ChunkCreator.getInstance().getChunk(chunkId); 307 assertTrue(chunk != null); 308 if (chunk.getChunkType() == ChunkType.JUMBO_CHUNK) { 309 jumboChunk = chunk; 310 } else if (chunk.getChunkType() == ChunkType.DATA_CHUNK) { 311 dataChunk = chunk; 312 } 313 } 314 315 assertTrue(dataChunk != null); 316 assertTrue(jumboChunk != null); 317 318 mslab.close(); 319 /** 320 * After mslab close, jumboChunk is removed from {@link ChunkCreator#chunkIdMap} but because 321 * dataChunk is recycled to pool so it is still in {@link ChunkCreator#chunkIdMap}. 322 */ 323 assertTrue(ChunkCreator.getInstance().getChunk(jumboChunk.getId()) == null); 324 assertTrue(!ChunkCreator.getInstance().isChunkInPool(jumboChunk.getId())); 325 assertTrue(ChunkCreator.getInstance().getChunk(dataChunk.getId()) == dataChunk); 326 assertTrue(ChunkCreator.getInstance().isChunkInPool(dataChunk.getId())); 327 328 } 329 330 private Thread getChunkQueueTestThread(final MemStoreLABImpl mslab, String threadName, 331 ExtendedCell cellToCopyInto) { 332 Thread thread = new Thread() { 333 volatile boolean stopped = false; 334 335 @Override 336 public void run() { 337 while (!stopped) { 338 // keep triggering chunk retirement 339 mslab.copyCellInto(cellToCopyInto); 340 } 341 } 342 343 @Override 344 public void interrupt() { 345 this.stopped = true; 346 } 347 }; 348 thread.setName(threadName); 349 thread.setDaemon(true); 350 return thread; 351 } 352 353 private static class AllocRecord implements Comparable<AllocRecord> { 354 private final ByteBuffer alloc; 355 private final int offset; 356 private final int size; 357 358 public AllocRecord(ByteBuffer alloc, int offset, int size) { 359 super(); 360 this.alloc = alloc; 361 this.offset = offset; 362 this.size = size; 363 } 364 365 @Override 366 public int compareTo(AllocRecord e) { 367 if (alloc != e.alloc) { 368 throw new RuntimeException("Can only compare within a particular array"); 369 } 370 return Ints.compare(this.offset, e.offset); 371 } 372 373 @Override 374 public String toString() { 375 return "AllocRecord(offset=" + this.offset + ", size=" + size + ")"; 376 } 377 } 378}