001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.io.hfile.bucket;
019
020import static org.apache.hadoop.hbase.io.hfile.bucket.TestByteBufferIOEngine.createBucketEntry;
021import static org.apache.hadoop.hbase.io.hfile.bucket.TestByteBufferIOEngine.getByteBuff;
022import static org.junit.Assert.assertArrayEquals;
023import static org.junit.Assert.assertEquals;
024import static org.junit.Assert.assertNotEquals;
025import static org.junit.Assert.assertNotNull;
026import static org.junit.Assert.fail;
027
028import java.io.File;
029import java.io.IOException;
030import java.nio.ByteBuffer;
031import java.nio.channels.FileChannel;
032import java.util.ArrayList;
033import java.util.List;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.io.ByteBuffAllocator;
036import org.apache.hadoop.hbase.nio.ByteBuff;
037import org.apache.hadoop.hbase.nio.RefCnt;
038import org.apache.hadoop.hbase.testclassification.IOTests;
039import org.apache.hadoop.hbase.testclassification.SmallTests;
040import org.junit.After;
041import org.junit.Assert;
042import org.junit.Before;
043import org.junit.ClassRule;
044import org.junit.Test;
045import org.junit.experimental.categories.Category;
046import org.mockito.Mockito;
047import org.mockito.invocation.InvocationOnMock;
048import org.mockito.stubbing.Answer;
049
050/**
051 * Basic test for {@link FileIOEngine}
052 */
053@Category({ IOTests.class, SmallTests.class })
054public class TestFileIOEngine {
055
056  @ClassRule
057  public static final HBaseClassTestRule CLASS_RULE =
058    HBaseClassTestRule.forClass(TestFileIOEngine.class);
059
060  private static final long TOTAL_CAPACITY = 6 * 1024 * 1024; // 6 MB
061  private static final String[] FILE_PATHS =
062    { "testFileIOEngine1", "testFileIOEngine2", "testFileIOEngine3" };
063  private static final long SIZE_PER_FILE = TOTAL_CAPACITY / FILE_PATHS.length; // 2 MB per File
064  private final static List<Long> boundaryStartPositions = new ArrayList<Long>();
065  private final static List<Long> boundaryStopPositions = new ArrayList<Long>();
066
067  private FileIOEngine fileIOEngine;
068
069  static {
070    boundaryStartPositions.add(0L);
071    for (int i = 1; i < FILE_PATHS.length; i++) {
072      boundaryStartPositions.add(SIZE_PER_FILE * i - 1);
073      boundaryStartPositions.add(SIZE_PER_FILE * i);
074      boundaryStartPositions.add(SIZE_PER_FILE * i + 1);
075    }
076    for (int i = 1; i < FILE_PATHS.length; i++) {
077      boundaryStopPositions.add(SIZE_PER_FILE * i - 1);
078      boundaryStopPositions.add(SIZE_PER_FILE * i);
079      boundaryStopPositions.add(SIZE_PER_FILE * i + 1);
080    }
081    boundaryStopPositions.add(SIZE_PER_FILE * FILE_PATHS.length - 1);
082  }
083
084  @Before
085  public void setUp() throws IOException {
086    fileIOEngine = new FileIOEngine(TOTAL_CAPACITY, false, FILE_PATHS);
087  }
088
089  @After
090  public void cleanUp() {
091    fileIOEngine.shutdown();
092    for (String filePath : FILE_PATHS) {
093      File file = new File(filePath);
094      if (file.exists()) {
095        file.delete();
096      }
097    }
098  }
099
100  @Test
101  public void testFileIOEngine() throws IOException {
102    for (int i = 0; i < 500; i++) {
103      int len = (int) Math.floor(Math.random() * 100) + 1;
104      long offset = (long) Math.floor(Math.random() * TOTAL_CAPACITY % (TOTAL_CAPACITY - len));
105      if (i < boundaryStartPositions.size()) {
106        // make the boundary start positon
107        offset = boundaryStartPositions.get(i);
108      } else if ((i - boundaryStartPositions.size()) < boundaryStopPositions.size()) {
109        // make the boundary stop positon
110        offset = boundaryStopPositions.get(i - boundaryStartPositions.size()) - len + 1;
111      } else if (i % 2 == 0) {
112        // make the cross-files block writing/reading
113        offset = Math.max(1, i % FILE_PATHS.length) * SIZE_PER_FILE - len / 2;
114      }
115      byte[] data1 = new byte[len];
116      for (int j = 0; j < data1.length; ++j) {
117        data1[j] = (byte) (Math.random() * 255);
118      }
119      fileIOEngine.write(ByteBuffer.wrap(data1), offset);
120
121      BucketEntry be = createBucketEntry(offset, len);
122      fileIOEngine.read(be);
123      ByteBuff data2 = getByteBuff(be);
124      assertArrayEquals(data1, data2.array());
125    }
126  }
127
128  @Test
129  public void testFileIOEngineHandlesZeroLengthInput() throws IOException {
130    byte[] data1 = new byte[0];
131
132    fileIOEngine.write(ByteBuffer.wrap(data1), 0);
133    BucketEntry be = createBucketEntry(0, 0);
134    fileIOEngine.read(be);
135    ByteBuff data2 = getByteBuff(be);
136    assertArrayEquals(data1, data2.array());
137  }
138
139  @Test
140  public void testReadFailedShouldReleaseByteBuff() {
141    ByteBuffAllocator alloc = Mockito.mock(ByteBuffAllocator.class);
142    final RefCnt refCnt = RefCnt.create();
143    Mockito.when(alloc.allocate(Mockito.anyInt())).thenAnswer(new Answer<ByteBuff>() {
144      @Override
145      public ByteBuff answer(InvocationOnMock invocation) throws Throwable {
146        int len = invocation.getArgument(0);
147        return ByteBuff.wrap(new ByteBuffer[] { ByteBuffer.allocate(len + 1) }, refCnt);
148      }
149    });
150    int len = 10;
151    byte[] data1 = new byte[len];
152    assertEquals(1, refCnt.refCnt());
153    try {
154      fileIOEngine.write(ByteBuffer.wrap(data1), 0);
155      BucketEntry be = createBucketEntry(0, len, alloc);
156      fileIOEngine.read(be);
157      fail();
158    } catch (IOException ioe) {
159      // expected exception.
160    }
161    assertEquals(0, refCnt.refCnt());
162  }
163
164  @Test
165  public void testClosedChannelException() throws IOException {
166    fileIOEngine.closeFileChannels();
167    int len = 5;
168    long offset = 0L;
169    int val = (int) (Math.random() * 255);
170    for (int i = 0; i < 2; i++) {
171      ByteBuff src = TestByteBufferIOEngine.createByteBuffer(len, val, i % 2 == 0);
172      int pos = src.position(), lim = src.limit();
173      fileIOEngine.write(src, offset);
174      src.position(pos).limit(lim);
175
176      BucketEntry be = createBucketEntry(offset, len);
177      fileIOEngine.read(be);
178      ByteBuff dst = getByteBuff(be);
179
180      Assert.assertEquals(src.remaining(), len);
181      Assert.assertEquals(dst.remaining(), len);
182      Assert.assertEquals(0,
183        ByteBuff.compareTo(src, pos, len, dst, dst.position(), dst.remaining()));
184    }
185  }
186
187  @Test
188  public void testRefreshFileConnection() throws IOException {
189    FileChannel[] fileChannels = fileIOEngine.getFileChannels();
190    FileChannel fileChannel = fileChannels[0];
191    assertNotNull(fileChannel);
192    fileChannel.close();
193    fileIOEngine.refreshFileConnection(0, new IOException("Test Exception"));
194    FileChannel[] reopenedFileChannels = fileIOEngine.getFileChannels();
195    FileChannel reopenedFileChannel = reopenedFileChannels[0];
196    assertNotEquals(fileChannel, reopenedFileChannel);
197    assertEquals(fileChannels.length, reopenedFileChannels.length);
198    for (int i = 1; i < fileChannels.length; i++) {
199      assertEquals(fileChannels[i], reopenedFileChannels[i]);
200    }
201  }
202}