001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.hbase.client.Scan.SCAN_ATTRIBUTES_TABLE_NAME;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.Map;
026import java.util.TreeMap;
027import java.util.concurrent.ExecutorService;
028import java.util.concurrent.atomic.AtomicInteger;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseConfiguration;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.HRegionLocation;
034import org.apache.hadoop.hbase.ServerName;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.Admin;
037import org.apache.hadoop.hbase.client.AsyncConnection;
038import org.apache.hadoop.hbase.client.BufferedMutator;
039import org.apache.hadoop.hbase.client.BufferedMutatorParams;
040import org.apache.hadoop.hbase.client.Connection;
041import org.apache.hadoop.hbase.client.ConnectionRegistry;
042import org.apache.hadoop.hbase.client.ConnectionUtils;
043import org.apache.hadoop.hbase.client.RegionInfoBuilder;
044import org.apache.hadoop.hbase.client.RegionLocator;
045import org.apache.hadoop.hbase.client.Result;
046import org.apache.hadoop.hbase.client.Scan;
047import org.apache.hadoop.hbase.client.Table;
048import org.apache.hadoop.hbase.client.TableBuilder;
049import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
050import org.apache.hadoop.hbase.security.User;
051import org.apache.hadoop.hbase.testclassification.SmallTests;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.hbase.util.Pair;
054import org.apache.hadoop.mapreduce.InputSplit;
055import org.apache.hadoop.mapreduce.JobContext;
056import org.apache.hadoop.mapreduce.RecordReader;
057import org.apache.hadoop.mapreduce.TaskAttemptContext;
058import org.junit.Assert;
059import org.junit.ClassRule;
060import org.junit.Rule;
061import org.junit.Test;
062import org.junit.experimental.categories.Category;
063import org.junit.rules.TestName;
064import org.mockito.Mockito;
065import org.mockito.invocation.InvocationOnMock;
066import org.mockito.stubbing.Answer;
067
068/**
069 * Tests of MultiTableInputFormatBase.
070 */
071@Category({ SmallTests.class })
072public class TestMultiTableInputFormatBase {
073
074  @ClassRule
075  public static final HBaseClassTestRule CLASS_RULE =
076    HBaseClassTestRule.forClass(TestMultiTableInputFormatBase.class);
077
078  @Rule
079  public final TestName name = new TestName();
080
081  /**
082   * Test getSplits only puts up one Connection. In past it has put up many Connections. Each
083   * Connection setup comes with a fresh new cache so we have to do fresh hit on hbase:meta. Should
084   * only do one Connection when doing getSplits even if a MultiTableInputFormat.
085   */
086  @Test
087  public void testMRSplitsConnectionCount() throws IOException {
088    // Make instance of MTIFB.
089    MultiTableInputFormatBase mtif = new MultiTableInputFormatBase() {
090      @Override
091      public RecordReader<ImmutableBytesWritable, Result> createRecordReader(InputSplit split,
092        TaskAttemptContext context) throws IOException, InterruptedException {
093        return super.createRecordReader(split, context);
094      }
095    };
096    // Pass it a mocked JobContext. Make the JC return our Configuration.
097    // Load the Configuration so it returns our special Connection so we can interpolate
098    // canned responses.
099    JobContext mockedJobContext = Mockito.mock(JobContext.class);
100    Configuration c = HBaseConfiguration.create();
101    c.set(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL, MRSplitsConnection.class.getName());
102    Mockito.when(mockedJobContext.getConfiguration()).thenReturn(c);
103    // Invent a bunch of scans. Have each Scan go against a different table so a good spread.
104    List<Scan> scans = new ArrayList<>();
105    for (int i = 0; i < 10; i++) {
106      Scan scan = new Scan();
107      String tableName = this.name.getMethodName() + i;
108      scan.setAttribute(SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(tableName));
109      scans.add(scan);
110    }
111    mtif.setScans(scans);
112    // Get splits. Assert that that more than one.
113    List<InputSplit> splits = mtif.getSplits(mockedJobContext);
114    Assert.assertTrue(splits.size() > 0);
115    // Assert only one Connection was made (see the static counter we have in the mocked
116    // Connection MRSplitsConnection Constructor.
117    Assert.assertEquals(1, MRSplitsConnection.creations.get());
118  }
119
120  /**
121   * Connection to use above in Test.
122   */
123  public static class MRSplitsConnection implements Connection {
124    private final Configuration configuration;
125    static final AtomicInteger creations = new AtomicInteger(0);
126
127    MRSplitsConnection(Configuration conf, ExecutorService pool, User user,
128      ConnectionRegistry registry, Map<String, byte[]> connectionAttributes) throws IOException {
129      this.configuration = conf;
130      creations.incrementAndGet();
131    }
132
133    @Override
134    public void abort(String why, Throwable e) {
135
136    }
137
138    @Override
139    public boolean isAborted() {
140      return false;
141    }
142
143    @Override
144    public Configuration getConfiguration() {
145      return this.configuration;
146    }
147
148    @Override
149    public BufferedMutator getBufferedMutator(TableName tableName) throws IOException {
150      return null;
151    }
152
153    @Override
154    public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
155      return null;
156    }
157
158    @Override
159    public RegionLocator getRegionLocator(final TableName tableName) throws IOException {
160      // Make up array of start keys. We start off w/ empty byte array.
161      final byte[][] startKeys = new byte[][] { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("aaaa"),
162        Bytes.toBytes("bbb"), Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"),
163        Bytes.toBytes("fff"), Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), Bytes.toBytes("iii"),
164        Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"), Bytes.toBytes("ooo"),
165        Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), Bytes.toBytes("rrr"), Bytes.toBytes("sss"),
166        Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), Bytes.toBytes("zzz") };
167      // Make an array of end keys. We end with the empty byte array.
168      final byte[][] endKeys =
169        new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("bbb"), Bytes.toBytes("ccc"),
170          Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"), Bytes.toBytes("ggg"),
171          Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"),
172          Bytes.toBytes("nnn"), Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"),
173          Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"),
174          Bytes.toBytes("vvv"), Bytes.toBytes("zzz"), HConstants.EMPTY_BYTE_ARRAY };
175      // Now make a map of start keys to HRegionLocations. Let the server namber derive from
176      // the start key.
177      final Map<byte[], HRegionLocation> map =
178        new TreeMap<byte[], HRegionLocation>(Bytes.BYTES_COMPARATOR);
179      for (byte[] startKey : startKeys) {
180        HRegionLocation hrl =
181          new HRegionLocation(RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).build(),
182            ServerName.valueOf(Bytes.toString(startKey), 0, 0));
183        map.put(startKey, hrl);
184      }
185      // Get a list of the locations.
186      final List<HRegionLocation> locations = new ArrayList<HRegionLocation>(map.values());
187      // Now make a RegionLocator mock backed by the abpve map and list of locations.
188      RegionLocator mockedRegionLocator = Mockito.mock(RegionLocator.class);
189      Mockito
190        .when(
191          mockedRegionLocator.getRegionLocation(Mockito.any(byte[].class), Mockito.anyBoolean()))
192        .thenAnswer(new Answer<HRegionLocation>() {
193          @Override
194          public HRegionLocation answer(InvocationOnMock invocationOnMock) throws Throwable {
195            Object[] args = invocationOnMock.getArguments();
196            byte[] key = (byte[]) args[0];
197            return map.get(key);
198          }
199        });
200      Mockito.when(mockedRegionLocator.getAllRegionLocations()).thenReturn(locations);
201      Mockito.when(mockedRegionLocator.getStartEndKeys())
202        .thenReturn(new Pair<byte[][], byte[][]>(startKeys, endKeys));
203      Mockito.when(mockedRegionLocator.getName()).thenReturn(tableName);
204      return mockedRegionLocator;
205    }
206
207    @Override
208    public Admin getAdmin() throws IOException {
209      Admin admin = Mockito.mock(Admin.class);
210      Mockito.when(admin.getConfiguration()).thenReturn(getConfiguration());
211      return admin;
212    }
213
214    @Override
215    public Table getTable(TableName tableName) throws IOException {
216      Table table = Mockito.mock(Table.class);
217      Mockito.when(table.getName()).thenReturn(tableName);
218      return table;
219    }
220
221    @Override
222    public void close() throws IOException {
223
224    }
225
226    @Override
227    public boolean isClosed() {
228      return false;
229    }
230
231    @Override
232    public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) {
233      return Mockito.mock(TableBuilder.class);
234    }
235
236    @Override
237    public void clearRegionLocationCache() {
238    }
239
240    @Override
241    public AsyncConnection toAsyncConnection() {
242      return null;
243    }
244
245    @Override
246    public String getClusterId() {
247      return null;
248    }
249
250  }
251}