001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.mockito.ArgumentMatchers.any; 022import static org.mockito.ArgumentMatchers.anyBoolean; 023import static org.mockito.Mockito.mock; 024import static org.mockito.Mockito.when; 025 026import java.io.IOException; 027import java.net.Inet6Address; 028import java.net.InetAddress; 029import java.net.UnknownHostException; 030import java.util.Map; 031import java.util.TreeMap; 032import java.util.concurrent.ExecutorService; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.HBaseConfiguration; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.HRegionLocation; 037import org.apache.hadoop.hbase.ServerName; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.client.Admin; 040import org.apache.hadoop.hbase.client.AsyncConnection; 041import org.apache.hadoop.hbase.client.BufferedMutator; 042import org.apache.hadoop.hbase.client.BufferedMutatorParams; 043import org.apache.hadoop.hbase.client.Connection; 044import org.apache.hadoop.hbase.client.ConnectionRegistry; 045import org.apache.hadoop.hbase.client.ConnectionUtils; 046import org.apache.hadoop.hbase.client.RegionInfo; 047import org.apache.hadoop.hbase.client.RegionInfoBuilder; 048import org.apache.hadoop.hbase.client.RegionLocator; 049import org.apache.hadoop.hbase.client.Table; 050import org.apache.hadoop.hbase.client.TableBuilder; 051import org.apache.hadoop.hbase.security.User; 052import org.apache.hadoop.hbase.testclassification.SmallTests; 053import org.apache.hadoop.hbase.util.Bytes; 054import org.apache.hadoop.hbase.util.Pair; 055import org.apache.hadoop.mapreduce.JobContext; 056import org.junit.jupiter.api.Tag; 057import org.junit.jupiter.api.Test; 058import org.mockito.Mockito; 059import org.mockito.invocation.InvocationOnMock; 060import org.mockito.stubbing.Answer; 061 062@Tag(SmallTests.TAG) 063public class TestTableInputFormatBase { 064 065 @Test 066 public void testReuseRegionSizeCalculator() throws IOException { 067 JobContext context = mock(JobContext.class); 068 Configuration conf = HBaseConfiguration.create(); 069 conf.set(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL, 070 ConnectionForMergeTesting.class.getName()); 071 conf.set(TableInputFormat.INPUT_TABLE, "testTable"); 072 conf.setBoolean(TableInputFormatBase.MAPREDUCE_INPUT_AUTOBALANCE, true); 073 when(context.getConfiguration()).thenReturn(conf); 074 075 TableInputFormat format = Mockito.spy(new TableInputFormatForMergeTesting()); 076 format.setConf(conf); 077 // initialize so that table is set, otherwise cloneOnFinish 078 // will be true and each getSplits call will re-initialize everything 079 format.initialize(context); 080 format.getSplits(context); 081 format.getSplits(context); 082 083 // re-initialize which will cause the next getSplits call to create a new RegionSizeCalculator 084 format.initialize(context); 085 format.getSplits(context); 086 format.getSplits(context); 087 088 // should only be 2 despite calling getSplits 4 times 089 Mockito.verify(format, Mockito.times(2)).createRegionSizeCalculator(Mockito.any(), 090 Mockito.any()); 091 } 092 093 @Test 094 public void testTableInputFormatBaseReverseDNSForIPv6() throws UnknownHostException { 095 String address = "ipv6.google.com"; 096 String localhost = null; 097 InetAddress addr = null; 098 TableInputFormat inputFormat = new TableInputFormat(); 099 try { 100 localhost = InetAddress.getByName(address).getCanonicalHostName(); 101 addr = Inet6Address.getByName(address); 102 } catch (UnknownHostException e) { 103 // google.com is down, we can probably forgive this test. 104 return; 105 } 106 System.out.println("Should retrun the hostname for this host " + localhost + " addr : " + addr); 107 String actualHostName = inputFormat.reverseDNS(addr); 108 assertEquals("Should retrun the hostname for this host. Expected : " + localhost + " Actual : " 109 + actualHostName, localhost, actualHostName); 110 } 111 112 @Test 113 public void testNonSuccessiveSplitsAreNotMerged() throws IOException { 114 JobContext context = mock(JobContext.class); 115 Configuration conf = HBaseConfiguration.create(); 116 conf.set(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL, 117 ConnectionForMergeTesting.class.getName()); 118 conf.set(TableInputFormat.INPUT_TABLE, "testTable"); 119 conf.setBoolean(TableInputFormatBase.MAPREDUCE_INPUT_AUTOBALANCE, true); 120 when(context.getConfiguration()).thenReturn(conf); 121 122 TableInputFormat tifExclude = new TableInputFormatForMergeTesting(); 123 tifExclude.setConf(conf); 124 // split["b", "c"] is excluded, split["o", "p"] and split["p", "q"] are merged, 125 // but split["a", "b"] and split["c", "d"] are not merged. 126 assertEquals(ConnectionForMergeTesting.START_KEYS.length - 1 - 1, 127 tifExclude.getSplits(context).size()); 128 } 129 130 /** 131 * Subclass of {@link TableInputFormat} to use in {@link #testNonSuccessiveSplitsAreNotMerged}. 132 * This class overrides {@link TableInputFormatBase#includeRegionInSplit} to exclude specific 133 * splits. 134 */ 135 private static class TableInputFormatForMergeTesting extends TableInputFormat { 136 private byte[] prefixStartKey = Bytes.toBytes("b"); 137 private byte[] prefixEndKey = Bytes.toBytes("c"); 138 private RegionSizeCalculator sizeCalculator; 139 140 /** 141 * Exclude regions which contain rows starting with "b". 142 */ 143 @Override 144 protected boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey) { 145 if ( 146 Bytes.compareTo(startKey, prefixEndKey) < 0 && (Bytes.compareTo(prefixStartKey, endKey) < 0 147 || Bytes.equals(endKey, HConstants.EMPTY_END_ROW)) 148 ) { 149 return false; 150 } else { 151 return true; 152 } 153 } 154 155 @Override 156 protected void initializeTable(Connection connection, TableName tableName) throws IOException { 157 super.initializeTable(connection, tableName); 158 ConnectionForMergeTesting cft = (ConnectionForMergeTesting) connection; 159 sizeCalculator = cft.getRegionSizeCalculator(); 160 } 161 162 @Override 163 protected RegionSizeCalculator createRegionSizeCalculator(RegionLocator locator, Admin admin) 164 throws IOException { 165 return sizeCalculator; 166 } 167 } 168 169 /** 170 * Connection class to use in {@link #testNonSuccessiveSplitsAreNotMerged}. This class returns 171 * mocked {@link Table}, {@link RegionLocator}, {@link RegionSizeCalculator}, and {@link Admin}. 172 */ 173 private static class ConnectionForMergeTesting implements Connection { 174 public static final byte[][] SPLITS = new byte[][] { Bytes.toBytes("a"), Bytes.toBytes("b"), 175 Bytes.toBytes("c"), Bytes.toBytes("d"), Bytes.toBytes("e"), Bytes.toBytes("f"), 176 Bytes.toBytes("g"), Bytes.toBytes("h"), Bytes.toBytes("i"), Bytes.toBytes("j"), 177 Bytes.toBytes("k"), Bytes.toBytes("l"), Bytes.toBytes("m"), Bytes.toBytes("n"), 178 Bytes.toBytes("o"), Bytes.toBytes("p"), Bytes.toBytes("q"), Bytes.toBytes("r"), 179 Bytes.toBytes("s"), Bytes.toBytes("t"), Bytes.toBytes("u"), Bytes.toBytes("v"), 180 Bytes.toBytes("w"), Bytes.toBytes("x"), Bytes.toBytes("y"), Bytes.toBytes("z") }; 181 182 public static final byte[][] START_KEYS; 183 public static final byte[][] END_KEYS; 184 static { 185 START_KEYS = new byte[SPLITS.length + 1][]; 186 START_KEYS[0] = HConstants.EMPTY_BYTE_ARRAY; 187 for (int i = 0; i < SPLITS.length; i++) { 188 START_KEYS[i + 1] = SPLITS[i]; 189 } 190 191 END_KEYS = new byte[SPLITS.length + 1][]; 192 for (int i = 0; i < SPLITS.length; i++) { 193 END_KEYS[i] = SPLITS[i]; 194 } 195 END_KEYS[SPLITS.length] = HConstants.EMPTY_BYTE_ARRAY; 196 } 197 198 public static final Map<byte[], Long> SIZE_MAP = new TreeMap<>(Bytes.BYTES_COMPARATOR); 199 static { 200 for (byte[] startKey : START_KEYS) { 201 SIZE_MAP.put(startKey, 1024L * 1024L * 1024L); 202 } 203 SIZE_MAP.put(Bytes.toBytes("a"), 200L * 1024L * 1024L); 204 SIZE_MAP.put(Bytes.toBytes("b"), 200L * 1024L * 1024L); 205 SIZE_MAP.put(Bytes.toBytes("c"), 200L * 1024L * 1024L); 206 SIZE_MAP.put(Bytes.toBytes("o"), 200L * 1024L * 1024L); 207 SIZE_MAP.put(Bytes.toBytes("p"), 200L * 1024L * 1024L); 208 } 209 210 ConnectionForMergeTesting(Configuration conf, ExecutorService pool, User user, 211 ConnectionRegistry registry, Map<String, byte[]> connectionAttributes) throws IOException { 212 } 213 214 @Override 215 public void abort(String why, Throwable e) { 216 } 217 218 @Override 219 public boolean isAborted() { 220 return false; 221 } 222 223 @Override 224 public Configuration getConfiguration() { 225 throw new UnsupportedOperationException(); 226 } 227 228 @Override 229 public Table getTable(TableName tableName) throws IOException { 230 Table table = mock(Table.class); 231 when(table.getName()).thenReturn(tableName); 232 return table; 233 } 234 235 @Override 236 public Table getTable(TableName tableName, ExecutorService pool) throws IOException { 237 throw new UnsupportedOperationException(); 238 } 239 240 @Override 241 public BufferedMutator getBufferedMutator(TableName tableName) throws IOException { 242 throw new UnsupportedOperationException(); 243 } 244 245 @Override 246 public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException { 247 throw new UnsupportedOperationException(); 248 } 249 250 @Override 251 public RegionLocator getRegionLocator(TableName tableName) throws IOException { 252 final Map<byte[], HRegionLocation> locationMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 253 for (byte[] startKey : START_KEYS) { 254 HRegionLocation hrl = 255 new HRegionLocation(RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).build(), 256 ServerName.valueOf("localhost", 0, 0)); 257 locationMap.put(startKey, hrl); 258 } 259 260 RegionLocator locator = mock(RegionLocator.class); 261 when(locator.getRegionLocation(any(byte[].class), anyBoolean())) 262 .thenAnswer(new Answer<HRegionLocation>() { 263 @Override 264 public HRegionLocation answer(InvocationOnMock invocationOnMock) throws Throwable { 265 Object[] args = invocationOnMock.getArguments(); 266 byte[] key = (byte[]) args[0]; 267 return locationMap.get(key); 268 } 269 }); 270 when(locator.getStartEndKeys()) 271 .thenReturn(new Pair<byte[][], byte[][]>(START_KEYS, END_KEYS)); 272 return locator; 273 } 274 275 public RegionSizeCalculator getRegionSizeCalculator() { 276 RegionSizeCalculator sizeCalculator = mock(RegionSizeCalculator.class); 277 when(sizeCalculator.getRegionSize(any(byte[].class))).thenAnswer(new Answer<Long>() { 278 @Override 279 public Long answer(InvocationOnMock invocationOnMock) throws Throwable { 280 Object[] args = invocationOnMock.getArguments(); 281 byte[] regionId = (byte[]) args[0]; 282 byte[] startKey = RegionInfo.getStartKey(regionId); 283 return SIZE_MAP.get(startKey); 284 } 285 }); 286 return sizeCalculator; 287 } 288 289 @Override 290 public Admin getAdmin() throws IOException { 291 Admin admin = mock(Admin.class); 292 // return non-null admin to pass null checks 293 return admin; 294 } 295 296 @Override 297 public void close() throws IOException { 298 } 299 300 @Override 301 public boolean isClosed() { 302 return false; 303 } 304 305 @Override 306 public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) { 307 throw new UnsupportedOperationException(); 308 } 309 310 @Override 311 public void clearRegionLocationCache() { 312 } 313 314 @Override 315 public AsyncConnection toAsyncConnection() { 316 throw new UnsupportedOperationException(); 317 } 318 319 @Override 320 public String getClusterId() { 321 return null; 322 } 323 } 324}