001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import java.io.IOException;
021import java.nio.ByteBuffer;
022import org.apache.hadoop.hbase.HBaseClassTestRule;
023import org.apache.hadoop.hbase.io.ByteBuffAllocator;
024import org.apache.hadoop.hbase.io.hfile.Cacheable;
025import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
026import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
027import org.apache.hadoop.hbase.nio.ByteBuff;
028import org.apache.hadoop.hbase.nio.RefCnt;
029import org.apache.hadoop.hbase.testclassification.IOTests;
030import org.apache.hadoop.hbase.testclassification.SmallTests;
031import org.junit.Assert;
032import org.junit.ClassRule;
033import org.junit.Test;
034import org.junit.experimental.categories.Category;
035
036/**
037 * Basic test for {@link ByteBufferIOEngine}
038 */
039@Category({ IOTests.class, SmallTests.class })
040public class TestByteBufferIOEngine {
041
042  @ClassRule
043  public static final HBaseClassTestRule CLASS_RULE =
044      HBaseClassTestRule.forClass(TestByteBufferIOEngine.class);
045
046  /**
047   * Override the {@link BucketEntry} so that we can set an arbitrary offset.
048   */
049  private static class MockBucketEntry extends BucketEntry {
050    private long off;
051
052    MockBucketEntry(long offset, int length, ByteBuffAllocator allocator) {
053      super(offset & 0xFF00, length, 0, false, RefCnt.create(), allocator);
054      this.off = offset;
055    }
056
057    @Override
058    long offset() {
059      return this.off;
060    }
061  }
062
063  private static BufferGrabbingDeserializer DESERIALIZER = new BufferGrabbingDeserializer();
064  static {
065    int id = CacheableDeserializerIdManager.registerDeserializer(DESERIALIZER);
066    DESERIALIZER.setIdentifier(id);
067  }
068
069  static BucketEntry createBucketEntry(long offset, int len) {
070    return createBucketEntry(offset, len, ByteBuffAllocator.HEAP);
071  }
072
073  static BucketEntry createBucketEntry(long offset, int len, ByteBuffAllocator allocator) {
074    BucketEntry be = new MockBucketEntry(offset, len, allocator);
075    be.setDeserializerReference(DESERIALIZER);
076    return be;
077  }
078
079  static ByteBuff getByteBuff(BucketEntry be) {
080    return ((BufferGrabbingDeserializer) be.deserializerReference()).buf;
081  }
082
083  @Test
084  public void testByteBufferIOEngine() throws Exception {
085    int capacity = 32 * 1024 * 1024; // 32 MB
086    int testNum = 100;
087    int maxBlockSize = 64 * 1024;
088    ByteBufferIOEngine ioEngine = new ByteBufferIOEngine(capacity);
089    int testOffsetAtStartNum = testNum / 10;
090    int testOffsetAtEndNum = testNum / 10;
091    for (int i = 0; i < testNum; i++) {
092      byte val = (byte) (Math.random() * 255);
093      int blockSize = (int) (Math.random() * maxBlockSize);
094      if (blockSize == 0) {
095        blockSize = 1;
096      }
097
098      ByteBuff src = createByteBuffer(blockSize, val, i % 2 == 0);
099      int pos = src.position(), lim = src.limit();
100      int offset;
101      if (testOffsetAtStartNum > 0) {
102        testOffsetAtStartNum--;
103        offset = 0;
104      } else if (testOffsetAtEndNum > 0) {
105        testOffsetAtEndNum--;
106        offset = capacity - blockSize;
107      } else {
108        offset = (int) (Math.random() * (capacity - maxBlockSize));
109      }
110      ioEngine.write(src, offset);
111      src.position(pos).limit(lim);
112
113      BucketEntry be = createBucketEntry(offset, blockSize);
114      ioEngine.read(be);
115      ByteBuff dst = getByteBuff(be);
116      Assert.assertEquals(src.remaining(), blockSize);
117      Assert.assertEquals(dst.remaining(), blockSize);
118      Assert.assertEquals(0, ByteBuff.compareTo(src, src.position(), src.remaining(), dst,
119        dst.position(), dst.remaining()));
120    }
121    assert testOffsetAtStartNum == 0;
122    assert testOffsetAtEndNum == 0;
123  }
124
125  /**
126   * A CacheableDeserializer implementation which just store reference to the {@link ByteBuff} to be
127   * deserialized.
128   */
129  static class BufferGrabbingDeserializer implements CacheableDeserializer<Cacheable> {
130    private ByteBuff buf;
131    private int identifier;
132
133    @Override
134    public Cacheable deserialize(final ByteBuff b, ByteBuffAllocator alloc)
135        throws IOException {
136      this.buf = b;
137      return null;
138    }
139
140    public void setIdentifier(int identifier) {
141      this.identifier = identifier;
142    }
143
144    @Override
145    public int getDeserializerIdentifier() {
146      return identifier;
147    }
148  }
149
150  static ByteBuff createByteBuffer(int len, int val, boolean useHeap) {
151    ByteBuffer b = useHeap ? ByteBuffer.allocate(2 * len) : ByteBuffer.allocateDirect(2 * len);
152    int pos = (int) (Math.random() * len);
153    b.position(pos).limit(pos + len);
154    for (int i = pos; i < pos + len; i++) {
155      b.put(i, (byte) val);
156    }
157    return ByteBuff.wrap(b);
158  }
159
160  @Test
161  public void testByteBufferIOEngineWithMBB() throws Exception {
162    int capacity = 32 * 1024 * 1024; // 32 MB
163    int testNum = 100;
164    int maxBlockSize = 64 * 1024;
165    ByteBufferIOEngine ioEngine = new ByteBufferIOEngine(capacity);
166    int testOffsetAtStartNum = testNum / 10;
167    int testOffsetAtEndNum = testNum / 10;
168    for (int i = 0; i < testNum; i++) {
169      byte val = (byte) (Math.random() * 255);
170      int blockSize = (int) (Math.random() * maxBlockSize);
171      if (blockSize == 0) {
172        blockSize = 1;
173      }
174      ByteBuff src = createByteBuffer(blockSize, val, i % 2 == 0);
175      int pos = src.position(), lim = src.limit();
176      int offset;
177      if (testOffsetAtStartNum > 0) {
178        testOffsetAtStartNum--;
179        offset = 0;
180      } else if (testOffsetAtEndNum > 0) {
181        testOffsetAtEndNum--;
182        offset = capacity - blockSize;
183      } else {
184        offset = (int) (Math.random() * (capacity - maxBlockSize));
185      }
186      ioEngine.write(src, offset);
187      src.position(pos).limit(lim);
188
189      BucketEntry be = createBucketEntry(offset, blockSize);
190      ioEngine.read(be);
191      ByteBuff dst = getByteBuff(be);
192      Assert.assertEquals(src.remaining(), blockSize);
193      Assert.assertEquals(dst.remaining(), blockSize);
194      Assert.assertEquals(0, ByteBuff.compareTo(src, src.position(), src.remaining(), dst,
195        dst.position(), dst.remaining()));
196    }
197    assert testOffsetAtStartNum == 0;
198    assert testOffsetAtEndNum == 0;
199  }
200}