001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.io.hfile.bucket; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021 022import java.io.IOException; 023import java.nio.ByteBuffer; 024import org.apache.hadoop.hbase.io.ByteBuffAllocator; 025import org.apache.hadoop.hbase.io.hfile.Cacheable; 026import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer; 027import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager; 028import org.apache.hadoop.hbase.nio.ByteBuff; 029import org.apache.hadoop.hbase.testclassification.IOTests; 030import org.apache.hadoop.hbase.testclassification.SmallTests; 031import org.junit.jupiter.api.Tag; 032import org.junit.jupiter.api.Test; 033 034/** 035 * Basic test for {@link ByteBufferIOEngine} 036 */ 037@Tag(IOTests.TAG) 038@Tag(SmallTests.TAG) 039public class TestByteBufferIOEngine { 040 041 /** 042 * Override the {@link BucketEntry} so that we can set an arbitrary offset. 043 */ 044 private static class MockBucketEntry extends BucketEntry { 045 private long off; 046 047 MockBucketEntry(long offset, int length, ByteBuffAllocator allocator) { 048 super(offset & 0xFF00, length, length, 0, false, (entry) -> { 049 return ByteBuffAllocator.NONE; 050 }, allocator); 051 this.off = offset; 052 } 053 054 @Override 055 long offset() { 056 return this.off; 057 } 058 } 059 060 private static BufferGrabbingDeserializer DESERIALIZER = new BufferGrabbingDeserializer(); 061 static { 062 int id = CacheableDeserializerIdManager.registerDeserializer(DESERIALIZER); 063 DESERIALIZER.setIdentifier(id); 064 } 065 066 static BucketEntry createBucketEntry(long offset, int len) { 067 return createBucketEntry(offset, len, ByteBuffAllocator.HEAP); 068 } 069 070 static BucketEntry createBucketEntry(long offset, int len, ByteBuffAllocator allocator) { 071 BucketEntry be = new MockBucketEntry(offset, len, allocator); 072 be.setDeserializerReference(DESERIALIZER); 073 return be; 074 } 075 076 static ByteBuff getByteBuff(BucketEntry be) { 077 return ((BufferGrabbingDeserializer) be.deserializerReference()).buf; 078 } 079 080 @Test 081 public void testByteBufferIOEngine() throws Exception { 082 int capacity = 32 * 1024 * 1024; // 32 MB 083 int testNum = 100; 084 int maxBlockSize = 64 * 1024; 085 ByteBufferIOEngine ioEngine = new ByteBufferIOEngine(capacity); 086 int testOffsetAtStartNum = testNum / 10; 087 int testOffsetAtEndNum = testNum / 10; 088 for (int i = 0; i < testNum; i++) { 089 byte val = (byte) (Math.random() * 255); 090 int blockSize = (int) (Math.random() * maxBlockSize); 091 if (blockSize == 0) { 092 blockSize = 1; 093 } 094 095 ByteBuff src = createByteBuffer(blockSize, val, i % 2 == 0); 096 int pos = src.position(), lim = src.limit(); 097 int offset; 098 if (testOffsetAtStartNum > 0) { 099 testOffsetAtStartNum--; 100 offset = 0; 101 } else if (testOffsetAtEndNum > 0) { 102 testOffsetAtEndNum--; 103 offset = capacity - blockSize; 104 } else { 105 offset = (int) (Math.random() * (capacity - maxBlockSize)); 106 } 107 ioEngine.write(src, offset); 108 src.position(pos).limit(lim); 109 110 BucketEntry be = createBucketEntry(offset, blockSize); 111 ioEngine.read(be); 112 ByteBuff dst = getByteBuff(be); 113 assertEquals(src.remaining(), blockSize); 114 assertEquals(dst.remaining(), blockSize); 115 assertEquals(0, ByteBuff.compareTo(src, src.position(), src.remaining(), dst, dst.position(), 116 dst.remaining())); 117 } 118 assert testOffsetAtStartNum == 0; 119 assert testOffsetAtEndNum == 0; 120 } 121 122 /** 123 * A CacheableDeserializer implementation which just store reference to the {@link ByteBuff} to be 124 * deserialized. 125 */ 126 static class BufferGrabbingDeserializer implements CacheableDeserializer<Cacheable> { 127 private ByteBuff buf; 128 private int identifier; 129 130 @Override 131 public Cacheable deserialize(final ByteBuff b, ByteBuffAllocator alloc) throws IOException { 132 this.buf = b; 133 return null; 134 } 135 136 public void setIdentifier(int identifier) { 137 this.identifier = identifier; 138 } 139 140 @Override 141 public int getDeserializerIdentifier() { 142 return identifier; 143 } 144 } 145 146 static ByteBuff createByteBuffer(int len, int val, boolean useHeap) { 147 ByteBuffer b = useHeap ? ByteBuffer.allocate(2 * len) : ByteBuffer.allocateDirect(2 * len); 148 int pos = (int) (Math.random() * len); 149 b.position(pos).limit(pos + len); 150 for (int i = pos; i < pos + len; i++) { 151 b.put(i, (byte) val); 152 } 153 return ByteBuff.wrap(b); 154 } 155 156 @Test 157 public void testByteBufferIOEngineWithMBB() throws Exception { 158 int capacity = 32 * 1024 * 1024; // 32 MB 159 int testNum = 100; 160 int maxBlockSize = 64 * 1024; 161 ByteBufferIOEngine ioEngine = new ByteBufferIOEngine(capacity); 162 int testOffsetAtStartNum = testNum / 10; 163 int testOffsetAtEndNum = testNum / 10; 164 for (int i = 0; i < testNum; i++) { 165 byte val = (byte) (Math.random() * 255); 166 int blockSize = (int) (Math.random() * maxBlockSize); 167 if (blockSize == 0) { 168 blockSize = 1; 169 } 170 ByteBuff src = createByteBuffer(blockSize, val, i % 2 == 0); 171 int pos = src.position(), lim = src.limit(); 172 int offset; 173 if (testOffsetAtStartNum > 0) { 174 testOffsetAtStartNum--; 175 offset = 0; 176 } else if (testOffsetAtEndNum > 0) { 177 testOffsetAtEndNum--; 178 offset = capacity - blockSize; 179 } else { 180 offset = (int) (Math.random() * (capacity - maxBlockSize)); 181 } 182 ioEngine.write(src, offset); 183 src.position(pos).limit(lim); 184 185 BucketEntry be = createBucketEntry(offset, blockSize); 186 ioEngine.read(be); 187 ByteBuff dst = getByteBuff(be); 188 assertEquals(src.remaining(), blockSize); 189 assertEquals(dst.remaining(), blockSize); 190 assertEquals(0, ByteBuff.compareTo(src, src.position(), src.remaining(), dst, dst.position(), 191 dst.remaining())); 192 } 193 assert testOffsetAtStartNum == 0; 194 assert testOffsetAtEndNum == 0; 195 } 196}