001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.apache.hadoop.hbase.io.hfile.bucket.TestByteBufferIOEngine.createBucketEntry;
021import static org.apache.hadoop.hbase.io.hfile.bucket.TestByteBufferIOEngine.getByteBuff;
022import static org.junit.jupiter.api.Assertions.assertArrayEquals;
023import static org.junit.jupiter.api.Assertions.assertEquals;
024import static org.junit.jupiter.api.Assertions.assertNotEquals;
025import static org.junit.jupiter.api.Assertions.assertNotNull;
026import static org.junit.jupiter.api.Assertions.fail;
027
028import java.io.File;
029import java.io.IOException;
030import java.nio.ByteBuffer;
031import java.nio.channels.FileChannel;
032import java.util.ArrayList;
033import java.util.List;
034import org.apache.hadoop.hbase.io.ByteBuffAllocator;
035import org.apache.hadoop.hbase.nio.ByteBuff;
036import org.apache.hadoop.hbase.nio.RefCnt;
037import org.apache.hadoop.hbase.testclassification.IOTests;
038import org.apache.hadoop.hbase.testclassification.SmallTests;
039import org.junit.jupiter.api.AfterEach;
040import org.junit.jupiter.api.BeforeEach;
041import org.junit.jupiter.api.Tag;
042import org.junit.jupiter.api.Test;
043import org.mockito.Mockito;
044import org.mockito.invocation.InvocationOnMock;
045import org.mockito.stubbing.Answer;
046
047/**
048 * Basic test for {@link FileIOEngine}
049 */
050@Tag(IOTests.TAG)
051@Tag(SmallTests.TAG)
052public class TestFileIOEngine {
053
054  private static final long TOTAL_CAPACITY = 6 * 1024 * 1024; // 6 MB
055  private static final String[] FILE_PATHS =
056    { "testFileIOEngine1", "testFileIOEngine2", "testFileIOEngine3" };
057  private static final long SIZE_PER_FILE = TOTAL_CAPACITY / FILE_PATHS.length; // 2 MB per File
058  private final static List<Long> boundaryStartPositions = new ArrayList<Long>();
059  private final static List<Long> boundaryStopPositions = new ArrayList<Long>();
060
061  private FileIOEngine fileIOEngine;
062
063  static {
064    boundaryStartPositions.add(0L);
065    for (int i = 1; i < FILE_PATHS.length; i++) {
066      boundaryStartPositions.add(SIZE_PER_FILE * i - 1);
067      boundaryStartPositions.add(SIZE_PER_FILE * i);
068      boundaryStartPositions.add(SIZE_PER_FILE * i + 1);
069    }
070    for (int i = 1; i < FILE_PATHS.length; i++) {
071      boundaryStopPositions.add(SIZE_PER_FILE * i - 1);
072      boundaryStopPositions.add(SIZE_PER_FILE * i);
073      boundaryStopPositions.add(SIZE_PER_FILE * i + 1);
074    }
075    boundaryStopPositions.add(SIZE_PER_FILE * FILE_PATHS.length - 1);
076  }
077
078  @BeforeEach
079  public void setUp() throws IOException {
080    fileIOEngine = new FileIOEngine(TOTAL_CAPACITY, false, FILE_PATHS);
081  }
082
083  @AfterEach
084  public void cleanUp() {
085    fileIOEngine.shutdown();
086    for (String filePath : FILE_PATHS) {
087      File file = new File(filePath);
088      if (file.exists()) {
089        file.delete();
090      }
091    }
092  }
093
094  @Test
095  public void testFileIOEngine() throws IOException {
096    for (int i = 0; i < 500; i++) {
097      int len = (int) Math.floor(Math.random() * 100) + 1;
098      long offset = (long) Math.floor(Math.random() * TOTAL_CAPACITY % (TOTAL_CAPACITY - len));
099      if (i < boundaryStartPositions.size()) {
100        // make the boundary start positon
101        offset = boundaryStartPositions.get(i);
102      } else if ((i - boundaryStartPositions.size()) < boundaryStopPositions.size()) {
103        // make the boundary stop positon
104        offset = boundaryStopPositions.get(i - boundaryStartPositions.size()) - len + 1;
105      } else if (i % 2 == 0) {
106        // make the cross-files block writing/reading
107        offset = Math.max(1, i % FILE_PATHS.length) * SIZE_PER_FILE - len / 2;
108      }
109      byte[] data1 = new byte[len];
110      for (int j = 0; j < data1.length; ++j) {
111        data1[j] = (byte) (Math.random() * 255);
112      }
113      fileIOEngine.write(ByteBuffer.wrap(data1), offset);
114
115      BucketEntry be = createBucketEntry(offset, len);
116      fileIOEngine.read(be);
117      ByteBuff data2 = getByteBuff(be);
118      assertArrayEquals(data1, data2.array());
119    }
120  }
121
122  @Test
123  public void testFileIOEngineHandlesZeroLengthInput() throws IOException {
124    byte[] data1 = new byte[0];
125
126    fileIOEngine.write(ByteBuffer.wrap(data1), 0);
127    BucketEntry be = createBucketEntry(0, 0);
128    fileIOEngine.read(be);
129    ByteBuff data2 = getByteBuff(be);
130    assertArrayEquals(data1, data2.array());
131  }
132
133  @Test
134  public void testReadFailedShouldReleaseByteBuff() {
135    ByteBuffAllocator alloc = Mockito.mock(ByteBuffAllocator.class);
136    final RefCnt refCnt = RefCnt.create();
137    Mockito.when(alloc.allocate(Mockito.anyInt())).thenAnswer(new Answer<ByteBuff>() {
138      @Override
139      public ByteBuff answer(InvocationOnMock invocation) throws Throwable {
140        int len = invocation.getArgument(0);
141        return ByteBuff.wrap(new ByteBuffer[] { ByteBuffer.allocate(len + 1) }, refCnt);
142      }
143    });
144    int len = 10;
145    byte[] data1 = new byte[len];
146    assertEquals(1, refCnt.refCnt());
147    try {
148      fileIOEngine.write(ByteBuffer.wrap(data1), 0);
149      BucketEntry be = createBucketEntry(0, len, alloc);
150      fileIOEngine.read(be);
151      fail();
152    } catch (IOException ioe) {
153      // expected exception.
154    }
155    assertEquals(0, refCnt.refCnt());
156  }
157
158  @Test
159  public void testClosedChannelException() throws IOException {
160    fileIOEngine.closeFileChannels();
161    int len = 5;
162    long offset = 0L;
163    int val = (int) (Math.random() * 255);
164    for (int i = 0; i < 2; i++) {
165      ByteBuff src = TestByteBufferIOEngine.createByteBuffer(len, val, i % 2 == 0);
166      int pos = src.position(), lim = src.limit();
167      fileIOEngine.write(src, offset);
168      src.position(pos).limit(lim);
169
170      BucketEntry be = createBucketEntry(offset, len);
171      fileIOEngine.read(be);
172      ByteBuff dst = getByteBuff(be);
173
174      assertEquals(src.remaining(), len);
175      assertEquals(dst.remaining(), len);
176      assertEquals(0, ByteBuff.compareTo(src, pos, len, dst, dst.position(), dst.remaining()));
177    }
178  }
179
180  @Test
181  public void testRefreshFileConnection() throws IOException {
182    FileChannel[] fileChannels = fileIOEngine.getFileChannels();
183    FileChannel fileChannel = fileChannels[0];
184    assertNotNull(fileChannel);
185    fileChannel.close();
186    fileIOEngine.refreshFileConnection(0, new IOException("Test Exception"));
187    FileChannel[] reopenedFileChannels = fileIOEngine.getFileChannels();
188    FileChannel reopenedFileChannel = reopenedFileChannels[0];
189    assertNotEquals(fileChannel, reopenedFileChannel);
190    assertEquals(fileChannels.length, reopenedFileChannels.length);
191    for (int i = 1; i < fileChannels.length; i++) {
192      assertEquals(fileChannels[i], reopenedFileChannels[i]);
193    }
194  }
195}