001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertFalse;
021import static org.junit.Assert.assertNotSame;
022import static org.junit.Assert.assertSame;
023import static org.junit.Assert.assertTrue;
024import static org.mockito.Mockito.mock;
025import static org.mockito.Mockito.when;
026
027import java.io.IOException;
028import java.util.ArrayList;
029import java.util.List;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FSDataInputStream;
032import org.apache.hadoop.fs.FSDataOutputStream;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.HBaseClassTestRule;
036import org.apache.hadoop.hbase.HBaseTestingUtility;
037import org.apache.hadoop.hbase.HDFSBlocksDistribution;
038import org.apache.hadoop.hbase.io.FileLink;
039import org.apache.hadoop.hbase.testclassification.MediumTests;
040import org.apache.hadoop.hbase.testclassification.RegionServerTests;
041import org.apache.hadoop.hdfs.MiniDFSCluster;
042import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
043import org.junit.After;
044import org.junit.Before;
045import org.junit.ClassRule;
046import org.junit.Test;
047import org.junit.experimental.categories.Category;
048
049@Category({ RegionServerTests.class, MediumTests.class })
050public class TestInputStreamBlockDistribution {
051
052  @ClassRule
053  public static final HBaseClassTestRule CLASS_RULE =
054    HBaseClassTestRule.forClass(TestInputStreamBlockDistribution.class);
055
056  private Configuration conf;
057  private FileSystem fs;
058  private Path testPath;
059
060  @Before
061  public void setUp() throws Exception {
062    HBaseTestingUtility testUtil = new HBaseTestingUtility();
063    conf = testUtil.getConfiguration();
064    conf.setInt("dfs.blocksize", 1024 * 1024);
065    conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024);
066
067    testUtil.startMiniDFSCluster(1);
068    MiniDFSCluster cluster = testUtil.getDFSCluster();
069    fs = cluster.getFileSystem();
070
071    testPath = new Path(testUtil.getDefaultRootDirPath(), "test.file");
072
073    writeSomeData(fs, testPath, 256 << 20, (byte) 2);
074  }
075
076  @After
077  public void tearDown() throws Exception {
078    fs.delete(testPath, false);
079    fs.close();
080  }
081
082  @Test
083  public void itDerivesLocalityFromHFileInputStream() throws Exception {
084    try (FSDataInputStream stream = fs.open(testPath)) {
085      HDFSBlocksDistribution initial = new HDFSBlocksDistribution();
086      InputStreamBlockDistribution test =
087        new InputStreamBlockDistribution(stream, getMockedStoreFileInfo(initial, false));
088
089      assertSame(initial, test.getHDFSBlockDistribution());
090
091      test.setLastCachedAt(test.getCachePeriodMs() + 1);
092
093      assertNotSame(initial, test.getHDFSBlockDistribution());
094    }
095
096  }
097
098  @Test
099  public void itDerivesLocalityFromFileLinkInputStream() throws Exception {
100    List<Path> files = new ArrayList<Path>();
101    files.add(testPath);
102
103    FileLink link = new FileLink(files);
104    try (FSDataInputStream stream = link.open(fs)) {
105
106      HDFSBlocksDistribution initial = new HDFSBlocksDistribution();
107
108      InputStreamBlockDistribution test =
109        new InputStreamBlockDistribution(stream, getMockedStoreFileInfo(initial, true));
110
111      assertSame(initial, test.getHDFSBlockDistribution());
112
113      test.setLastCachedAt(test.getCachePeriodMs() + 1);
114
115      assertNotSame(initial, test.getHDFSBlockDistribution());
116    }
117  }
118
119  @Test
120  public void itFallsBackOnLastKnownValueWhenUnsupported() {
121    FSDataInputStream fakeStream = mock(FSDataInputStream.class);
122    HDFSBlocksDistribution initial = new HDFSBlocksDistribution();
123
124    InputStreamBlockDistribution test =
125      new InputStreamBlockDistribution(fakeStream, getMockedStoreFileInfo(initial, false));
126
127    assertSame(initial, test.getHDFSBlockDistribution());
128    test.setLastCachedAt(test.getCachePeriodMs() + 1);
129
130    // fakeStream is not an HdfsDataInputStream or FileLink, so will fail to resolve
131    assertSame(initial, test.getHDFSBlockDistribution());
132    assertTrue(test.isStreamUnsupported());
133  }
134
135  @Test
136  public void itFallsBackOnLastKnownValueOnException() throws IOException {
137    HdfsDataInputStream fakeStream = mock(HdfsDataInputStream.class);
138    when(fakeStream.getAllBlocks()).thenThrow(new IOException("test"));
139
140    HDFSBlocksDistribution initial = new HDFSBlocksDistribution();
141
142    InputStreamBlockDistribution test =
143      new InputStreamBlockDistribution(fakeStream, getMockedStoreFileInfo(initial, false));
144
145    assertSame(initial, test.getHDFSBlockDistribution());
146    test.setLastCachedAt(test.getCachePeriodMs() + 1);
147
148    // fakeStream throws an exception, so falls back on original
149    assertSame(initial, test.getHDFSBlockDistribution());
150
151    assertFalse(test.isStreamUnsupported());
152  }
153
154  /**
155   * Write up to 'size' bytes with value 'v' into a new file called 'path'.
156   */
157  private void writeSomeData(FileSystem fs, Path path, long size, byte v) throws IOException {
158    byte[] data = new byte[4096];
159    for (int i = 0; i < data.length; i++) {
160      data[i] = v;
161    }
162
163    FSDataOutputStream stream = fs.create(path);
164    try {
165      long written = 0;
166      while (written < size) {
167        stream.write(data, 0, data.length);
168        written += data.length;
169      }
170    } finally {
171      stream.close();
172    }
173  }
174
175  private StoreFileInfo getMockedStoreFileInfo(HDFSBlocksDistribution distribution,
176    boolean isFileLink) {
177    StoreFileInfo mock = mock(StoreFileInfo.class);
178    when(mock.getHDFSBlockDistribution()).thenReturn(distribution);
179    when(mock.getConf()).thenReturn(conf);
180    when(mock.isLink()).thenReturn(isFileLink);
181    return mock;
182  }
183}