001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.assertEquals; 021import static org.mockito.ArgumentMatchers.any; 022import static org.mockito.ArgumentMatchers.anyBoolean; 023import static org.mockito.Mockito.mock; 024import static org.mockito.Mockito.when; 025 026import java.io.IOException; 027import java.net.Inet6Address; 028import java.net.InetAddress; 029import java.net.UnknownHostException; 030import java.util.Map; 031import java.util.TreeMap; 032import java.util.concurrent.ExecutorService; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseConfiguration; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.HRegionLocation; 038import org.apache.hadoop.hbase.ServerName; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.client.Admin; 041import org.apache.hadoop.hbase.client.AsyncConnection; 042import org.apache.hadoop.hbase.client.BufferedMutator; 043import org.apache.hadoop.hbase.client.BufferedMutatorParams; 044import org.apache.hadoop.hbase.client.Connection; 045import org.apache.hadoop.hbase.client.ConnectionUtils; 046import org.apache.hadoop.hbase.client.RegionInfo; 047import org.apache.hadoop.hbase.client.RegionInfoBuilder; 048import org.apache.hadoop.hbase.client.RegionLocator; 049import org.apache.hadoop.hbase.client.Table; 050import org.apache.hadoop.hbase.client.TableBuilder; 051import org.apache.hadoop.hbase.security.User; 052import org.apache.hadoop.hbase.testclassification.SmallTests; 053import org.apache.hadoop.hbase.util.Bytes; 054import org.apache.hadoop.hbase.util.Pair; 055import org.apache.hadoop.mapreduce.JobContext; 056import org.junit.ClassRule; 057import org.junit.Test; 058import org.junit.experimental.categories.Category; 059import org.mockito.Mockito; 060import org.mockito.invocation.InvocationOnMock; 061import org.mockito.stubbing.Answer; 062 063@Category({ SmallTests.class }) 064public class TestTableInputFormatBase { 065 066 @ClassRule 067 public static final HBaseClassTestRule CLASS_RULE = 068 HBaseClassTestRule.forClass(TestTableInputFormatBase.class); 069 070 @Test 071 public void testReuseRegionSizeCalculator() throws IOException { 072 JobContext context = mock(JobContext.class); 073 Configuration conf = HBaseConfiguration.create(); 074 conf.set(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL, 075 ConnectionForMergeTesting.class.getName()); 076 conf.set(TableInputFormat.INPUT_TABLE, "testTable"); 077 conf.setBoolean(TableInputFormatBase.MAPREDUCE_INPUT_AUTOBALANCE, true); 078 when(context.getConfiguration()).thenReturn(conf); 079 080 TableInputFormat format = Mockito.spy(new TableInputFormatForMergeTesting()); 081 format.setConf(conf); 082 // initialize so that table is set, otherwise cloneOnFinish 083 // will be true and each getSplits call will re-initialize everything 084 format.initialize(context); 085 format.getSplits(context); 086 format.getSplits(context); 087 088 // re-initialize which will cause the next getSplits call to create a new RegionSizeCalculator 089 format.initialize(context); 090 format.getSplits(context); 091 format.getSplits(context); 092 093 // should only be 2 despite calling getSplits 4 times 094 Mockito.verify(format, Mockito.times(2)).createRegionSizeCalculator(Mockito.any(), 095 Mockito.any()); 096 } 097 098 @Test 099 public void testTableInputFormatBaseReverseDNSForIPv6() throws UnknownHostException { 100 String address = "ipv6.google.com"; 101 String localhost = null; 102 InetAddress addr = null; 103 TableInputFormat inputFormat = new TableInputFormat(); 104 try { 105 localhost = InetAddress.getByName(address).getCanonicalHostName(); 106 addr = Inet6Address.getByName(address); 107 } catch (UnknownHostException e) { 108 // google.com is down, we can probably forgive this test. 109 return; 110 } 111 System.out.println("Should retrun the hostname for this host " + localhost + " addr : " + addr); 112 String actualHostName = inputFormat.reverseDNS(addr); 113 assertEquals("Should retrun the hostname for this host. Expected : " + localhost + " Actual : " 114 + actualHostName, localhost, actualHostName); 115 } 116 117 @Test 118 public void testNonSuccessiveSplitsAreNotMerged() throws IOException { 119 JobContext context = mock(JobContext.class); 120 Configuration conf = HBaseConfiguration.create(); 121 conf.set(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL, 122 ConnectionForMergeTesting.class.getName()); 123 conf.set(TableInputFormat.INPUT_TABLE, "testTable"); 124 conf.setBoolean(TableInputFormatBase.MAPREDUCE_INPUT_AUTOBALANCE, true); 125 when(context.getConfiguration()).thenReturn(conf); 126 127 TableInputFormat tifExclude = new TableInputFormatForMergeTesting(); 128 tifExclude.setConf(conf); 129 // split["b", "c"] is excluded, split["o", "p"] and split["p", "q"] are merged, 130 // but split["a", "b"] and split["c", "d"] are not merged. 131 assertEquals(ConnectionForMergeTesting.START_KEYS.length - 1 - 1, 132 tifExclude.getSplits(context).size()); 133 } 134 135 /** 136 * Subclass of {@link TableInputFormat} to use in {@link #testNonSuccessiveSplitsAreNotMerged}. 137 * This class overrides {@link TableInputFormatBase#includeRegionInSplit} to exclude specific 138 * splits. 139 */ 140 private static class TableInputFormatForMergeTesting extends TableInputFormat { 141 private byte[] prefixStartKey = Bytes.toBytes("b"); 142 private byte[] prefixEndKey = Bytes.toBytes("c"); 143 private RegionSizeCalculator sizeCalculator; 144 145 /** 146 * Exclude regions which contain rows starting with "b". 147 */ 148 @Override 149 protected boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey) { 150 if ( 151 Bytes.compareTo(startKey, prefixEndKey) < 0 && (Bytes.compareTo(prefixStartKey, endKey) < 0 152 || Bytes.equals(endKey, HConstants.EMPTY_END_ROW)) 153 ) { 154 return false; 155 } else { 156 return true; 157 } 158 } 159 160 @Override 161 protected void initializeTable(Connection connection, TableName tableName) throws IOException { 162 super.initializeTable(connection, tableName); 163 ConnectionForMergeTesting cft = (ConnectionForMergeTesting) connection; 164 sizeCalculator = cft.getRegionSizeCalculator(); 165 } 166 167 @Override 168 protected RegionSizeCalculator createRegionSizeCalculator(RegionLocator locator, Admin admin) 169 throws IOException { 170 return sizeCalculator; 171 } 172 } 173 174 /** 175 * Connection class to use in {@link #testNonSuccessiveSplitsAreNotMerged}. This class returns 176 * mocked {@link Table}, {@link RegionLocator}, {@link RegionSizeCalculator}, and {@link Admin}. 177 */ 178 private static class ConnectionForMergeTesting implements Connection { 179 public static final byte[][] SPLITS = new byte[][] { Bytes.toBytes("a"), Bytes.toBytes("b"), 180 Bytes.toBytes("c"), Bytes.toBytes("d"), Bytes.toBytes("e"), Bytes.toBytes("f"), 181 Bytes.toBytes("g"), Bytes.toBytes("h"), Bytes.toBytes("i"), Bytes.toBytes("j"), 182 Bytes.toBytes("k"), Bytes.toBytes("l"), Bytes.toBytes("m"), Bytes.toBytes("n"), 183 Bytes.toBytes("o"), Bytes.toBytes("p"), Bytes.toBytes("q"), Bytes.toBytes("r"), 184 Bytes.toBytes("s"), Bytes.toBytes("t"), Bytes.toBytes("u"), Bytes.toBytes("v"), 185 Bytes.toBytes("w"), Bytes.toBytes("x"), Bytes.toBytes("y"), Bytes.toBytes("z") }; 186 187 public static final byte[][] START_KEYS; 188 public static final byte[][] END_KEYS; 189 static { 190 START_KEYS = new byte[SPLITS.length + 1][]; 191 START_KEYS[0] = HConstants.EMPTY_BYTE_ARRAY; 192 for (int i = 0; i < SPLITS.length; i++) { 193 START_KEYS[i + 1] = SPLITS[i]; 194 } 195 196 END_KEYS = new byte[SPLITS.length + 1][]; 197 for (int i = 0; i < SPLITS.length; i++) { 198 END_KEYS[i] = SPLITS[i]; 199 } 200 END_KEYS[SPLITS.length] = HConstants.EMPTY_BYTE_ARRAY; 201 } 202 203 public static final Map<byte[], Long> SIZE_MAP = new TreeMap<>(Bytes.BYTES_COMPARATOR); 204 static { 205 for (byte[] startKey : START_KEYS) { 206 SIZE_MAP.put(startKey, 1024L * 1024L * 1024L); 207 } 208 SIZE_MAP.put(Bytes.toBytes("a"), 200L * 1024L * 1024L); 209 SIZE_MAP.put(Bytes.toBytes("b"), 200L * 1024L * 1024L); 210 SIZE_MAP.put(Bytes.toBytes("c"), 200L * 1024L * 1024L); 211 SIZE_MAP.put(Bytes.toBytes("o"), 200L * 1024L * 1024L); 212 SIZE_MAP.put(Bytes.toBytes("p"), 200L * 1024L * 1024L); 213 } 214 215 ConnectionForMergeTesting(Configuration conf, ExecutorService pool, User user, 216 Map<String, byte[]> connectionAttributes) throws IOException { 217 } 218 219 @Override 220 public void abort(String why, Throwable e) { 221 } 222 223 @Override 224 public boolean isAborted() { 225 return false; 226 } 227 228 @Override 229 public Configuration getConfiguration() { 230 throw new UnsupportedOperationException(); 231 } 232 233 @Override 234 public Table getTable(TableName tableName) throws IOException { 235 Table table = mock(Table.class); 236 when(table.getName()).thenReturn(tableName); 237 return table; 238 } 239 240 @Override 241 public Table getTable(TableName tableName, ExecutorService pool) throws IOException { 242 throw new UnsupportedOperationException(); 243 } 244 245 @Override 246 public BufferedMutator getBufferedMutator(TableName tableName) throws IOException { 247 throw new UnsupportedOperationException(); 248 } 249 250 @Override 251 public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException { 252 throw new UnsupportedOperationException(); 253 } 254 255 @Override 256 public RegionLocator getRegionLocator(TableName tableName) throws IOException { 257 final Map<byte[], HRegionLocation> locationMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 258 for (byte[] startKey : START_KEYS) { 259 HRegionLocation hrl = 260 new HRegionLocation(RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).build(), 261 ServerName.valueOf("localhost", 0, 0)); 262 locationMap.put(startKey, hrl); 263 } 264 265 RegionLocator locator = mock(RegionLocator.class); 266 when(locator.getRegionLocation(any(byte[].class), anyBoolean())) 267 .thenAnswer(new Answer<HRegionLocation>() { 268 @Override 269 public HRegionLocation answer(InvocationOnMock invocationOnMock) throws Throwable { 270 Object[] args = invocationOnMock.getArguments(); 271 byte[] key = (byte[]) args[0]; 272 return locationMap.get(key); 273 } 274 }); 275 when(locator.getStartEndKeys()) 276 .thenReturn(new Pair<byte[][], byte[][]>(START_KEYS, END_KEYS)); 277 return locator; 278 } 279 280 public RegionSizeCalculator getRegionSizeCalculator() { 281 RegionSizeCalculator sizeCalculator = mock(RegionSizeCalculator.class); 282 when(sizeCalculator.getRegionSize(any(byte[].class))).thenAnswer(new Answer<Long>() { 283 @Override 284 public Long answer(InvocationOnMock invocationOnMock) throws Throwable { 285 Object[] args = invocationOnMock.getArguments(); 286 byte[] regionId = (byte[]) args[0]; 287 byte[] startKey = RegionInfo.getStartKey(regionId); 288 return SIZE_MAP.get(startKey); 289 } 290 }); 291 return sizeCalculator; 292 } 293 294 @Override 295 public Admin getAdmin() throws IOException { 296 Admin admin = mock(Admin.class); 297 // return non-null admin to pass null checks 298 return admin; 299 } 300 301 @Override 302 public void close() throws IOException { 303 } 304 305 @Override 306 public boolean isClosed() { 307 return false; 308 } 309 310 @Override 311 public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) { 312 throw new UnsupportedOperationException(); 313 } 314 315 @Override 316 public void clearRegionLocationCache() { 317 } 318 319 @Override 320 public AsyncConnection toAsyncConnection() { 321 throw new UnsupportedOperationException(); 322 } 323 324 @Override 325 public String getClusterId() { 326 return null; 327 } 328 } 329}