001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import java.io.IOException; 021import java.nio.ByteBuffer; 022import org.apache.hadoop.hbase.HBaseClassTestRule; 023import org.apache.hadoop.hbase.io.ByteBuffAllocator; 024import org.apache.hadoop.hbase.io.hfile.Cacheable; 025import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer; 026import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager; 027import org.apache.hadoop.hbase.nio.ByteBuff; 028import org.apache.hadoop.hbase.nio.RefCnt; 029import org.apache.hadoop.hbase.testclassification.IOTests; 030import org.apache.hadoop.hbase.testclassification.SmallTests; 031import org.junit.Assert; 032import org.junit.ClassRule; 033import org.junit.Test; 034import org.junit.experimental.categories.Category; 035 036/** 037 * Basic test for {@link ByteBufferIOEngine} 038 */ 039@Category({ IOTests.class, SmallTests.class }) 040public class TestByteBufferIOEngine { 041 042 @ClassRule 043 public static final HBaseClassTestRule CLASS_RULE = 044 HBaseClassTestRule.forClass(TestByteBufferIOEngine.class); 045 046 /** 047 * Override the {@link BucketEntry} so that we can set an arbitrary offset. 048 */ 049 private static class MockBucketEntry extends BucketEntry { 050 private long off; 051 052 MockBucketEntry(long offset, int length, ByteBuffAllocator allocator) { 053 super(offset & 0xFF00, length, 0, false, RefCnt.create(), allocator); 054 this.off = offset; 055 } 056 057 @Override 058 long offset() { 059 return this.off; 060 } 061 } 062 063 private static BufferGrabbingDeserializer DESERIALIZER = new BufferGrabbingDeserializer(); 064 static { 065 int id = CacheableDeserializerIdManager.registerDeserializer(DESERIALIZER); 066 DESERIALIZER.setIdentifier(id); 067 } 068 069 static BucketEntry createBucketEntry(long offset, int len) { 070 return createBucketEntry(offset, len, ByteBuffAllocator.HEAP); 071 } 072 073 static BucketEntry createBucketEntry(long offset, int len, ByteBuffAllocator allocator) { 074 BucketEntry be = new MockBucketEntry(offset, len, allocator); 075 be.setDeserializerReference(DESERIALIZER); 076 return be; 077 } 078 079 static ByteBuff getByteBuff(BucketEntry be) { 080 return ((BufferGrabbingDeserializer) be.deserializerReference()).buf; 081 } 082 083 @Test 084 public void testByteBufferIOEngine() throws Exception { 085 int capacity = 32 * 1024 * 1024; // 32 MB 086 int testNum = 100; 087 int maxBlockSize = 64 * 1024; 088 ByteBufferIOEngine ioEngine = new ByteBufferIOEngine(capacity); 089 int testOffsetAtStartNum = testNum / 10; 090 int testOffsetAtEndNum = testNum / 10; 091 for (int i = 0; i < testNum; i++) { 092 byte val = (byte) (Math.random() * 255); 093 int blockSize = (int) (Math.random() * maxBlockSize); 094 if (blockSize == 0) { 095 blockSize = 1; 096 } 097 098 ByteBuff src = createByteBuffer(blockSize, val, i % 2 == 0); 099 int pos = src.position(), lim = src.limit(); 100 int offset; 101 if (testOffsetAtStartNum > 0) { 102 testOffsetAtStartNum--; 103 offset = 0; 104 } else if (testOffsetAtEndNum > 0) { 105 testOffsetAtEndNum--; 106 offset = capacity - blockSize; 107 } else { 108 offset = (int) (Math.random() * (capacity - maxBlockSize)); 109 } 110 ioEngine.write(src, offset); 111 src.position(pos).limit(lim); 112 113 BucketEntry be = createBucketEntry(offset, blockSize); 114 ioEngine.read(be); 115 ByteBuff dst = getByteBuff(be); 116 Assert.assertEquals(src.remaining(), blockSize); 117 Assert.assertEquals(dst.remaining(), blockSize); 118 Assert.assertEquals(0, ByteBuff.compareTo(src, src.position(), src.remaining(), dst, 119 dst.position(), dst.remaining())); 120 } 121 assert testOffsetAtStartNum == 0; 122 assert testOffsetAtEndNum == 0; 123 } 124 125 /** 126 * A CacheableDeserializer implementation which just store reference to the {@link ByteBuff} to be 127 * deserialized. 128 */ 129 static class BufferGrabbingDeserializer implements CacheableDeserializer<Cacheable> { 130 private ByteBuff buf; 131 private int identifier; 132 133 @Override 134 public Cacheable deserialize(final ByteBuff b, ByteBuffAllocator alloc) 135 throws IOException { 136 this.buf = b; 137 return null; 138 } 139 140 public void setIdentifier(int identifier) { 141 this.identifier = identifier; 142 } 143 144 @Override 145 public int getDeserializerIdentifier() { 146 return identifier; 147 } 148 } 149 150 static ByteBuff createByteBuffer(int len, int val, boolean useHeap) { 151 ByteBuffer b = useHeap ? ByteBuffer.allocate(2 * len) : ByteBuffer.allocateDirect(2 * len); 152 int pos = (int) (Math.random() * len); 153 b.position(pos).limit(pos + len); 154 for (int i = pos; i < pos + len; i++) { 155 b.put(i, (byte) val); 156 } 157 return ByteBuff.wrap(b); 158 } 159 160 @Test 161 public void testByteBufferIOEngineWithMBB() throws Exception { 162 int capacity = 32 * 1024 * 1024; // 32 MB 163 int testNum = 100; 164 int maxBlockSize = 64 * 1024; 165 ByteBufferIOEngine ioEngine = new ByteBufferIOEngine(capacity); 166 int testOffsetAtStartNum = testNum / 10; 167 int testOffsetAtEndNum = testNum / 10; 168 for (int i = 0; i < testNum; i++) { 169 byte val = (byte) (Math.random() * 255); 170 int blockSize = (int) (Math.random() * maxBlockSize); 171 if (blockSize == 0) { 172 blockSize = 1; 173 } 174 ByteBuff src = createByteBuffer(blockSize, val, i % 2 == 0); 175 int pos = src.position(), lim = src.limit(); 176 int offset; 177 if (testOffsetAtStartNum > 0) { 178 testOffsetAtStartNum--; 179 offset = 0; 180 } else if (testOffsetAtEndNum > 0) { 181 testOffsetAtEndNum--; 182 offset = capacity - blockSize; 183 } else { 184 offset = (int) (Math.random() * (capacity - maxBlockSize)); 185 } 186 ioEngine.write(src, offset); 187 src.position(pos).limit(lim); 188 189 BucketEntry be = createBucketEntry(offset, blockSize); 190 ioEngine.read(be); 191 ByteBuff dst = getByteBuff(be); 192 Assert.assertEquals(src.remaining(), blockSize); 193 Assert.assertEquals(dst.remaining(), blockSize); 194 Assert.assertEquals(0, ByteBuff.compareTo(src, src.position(), src.remaining(), dst, 195 dst.position(), dst.remaining())); 196 } 197 assert testOffsetAtStartNum == 0; 198 assert testOffsetAtEndNum == 0; 199 } 200}