001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021
022import java.io.IOException;
023import java.nio.ByteBuffer;
024import org.apache.hadoop.hbase.io.ByteBuffAllocator;
025import org.apache.hadoop.hbase.io.hfile.Cacheable;
026import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer;
027import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager;
028import org.apache.hadoop.hbase.nio.ByteBuff;
029import org.apache.hadoop.hbase.testclassification.IOTests;
030import org.apache.hadoop.hbase.testclassification.SmallTests;
031import org.junit.jupiter.api.Tag;
032import org.junit.jupiter.api.Test;
033
034/**
035 * Basic test for {@link ByteBufferIOEngine}
036 */
037@Tag(IOTests.TAG)
038@Tag(SmallTests.TAG)
039public class TestByteBufferIOEngine {
040
041  /**
042   * Override the {@link BucketEntry} so that we can set an arbitrary offset.
043   */
044  private static class MockBucketEntry extends BucketEntry {
045    private long off;
046
047    MockBucketEntry(long offset, int length, ByteBuffAllocator allocator) {
048      super(offset & 0xFF00, length, length, 0, false, (entry) -> {
049        return ByteBuffAllocator.NONE;
050      }, allocator);
051      this.off = offset;
052    }
053
054    @Override
055    long offset() {
056      return this.off;
057    }
058  }
059
060  private static BufferGrabbingDeserializer DESERIALIZER = new BufferGrabbingDeserializer();
061  static {
062    int id = CacheableDeserializerIdManager.registerDeserializer(DESERIALIZER);
063    DESERIALIZER.setIdentifier(id);
064  }
065
066  static BucketEntry createBucketEntry(long offset, int len) {
067    return createBucketEntry(offset, len, ByteBuffAllocator.HEAP);
068  }
069
070  static BucketEntry createBucketEntry(long offset, int len, ByteBuffAllocator allocator) {
071    BucketEntry be = new MockBucketEntry(offset, len, allocator);
072    be.setDeserializerReference(DESERIALIZER);
073    return be;
074  }
075
076  static ByteBuff getByteBuff(BucketEntry be) {
077    return ((BufferGrabbingDeserializer) be.deserializerReference()).buf;
078  }
079
080  @Test
081  public void testByteBufferIOEngine() throws Exception {
082    int capacity = 32 * 1024 * 1024; // 32 MB
083    int testNum = 100;
084    int maxBlockSize = 64 * 1024;
085    ByteBufferIOEngine ioEngine = new ByteBufferIOEngine(capacity);
086    int testOffsetAtStartNum = testNum / 10;
087    int testOffsetAtEndNum = testNum / 10;
088    for (int i = 0; i < testNum; i++) {
089      byte val = (byte) (Math.random() * 255);
090      int blockSize = (int) (Math.random() * maxBlockSize);
091      if (blockSize == 0) {
092        blockSize = 1;
093      }
094
095      ByteBuff src = createByteBuffer(blockSize, val, i % 2 == 0);
096      int pos = src.position(), lim = src.limit();
097      int offset;
098      if (testOffsetAtStartNum > 0) {
099        testOffsetAtStartNum--;
100        offset = 0;
101      } else if (testOffsetAtEndNum > 0) {
102        testOffsetAtEndNum--;
103        offset = capacity - blockSize;
104      } else {
105        offset = (int) (Math.random() * (capacity - maxBlockSize));
106      }
107      ioEngine.write(src, offset);
108      src.position(pos).limit(lim);
109
110      BucketEntry be = createBucketEntry(offset, blockSize);
111      ioEngine.read(be);
112      ByteBuff dst = getByteBuff(be);
113      assertEquals(src.remaining(), blockSize);
114      assertEquals(dst.remaining(), blockSize);
115      assertEquals(0, ByteBuff.compareTo(src, src.position(), src.remaining(), dst, dst.position(),
116        dst.remaining()));
117    }
118    assert testOffsetAtStartNum == 0;
119    assert testOffsetAtEndNum == 0;
120  }
121
122  /**
123   * A CacheableDeserializer implementation which just store reference to the {@link ByteBuff} to be
124   * deserialized.
125   */
126  static class BufferGrabbingDeserializer implements CacheableDeserializer<Cacheable> {
127    private ByteBuff buf;
128    private int identifier;
129
130    @Override
131    public Cacheable deserialize(final ByteBuff b, ByteBuffAllocator alloc) throws IOException {
132      this.buf = b;
133      return null;
134    }
135
136    public void setIdentifier(int identifier) {
137      this.identifier = identifier;
138    }
139
140    @Override
141    public int getDeserializerIdentifier() {
142      return identifier;
143    }
144  }
145
146  static ByteBuff createByteBuffer(int len, int val, boolean useHeap) {
147    ByteBuffer b = useHeap ? ByteBuffer.allocate(2 * len) : ByteBuffer.allocateDirect(2 * len);
148    int pos = (int) (Math.random() * len);
149    b.position(pos).limit(pos + len);
150    for (int i = pos; i < pos + len; i++) {
151      b.put(i, (byte) val);
152    }
153    return ByteBuff.wrap(b);
154  }
155
156  @Test
157  public void testByteBufferIOEngineWithMBB() throws Exception {
158    int capacity = 32 * 1024 * 1024; // 32 MB
159    int testNum = 100;
160    int maxBlockSize = 64 * 1024;
161    ByteBufferIOEngine ioEngine = new ByteBufferIOEngine(capacity);
162    int testOffsetAtStartNum = testNum / 10;
163    int testOffsetAtEndNum = testNum / 10;
164    for (int i = 0; i < testNum; i++) {
165      byte val = (byte) (Math.random() * 255);
166      int blockSize = (int) (Math.random() * maxBlockSize);
167      if (blockSize == 0) {
168        blockSize = 1;
169      }
170      ByteBuff src = createByteBuffer(blockSize, val, i % 2 == 0);
171      int pos = src.position(), lim = src.limit();
172      int offset;
173      if (testOffsetAtStartNum > 0) {
174        testOffsetAtStartNum--;
175        offset = 0;
176      } else if (testOffsetAtEndNum > 0) {
177        testOffsetAtEndNum--;
178        offset = capacity - blockSize;
179      } else {
180        offset = (int) (Math.random() * (capacity - maxBlockSize));
181      }
182      ioEngine.write(src, offset);
183      src.position(pos).limit(lim);
184
185      BucketEntry be = createBucketEntry(offset, blockSize);
186      ioEngine.read(be);
187      ByteBuff dst = getByteBuff(be);
188      assertEquals(src.remaining(), blockSize);
189      assertEquals(dst.remaining(), blockSize);
190      assertEquals(0, ByteBuff.compareTo(src, src.position(), src.remaining(), dst, dst.position(),
191        dst.remaining()));
192    }
193    assert testOffsetAtStartNum == 0;
194    assert testOffsetAtEndNum == 0;
195  }
196}