001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import java.io.IOException;
021import java.nio.ByteBuffer;
022import org.apache.hadoop.hbase.HBaseClassTestRule;
023import org.apache.hadoop.hbase.io.ByteBuffAllocator;
024import org.apache.hadoop.hbase.io.hfile.Cacheable;
025import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
026import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
027import org.apache.hadoop.hbase.nio.ByteBuff;
028import org.apache.hadoop.hbase.testclassification.IOTests;
029import org.apache.hadoop.hbase.testclassification.SmallTests;
030import org.junit.Assert;
031import org.junit.ClassRule;
032import org.junit.Test;
033import org.junit.experimental.categories.Category;
034
035/**
036 * Basic test for {@link ByteBufferIOEngine}
037 */
038@Category({ IOTests.class, SmallTests.class })
039public class TestByteBufferIOEngine {
040
041  @ClassRule
042  public static final HBaseClassTestRule CLASS_RULE =
043      HBaseClassTestRule.forClass(TestByteBufferIOEngine.class);
044
045  /**
046   * Override the {@link BucketEntry} so that we can set an arbitrary offset.
047   */
048  private static class MockBucketEntry extends BucketEntry {
049    private long off;
050
051    MockBucketEntry(long offset, int length, ByteBuffAllocator allocator) {
052      super(offset & 0xFF00, length, 0, false, (entry) -> {
053        return ByteBuffAllocator.NONE;
054      }, allocator);
055      this.off = offset;
056    }
057
058    @Override
059    long offset() {
060      return this.off;
061    }
062  }
063
064  private static BufferGrabbingDeserializer DESERIALIZER = new BufferGrabbingDeserializer();
065  static {
066    int id = CacheableDeserializerIdManager.registerDeserializer(DESERIALIZER);
067    DESERIALIZER.setIdentifier(id);
068  }
069
070  static BucketEntry createBucketEntry(long offset, int len) {
071    return createBucketEntry(offset, len, ByteBuffAllocator.HEAP);
072  }
073
074  static BucketEntry createBucketEntry(long offset, int len, ByteBuffAllocator allocator) {
075    BucketEntry be = new MockBucketEntry(offset, len, allocator);
076    be.setDeserializerReference(DESERIALIZER);
077    return be;
078  }
079
080  static ByteBuff getByteBuff(BucketEntry be) {
081    return ((BufferGrabbingDeserializer) be.deserializerReference()).buf;
082  }
083
084  @Test
085  public void testByteBufferIOEngine() throws Exception {
086    int capacity = 32 * 1024 * 1024; // 32 MB
087    int testNum = 100;
088    int maxBlockSize = 64 * 1024;
089    ByteBufferIOEngine ioEngine = new ByteBufferIOEngine(capacity);
090    int testOffsetAtStartNum = testNum / 10;
091    int testOffsetAtEndNum = testNum / 10;
092    for (int i = 0; i < testNum; i++) {
093      byte val = (byte) (Math.random() * 255);
094      int blockSize = (int) (Math.random() * maxBlockSize);
095      if (blockSize == 0) {
096        blockSize = 1;
097      }
098
099      ByteBuff src = createByteBuffer(blockSize, val, i % 2 == 0);
100      int pos = src.position(), lim = src.limit();
101      int offset;
102      if (testOffsetAtStartNum > 0) {
103        testOffsetAtStartNum--;
104        offset = 0;
105      } else if (testOffsetAtEndNum > 0) {
106        testOffsetAtEndNum--;
107        offset = capacity - blockSize;
108      } else {
109        offset = (int) (Math.random() * (capacity - maxBlockSize));
110      }
111      ioEngine.write(src, offset);
112      src.position(pos).limit(lim);
113
114      BucketEntry be = createBucketEntry(offset, blockSize);
115      ioEngine.read(be);
116      ByteBuff dst = getByteBuff(be);
117      Assert.assertEquals(src.remaining(), blockSize);
118      Assert.assertEquals(dst.remaining(), blockSize);
119      Assert.assertEquals(0, ByteBuff.compareTo(src, src.position(), src.remaining(), dst,
120        dst.position(), dst.remaining()));
121    }
122    assert testOffsetAtStartNum == 0;
123    assert testOffsetAtEndNum == 0;
124  }
125
126  /**
127   * A CacheableDeserializer implementation which just store reference to the {@link ByteBuff} to be
128   * deserialized.
129   */
130  static class BufferGrabbingDeserializer implements CacheableDeserializer<Cacheable> {
131    private ByteBuff buf;
132    private int identifier;
133
134    @Override
135    public Cacheable deserialize(final ByteBuff b, ByteBuffAllocator alloc)
136        throws IOException {
137      this.buf = b;
138      return null;
139    }
140
141    public void setIdentifier(int identifier) {
142      this.identifier = identifier;
143    }
144
145    @Override
146    public int getDeserializerIdentifier() {
147      return identifier;
148    }
149  }
150
151  static ByteBuff createByteBuffer(int len, int val, boolean useHeap) {
152    ByteBuffer b = useHeap ? ByteBuffer.allocate(2 * len) : ByteBuffer.allocateDirect(2 * len);
153    int pos = (int) (Math.random() * len);
154    b.position(pos).limit(pos + len);
155    for (int i = pos; i < pos + len; i++) {
156      b.put(i, (byte) val);
157    }
158    return ByteBuff.wrap(b);
159  }
160
161  @Test
162  public void testByteBufferIOEngineWithMBB() throws Exception {
163    int capacity = 32 * 1024 * 1024; // 32 MB
164    int testNum = 100;
165    int maxBlockSize = 64 * 1024;
166    ByteBufferIOEngine ioEngine = new ByteBufferIOEngine(capacity);
167    int testOffsetAtStartNum = testNum / 10;
168    int testOffsetAtEndNum = testNum / 10;
169    for (int i = 0; i < testNum; i++) {
170      byte val = (byte) (Math.random() * 255);
171      int blockSize = (int) (Math.random() * maxBlockSize);
172      if (blockSize == 0) {
173        blockSize = 1;
174      }
175      ByteBuff src = createByteBuffer(blockSize, val, i % 2 == 0);
176      int pos = src.position(), lim = src.limit();
177      int offset;
178      if (testOffsetAtStartNum > 0) {
179        testOffsetAtStartNum--;
180        offset = 0;
181      } else if (testOffsetAtEndNum > 0) {
182        testOffsetAtEndNum--;
183        offset = capacity - blockSize;
184      } else {
185        offset = (int) (Math.random() * (capacity - maxBlockSize));
186      }
187      ioEngine.write(src, offset);
188      src.position(pos).limit(lim);
189
190      BucketEntry be = createBucketEntry(offset, blockSize);
191      ioEngine.read(be);
192      ByteBuff dst = getByteBuff(be);
193      Assert.assertEquals(src.remaining(), blockSize);
194      Assert.assertEquals(dst.remaining(), blockSize);
195      Assert.assertEquals(0, ByteBuff.compareTo(src, src.position(), src.remaining(), dst,
196        dst.position(), dst.remaining()));
197    }
198    assert testOffsetAtStartNum == 0;
199    assert testOffsetAtEndNum == 0;
200  }
201}