001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.jupiter.api.Assertions.assertFalse; 021import static org.junit.jupiter.api.Assertions.assertNotSame; 022import static org.junit.jupiter.api.Assertions.assertSame; 023import static org.junit.jupiter.api.Assertions.assertTrue; 024import static org.mockito.Mockito.mock; 025import static org.mockito.Mockito.when; 026 027import java.io.IOException; 028import java.util.ArrayList; 029import java.util.List; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FSDataInputStream; 032import org.apache.hadoop.fs.FSDataOutputStream; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.HBaseTestingUtil; 036import org.apache.hadoop.hbase.HDFSBlocksDistribution; 037import org.apache.hadoop.hbase.io.FileLink; 038import org.apache.hadoop.hbase.testclassification.MediumTests; 039import org.apache.hadoop.hbase.testclassification.RegionServerTests; 040import org.apache.hadoop.hdfs.MiniDFSCluster; 041import org.apache.hadoop.hdfs.client.HdfsDataInputStream; 042import org.junit.jupiter.api.AfterEach; 043import org.junit.jupiter.api.BeforeEach; 044import org.junit.jupiter.api.Tag; 045import org.junit.jupiter.api.Test; 046 047@Tag(RegionServerTests.TAG) 048@Tag(MediumTests.TAG) 049public class TestInputStreamBlockDistribution { 050 051 private Configuration conf; 052 private FileSystem fs; 053 private Path testPath; 054 055 @BeforeEach 056 public void setUp() throws Exception { 057 HBaseTestingUtil testUtil = new HBaseTestingUtil(); 058 conf = testUtil.getConfiguration(); 059 conf.setInt("dfs.blocksize", 1024 * 1024); 060 conf.setInt("dfs.client.read.prefetch.size", 2 * 1024 * 1024); 061 062 testUtil.startMiniDFSCluster(1); 063 MiniDFSCluster cluster = testUtil.getDFSCluster(); 064 fs = cluster.getFileSystem(); 065 066 testPath = new Path(testUtil.getDefaultRootDirPath(), "test.file"); 067 068 writeSomeData(fs, testPath, 256 << 20, (byte) 2); 069 } 070 071 @AfterEach 072 public void tearDown() throws Exception { 073 fs.delete(testPath, false); 074 fs.close(); 075 } 076 077 @Test 078 public void itDerivesLocalityFromHFileInputStream() throws Exception { 079 try (FSDataInputStream stream = fs.open(testPath)) { 080 HDFSBlocksDistribution initial = new HDFSBlocksDistribution(); 081 InputStreamBlockDistribution test = 082 new InputStreamBlockDistribution(stream, getMockedStoreFileInfo(initial, false)); 083 084 assertSame(initial, test.getHDFSBlockDistribution()); 085 086 test.setLastCachedAt(test.getCachePeriodMs() + 1); 087 088 assertNotSame(initial, test.getHDFSBlockDistribution()); 089 } 090 091 } 092 093 @Test 094 public void itDerivesLocalityFromFileLinkInputStream() throws Exception { 095 List<Path> files = new ArrayList<Path>(); 096 files.add(testPath); 097 098 FileLink link = new FileLink(files); 099 try (FSDataInputStream stream = link.open(fs)) { 100 101 HDFSBlocksDistribution initial = new HDFSBlocksDistribution(); 102 103 InputStreamBlockDistribution test = 104 new InputStreamBlockDistribution(stream, getMockedStoreFileInfo(initial, true)); 105 106 assertSame(initial, test.getHDFSBlockDistribution()); 107 108 test.setLastCachedAt(test.getCachePeriodMs() + 1); 109 110 assertNotSame(initial, test.getHDFSBlockDistribution()); 111 } 112 } 113 114 @Test 115 public void itFallsBackOnLastKnownValueWhenUnsupported() { 116 FSDataInputStream fakeStream = mock(FSDataInputStream.class); 117 HDFSBlocksDistribution initial = new HDFSBlocksDistribution(); 118 119 InputStreamBlockDistribution test = 120 new InputStreamBlockDistribution(fakeStream, getMockedStoreFileInfo(initial, false)); 121 122 assertSame(initial, test.getHDFSBlockDistribution()); 123 test.setLastCachedAt(test.getCachePeriodMs() + 1); 124 125 // fakeStream is not an HdfsDataInputStream or FileLink, so will fail to resolve 126 assertSame(initial, test.getHDFSBlockDistribution()); 127 assertTrue(test.isStreamUnsupported()); 128 } 129 130 @Test 131 public void itFallsBackOnLastKnownValueOnException() throws IOException { 132 HdfsDataInputStream fakeStream = mock(HdfsDataInputStream.class); 133 when(fakeStream.getAllBlocks()).thenThrow(new IOException("test")); 134 135 HDFSBlocksDistribution initial = new HDFSBlocksDistribution(); 136 137 InputStreamBlockDistribution test = 138 new InputStreamBlockDistribution(fakeStream, getMockedStoreFileInfo(initial, false)); 139 140 assertSame(initial, test.getHDFSBlockDistribution()); 141 test.setLastCachedAt(test.getCachePeriodMs() + 1); 142 143 // fakeStream throws an exception, so falls back on original 144 assertSame(initial, test.getHDFSBlockDistribution()); 145 146 assertFalse(test.isStreamUnsupported()); 147 } 148 149 /** 150 * Write up to 'size' bytes with value 'v' into a new file called 'path'. 151 */ 152 private void writeSomeData(FileSystem fs, Path path, long size, byte v) throws IOException { 153 byte[] data = new byte[4096]; 154 for (int i = 0; i < data.length; i++) { 155 data[i] = v; 156 } 157 158 FSDataOutputStream stream = fs.create(path); 159 try { 160 long written = 0; 161 while (written < size) { 162 stream.write(data, 0, data.length); 163 written += data.length; 164 } 165 } finally { 166 stream.close(); 167 } 168 } 169 170 private StoreFileInfo getMockedStoreFileInfo(HDFSBlocksDistribution distribution, 171 boolean isFileLink) { 172 StoreFileInfo mock = mock(StoreFileInfo.class); 173 when(mock.getHDFSBlockDistribution()).thenReturn(distribution); 174 when(mock.getConf()).thenReturn(conf); 175 when(mock.isLink()).thenReturn(isFileLink); 176 return mock; 177 } 178}