001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.hbase.client.Scan.SCAN_ATTRIBUTES_TABLE_NAME;
021
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.List;
025import java.util.Map;
026import java.util.TreeMap;
027import java.util.concurrent.ExecutorService;
028import java.util.concurrent.atomic.AtomicInteger;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseConfiguration;
032import org.apache.hadoop.hbase.HConstants;
033import org.apache.hadoop.hbase.HRegionLocation;
034import org.apache.hadoop.hbase.ServerName;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.Admin;
037import org.apache.hadoop.hbase.client.BufferedMutator;
038import org.apache.hadoop.hbase.client.BufferedMutatorParams;
039import org.apache.hadoop.hbase.client.ClusterConnection;
040import org.apache.hadoop.hbase.client.Connection;
041import org.apache.hadoop.hbase.client.RegionInfoBuilder;
042import org.apache.hadoop.hbase.client.RegionLocator;
043import org.apache.hadoop.hbase.client.Result;
044import org.apache.hadoop.hbase.client.Scan;
045import org.apache.hadoop.hbase.client.Table;
046import org.apache.hadoop.hbase.client.TableBuilder;
047import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
048import org.apache.hadoop.hbase.security.User;
049import org.apache.hadoop.hbase.testclassification.SmallTests;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.apache.hadoop.hbase.util.Pair;
052import org.apache.hadoop.mapreduce.InputSplit;
053import org.apache.hadoop.mapreduce.JobContext;
054import org.apache.hadoop.mapreduce.RecordReader;
055import org.apache.hadoop.mapreduce.TaskAttemptContext;
056import org.junit.Assert;
057import org.junit.ClassRule;
058import org.junit.Rule;
059import org.junit.Test;
060import org.junit.experimental.categories.Category;
061import org.junit.rules.TestName;
062import org.mockito.Mockito;
063import org.mockito.invocation.InvocationOnMock;
064import org.mockito.stubbing.Answer;
065
066/**
067 * Tests of MultiTableInputFormatBase.
068 */
069@Category({SmallTests.class})
070public class TestMultiTableInputFormatBase {
071
072  @ClassRule
073  public static final HBaseClassTestRule CLASS_RULE =
074      HBaseClassTestRule.forClass(TestMultiTableInputFormatBase.class);
075
076  @Rule public final TestName name = new TestName();
077
078  /**
079   * Test getSplits only puts up one Connection.
080   * In past it has put up many Connections. Each Connection setup comes with a fresh new cache
081   * so we have to do fresh hit on hbase:meta. Should only do one Connection when doing getSplits
082   * even if a MultiTableInputFormat.
083   * @throws IOException
084   */
085  @Test
086  public void testMRSplitsConnectionCount() throws IOException {
087    // Make instance of MTIFB.
088    MultiTableInputFormatBase mtif = new MultiTableInputFormatBase() {
089      @Override
090      public RecordReader<ImmutableBytesWritable, Result> createRecordReader(InputSplit split,
091          TaskAttemptContext context)
092      throws IOException, InterruptedException {
093        return super.createRecordReader(split, context);
094      }
095    };
096    // Pass it a mocked JobContext. Make the JC return our Configuration.
097    // Load the Configuration so it returns our special Connection so we can interpolate
098    // canned responses.
099    JobContext mockedJobContext = Mockito.mock(JobContext.class);
100    Configuration c = HBaseConfiguration.create();
101    c.set(ClusterConnection.HBASE_CLIENT_CONNECTION_IMPL, MRSplitsConnection.class.getName());
102    Mockito.when(mockedJobContext.getConfiguration()).thenReturn(c);
103    // Invent a bunch of scans. Have each Scan go against a different table so a good spread.
104    List<Scan> scans = new ArrayList<>();
105    for (int i = 0; i < 10; i++) {
106      Scan scan = new Scan();
107      String tableName = this.name.getMethodName() + i;
108      scan.setAttribute(SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(tableName));
109      scans.add(scan);
110    }
111    mtif.setScans(scans);
112    // Get splits. Assert that that more than one.
113    List<InputSplit> splits = mtif.getSplits(mockedJobContext);
114    Assert.assertTrue(splits.size() > 0);
115    // Assert only one Connection was made (see the static counter we have in the mocked
116    // Connection MRSplitsConnection Constructor.
117    Assert.assertEquals(1, MRSplitsConnection.creations.get());
118  }
119
120  /**
121   * Connection to use above in Test.
122   */
123  public static class MRSplitsConnection implements Connection {
124    private final Configuration configuration;
125    static final AtomicInteger creations = new AtomicInteger(0);
126
127    MRSplitsConnection (Configuration conf, ExecutorService pool, User user) throws IOException {
128      this.configuration = conf;
129      creations.incrementAndGet();
130    }
131
132    @Override
133    public void abort(String why, Throwable e) {
134
135    }
136
137    @Override
138    public boolean isAborted() {
139      return false;
140    }
141
142    @Override
143    public Configuration getConfiguration() {
144      return this.configuration;
145    }
146
147    @Override
148    public BufferedMutator getBufferedMutator(TableName tableName) throws IOException {
149      return null;
150    }
151
152    @Override
153    public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
154      return null;
155    }
156
157    @Override
158    public RegionLocator getRegionLocator(final TableName tableName) throws IOException {
159      // Make up array of start keys. We start off w/ empty byte array.
160      final byte [][] startKeys = new byte [][] {HConstants.EMPTY_BYTE_ARRAY,
161          Bytes.toBytes("aaaa"), Bytes.toBytes("bbb"),
162          Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"),
163          Bytes.toBytes("fff"), Bytes.toBytes("ggg"), Bytes.toBytes("hhh"),
164          Bytes.toBytes("iii"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"),
165          Bytes.toBytes("nnn"), Bytes.toBytes("ooo"), Bytes.toBytes("ppp"),
166          Bytes.toBytes("qqq"), Bytes.toBytes("rrr"), Bytes.toBytes("sss"),
167          Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"),
168          Bytes.toBytes("zzz")};
169      // Make an array of end keys. We end with the empty byte array.
170      final byte [][] endKeys = new byte[][] {
171          Bytes.toBytes("aaaa"), Bytes.toBytes("bbb"),
172          Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"),
173          Bytes.toBytes("fff"), Bytes.toBytes("ggg"), Bytes.toBytes("hhh"),
174          Bytes.toBytes("iii"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"),
175          Bytes.toBytes("nnn"), Bytes.toBytes("ooo"), Bytes.toBytes("ppp"),
176          Bytes.toBytes("qqq"), Bytes.toBytes("rrr"), Bytes.toBytes("sss"),
177          Bytes.toBytes("ttt"), Bytes.toBytes("uuu"), Bytes.toBytes("vvv"),
178          Bytes.toBytes("zzz"),
179          HConstants.EMPTY_BYTE_ARRAY};
180      // Now make a map of start keys to HRegionLocations. Let the server namber derive from
181      // the start key.
182      final Map<byte [], HRegionLocation> map =
183          new TreeMap<byte [], HRegionLocation>(Bytes.BYTES_COMPARATOR);
184      for (byte [] startKey: startKeys) {
185        HRegionLocation hrl = new HRegionLocation(
186            RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).build(),
187            ServerName.valueOf(Bytes.toString(startKey), 0, 0));
188        map.put(startKey, hrl);
189      }
190      // Get a list of the locations.
191      final List<HRegionLocation> locations = new ArrayList<HRegionLocation>(map.values());
192      // Now make a RegionLocator mock backed by the abpve map and list of locations.
193      RegionLocator mockedRegionLocator = Mockito.mock(RegionLocator.class);
194      Mockito.when(mockedRegionLocator.getRegionLocation(Mockito.any(byte [].class),
195            Mockito.anyBoolean())).
196          thenAnswer(new Answer<HRegionLocation>() {
197            @Override
198            public HRegionLocation answer(InvocationOnMock invocationOnMock) throws Throwable {
199              Object [] args = invocationOnMock.getArguments();
200              byte [] key = (byte [])args[0];
201              return map.get(key);
202            }
203          });
204      Mockito.when(mockedRegionLocator.getAllRegionLocations()).thenReturn(locations);
205      Mockito.when(mockedRegionLocator.getStartEndKeys()).
206          thenReturn(new Pair<byte [][], byte[][]>(startKeys, endKeys));
207      Mockito.when(mockedRegionLocator.getName()).thenReturn(tableName);
208      return mockedRegionLocator;
209    }
210
211    @Override
212    public Admin getAdmin() throws IOException {
213      Admin admin = Mockito.mock(Admin.class);
214      Mockito.when(admin.getConfiguration()).thenReturn(getConfiguration());
215      return admin;
216    }
217
218    @Override
219    public Table getTable(TableName tableName) throws IOException {
220      Table table = Mockito.mock(Table.class);
221      Mockito.when(table.getName()).thenReturn(tableName);
222      return table;
223    }
224
225    @Override
226    public void close() throws IOException {
227
228    }
229
230    @Override
231    public boolean isClosed() {
232      return false;
233    }
234
235    @Override
236    public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) {
237      return Mockito.mock(TableBuilder.class);
238    }
239  }
240}