001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertTrue; 022 023import java.util.concurrent.atomic.AtomicInteger; 024import org.apache.hadoop.hbase.nio.ByteBuff; 025import org.apache.hadoop.hbase.testclassification.RPCTests; 026import org.apache.hadoop.hbase.testclassification.SmallTests; 027import org.junit.jupiter.api.Tag; 028import org.junit.jupiter.api.Test; 029 030import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector; 031import org.apache.hbase.thirdparty.io.netty.util.internal.logging.InternalLogLevel; 032import org.apache.hbase.thirdparty.io.netty.util.internal.logging.InternalLogger; 033import org.apache.hbase.thirdparty.io.netty.util.internal.logging.InternalLoggerFactory; 034import org.apache.hbase.thirdparty.io.netty.util.internal.logging.Slf4JLoggerFactory; 035 036@Tag(RPCTests.TAG) 037@Tag(SmallTests.TAG) 038public class TestByteBuffAllocatorLeakDetection { 039 040 @SuppressWarnings("unused") 041 @Test 042 public void testLeakDetection() throws InterruptedException { 043 InternalLoggerFactory original = InternalLoggerFactory.getDefaultFactory(); 044 AtomicInteger leaksDetected = new AtomicInteger(); 045 InternalLoggerFactory.setDefaultFactory(new MockedLoggerFactory(leaksDetected)); 046 047 ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID); 048 assertTrue(ResourceLeakDetector.isEnabled()); 049 050 try { 051 int maxBuffersInPool = 10; 052 int bufSize = 1024; 053 int minSize = bufSize / 8; 054 ByteBuffAllocator alloc = new ByteBuffAllocator(true, maxBuffersInPool, bufSize, minSize); 055 056 // tracking leaks happens on creation of a RefCnt, through a call to detector.track(). 057 // If a leak occurs, but detector.track() is never called again, the leak will not be 058 // realized. Further, causing a leak requires a GC event. So below we do some allocations, 059 // cause some GC's, do more allocations, and then expect a leak to show up. 060 061 // first allocate on-heap. we expect to not see a leak from this, because we 062 // dont count on-heap references 063 alloc.allocate(minSize - 1); 064 System.gc(); 065 Thread.sleep(1000); 066 067 // cause an allocation to trigger a leak detection, if there were one. 068 // keep a reference so we don't trigger a leak right away from this. 069 ByteBuff reference = alloc.allocate(minSize * 2); 070 assertEquals(0, leaksDetected.get()); 071 072 // allocate, but don't keep a reference. this should cause a leak 073 alloc.allocate(minSize * 2); 074 System.gc(); 075 Thread.sleep(1000); 076 077 // allocate again, this should cause the above leak to be detected 078 alloc.allocate(minSize * 2); 079 assertEquals(1, leaksDetected.get()); 080 } finally { 081 InternalLoggerFactory.setDefaultFactory(original); 082 } 083 } 084 085 private static class MockedLoggerFactory extends Slf4JLoggerFactory { 086 087 private AtomicInteger leaksDetected; 088 089 public MockedLoggerFactory(AtomicInteger leaksDetected) { 090 this.leaksDetected = leaksDetected; 091 } 092 093 @Override 094 public InternalLogger newInstance(String name) { 095 InternalLogger delegate = super.newInstance(name); 096 return new MockedLogger(leaksDetected, delegate); 097 } 098 } 099 100 private static class MockedLogger implements InternalLogger { 101 102 private AtomicInteger leaksDetected; 103 private InternalLogger delegate; 104 105 public MockedLogger(AtomicInteger leaksDetected, InternalLogger delegate) { 106 this.leaksDetected = leaksDetected; 107 this.delegate = delegate; 108 } 109 110 private void maybeCountLeak(String msgOrFormat) { 111 if (msgOrFormat.startsWith("LEAK")) { 112 leaksDetected.incrementAndGet(); 113 } 114 } 115 116 @Override 117 public void error(String msg) { 118 maybeCountLeak(msg); 119 delegate.error(msg); 120 } 121 122 @Override 123 public void error(String format, Object arg) { 124 maybeCountLeak(format); 125 delegate.error(format, arg); 126 } 127 128 @Override 129 public void error(String format, Object argA, Object argB) { 130 maybeCountLeak(format); 131 delegate.error(format, argA, argB); 132 } 133 134 @Override 135 public void error(String format, Object... arguments) { 136 maybeCountLeak(format); 137 delegate.error(format, arguments); 138 } 139 140 @Override 141 public void error(String msg, Throwable t) { 142 maybeCountLeak(msg); 143 delegate.error(msg, t); 144 } 145 146 @Override 147 public void error(Throwable t) { 148 delegate.error(t); 149 } 150 151 @Override 152 public String name() { 153 return delegate.name(); 154 } 155 156 @Override 157 public boolean isTraceEnabled() { 158 return delegate.isTraceEnabled(); 159 } 160 161 @Override 162 public void trace(String msg) { 163 delegate.trace(msg); 164 } 165 166 @Override 167 public void trace(String format, Object arg) { 168 delegate.trace(format, arg); 169 } 170 171 @Override 172 public void trace(String format, Object argA, Object argB) { 173 delegate.trace(format, argA, argB); 174 } 175 176 @Override 177 public void trace(String format, Object... arguments) { 178 delegate.trace(format, arguments); 179 } 180 181 @Override 182 public void trace(String msg, Throwable t) { 183 delegate.trace(msg, t); 184 } 185 186 @Override 187 public void trace(Throwable t) { 188 delegate.trace(t); 189 } 190 191 @Override 192 public boolean isDebugEnabled() { 193 return delegate.isDebugEnabled(); 194 } 195 196 @Override 197 public void debug(String msg) { 198 delegate.debug(msg); 199 } 200 201 @Override 202 public void debug(String format, Object arg) { 203 delegate.debug(format, arg); 204 } 205 206 @Override 207 public void debug(String format, Object argA, Object argB) { 208 delegate.debug(format, argA, argB); 209 } 210 211 @Override 212 public void debug(String format, Object... arguments) { 213 delegate.debug(format, arguments); 214 } 215 216 @Override 217 public void debug(String msg, Throwable t) { 218 delegate.debug(msg, t); 219 } 220 221 @Override 222 public void debug(Throwable t) { 223 delegate.debug(t); 224 } 225 226 @Override 227 public boolean isInfoEnabled() { 228 return delegate.isInfoEnabled(); 229 } 230 231 @Override 232 public void info(String msg) { 233 delegate.info(msg); 234 } 235 236 @Override 237 public void info(String format, Object arg) { 238 delegate.info(format, arg); 239 } 240 241 @Override 242 public void info(String format, Object argA, Object argB) { 243 delegate.info(format, argA, argB); 244 } 245 246 @Override 247 public void info(String format, Object... arguments) { 248 delegate.info(format, arguments); 249 } 250 251 @Override 252 public void info(String msg, Throwable t) { 253 delegate.info(msg, t); 254 } 255 256 @Override 257 public void info(Throwable t) { 258 delegate.info(t); 259 } 260 261 @Override 262 public boolean isWarnEnabled() { 263 return delegate.isWarnEnabled(); 264 } 265 266 @Override 267 public void warn(String msg) { 268 delegate.warn(msg); 269 } 270 271 @Override 272 public void warn(String format, Object arg) { 273 delegate.warn(format, arg); 274 } 275 276 @Override 277 public void warn(String format, Object... arguments) { 278 delegate.warn(format, arguments); 279 } 280 281 @Override 282 public void warn(String format, Object argA, Object argB) { 283 delegate.warn(format, argA, argB); 284 } 285 286 @Override 287 public void warn(String msg, Throwable t) { 288 delegate.warn(msg, t); 289 } 290 291 @Override 292 public void warn(Throwable t) { 293 delegate.warn(t); 294 } 295 296 @Override 297 public boolean isErrorEnabled() { 298 return delegate.isErrorEnabled(); 299 } 300 301 @Override 302 public boolean isEnabled(InternalLogLevel level) { 303 return delegate.isEnabled(level); 304 } 305 306 @Override 307 public void log(InternalLogLevel level, String msg) { 308 delegate.log(level, msg); 309 } 310 311 @Override 312 public void log(InternalLogLevel level, String format, Object arg) { 313 delegate.log(level, format, arg); 314 } 315 316 @Override 317 public void log(InternalLogLevel level, String format, Object argA, Object argB) { 318 delegate.log(level, format, argA, argB); 319 } 320 321 @Override 322 public void log(InternalLogLevel level, String format, Object... arguments) { 323 delegate.log(level, format, arguments); 324 } 325 326 @Override 327 public void log(InternalLogLevel level, String msg, Throwable t) { 328 delegate.log(level, msg, t); 329 } 330 331 @Override 332 public void log(InternalLogLevel level, Throwable t) { 333 delegate.log(level, t); 334 } 335 } 336}