001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import java.io.IOException; 021import java.nio.ByteBuffer; 022import org.apache.hadoop.hbase.HBaseClassTestRule; 023import org.apache.hadoop.hbase.io.ByteBuffAllocator; 024import org.apache.hadoop.hbase.io.hfile.Cacheable; 025import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer; 026import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager; 027import org.apache.hadoop.hbase.nio.ByteBuff; 028import org.apache.hadoop.hbase.testclassification.IOTests; 029import org.apache.hadoop.hbase.testclassification.SmallTests; 030import org.junit.Assert; 031import org.junit.ClassRule; 032import org.junit.Test; 033import org.junit.experimental.categories.Category; 034 035/** 036 * Basic test for {@link ByteBufferIOEngine} 037 */ 038@Category({ IOTests.class, SmallTests.class }) 039public class TestByteBufferIOEngine { 040 041 @ClassRule 042 public static final HBaseClassTestRule CLASS_RULE = 043 HBaseClassTestRule.forClass(TestByteBufferIOEngine.class); 044 045 /** 046 * Override the {@link BucketEntry} so that we can set an arbitrary offset. 047 */ 048 private static class MockBucketEntry extends BucketEntry { 049 private long off; 050 051 MockBucketEntry(long offset, int length, ByteBuffAllocator allocator) { 052 super(offset & 0xFF00, length, 0, false, (entry) -> { 053 return ByteBuffAllocator.NONE; 054 }, allocator); 055 this.off = offset; 056 } 057 058 @Override 059 long offset() { 060 return this.off; 061 } 062 } 063 064 private static BufferGrabbingDeserializer DESERIALIZER = new BufferGrabbingDeserializer(); 065 static { 066 int id = CacheableDeserializerIdManager.registerDeserializer(DESERIALIZER); 067 DESERIALIZER.setIdentifier(id); 068 } 069 070 static BucketEntry createBucketEntry(long offset, int len) { 071 return createBucketEntry(offset, len, ByteBuffAllocator.HEAP); 072 } 073 074 static BucketEntry createBucketEntry(long offset, int len, ByteBuffAllocator allocator) { 075 BucketEntry be = new MockBucketEntry(offset, len, allocator); 076 be.setDeserializerReference(DESERIALIZER); 077 return be; 078 } 079 080 static ByteBuff getByteBuff(BucketEntry be) { 081 return ((BufferGrabbingDeserializer) be.deserializerReference()).buf; 082 } 083 084 @Test 085 public void testByteBufferIOEngine() throws Exception { 086 int capacity = 32 * 1024 * 1024; // 32 MB 087 int testNum = 100; 088 int maxBlockSize = 64 * 1024; 089 ByteBufferIOEngine ioEngine = new ByteBufferIOEngine(capacity); 090 int testOffsetAtStartNum = testNum / 10; 091 int testOffsetAtEndNum = testNum / 10; 092 for (int i = 0; i < testNum; i++) { 093 byte val = (byte) (Math.random() * 255); 094 int blockSize = (int) (Math.random() * maxBlockSize); 095 if (blockSize == 0) { 096 blockSize = 1; 097 } 098 099 ByteBuff src = createByteBuffer(blockSize, val, i % 2 == 0); 100 int pos = src.position(), lim = src.limit(); 101 int offset; 102 if (testOffsetAtStartNum > 0) { 103 testOffsetAtStartNum--; 104 offset = 0; 105 } else if (testOffsetAtEndNum > 0) { 106 testOffsetAtEndNum--; 107 offset = capacity - blockSize; 108 } else { 109 offset = (int) (Math.random() * (capacity - maxBlockSize)); 110 } 111 ioEngine.write(src, offset); 112 src.position(pos).limit(lim); 113 114 BucketEntry be = createBucketEntry(offset, blockSize); 115 ioEngine.read(be); 116 ByteBuff dst = getByteBuff(be); 117 Assert.assertEquals(src.remaining(), blockSize); 118 Assert.assertEquals(dst.remaining(), blockSize); 119 Assert.assertEquals(0, ByteBuff.compareTo(src, src.position(), src.remaining(), dst, 120 dst.position(), dst.remaining())); 121 } 122 assert testOffsetAtStartNum == 0; 123 assert testOffsetAtEndNum == 0; 124 } 125 126 /** 127 * A CacheableDeserializer implementation which just store reference to the {@link ByteBuff} to be 128 * deserialized. 129 */ 130 static class BufferGrabbingDeserializer implements CacheableDeserializer<Cacheable> { 131 private ByteBuff buf; 132 private int identifier; 133 134 @Override 135 public Cacheable deserialize(final ByteBuff b, ByteBuffAllocator alloc) throws IOException { 136 this.buf = b; 137 return null; 138 } 139 140 public void setIdentifier(int identifier) { 141 this.identifier = identifier; 142 } 143 144 @Override 145 public int getDeserializerIdentifier() { 146 return identifier; 147 } 148 } 149 150 static ByteBuff createByteBuffer(int len, int val, boolean useHeap) { 151 ByteBuffer b = useHeap ? ByteBuffer.allocate(2 * len) : ByteBuffer.allocateDirect(2 * len); 152 int pos = (int) (Math.random() * len); 153 b.position(pos).limit(pos + len); 154 for (int i = pos; i < pos + len; i++) { 155 b.put(i, (byte) val); 156 } 157 return ByteBuff.wrap(b); 158 } 159 160 @Test 161 public void testByteBufferIOEngineWithMBB() throws Exception { 162 int capacity = 32 * 1024 * 1024; // 32 MB 163 int testNum = 100; 164 int maxBlockSize = 64 * 1024; 165 ByteBufferIOEngine ioEngine = new ByteBufferIOEngine(capacity); 166 int testOffsetAtStartNum = testNum / 10; 167 int testOffsetAtEndNum = testNum / 10; 168 for (int i = 0; i < testNum; i++) { 169 byte val = (byte) (Math.random() * 255); 170 int blockSize = (int) (Math.random() * maxBlockSize); 171 if (blockSize == 0) { 172 blockSize = 1; 173 } 174 ByteBuff src = createByteBuffer(blockSize, val, i % 2 == 0); 175 int pos = src.position(), lim = src.limit(); 176 int offset; 177 if (testOffsetAtStartNum > 0) { 178 testOffsetAtStartNum--; 179 offset = 0; 180 } else if (testOffsetAtEndNum > 0) { 181 testOffsetAtEndNum--; 182 offset = capacity - blockSize; 183 } else { 184 offset = (int) (Math.random() * (capacity - maxBlockSize)); 185 } 186 ioEngine.write(src, offset); 187 src.position(pos).limit(lim); 188 189 BucketEntry be = createBucketEntry(offset, blockSize); 190 ioEngine.read(be); 191 ByteBuff dst = getByteBuff(be); 192 Assert.assertEquals(src.remaining(), blockSize); 193 Assert.assertEquals(dst.remaining(), blockSize); 194 Assert.assertEquals(0, ByteBuff.compareTo(src, src.position(), src.remaining(), dst, 195 dst.position(), dst.remaining())); 196 } 197 assert testOffsetAtStartNum == 0; 198 assert testOffsetAtEndNum == 0; 199 } 200}