001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertNotNull; 023import static org.junit.Assert.assertNull; 024import static org.junit.Assert.assertTrue; 025 026import java.nio.ByteBuffer; 027import org.apache.hadoop.hbase.HBaseClassTestRule; 028import org.apache.hadoop.hbase.io.ByteBufferPool; 029import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup; 030import org.apache.hadoop.hbase.nio.ByteBuff; 031import org.apache.hadoop.hbase.nio.MultiByteBuff; 032import org.apache.hadoop.hbase.nio.SingleByteBuff; 033import org.apache.hadoop.hbase.testclassification.RPCTests; 034import org.apache.hadoop.hbase.testclassification.SmallTests; 035import org.apache.hadoop.hbase.util.Pair; 036import org.junit.ClassRule; 037import org.junit.Test; 038import org.junit.experimental.categories.Category; 039 040@Category({ RPCTests.class, SmallTests.class }) 041public class TestRpcServer { 042 043 @ClassRule 044 public static final HBaseClassTestRule CLASS_RULE = 045 HBaseClassTestRule.forClass(TestRpcServer.class); 046 047 @Test 048 public void testAllocateByteBuffToReadInto() throws Exception { 049 int maxBuffersInPool = 10; 050 ByteBufferPool pool = new ByteBufferPool(6 * 1024, maxBuffersInPool); 051 initPoolWithAllBuffers(pool, maxBuffersInPool); 052 ByteBuff buff = null; 053 Pair<ByteBuff, CallCleanup> pair; 054 // When the request size is less than 1/6th of the pool buffer size. We should use on demand 055 // created on heap Buffer 056 pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool), 057 200); 058 buff = pair.getFirst(); 059 assertTrue(buff.hasArray()); 060 assertEquals(maxBuffersInPool, pool.getQueueSize()); 061 assertNull(pair.getSecond()); 062 // When the request size is > 1/6th of the pool buffer size. 063 pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool), 064 1024); 065 buff = pair.getFirst(); 066 assertFalse(buff.hasArray()); 067 assertEquals(maxBuffersInPool - 1, pool.getQueueSize()); 068 assertNotNull(pair.getSecond()); 069 pair.getSecond().run();// CallCleanup#run should put back the BB to pool. 070 assertEquals(maxBuffersInPool, pool.getQueueSize()); 071 // Request size> pool buffer size 072 pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool), 073 7 * 1024); 074 buff = pair.getFirst(); 075 assertFalse(buff.hasArray()); 076 assertTrue(buff instanceof MultiByteBuff); 077 ByteBuffer[] bbs = ((MultiByteBuff) buff).getEnclosingByteBuffers(); 078 assertEquals(2, bbs.length); 079 assertTrue(bbs[0].isDirect()); 080 assertTrue(bbs[1].isDirect()); 081 assertEquals(6 * 1024, bbs[0].limit()); 082 assertEquals(1024, bbs[1].limit()); 083 assertEquals(maxBuffersInPool - 2, pool.getQueueSize()); 084 assertNotNull(pair.getSecond()); 085 pair.getSecond().run();// CallCleanup#run should put back the BB to pool. 086 assertEquals(maxBuffersInPool, pool.getQueueSize()); 087 088 pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool), 089 6 * 1024 + 200); 090 buff = pair.getFirst(); 091 assertFalse(buff.hasArray()); 092 assertTrue(buff instanceof MultiByteBuff); 093 bbs = ((MultiByteBuff) buff).getEnclosingByteBuffers(); 094 assertEquals(2, bbs.length); 095 assertTrue(bbs[0].isDirect()); 096 assertFalse(bbs[1].isDirect()); 097 assertEquals(6 * 1024, bbs[0].limit()); 098 assertEquals(200, bbs[1].limit()); 099 assertEquals(maxBuffersInPool - 1, pool.getQueueSize()); 100 assertNotNull(pair.getSecond()); 101 pair.getSecond().run();// CallCleanup#run should put back the BB to pool. 102 assertEquals(maxBuffersInPool, pool.getQueueSize()); 103 104 ByteBuffer[] buffers = new ByteBuffer[maxBuffersInPool - 1]; 105 for (int i = 0; i < maxBuffersInPool - 1; i++) { 106 buffers[i] = pool.getBuffer(); 107 } 108 pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool), 109 20 * 1024); 110 buff = pair.getFirst(); 111 assertFalse(buff.hasArray()); 112 assertTrue(buff instanceof MultiByteBuff); 113 bbs = ((MultiByteBuff) buff).getEnclosingByteBuffers(); 114 assertEquals(2, bbs.length); 115 assertTrue(bbs[0].isDirect()); 116 assertFalse(bbs[1].isDirect()); 117 assertEquals(6 * 1024, bbs[0].limit()); 118 assertEquals(14 * 1024, bbs[1].limit()); 119 assertEquals(0, pool.getQueueSize()); 120 assertNotNull(pair.getSecond()); 121 pair.getSecond().run();// CallCleanup#run should put back the BB to pool. 122 assertEquals(1, pool.getQueueSize()); 123 pool.getBuffer(); 124 pair = RpcServer.allocateByteBuffToReadInto(pool, RpcServer.getMinSizeForReservoirUse(pool), 125 7 * 1024); 126 buff = pair.getFirst(); 127 assertTrue(buff.hasArray()); 128 assertTrue(buff instanceof SingleByteBuff); 129 assertEquals(7 * 1024, ((SingleByteBuff) buff).getEnclosingByteBuffer().limit()); 130 assertNull(pair.getSecond()); 131 } 132 133 private void initPoolWithAllBuffers(ByteBufferPool pool, int maxBuffersInPool) { 134 ByteBuffer[] buffers = new ByteBuffer[maxBuffersInPool]; 135 // Just call getBuffer() on pool 'maxBuffersInPool' so as to init all buffers and then put back 136 // all. Makes pool with max #buffers. 137 for (int i = 0; i < maxBuffersInPool; i++) { 138 buffers[i] = pool.getBuffer(); 139 } 140 for (ByteBuffer buf : buffers) { 141 pool.putbackBuffer(buf); 142 } 143 } 144}