001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.Assert.assertEquals;
021import static org.mockito.ArgumentMatchers.any;
022import static org.mockito.ArgumentMatchers.anyBoolean;
023import static org.mockito.Mockito.mock;
024import static org.mockito.Mockito.when;
025
026import java.io.IOException;
027import java.net.Inet6Address;
028import java.net.InetAddress;
029import java.net.UnknownHostException;
030import java.util.Map;
031import java.util.TreeMap;
032import java.util.concurrent.ExecutorService;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseConfiguration;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.HRegionLocation;
038import org.apache.hadoop.hbase.ServerName;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.client.Admin;
041import org.apache.hadoop.hbase.client.AsyncConnection;
042import org.apache.hadoop.hbase.client.BufferedMutator;
043import org.apache.hadoop.hbase.client.BufferedMutatorParams;
044import org.apache.hadoop.hbase.client.Connection;
045import org.apache.hadoop.hbase.client.ConnectionUtils;
046import org.apache.hadoop.hbase.client.RegionInfo;
047import org.apache.hadoop.hbase.client.RegionInfoBuilder;
048import org.apache.hadoop.hbase.client.RegionLocator;
049import org.apache.hadoop.hbase.client.Table;
050import org.apache.hadoop.hbase.client.TableBuilder;
051import org.apache.hadoop.hbase.security.User;
052import org.apache.hadoop.hbase.testclassification.SmallTests;
053import org.apache.hadoop.hbase.util.Bytes;
054import org.apache.hadoop.hbase.util.Pair;
055import org.apache.hadoop.mapreduce.JobContext;
056import org.junit.ClassRule;
057import org.junit.Test;
058import org.junit.experimental.categories.Category;
059import org.mockito.Mockito;
060import org.mockito.invocation.InvocationOnMock;
061import org.mockito.stubbing.Answer;
062
063@Category({ SmallTests.class })
064public class TestTableInputFormatBase {
065
066  @ClassRule
067  public static final HBaseClassTestRule CLASS_RULE =
068    HBaseClassTestRule.forClass(TestTableInputFormatBase.class);
069
070  @Test
071  public void testReuseRegionSizeCalculator() throws IOException {
072    JobContext context = mock(JobContext.class);
073    Configuration conf = HBaseConfiguration.create();
074    conf.set(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL,
075      ConnectionForMergeTesting.class.getName());
076    conf.set(TableInputFormat.INPUT_TABLE, "testTable");
077    conf.setBoolean(TableInputFormatBase.MAPREDUCE_INPUT_AUTOBALANCE, true);
078    when(context.getConfiguration()).thenReturn(conf);
079
080    TableInputFormat format = Mockito.spy(new TableInputFormatForMergeTesting());
081    format.setConf(conf);
082    // initialize so that table is set, otherwise cloneOnFinish
083    // will be true and each getSplits call will re-initialize everything
084    format.initialize(context);
085    format.getSplits(context);
086    format.getSplits(context);
087
088    // re-initialize which will cause the next getSplits call to create a new RegionSizeCalculator
089    format.initialize(context);
090    format.getSplits(context);
091    format.getSplits(context);
092
093    // should only be 2 despite calling getSplits 4 times
094    Mockito.verify(format, Mockito.times(2)).createRegionSizeCalculator(Mockito.any(),
095      Mockito.any());
096  }
097
098  @Test
099  public void testTableInputFormatBaseReverseDNSForIPv6() throws UnknownHostException {
100    String address = "ipv6.google.com";
101    String localhost = null;
102    InetAddress addr = null;
103    TableInputFormat inputFormat = new TableInputFormat();
104    try {
105      localhost = InetAddress.getByName(address).getCanonicalHostName();
106      addr = Inet6Address.getByName(address);
107    } catch (UnknownHostException e) {
108      // google.com is down, we can probably forgive this test.
109      return;
110    }
111    System.out.println("Should retrun the hostname for this host " + localhost + " addr : " + addr);
112    String actualHostName = inputFormat.reverseDNS(addr);
113    assertEquals("Should retrun the hostname for this host. Expected : " + localhost + " Actual : "
114      + actualHostName, localhost, actualHostName);
115  }
116
117  @Test
118  public void testNonSuccessiveSplitsAreNotMerged() throws IOException {
119    JobContext context = mock(JobContext.class);
120    Configuration conf = HBaseConfiguration.create();
121    conf.set(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL,
122      ConnectionForMergeTesting.class.getName());
123    conf.set(TableInputFormat.INPUT_TABLE, "testTable");
124    conf.setBoolean(TableInputFormatBase.MAPREDUCE_INPUT_AUTOBALANCE, true);
125    when(context.getConfiguration()).thenReturn(conf);
126
127    TableInputFormat tifExclude = new TableInputFormatForMergeTesting();
128    tifExclude.setConf(conf);
129    // split["b", "c"] is excluded, split["o", "p"] and split["p", "q"] are merged,
130    // but split["a", "b"] and split["c", "d"] are not merged.
131    assertEquals(ConnectionForMergeTesting.START_KEYS.length - 1 - 1,
132      tifExclude.getSplits(context).size());
133  }
134
135  /**
136   * Subclass of {@link TableInputFormat} to use in {@link #testNonSuccessiveSplitsAreNotMerged}.
137   * This class overrides {@link TableInputFormatBase#includeRegionInSplit} to exclude specific
138   * splits.
139   */
140  private static class TableInputFormatForMergeTesting extends TableInputFormat {
141    private byte[] prefixStartKey = Bytes.toBytes("b");
142    private byte[] prefixEndKey = Bytes.toBytes("c");
143    private RegionSizeCalculator sizeCalculator;
144
145    /**
146     * Exclude regions which contain rows starting with "b".
147     */
148    @Override
149    protected boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey) {
150      if (
151        Bytes.compareTo(startKey, prefixEndKey) < 0 && (Bytes.compareTo(prefixStartKey, endKey) < 0
152          || Bytes.equals(endKey, HConstants.EMPTY_END_ROW))
153      ) {
154        return false;
155      } else {
156        return true;
157      }
158    }
159
160    @Override
161    protected void initializeTable(Connection connection, TableName tableName) throws IOException {
162      super.initializeTable(connection, tableName);
163      ConnectionForMergeTesting cft = (ConnectionForMergeTesting) connection;
164      sizeCalculator = cft.getRegionSizeCalculator();
165    }
166
167    @Override
168    protected RegionSizeCalculator createRegionSizeCalculator(RegionLocator locator, Admin admin)
169      throws IOException {
170      return sizeCalculator;
171    }
172  }
173
174  /**
175   * Connection class to use in {@link #testNonSuccessiveSplitsAreNotMerged}. This class returns
176   * mocked {@link Table}, {@link RegionLocator}, {@link RegionSizeCalculator}, and {@link Admin}.
177   */
178  private static class ConnectionForMergeTesting implements Connection {
179    public static final byte[][] SPLITS = new byte[][] { Bytes.toBytes("a"), Bytes.toBytes("b"),
180      Bytes.toBytes("c"), Bytes.toBytes("d"), Bytes.toBytes("e"), Bytes.toBytes("f"),
181      Bytes.toBytes("g"), Bytes.toBytes("h"), Bytes.toBytes("i"), Bytes.toBytes("j"),
182      Bytes.toBytes("k"), Bytes.toBytes("l"), Bytes.toBytes("m"), Bytes.toBytes("n"),
183      Bytes.toBytes("o"), Bytes.toBytes("p"), Bytes.toBytes("q"), Bytes.toBytes("r"),
184      Bytes.toBytes("s"), Bytes.toBytes("t"), Bytes.toBytes("u"), Bytes.toBytes("v"),
185      Bytes.toBytes("w"), Bytes.toBytes("x"), Bytes.toBytes("y"), Bytes.toBytes("z") };
186
187    public static final byte[][] START_KEYS;
188    public static final byte[][] END_KEYS;
189    static {
190      START_KEYS = new byte[SPLITS.length + 1][];
191      START_KEYS[0] = HConstants.EMPTY_BYTE_ARRAY;
192      for (int i = 0; i < SPLITS.length; i++) {
193        START_KEYS[i + 1] = SPLITS[i];
194      }
195
196      END_KEYS = new byte[SPLITS.length + 1][];
197      for (int i = 0; i < SPLITS.length; i++) {
198        END_KEYS[i] = SPLITS[i];
199      }
200      END_KEYS[SPLITS.length] = HConstants.EMPTY_BYTE_ARRAY;
201    }
202
203    public static final Map<byte[], Long> SIZE_MAP = new TreeMap<>(Bytes.BYTES_COMPARATOR);
204    static {
205      for (byte[] startKey : START_KEYS) {
206        SIZE_MAP.put(startKey, 1024L * 1024L * 1024L);
207      }
208      SIZE_MAP.put(Bytes.toBytes("a"), 200L * 1024L * 1024L);
209      SIZE_MAP.put(Bytes.toBytes("b"), 200L * 1024L * 1024L);
210      SIZE_MAP.put(Bytes.toBytes("c"), 200L * 1024L * 1024L);
211      SIZE_MAP.put(Bytes.toBytes("o"), 200L * 1024L * 1024L);
212      SIZE_MAP.put(Bytes.toBytes("p"), 200L * 1024L * 1024L);
213    }
214
215    ConnectionForMergeTesting(Configuration conf, ExecutorService pool, User user,
216      Map<String, byte[]> connectionAttributes) throws IOException {
217    }
218
219    @Override
220    public void abort(String why, Throwable e) {
221    }
222
223    @Override
224    public boolean isAborted() {
225      return false;
226    }
227
228    @Override
229    public Configuration getConfiguration() {
230      throw new UnsupportedOperationException();
231    }
232
233    @Override
234    public Table getTable(TableName tableName) throws IOException {
235      Table table = mock(Table.class);
236      when(table.getName()).thenReturn(tableName);
237      return table;
238    }
239
240    @Override
241    public Table getTable(TableName tableName, ExecutorService pool) throws IOException {
242      throw new UnsupportedOperationException();
243    }
244
245    @Override
246    public BufferedMutator getBufferedMutator(TableName tableName) throws IOException {
247      throw new UnsupportedOperationException();
248    }
249
250    @Override
251    public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
252      throw new UnsupportedOperationException();
253    }
254
255    @Override
256    public RegionLocator getRegionLocator(TableName tableName) throws IOException {
257      final Map<byte[], HRegionLocation> locationMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
258      for (byte[] startKey : START_KEYS) {
259        HRegionLocation hrl =
260          new HRegionLocation(RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).build(),
261            ServerName.valueOf("localhost", 0, 0));
262        locationMap.put(startKey, hrl);
263      }
264
265      RegionLocator locator = mock(RegionLocator.class);
266      when(locator.getRegionLocation(any(byte[].class), anyBoolean()))
267        .thenAnswer(new Answer<HRegionLocation>() {
268          @Override
269          public HRegionLocation answer(InvocationOnMock invocationOnMock) throws Throwable {
270            Object[] args = invocationOnMock.getArguments();
271            byte[] key = (byte[]) args[0];
272            return locationMap.get(key);
273          }
274        });
275      when(locator.getStartEndKeys())
276        .thenReturn(new Pair<byte[][], byte[][]>(START_KEYS, END_KEYS));
277      return locator;
278    }
279
280    public RegionSizeCalculator getRegionSizeCalculator() {
281      RegionSizeCalculator sizeCalculator = mock(RegionSizeCalculator.class);
282      when(sizeCalculator.getRegionSize(any(byte[].class))).thenAnswer(new Answer<Long>() {
283        @Override
284        public Long answer(InvocationOnMock invocationOnMock) throws Throwable {
285          Object[] args = invocationOnMock.getArguments();
286          byte[] regionId = (byte[]) args[0];
287          byte[] startKey = RegionInfo.getStartKey(regionId);
288          return SIZE_MAP.get(startKey);
289        }
290      });
291      return sizeCalculator;
292    }
293
294    @Override
295    public Admin getAdmin() throws IOException {
296      Admin admin = mock(Admin.class);
297      // return non-null admin to pass null checks
298      return admin;
299    }
300
301    @Override
302    public void close() throws IOException {
303    }
304
305    @Override
306    public boolean isClosed() {
307      return false;
308    }
309
310    @Override
311    public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) {
312      throw new UnsupportedOperationException();
313    }
314
315    @Override
316    public void clearRegionLocationCache() {
317    }
318
319    @Override
320    public AsyncConnection toAsyncConnection() {
321      throw new UnsupportedOperationException();
322    }
323
324    @Override
325    public String getClusterId() {
326      return null;
327    }
328  }
329}