001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.*; 021import static org.mockito.Mockito.any; 022import static org.mockito.Mockito.anyBoolean; 023import static org.mockito.Mockito.mock; 024import static org.mockito.Mockito.when; 025 026import java.io.IOException; 027import java.net.Inet6Address; 028import java.net.InetAddress; 029import java.net.UnknownHostException; 030import java.util.Map; 031import java.util.TreeMap; 032import java.util.concurrent.ExecutorService; 033import org.apache.hadoop.conf.Configuration; 034import org.apache.hadoop.hbase.HBaseClassTestRule; 035import org.apache.hadoop.hbase.HBaseConfiguration; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.HRegionLocation; 038import org.apache.hadoop.hbase.ServerName; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.client.Admin; 041import org.apache.hadoop.hbase.client.BufferedMutator; 042import org.apache.hadoop.hbase.client.BufferedMutatorParams; 043import org.apache.hadoop.hbase.client.ClusterConnection; 044import org.apache.hadoop.hbase.client.Connection; 045import org.apache.hadoop.hbase.client.RegionInfo; 046import org.apache.hadoop.hbase.client.RegionInfoBuilder; 047import org.apache.hadoop.hbase.client.RegionLocator; 048import org.apache.hadoop.hbase.client.Table; 049import org.apache.hadoop.hbase.client.TableBuilder; 050import org.apache.hadoop.hbase.security.User; 051import org.apache.hadoop.hbase.testclassification.SmallTests; 052import org.apache.hadoop.hbase.util.Bytes; 053import org.apache.hadoop.hbase.util.Pair; 054import org.apache.hadoop.mapreduce.JobContext; 055import org.junit.ClassRule; 056import org.junit.Test; 057import org.junit.experimental.categories.Category; 058import org.mockito.invocation.InvocationOnMock; 059import org.mockito.stubbing.Answer; 060 061@Category({SmallTests.class}) 062public class TestTableInputFormatBase { 063 064 @ClassRule 065 public static final HBaseClassTestRule CLASS_RULE = 066 HBaseClassTestRule.forClass(TestTableInputFormatBase.class); 067 068 @Test 069 public void testTableInputFormatBaseReverseDNSForIPv6() 070 throws UnknownHostException { 071 String address = "ipv6.google.com"; 072 String localhost = null; 073 InetAddress addr = null; 074 TableInputFormat inputFormat = new TableInputFormat(); 075 try { 076 localhost = InetAddress.getByName(address).getCanonicalHostName(); 077 addr = Inet6Address.getByName(address); 078 } catch (UnknownHostException e) { 079 // google.com is down, we can probably forgive this test. 080 return; 081 } 082 System.out.println("Should retrun the hostname for this host " + 083 localhost + " addr : " + addr); 084 String actualHostName = inputFormat.reverseDNS(addr); 085 assertEquals("Should retrun the hostname for this host. Expected : " + 086 localhost + " Actual : " + actualHostName, localhost, actualHostName); 087 } 088 089 @Test 090 public void testNonSuccessiveSplitsAreNotMerged() throws IOException { 091 JobContext context = mock(JobContext.class); 092 Configuration conf = HBaseConfiguration.create(); 093 conf.set(ClusterConnection.HBASE_CLIENT_CONNECTION_IMPL, 094 ConnectionForMergeTesting.class.getName()); 095 conf.set(TableInputFormat.INPUT_TABLE, "testTable"); 096 conf.setBoolean(TableInputFormatBase.MAPREDUCE_INPUT_AUTOBALANCE, true); 097 when(context.getConfiguration()).thenReturn(conf); 098 099 TableInputFormat tifExclude = new TableInputFormatForMergeTesting(); 100 tifExclude.setConf(conf); 101 // split["b", "c"] is excluded, split["o", "p"] and split["p", "q"] are merged, 102 // but split["a", "b"] and split["c", "d"] are not merged. 103 assertEquals(ConnectionForMergeTesting.START_KEYS.length - 1 - 1, 104 tifExclude.getSplits(context).size()); 105 } 106 107 /** 108 * Subclass of {@link TableInputFormat} to use in {@link #testNonSuccessiveSplitsAreNotMerged}. 109 * This class overrides {@link TableInputFormatBase#includeRegionInSplit} 110 * to exclude specific splits. 111 */ 112 private static class TableInputFormatForMergeTesting extends TableInputFormat { 113 private byte[] prefixStartKey = Bytes.toBytes("b"); 114 private byte[] prefixEndKey = Bytes.toBytes("c"); 115 private RegionSizeCalculator sizeCalculator; 116 117 /** 118 * Exclude regions which contain rows starting with "b". 119 */ 120 @Override 121 protected boolean includeRegionInSplit(final byte[] startKey, final byte [] endKey) { 122 if (Bytes.compareTo(startKey, prefixEndKey) < 0 123 && (Bytes.compareTo(prefixStartKey, endKey) < 0 124 || Bytes.equals(endKey, HConstants.EMPTY_END_ROW))) { 125 return false; 126 } else { 127 return true; 128 } 129 } 130 131 @Override 132 protected void initializeTable(Connection connection, TableName tableName) throws IOException { 133 super.initializeTable(connection, tableName); 134 ConnectionForMergeTesting cft = (ConnectionForMergeTesting) connection; 135 sizeCalculator = cft.getRegionSizeCalculator(); 136 } 137 138 @Override 139 protected RegionSizeCalculator createRegionSizeCalculator(RegionLocator locator, Admin admin) 140 throws IOException { 141 return sizeCalculator; 142 } 143 } 144 145 /** 146 * Connection class to use in {@link #testNonSuccessiveSplitsAreNotMerged}. 147 * This class returns mocked {@link Table}, {@link RegionLocator}, {@link RegionSizeCalculator}, 148 * and {@link Admin}. 149 */ 150 private static class ConnectionForMergeTesting implements Connection { 151 public static final byte[][] SPLITS = new byte[][] { 152 Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c"), Bytes.toBytes("d"), 153 Bytes.toBytes("e"), Bytes.toBytes("f"), Bytes.toBytes("g"), Bytes.toBytes("h"), 154 Bytes.toBytes("i"), Bytes.toBytes("j"), Bytes.toBytes("k"), Bytes.toBytes("l"), 155 Bytes.toBytes("m"), Bytes.toBytes("n"), Bytes.toBytes("o"), Bytes.toBytes("p"), 156 Bytes.toBytes("q"), Bytes.toBytes("r"), Bytes.toBytes("s"), Bytes.toBytes("t"), 157 Bytes.toBytes("u"), Bytes.toBytes("v"), Bytes.toBytes("w"), Bytes.toBytes("x"), 158 Bytes.toBytes("y"), Bytes.toBytes("z") 159 }; 160 161 public static final byte[][] START_KEYS; 162 public static final byte[][] END_KEYS; 163 static { 164 START_KEYS = new byte[SPLITS.length + 1][]; 165 START_KEYS[0] = HConstants.EMPTY_BYTE_ARRAY; 166 for (int i = 0; i < SPLITS.length; i++) { 167 START_KEYS[i + 1] = SPLITS[i]; 168 } 169 170 END_KEYS = new byte[SPLITS.length + 1][]; 171 for (int i = 0; i < SPLITS.length; i++) { 172 END_KEYS[i] = SPLITS[i]; 173 } 174 END_KEYS[SPLITS.length] = HConstants.EMPTY_BYTE_ARRAY; 175 } 176 177 public static final Map<byte[], Long> SIZE_MAP = new TreeMap<>(Bytes.BYTES_COMPARATOR); 178 static { 179 for (byte[] startKey : START_KEYS) { 180 SIZE_MAP.put(startKey, 1024L * 1024L * 1024L); 181 } 182 SIZE_MAP.put(Bytes.toBytes("a"), 200L * 1024L * 1024L); 183 SIZE_MAP.put(Bytes.toBytes("b"), 200L * 1024L * 1024L); 184 SIZE_MAP.put(Bytes.toBytes("c"), 200L * 1024L * 1024L); 185 SIZE_MAP.put(Bytes.toBytes("o"), 200L * 1024L * 1024L); 186 SIZE_MAP.put(Bytes.toBytes("p"), 200L * 1024L * 1024L); 187 } 188 189 ConnectionForMergeTesting(Configuration conf, ExecutorService pool, User user) 190 throws IOException { 191 } 192 193 @Override 194 public void abort(String why, Throwable e) { 195 } 196 197 @Override 198 public boolean isAborted() { 199 return false; 200 } 201 202 @Override 203 public Configuration getConfiguration() { 204 throw new UnsupportedOperationException(); 205 } 206 207 @Override 208 public Table getTable(TableName tableName) throws IOException { 209 Table table = mock(Table.class); 210 when(table.getName()).thenReturn(tableName); 211 return table; 212 } 213 214 @Override 215 public Table getTable(TableName tableName, ExecutorService pool) throws IOException { 216 throw new UnsupportedOperationException(); 217 } 218 219 @Override 220 public BufferedMutator getBufferedMutator(TableName tableName) throws IOException { 221 throw new UnsupportedOperationException(); 222 } 223 224 @Override 225 public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException { 226 throw new UnsupportedOperationException(); 227 } 228 229 @Override 230 public RegionLocator getRegionLocator(TableName tableName) throws IOException { 231 final Map<byte[], HRegionLocation> locationMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); 232 for (byte[] startKey : START_KEYS) { 233 HRegionLocation hrl = new HRegionLocation( 234 RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).build(), 235 ServerName.valueOf("localhost", 0, 0)); 236 locationMap.put(startKey, hrl); 237 } 238 239 RegionLocator locator = mock(RegionLocator.class); 240 when(locator.getRegionLocation(any(byte [].class), anyBoolean())). 241 thenAnswer(new Answer<HRegionLocation>() { 242 @Override 243 public HRegionLocation answer(InvocationOnMock invocationOnMock) throws Throwable { 244 Object [] args = invocationOnMock.getArguments(); 245 byte [] key = (byte [])args[0]; 246 return locationMap.get(key); 247 } 248 }); 249 when(locator.getStartEndKeys()). 250 thenReturn(new Pair<byte[][], byte[][]>(START_KEYS, END_KEYS)); 251 return locator; 252 } 253 254 public RegionSizeCalculator getRegionSizeCalculator() { 255 RegionSizeCalculator sizeCalculator = mock(RegionSizeCalculator.class); 256 when(sizeCalculator.getRegionSize(any(byte[].class))). 257 thenAnswer(new Answer<Long>() { 258 @Override 259 public Long answer(InvocationOnMock invocationOnMock) throws Throwable { 260 Object [] args = invocationOnMock.getArguments(); 261 byte [] regionId = (byte [])args[0]; 262 byte[] startKey = RegionInfo.getStartKey(regionId); 263 return SIZE_MAP.get(startKey); 264 } 265 }); 266 return sizeCalculator; 267 } 268 269 @Override 270 public Admin getAdmin() throws IOException { 271 Admin admin = mock(Admin.class); 272 // return non-null admin to pass null checks 273 return admin; 274 } 275 276 @Override 277 public void close() throws IOException { 278 } 279 280 @Override 281 public boolean isClosed() { 282 return false; 283 } 284 285 @Override 286 public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) { 287 throw new UnsupportedOperationException(); 288 } 289 290 @Override 291 public void clearRegionLocationCache() { 292 } 293 } 294}