001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.apache.hadoop.hbase.client.Scan.SCAN_ATTRIBUTES_TABLE_NAME; 021 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.List; 025import java.util.Map; 026import java.util.TreeMap; 027import java.util.concurrent.ExecutorService; 028import java.util.concurrent.atomic.AtomicInteger; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.HBaseConfiguration; 032import org.apache.hadoop.hbase.HConstants; 033import org.apache.hadoop.hbase.HRegionLocation; 034import org.apache.hadoop.hbase.ServerName; 035import org.apache.hadoop.hbase.TableName; 036import org.apache.hadoop.hbase.client.Admin; 037import org.apache.hadoop.hbase.client.BufferedMutator; 038import org.apache.hadoop.hbase.client.BufferedMutatorParams; 039import org.apache.hadoop.hbase.client.ClusterConnection; 040import org.apache.hadoop.hbase.client.Connection; 041import org.apache.hadoop.hbase.client.RegionInfoBuilder; 042import org.apache.hadoop.hbase.client.RegionLocator; 043import org.apache.hadoop.hbase.client.Result; 044import org.apache.hadoop.hbase.client.Scan; 045import org.apache.hadoop.hbase.client.Table; 046import org.apache.hadoop.hbase.client.TableBuilder; 047import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 048import org.apache.hadoop.hbase.security.User; 049import org.apache.hadoop.hbase.testclassification.SmallTests; 050import org.apache.hadoop.hbase.util.Bytes; 051import org.apache.hadoop.hbase.util.Pair; 052import org.apache.hadoop.mapreduce.InputSplit; 053import org.apache.hadoop.mapreduce.JobContext; 054import org.apache.hadoop.mapreduce.RecordReader; 055import org.apache.hadoop.mapreduce.TaskAttemptContext; 056import org.junit.Assert; 057import org.junit.ClassRule; 058import org.junit.Rule; 059import org.junit.Test; 060import org.junit.experimental.categories.Category; 061import org.junit.rules.TestName; 062import org.mockito.Mockito; 063import org.mockito.invocation.InvocationOnMock; 064import org.mockito.stubbing.Answer; 065 066/** 067 * Tests of MultiTableInputFormatBase. 068 */ 069@Category({SmallTests.class}) 070public class TestMultiTableInputFormatBase { 071 072 @ClassRule 073 public static final HBaseClassTestRule CLASS_RULE = 074 HBaseClassTestRule.forClass(TestMultiTableInputFormatBase.class); 075 076 @Rule public final TestName name = new TestName(); 077 078 /** 079 * Test getSplits only puts up one Connection. 080 * In past it has put up many Connections. Each Connection setup comes with a fresh new cache 081 * so we have to do fresh hit on hbase:meta. Should only do one Connection when doing getSplits 082 * even if a MultiTableInputFormat. 083 * @throws IOException 084 */ 085 @Test 086 public void testMRSplitsConnectionCount() throws IOException { 087 // Make instance of MTIFB. 088 MultiTableInputFormatBase mtif = new MultiTableInputFormatBase() { 089 @Override 090 public RecordReader<ImmutableBytesWritable, Result> createRecordReader(InputSplit split, 091 TaskAttemptContext context) 092 throws IOException, InterruptedException { 093 return super.createRecordReader(split, context); 094 } 095 }; 096 // Pass it a mocked JobContext. Make the JC return our Configuration. 097 // Load the Configuration so it returns our special Connection so we can interpolate 098 // canned responses. 099 JobContext mockedJobContext = Mockito.mock(JobContext.class); 100 Configuration c = HBaseConfiguration.create(); 101 c.set(ClusterConnection.HBASE_CLIENT_CONNECTION_IMPL, MRSplitsConnection.class.getName()); 102 Mockito.when(mockedJobContext.getConfiguration()).thenReturn(c); 103 // Invent a bunch of scans. Have each Scan go against a different table so a good spread. 104 List<Scan> scans = new ArrayList<>(); 105 for (int i = 0; i < 10; i++) { 106 Scan scan = new Scan(); 107 String tableName = this.name.getMethodName() + i; 108 scan.setAttribute(SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(tableName)); 109 scans.add(scan); 110 } 111 mtif.setScans(scans); 112 // Get splits. Assert that that more than one. 113 List<InputSplit> splits = mtif.getSplits(mockedJobContext); 114 Assert.assertTrue(splits.size() > 0); 115 // Assert only one Connection was made (see the static counter we have in the mocked 116 // Connection MRSplitsConnection Constructor. 117 Assert.assertEquals(1, MRSplitsConnection.creations.get()); 118 } 119 120 /** 121 * Connection to use above in Test. 122 */ 123 public static class MRSplitsConnection implements Connection { 124 private final Configuration configuration; 125 static final AtomicInteger creations = new AtomicInteger(0); 126 127 MRSplitsConnection (Configuration conf, ExecutorService pool, User user) throws IOException { 128 this.configuration = conf; 129 creations.incrementAndGet(); 130 } 131 132 @Override 133 public void abort(String why, Throwable e) { 134 135 } 136 137 @Override 138 public boolean isAborted() { 139 return false; 140 } 141 142 @Override 143 public Configuration getConfiguration() { 144 return this.configuration; 145 } 146 147 @Override 148 public BufferedMutator getBufferedMutator(TableName tableName) throws IOException { 149 return null; 150 } 151 152 @Override 153 public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException { 154 return null; 155 } 156 157 @Override 158 public RegionLocator getRegionLocator(final TableName tableName) throws IOException { 159 // Make up array of start keys. We start off w/ empty byte array. 160 final byte [][] startKeys = new byte [][] {HConstants.EMPTY_BYTE_ARRAY, 161 Bytes.toBytes("aaaa"), Bytes.toBytes("bbb"), 162 Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), 163 Bytes.toBytes("fff"), Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), 164 Bytes.toBytes("iii"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), 165 Bytes.toBytes("nnn"), Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), 166 Bytes.toBytes("qqq"), Bytes.toBytes("rrr"), Bytes.toBytes("sss"), 167 Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), 168 Bytes.toBytes("zzz")}; 169 // Make an array of end keys. We end with the empty byte array. 170 final byte [][] endKeys = new byte[][] { 171 Bytes.toBytes("aaaa"), Bytes.toBytes("bbb"), 172 Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), 173 Bytes.toBytes("fff"), Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), 174 Bytes.toBytes("iii"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"), 175 Bytes.toBytes("nnn"), Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), 176 Bytes.toBytes("qqq"), Bytes.toBytes("rrr"), Bytes.toBytes("sss"), 177 Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), 178 Bytes.toBytes("zzz"), 179 HConstants.EMPTY_BYTE_ARRAY}; 180 // Now make a map of start keys to HRegionLocations. Let the server namber derive from 181 // the start key. 182 final Map<byte [], HRegionLocation> map = 183 new TreeMap<byte [], HRegionLocation>(Bytes.BYTES_COMPARATOR); 184 for (byte [] startKey: startKeys) { 185 HRegionLocation hrl = new HRegionLocation( 186 RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).build(), 187 ServerName.valueOf(Bytes.toString(startKey), 0, 0)); 188 map.put(startKey, hrl); 189 } 190 // Get a list of the locations. 191 final List<HRegionLocation> locations = new ArrayList<HRegionLocation>(map.values()); 192 // Now make a RegionLocator mock backed by the abpve map and list of locations. 193 RegionLocator mockedRegionLocator = Mockito.mock(RegionLocator.class); 194 Mockito.when(mockedRegionLocator.getRegionLocation(Mockito.any(byte [].class), 195 Mockito.anyBoolean())). 196 thenAnswer(new Answer<HRegionLocation>() { 197 @Override 198 public HRegionLocation answer(InvocationOnMock invocationOnMock) throws Throwable { 199 Object [] args = invocationOnMock.getArguments(); 200 byte [] key = (byte [])args[0]; 201 return map.get(key); 202 } 203 }); 204 Mockito.when(mockedRegionLocator.getAllRegionLocations()).thenReturn(locations); 205 Mockito.when(mockedRegionLocator.getStartEndKeys()). 206 thenReturn(new Pair<byte [][], byte[][]>(startKeys, endKeys)); 207 Mockito.when(mockedRegionLocator.getName()).thenReturn(tableName); 208 return mockedRegionLocator; 209 } 210 211 @Override 212 public Admin getAdmin() throws IOException { 213 Admin admin = Mockito.mock(Admin.class); 214 Mockito.when(admin.getConfiguration()).thenReturn(getConfiguration()); 215 return admin; 216 } 217 218 @Override 219 public Table getTable(TableName tableName) throws IOException { 220 Table table = Mockito.mock(Table.class); 221 Mockito.when(table.getName()).thenReturn(tableName); 222 return table; 223 } 224 225 @Override 226 public void close() throws IOException { 227 228 } 229 230 @Override 231 public boolean isClosed() { 232 return false; 233 } 234 235 @Override 236 public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) { 237 return Mockito.mock(TableBuilder.class); 238 } 239 } 240}