001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.jupiter.api.Assertions.assertFalse;
021import static org.junit.jupiter.api.Assertions.assertNotSame;
022import static org.junit.jupiter.api.Assertions.assertSame;
023import static org.junit.jupiter.api.Assertions.assertTrue;
024import static org.mockito.Mockito.mock;
025import static org.mockito.Mockito.when;
026
027import java.io.IOException;
028import java.util.ArrayList;
029import java.util.List;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FSDataInputStream;
032import org.apache.hadoop.fs.FSDataOutputStream;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.HDFSBlocksDistribution;
037import org.apache.hadoop.hbase.io.FileLink;
038import org.apache.hadoop.hbase.testclassification.MediumTests;
039import org.apache.hadoop.hbase.testclassification.RegionServerTests;
040import org.apache.hadoop.hdfs.MiniDFSCluster;
041import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
042import org.junit.jupiter.api.AfterEach;
043import org.junit.jupiter.api.BeforeEach;
044import org.junit.jupiter.api.Tag;
045import org.junit.jupiter.api.Test;
046
047@Tag(RegionServerTests.TAG)
048@Tag(MediumTests.TAG)
049public class TestInputStreamBlockDistribution {
050
051  private Configuration conf;
052  private FileSystem fs;
053  private Path testPath;
054
055  @BeforeEach
056  public void setUp() throws Exception {
057    HBaseTestingUtil testUtil = new HBaseTestingUtil();
058    conf = testUtil.getConfiguration();
059    conf.setInt("dfs.blocksize", 1024 * 1024);
060    conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024);
061
062    testUtil.startMiniDFSCluster(1);
063    MiniDFSCluster cluster = testUtil.getDFSCluster();
064    fs = cluster.getFileSystem();
065
066    testPath = new Path(testUtil.getDefaultRootDirPath(), "test.file");
067
068    writeSomeData(fs, testPath, 256 << 20, (byte) 2);
069  }
070
071  @AfterEach
072  public void tearDown() throws Exception {
073    fs.delete(testPath, false);
074    fs.close();
075  }
076
077  @Test
078  public void itDerivesLocalityFromHFileInputStream() throws Exception {
079    try (FSDataInputStream stream = fs.open(testPath)) {
080      HDFSBlocksDistribution initial = new HDFSBlocksDistribution();
081      InputStreamBlockDistribution test =
082        new InputStreamBlockDistribution(stream, getMockedStoreFileInfo(initial, false));
083
084      assertSame(initial, test.getHDFSBlockDistribution());
085
086      test.setLastCachedAt(test.getCachePeriodMs() + 1);
087
088      assertNotSame(initial, test.getHDFSBlockDistribution());
089    }
090
091  }
092
093  @Test
094  public void itDerivesLocalityFromFileLinkInputStream() throws Exception {
095    List<Path> files = new ArrayList<Path>();
096    files.add(testPath);
097
098    FileLink link = new FileLink(files);
099    try (FSDataInputStream stream = link.open(fs)) {
100
101      HDFSBlocksDistribution initial = new HDFSBlocksDistribution();
102
103      InputStreamBlockDistribution test =
104        new InputStreamBlockDistribution(stream, getMockedStoreFileInfo(initial, true));
105
106      assertSame(initial, test.getHDFSBlockDistribution());
107
108      test.setLastCachedAt(test.getCachePeriodMs() + 1);
109
110      assertNotSame(initial, test.getHDFSBlockDistribution());
111    }
112  }
113
114  @Test
115  public void itFallsBackOnLastKnownValueWhenUnsupported() {
116    FSDataInputStream fakeStream = mock(FSDataInputStream.class);
117    HDFSBlocksDistribution initial = new HDFSBlocksDistribution();
118
119    InputStreamBlockDistribution test =
120      new InputStreamBlockDistribution(fakeStream, getMockedStoreFileInfo(initial, false));
121
122    assertSame(initial, test.getHDFSBlockDistribution());
123    test.setLastCachedAt(test.getCachePeriodMs() + 1);
124
125    // fakeStream is not an HdfsDataInputStream or FileLink, so will fail to resolve
126    assertSame(initial, test.getHDFSBlockDistribution());
127    assertTrue(test.isStreamUnsupported());
128  }
129
130  @Test
131  public void itFallsBackOnLastKnownValueOnException() throws IOException {
132    HdfsDataInputStream fakeStream = mock(HdfsDataInputStream.class);
133    when(fakeStream.getAllBlocks()).thenThrow(new IOException("test"));
134
135    HDFSBlocksDistribution initial = new HDFSBlocksDistribution();
136
137    InputStreamBlockDistribution test =
138      new InputStreamBlockDistribution(fakeStream, getMockedStoreFileInfo(initial, false));
139
140    assertSame(initial, test.getHDFSBlockDistribution());
141    test.setLastCachedAt(test.getCachePeriodMs() + 1);
142
143    // fakeStream throws an exception, so falls back on original
144    assertSame(initial, test.getHDFSBlockDistribution());
145
146    assertFalse(test.isStreamUnsupported());
147  }
148
149  /**
150   * Write up to 'size' bytes with value 'v' into a new file called 'path'.
151   */
152  private void writeSomeData(FileSystem fs, Path path, long size, byte v) throws IOException {
153    byte[] data = new byte[4096];
154    for (int i = 0; i < data.length; i++) {
155      data[i] = v;
156    }
157
158    FSDataOutputStream stream = fs.create(path);
159    try {
160      long written = 0;
161      while (written < size) {
162        stream.write(data, 0, data.length);
163        written += data.length;
164      }
165    } finally {
166      stream.close();
167    }
168  }
169
170  private StoreFileInfo getMockedStoreFileInfo(HDFSBlocksDistribution distribution,
171    boolean isFileLink) {
172    StoreFileInfo mock = mock(StoreFileInfo.class);
173    when(mock.getHDFSBlockDistribution()).thenReturn(distribution);
174    when(mock.getConf()).thenReturn(conf);
175    when(mock.isLink()).thenReturn(isFileLink);
176    return mock;
177  }
178}