001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.apache.hadoop.hbase.client.Scan.SCAN_ATTRIBUTES_TABLE_NAME; 021import static org.junit.jupiter.api.Assertions.assertEquals; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.List; 027import java.util.Map; 028import java.util.TreeMap; 029import java.util.concurrent.ExecutorService; 030import java.util.concurrent.atomic.AtomicInteger; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.hbase.HBaseConfiguration; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.HRegionLocation; 035import org.apache.hadoop.hbase.ServerName; 036import org.apache.hadoop.hbase.TableName; 037import org.apache.hadoop.hbase.client.Admin; 038import org.apache.hadoop.hbase.client.AsyncConnection; 039import org.apache.hadoop.hbase.client.BufferedMutator; 040import org.apache.hadoop.hbase.client.BufferedMutatorParams; 041import org.apache.hadoop.hbase.client.Connection; 042import org.apache.hadoop.hbase.client.ConnectionRegistry; 043import org.apache.hadoop.hbase.client.ConnectionUtils; 044import org.apache.hadoop.hbase.client.RegionInfoBuilder; 045import org.apache.hadoop.hbase.client.RegionLocator; 046import org.apache.hadoop.hbase.client.Result; 047import org.apache.hadoop.hbase.client.Scan; 048import org.apache.hadoop.hbase.client.Table; 049import org.apache.hadoop.hbase.client.TableBuilder; 050import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 051import org.apache.hadoop.hbase.security.User; 052import org.apache.hadoop.hbase.testclassification.SmallTests; 053import org.apache.hadoop.hbase.util.Bytes; 054import org.apache.hadoop.hbase.util.Pair; 055import org.apache.hadoop.mapreduce.InputSplit; 056import org.apache.hadoop.mapreduce.JobContext; 057import org.apache.hadoop.mapreduce.RecordReader; 058import org.apache.hadoop.mapreduce.TaskAttemptContext; 059import org.junit.jupiter.api.Tag; 060import org.junit.jupiter.api.Test; 061import org.junit.jupiter.api.TestInfo; 062import org.mockito.Mockito; 063import org.mockito.invocation.InvocationOnMock; 064import org.mockito.stubbing.Answer; 065 066/** 067 * Tests of MultiTableInputFormatBase. 068 */ 069@Tag(SmallTests.TAG) 070public class TestMultiTableInputFormatBase { 071 072 /** 073 * Test getSplits only puts up one Connection. In past it has put up many Connections. Each 074 * Connection setup comes with a fresh new cache so we have to do fresh hit on hbase:meta. Should 075 * only do one Connection when doing getSplits even if a MultiTableInputFormat. 076 */ 077 @Test 078 public void testMRSplitsConnectionCount(TestInfo testInfo) throws IOException { 079 // Make instance of MTIFB. 080 MultiTableInputFormatBase mtif = new MultiTableInputFormatBase() { 081 @Override 082 public RecordReader<ImmutableBytesWritable, Result> createRecordReader(InputSplit split, 083 TaskAttemptContext context) throws IOException, InterruptedException { 084 return super.createRecordReader(split, context); 085 } 086 }; 087 // Pass it a mocked JobContext. Make the JC return our Configuration. 088 // Load the Configuration so it returns our special Connection so we can interpolate 089 // canned responses. 090 JobContext mockedJobContext = Mockito.mock(JobContext.class); 091 Configuration c = HBaseConfiguration.create(); 092 c.set(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL, MRSplitsConnection.class.getName()); 093 Mockito.when(mockedJobContext.getConfiguration()).thenReturn(c); 094 // Invent a bunch of scans. Have each Scan go against a different table so a good spread. 095 List<Scan> scans = new ArrayList<>(); 096 for (int i = 0; i < 10; i++) { 097 Scan scan = new Scan(); 098 String tableName = testInfo.getTestMethod().get().getName() + i; 099 scan.setAttribute(SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(tableName)); 100 scans.add(scan); 101 } 102 mtif.setScans(scans); 103 // Get splits. Assert that that more than one. 104 List<InputSplit> splits = mtif.getSplits(mockedJobContext); 105 assertTrue(splits.size() > 0); 106 // Assert only one Connection was made (see the static counter we have in the mocked 107 // Connection MRSplitsConnection Constructor. 108 assertEquals(1, MRSplitsConnection.creations.get()); 109 } 110 111 /** 112 * Connection to use above in Test. 113 */ 114 public static class MRSplitsConnection implements Connection { 115 private final Configuration configuration; 116 static final AtomicInteger creations = new AtomicInteger(0); 117 118 MRSplitsConnection(Configuration conf, ExecutorService pool, User user, 119 ConnectionRegistry registry, Map<String, byte[]> connectionAttributes) throws IOException { 120 this.configuration = conf; 121 creations.incrementAndGet(); 122 } 123 124 @Override 125 public void abort(String why, Throwable e) { 126 127 } 128 129 @Override 130 public boolean isAborted() { 131 return false; 132 } 133 134 @Override 135 public Configuration getConfiguration() { 136 return this.configuration; 137 } 138 139 @Override 140 public BufferedMutator getBufferedMutator(TableName tableName) throws IOException { 141 return null; 142 } 143 144 @Override 145 public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException { 146 return null; 147 } 148 149 @Override 150 public RegionLocator getRegionLocator(final TableName tableName) throws IOException { 151 // Make up array of start keys. We start off w/ empty byte array. 152 final byte[][] startKeys = new byte[][] { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("aaaa"), 153 Bytes.toBytes("bbb"), Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), 154 Bytes.toBytes("fff"), Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"), 155 Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"), Bytes.toBytes("ooo"), 156 Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"), Bytes.toBytes("sss"), 157 Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), Bytes.toBytes("zzz") }; 158 // Make an array of end keys. We end with the empty byte array. 159 final byte[][] endKeys = 160 new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("bbb"), Bytes.toBytes("ccc"), 161 Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"), Bytes.toBytes("ggg"), 162 Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), 163 Bytes.toBytes("nnn"), Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), 164 Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), 165 Bytes.toBytes("vvv"), Bytes.toBytes("zzz"), HConstants.EMPTY_BYTE_ARRAY }; 166 // Now make a map of start keys to HRegionLocations. Let the server namber derive from 167 // the start key. 168 final Map<byte[], HRegionLocation> map = 169 new TreeMap<byte[], HRegionLocation>(Bytes.BYTES_COMPARATOR); 170 for (byte[] startKey : startKeys) { 171 HRegionLocation hrl = 172 new HRegionLocation(RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).build(), 173 ServerName.valueOf(Bytes.toString(startKey), 0, 0)); 174 map.put(startKey, hrl); 175 } 176 // Get a list of the locations. 177 final List<HRegionLocation> locations = new ArrayList<HRegionLocation>(map.values()); 178 // Now make a RegionLocator mock backed by the abpve map and list of locations. 179 RegionLocator mockedRegionLocator = Mockito.mock(RegionLocator.class); 180 Mockito 181 .when( 182 mockedRegionLocator.getRegionLocation(Mockito.any(byte[].class), Mockito.anyBoolean())) 183 .thenAnswer(new Answer<HRegionLocation>() { 184 @Override 185 public HRegionLocation answer(InvocationOnMock invocationOnMock) throws Throwable { 186 Object[] args = invocationOnMock.getArguments(); 187 byte[] key = (byte[]) args[0]; 188 return map.get(key); 189 } 190 }); 191 Mockito.when(mockedRegionLocator.getAllRegionLocations()).thenReturn(locations); 192 Mockito.when(mockedRegionLocator.getStartEndKeys()) 193 .thenReturn(new Pair<byte[][], byte[][]>(startKeys, endKeys)); 194 Mockito.when(mockedRegionLocator.getName()).thenReturn(tableName); 195 return mockedRegionLocator; 196 } 197 198 @Override 199 public Admin getAdmin() throws IOException { 200 Admin admin = Mockito.mock(Admin.class); 201 Mockito.when(admin.getConfiguration()).thenReturn(getConfiguration()); 202 return admin; 203 } 204 205 @Override 206 public Table getTable(TableName tableName) throws IOException { 207 Table table = Mockito.mock(Table.class); 208 Mockito.when(table.getName()).thenReturn(tableName); 209 return table; 210 } 211 212 @Override 213 public void close() throws IOException { 214 215 } 216 217 @Override 218 public boolean isClosed() { 219 return false; 220 } 221 222 @Override 223 public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) { 224 return Mockito.mock(TableBuilder.class); 225 } 226 227 @Override 228 public void clearRegionLocationCache() { 229 } 230 231 @Override 232 public AsyncConnection toAsyncConnection() { 233 return null; 234 } 235 236 @Override 237 public String getClusterId() { 238 return null; 239 } 240 241 } 242}