001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertTrue;
022
023import java.util.concurrent.atomic.AtomicInteger;
024import org.apache.hadoop.hbase.nio.ByteBuff;
025import org.apache.hadoop.hbase.testclassification.RPCTests;
026import org.apache.hadoop.hbase.testclassification.SmallTests;
027import org.junit.jupiter.api.Tag;
028import org.junit.jupiter.api.Test;
029
030import org.apache.hbase.thirdparty.io.netty.util.ResourceLeakDetector;
031import org.apache.hbase.thirdparty.io.netty.util.internal.logging.InternalLogLevel;
032import org.apache.hbase.thirdparty.io.netty.util.internal.logging.InternalLogger;
033import org.apache.hbase.thirdparty.io.netty.util.internal.logging.InternalLoggerFactory;
034import org.apache.hbase.thirdparty.io.netty.util.internal.logging.Slf4JLoggerFactory;
035
036@Tag(RPCTests.TAG)
037@Tag(SmallTests.TAG)
038public class TestByteBuffAllocatorLeakDetection {
039
040  @SuppressWarnings("unused")
041  @Test
042  public void testLeakDetection() throws InterruptedException {
043    InternalLoggerFactory original = InternalLoggerFactory.getDefaultFactory();
044    AtomicInteger leaksDetected = new AtomicInteger();
045    InternalLoggerFactory.setDefaultFactory(new MockedLoggerFactory(leaksDetected));
046
047    ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
048    assertTrue(ResourceLeakDetector.isEnabled());
049
050    try {
051      int maxBuffersInPool = 10;
052      int bufSize = 1024;
053      int minSize = bufSize / 8;
054      ByteBuffAllocator alloc = new ByteBuffAllocator(true, maxBuffersInPool, bufSize, minSize);
055
056      // tracking leaks happens on creation of a RefCnt, through a call to detector.track().
057      // If a leak occurs, but detector.track() is never called again, the leak will not be
058      // realized. Further, causing a leak requires a GC event. So below we do some allocations,
059      // cause some GC's, do more allocations, and then expect a leak to show up.
060
061      // first allocate on-heap. we expect to not see a leak from this, because we
062      // dont count on-heap references
063      alloc.allocate(minSize - 1);
064      System.gc();
065      Thread.sleep(1000);
066
067      // cause an allocation to trigger a leak detection, if there were one.
068      // keep a reference so we don't trigger a leak right away from this.
069      ByteBuff reference = alloc.allocate(minSize * 2);
070      assertEquals(0, leaksDetected.get());
071
072      // allocate, but don't keep a reference. this should cause a leak
073      alloc.allocate(minSize * 2);
074      System.gc();
075      Thread.sleep(1000);
076
077      // allocate again, this should cause the above leak to be detected
078      alloc.allocate(minSize * 2);
079      assertEquals(1, leaksDetected.get());
080    } finally {
081      InternalLoggerFactory.setDefaultFactory(original);
082    }
083  }
084
085  private static class MockedLoggerFactory extends Slf4JLoggerFactory {
086
087    private AtomicInteger leaksDetected;
088
089    public MockedLoggerFactory(AtomicInteger leaksDetected) {
090      this.leaksDetected = leaksDetected;
091    }
092
093    @Override
094    public InternalLogger newInstance(String name) {
095      InternalLogger delegate = super.newInstance(name);
096      return new MockedLogger(leaksDetected, delegate);
097    }
098  }
099
100  private static class MockedLogger implements InternalLogger {
101
102    private AtomicInteger leaksDetected;
103    private InternalLogger delegate;
104
105    public MockedLogger(AtomicInteger leaksDetected, InternalLogger delegate) {
106      this.leaksDetected = leaksDetected;
107      this.delegate = delegate;
108    }
109
110    private void maybeCountLeak(String msgOrFormat) {
111      if (msgOrFormat.startsWith("LEAK")) {
112        leaksDetected.incrementAndGet();
113      }
114    }
115
116    @Override
117    public void error(String msg) {
118      maybeCountLeak(msg);
119      delegate.error(msg);
120    }
121
122    @Override
123    public void error(String format, Object arg) {
124      maybeCountLeak(format);
125      delegate.error(format, arg);
126    }
127
128    @Override
129    public void error(String format, Object argA, Object argB) {
130      maybeCountLeak(format);
131      delegate.error(format, argA, argB);
132    }
133
134    @Override
135    public void error(String format, Object... arguments) {
136      maybeCountLeak(format);
137      delegate.error(format, arguments);
138    }
139
140    @Override
141    public void error(String msg, Throwable t) {
142      maybeCountLeak(msg);
143      delegate.error(msg, t);
144    }
145
146    @Override
147    public void error(Throwable t) {
148      delegate.error(t);
149    }
150
151    @Override
152    public String name() {
153      return delegate.name();
154    }
155
156    @Override
157    public boolean isTraceEnabled() {
158      return delegate.isTraceEnabled();
159    }
160
161    @Override
162    public void trace(String msg) {
163      delegate.trace(msg);
164    }
165
166    @Override
167    public void trace(String format, Object arg) {
168      delegate.trace(format, arg);
169    }
170
171    @Override
172    public void trace(String format, Object argA, Object argB) {
173      delegate.trace(format, argA, argB);
174    }
175
176    @Override
177    public void trace(String format, Object... arguments) {
178      delegate.trace(format, arguments);
179    }
180
181    @Override
182    public void trace(String msg, Throwable t) {
183      delegate.trace(msg, t);
184    }
185
186    @Override
187    public void trace(Throwable t) {
188      delegate.trace(t);
189    }
190
191    @Override
192    public boolean isDebugEnabled() {
193      return delegate.isDebugEnabled();
194    }
195
196    @Override
197    public void debug(String msg) {
198      delegate.debug(msg);
199    }
200
201    @Override
202    public void debug(String format, Object arg) {
203      delegate.debug(format, arg);
204    }
205
206    @Override
207    public void debug(String format, Object argA, Object argB) {
208      delegate.debug(format, argA, argB);
209    }
210
211    @Override
212    public void debug(String format, Object... arguments) {
213      delegate.debug(format, arguments);
214    }
215
216    @Override
217    public void debug(String msg, Throwable t) {
218      delegate.debug(msg, t);
219    }
220
221    @Override
222    public void debug(Throwable t) {
223      delegate.debug(t);
224    }
225
226    @Override
227    public boolean isInfoEnabled() {
228      return delegate.isInfoEnabled();
229    }
230
231    @Override
232    public void info(String msg) {
233      delegate.info(msg);
234    }
235
236    @Override
237    public void info(String format, Object arg) {
238      delegate.info(format, arg);
239    }
240
241    @Override
242    public void info(String format, Object argA, Object argB) {
243      delegate.info(format, argA, argB);
244    }
245
246    @Override
247    public void info(String format, Object... arguments) {
248      delegate.info(format, arguments);
249    }
250
251    @Override
252    public void info(String msg, Throwable t) {
253      delegate.info(msg, t);
254    }
255
256    @Override
257    public void info(Throwable t) {
258      delegate.info(t);
259    }
260
261    @Override
262    public boolean isWarnEnabled() {
263      return delegate.isWarnEnabled();
264    }
265
266    @Override
267    public void warn(String msg) {
268      delegate.warn(msg);
269    }
270
271    @Override
272    public void warn(String format, Object arg) {
273      delegate.warn(format, arg);
274    }
275
276    @Override
277    public void warn(String format, Object... arguments) {
278      delegate.warn(format, arguments);
279    }
280
281    @Override
282    public void warn(String format, Object argA, Object argB) {
283      delegate.warn(format, argA, argB);
284    }
285
286    @Override
287    public void warn(String msg, Throwable t) {
288      delegate.warn(msg, t);
289    }
290
291    @Override
292    public void warn(Throwable t) {
293      delegate.warn(t);
294    }
295
296    @Override
297    public boolean isErrorEnabled() {
298      return delegate.isErrorEnabled();
299    }
300
301    @Override
302    public boolean isEnabled(InternalLogLevel level) {
303      return delegate.isEnabled(level);
304    }
305
306    @Override
307    public void log(InternalLogLevel level, String msg) {
308      delegate.log(level, msg);
309    }
310
311    @Override
312    public void log(InternalLogLevel level, String format, Object arg) {
313      delegate.log(level, format, arg);
314    }
315
316    @Override
317    public void log(InternalLogLevel level, String format, Object argA, Object argB) {
318      delegate.log(level, format, argA, argB);
319    }
320
321    @Override
322    public void log(InternalLogLevel level, String format, Object... arguments) {
323      delegate.log(level, format, arguments);
324    }
325
326    @Override
327    public void log(InternalLogLevel level, String msg, Throwable t) {
328      delegate.log(level, msg, t);
329    }
330
331    @Override
332    public void log(InternalLogLevel level, Throwable t) {
333      delegate.log(level, t);
334    }
335  }
336}