001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.Assert.assertFalse; 021import static org.junit.Assert.assertNotSame; 022import static org.junit.Assert.assertSame; 023import static org.junit.Assert.assertTrue; 024import static org.mockito.Mockito.mock; 025import static org.mockito.Mockito.when; 026 027import java.io.IOException; 028import java.util.ArrayList; 029import java.util.List; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FSDataInputStream; 032import org.apache.hadoop.fs.FSDataOutputStream; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.HBaseClassTestRule; 036import org.apache.hadoop.hbase.HBaseTestingUtility; 037import org.apache.hadoop.hbase.HDFSBlocksDistribution; 038import org.apache.hadoop.hbase.io.FileLink; 039import org.apache.hadoop.hbase.testclassification.MediumTests; 040import org.apache.hadoop.hbase.testclassification.RegionServerTests; 041import org.apache.hadoop.hdfs.MiniDFSCluster; 042import org.apache.hadoop.hdfs.client.HdfsDataInputStream; 043import org.junit.After; 044import org.junit.Before; 045import org.junit.ClassRule; 046import org.junit.Test; 047import org.junit.experimental.categories.Category; 048 049@Category({ RegionServerTests.class, MediumTests.class }) 050public class TestInputStreamBlockDistribution { 051 052 @ClassRule 053 public static final HBaseClassTestRule CLASS_RULE = 054 HBaseClassTestRule.forClass(TestInputStreamBlockDistribution.class); 055 056 private Configuration conf; 057 private FileSystem fs; 058 private Path testPath; 059 060 @Before 061 public void setUp() throws Exception { 062 HBaseTestingUtility testUtil = new HBaseTestingUtility(); 063 conf = testUtil.getConfiguration(); 064 conf.setInt("dfs.blocksize", 1024 * 1024); 065 conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024); 066 067 testUtil.startMiniDFSCluster(1); 068 MiniDFSCluster cluster = testUtil.getDFSCluster(); 069 fs = cluster.getFileSystem(); 070 071 testPath = new Path(testUtil.getDefaultRootDirPath(), "test.file"); 072 073 writeSomeData(fs, testPath, 256 << 20, (byte) 2); 074 } 075 076 @After 077 public void tearDown() throws Exception { 078 fs.delete(testPath, false); 079 fs.close(); 080 } 081 082 @Test 083 public void itDerivesLocalityFromHFileInputStream() throws Exception { 084 try (FSDataInputStream stream = fs.open(testPath)) { 085 HDFSBlocksDistribution initial = new HDFSBlocksDistribution(); 086 InputStreamBlockDistribution test = 087 new InputStreamBlockDistribution(stream, getMockedStoreFileInfo(initial, false)); 088 089 assertSame(initial, test.getHDFSBlockDistribution()); 090 091 test.setLastCachedAt(test.getCachePeriodMs() + 1); 092 093 assertNotSame(initial, test.getHDFSBlockDistribution()); 094 } 095 096 } 097 098 @Test 099 public void itDerivesLocalityFromFileLinkInputStream() throws Exception { 100 List<Path> files = new ArrayList<Path>(); 101 files.add(testPath); 102 103 FileLink link = new FileLink(files); 104 try (FSDataInputStream stream = link.open(fs)) { 105 106 HDFSBlocksDistribution initial = new HDFSBlocksDistribution(); 107 108 InputStreamBlockDistribution test = 109 new InputStreamBlockDistribution(stream, getMockedStoreFileInfo(initial, true)); 110 111 assertSame(initial, test.getHDFSBlockDistribution()); 112 113 test.setLastCachedAt(test.getCachePeriodMs() + 1); 114 115 assertNotSame(initial, test.getHDFSBlockDistribution()); 116 } 117 } 118 119 @Test 120 public void itFallsBackOnLastKnownValueWhenUnsupported() { 121 FSDataInputStream fakeStream = mock(FSDataInputStream.class); 122 HDFSBlocksDistribution initial = new HDFSBlocksDistribution(); 123 124 InputStreamBlockDistribution test = 125 new InputStreamBlockDistribution(fakeStream, getMockedStoreFileInfo(initial, false)); 126 127 assertSame(initial, test.getHDFSBlockDistribution()); 128 test.setLastCachedAt(test.getCachePeriodMs() + 1); 129 130 // fakeStream is not an HdfsDataInputStream or FileLink, so will fail to resolve 131 assertSame(initial, test.getHDFSBlockDistribution()); 132 assertTrue(test.isStreamUnsupported()); 133 } 134 135 @Test 136 public void itFallsBackOnLastKnownValueOnException() throws IOException { 137 HdfsDataInputStream fakeStream = mock(HdfsDataInputStream.class); 138 when(fakeStream.getAllBlocks()).thenThrow(new IOException("test")); 139 140 HDFSBlocksDistribution initial = new HDFSBlocksDistribution(); 141 142 InputStreamBlockDistribution test = 143 new InputStreamBlockDistribution(fakeStream, getMockedStoreFileInfo(initial, false)); 144 145 assertSame(initial, test.getHDFSBlockDistribution()); 146 test.setLastCachedAt(test.getCachePeriodMs() + 1); 147 148 // fakeStream throws an exception, so falls back on original 149 assertSame(initial, test.getHDFSBlockDistribution()); 150 151 assertFalse(test.isStreamUnsupported()); 152 } 153 154 /** 155 * Write up to 'size' bytes with value 'v' into a new file called 'path'. 156 */ 157 private void writeSomeData(FileSystem fs, Path path, long size, byte v) throws IOException { 158 byte[] data = new byte[4096]; 159 for (int i = 0; i < data.length; i++) { 160 data[i] = v; 161 } 162 163 FSDataOutputStream stream = fs.create(path); 164 try { 165 long written = 0; 166 while (written < size) { 167 stream.write(data, 0, data.length); 168 written += data.length; 169 } 170 } finally { 171 stream.close(); 172 } 173 } 174 175 private StoreFileInfo getMockedStoreFileInfo(HDFSBlocksDistribution distribution, 176 boolean isFileLink) { 177 StoreFileInfo mock = mock(StoreFileInfo.class); 178 when(mock.getHDFSBlockDistribution()).thenReturn(distribution); 179 when(mock.getConf()).thenReturn(conf); 180 when(mock.isLink()).thenReturn(isFileLink); 181 return mock; 182 } 183}