001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.Assert.assertEquals;
021import static org.mockito.ArgumentMatchers.any;
022import static org.mockito.ArgumentMatchers.anyBoolean;
023import static org.mockito.Mockito.mock;
024import static org.mockito.Mockito.when;
025
026import java.io.IOException;
027import java.net.Inet6Address;
028import java.net.InetAddress;
029import java.net.UnknownHostException;
030import java.util.Map;
031import java.util.TreeMap;
032import java.util.concurrent.ExecutorService;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseConfiguration;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.HRegionLocation;
038import org.apache.hadoop.hbase.ServerName;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.client.Admin;
041import org.apache.hadoop.hbase.client.AsyncConnection;
042import org.apache.hadoop.hbase.client.BufferedMutator;
043import org.apache.hadoop.hbase.client.BufferedMutatorParams;
044import org.apache.hadoop.hbase.client.Connection;
045import org.apache.hadoop.hbase.client.ConnectionUtils;
046import org.apache.hadoop.hbase.client.RegionInfo;
047import org.apache.hadoop.hbase.client.RegionInfoBuilder;
048import org.apache.hadoop.hbase.client.RegionLocator;
049import org.apache.hadoop.hbase.client.Table;
050import org.apache.hadoop.hbase.client.TableBuilder;
051import org.apache.hadoop.hbase.security.User;
052import org.apache.hadoop.hbase.testclassification.SmallTests;
053import org.apache.hadoop.hbase.util.Bytes;
054import org.apache.hadoop.hbase.util.Pair;
055import org.apache.hadoop.mapreduce.JobContext;
056import org.junit.ClassRule;
057import org.junit.Test;
058import org.junit.experimental.categories.Category;
059import org.mockito.invocation.InvocationOnMock;
060import org.mockito.stubbing.Answer;
061
062@Category({SmallTests.class})
063public class TestTableInputFormatBase {
064
065  @ClassRule
066  public static final HBaseClassTestRule CLASS_RULE =
067      HBaseClassTestRule.forClass(TestTableInputFormatBase.class);
068
069  @Test
070  public void testTableInputFormatBaseReverseDNSForIPv6()
071      throws UnknownHostException {
072    String address = "ipv6.google.com";
073    String localhost = null;
074    InetAddress addr = null;
075    TableInputFormat inputFormat = new TableInputFormat();
076    try {
077      localhost = InetAddress.getByName(address).getCanonicalHostName();
078      addr = Inet6Address.getByName(address);
079    } catch (UnknownHostException e) {
080      // google.com is down, we can probably forgive this test.
081      return;
082    }
083    System.out.println("Should retrun the hostname for this host " +
084        localhost + " addr : " + addr);
085    String actualHostName = inputFormat.reverseDNS(addr);
086    assertEquals("Should retrun the hostname for this host. Expected : " +
087        localhost + " Actual : " + actualHostName, localhost, actualHostName);
088  }
089
090  @Test
091  public void testNonSuccessiveSplitsAreNotMerged() throws IOException {
092    JobContext context = mock(JobContext.class);
093    Configuration conf = HBaseConfiguration.create();
094    conf.set(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL,
095        ConnectionForMergeTesting.class.getName());
096    conf.set(TableInputFormat.INPUT_TABLE, "testTable");
097    conf.setBoolean(TableInputFormatBase.MAPREDUCE_INPUT_AUTOBALANCE, true);
098    when(context.getConfiguration()).thenReturn(conf);
099
100    TableInputFormat tifExclude = new TableInputFormatForMergeTesting();
101    tifExclude.setConf(conf);
102    // split["b", "c"] is excluded, split["o", "p"] and split["p", "q"] are merged,
103    // but split["a", "b"] and split["c", "d"] are not merged.
104    assertEquals(ConnectionForMergeTesting.START_KEYS.length - 1 - 1,
105        tifExclude.getSplits(context).size());
106  }
107
108  /**
109   * Subclass of {@link TableInputFormat} to use in {@link #testNonSuccessiveSplitsAreNotMerged}.
110   * This class overrides {@link TableInputFormatBase#includeRegionInSplit}
111   * to exclude specific splits.
112   */
113  private static class TableInputFormatForMergeTesting extends TableInputFormat {
114    private byte[] prefixStartKey = Bytes.toBytes("b");
115    private byte[] prefixEndKey = Bytes.toBytes("c");
116    private RegionSizeCalculator sizeCalculator;
117
118    /**
119     * Exclude regions which contain rows starting with "b".
120     */
121    @Override
122    protected boolean includeRegionInSplit(final byte[] startKey, final byte [] endKey) {
123      if (Bytes.compareTo(startKey, prefixEndKey) < 0
124          && (Bytes.compareTo(prefixStartKey, endKey) < 0
125              || Bytes.equals(endKey, HConstants.EMPTY_END_ROW))) {
126        return false;
127      } else {
128        return true;
129      }
130    }
131
132    @Override
133    protected void initializeTable(Connection connection, TableName tableName) throws IOException {
134      super.initializeTable(connection, tableName);
135      ConnectionForMergeTesting cft = (ConnectionForMergeTesting) connection;
136      sizeCalculator = cft.getRegionSizeCalculator();
137    }
138
139    @Override
140    protected RegionSizeCalculator createRegionSizeCalculator(RegionLocator locator, Admin admin)
141      throws IOException {
142      return sizeCalculator;
143    }
144  }
145
146  /**
147   * Connection class to use in {@link #testNonSuccessiveSplitsAreNotMerged}.
148   * This class returns mocked {@link Table}, {@link RegionLocator}, {@link RegionSizeCalculator},
149   * and {@link Admin}.
150   */
151  private static class ConnectionForMergeTesting implements Connection {
152    public static final byte[][] SPLITS = new byte[][] {
153      Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c"), Bytes.toBytes("d"),
154      Bytes.toBytes("e"), Bytes.toBytes("f"), Bytes.toBytes("g"), Bytes.toBytes("h"),
155      Bytes.toBytes("i"), Bytes.toBytes("j"), Bytes.toBytes("k"), Bytes.toBytes("l"),
156      Bytes.toBytes("m"), Bytes.toBytes("n"), Bytes.toBytes("o"), Bytes.toBytes("p"),
157      Bytes.toBytes("q"), Bytes.toBytes("r"), Bytes.toBytes("s"), Bytes.toBytes("t"),
158      Bytes.toBytes("u"), Bytes.toBytes("v"), Bytes.toBytes("w"), Bytes.toBytes("x"),
159      Bytes.toBytes("y"), Bytes.toBytes("z")
160    };
161
162    public static final byte[][] START_KEYS;
163    public static final byte[][] END_KEYS;
164    static {
165      START_KEYS = new byte[SPLITS.length + 1][];
166      START_KEYS[0] = HConstants.EMPTY_BYTE_ARRAY;
167      for (int i = 0; i < SPLITS.length; i++) {
168        START_KEYS[i + 1] = SPLITS[i];
169      }
170
171      END_KEYS = new byte[SPLITS.length + 1][];
172      for (int i = 0; i < SPLITS.length; i++) {
173        END_KEYS[i] = SPLITS[i];
174      }
175      END_KEYS[SPLITS.length] = HConstants.EMPTY_BYTE_ARRAY;
176    }
177
178    public static final Map<byte[], Long> SIZE_MAP = new TreeMap<>(Bytes.BYTES_COMPARATOR);
179    static {
180      for (byte[] startKey : START_KEYS) {
181        SIZE_MAP.put(startKey, 1024L * 1024L * 1024L);
182      }
183      SIZE_MAP.put(Bytes.toBytes("a"), 200L * 1024L * 1024L);
184      SIZE_MAP.put(Bytes.toBytes("b"), 200L * 1024L * 1024L);
185      SIZE_MAP.put(Bytes.toBytes("c"), 200L * 1024L * 1024L);
186      SIZE_MAP.put(Bytes.toBytes("o"), 200L * 1024L * 1024L);
187      SIZE_MAP.put(Bytes.toBytes("p"), 200L * 1024L * 1024L);
188    }
189
190    ConnectionForMergeTesting(Configuration conf, ExecutorService pool, User user)
191        throws IOException {
192    }
193
194    @Override
195    public void abort(String why, Throwable e) {
196    }
197
198    @Override
199    public boolean isAborted() {
200      return false;
201    }
202
203    @Override
204    public Configuration getConfiguration() {
205      throw new UnsupportedOperationException();
206    }
207
208    @Override
209    public Table getTable(TableName tableName) throws IOException {
210      Table table = mock(Table.class);
211      when(table.getName()).thenReturn(tableName);
212      return table;
213    }
214
215    @Override
216    public Table getTable(TableName tableName, ExecutorService pool) throws IOException {
217      throw new UnsupportedOperationException();
218    }
219
220    @Override
221    public BufferedMutator getBufferedMutator(TableName tableName) throws IOException {
222      throw new UnsupportedOperationException();
223    }
224
225    @Override
226    public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
227      throw new UnsupportedOperationException();
228    }
229
230    @Override
231    public RegionLocator getRegionLocator(TableName tableName) throws IOException {
232      final Map<byte[], HRegionLocation> locationMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
233      for (byte[] startKey : START_KEYS) {
234        HRegionLocation hrl = new HRegionLocation(
235            RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).build(),
236            ServerName.valueOf("localhost", 0, 0));
237        locationMap.put(startKey, hrl);
238      }
239
240      RegionLocator locator = mock(RegionLocator.class);
241      when(locator.getRegionLocation(any(byte [].class), anyBoolean())).
242        thenAnswer(new Answer<HRegionLocation>() {
243          @Override
244          public HRegionLocation answer(InvocationOnMock invocationOnMock) throws Throwable {
245            Object [] args = invocationOnMock.getArguments();
246            byte [] key = (byte [])args[0];
247            return locationMap.get(key);
248          }
249        });
250      when(locator.getStartEndKeys()).
251        thenReturn(new Pair<byte[][], byte[][]>(START_KEYS, END_KEYS));
252      return locator;
253    }
254
255    public RegionSizeCalculator getRegionSizeCalculator() {
256      RegionSizeCalculator sizeCalculator = mock(RegionSizeCalculator.class);
257      when(sizeCalculator.getRegionSize(any(byte[].class))).
258        thenAnswer(new Answer<Long>() {
259          @Override
260          public Long answer(InvocationOnMock invocationOnMock) throws Throwable {
261            Object [] args = invocationOnMock.getArguments();
262            byte [] regionId = (byte [])args[0];
263            byte[] startKey = RegionInfo.getStartKey(regionId);
264            return SIZE_MAP.get(startKey);
265          }
266        });
267      return sizeCalculator;
268    }
269
270    @Override
271    public Admin getAdmin() throws IOException {
272      Admin admin = mock(Admin.class);
273      // return non-null admin to pass null checks
274      return admin;
275    }
276
277    @Override
278    public void close() throws IOException {
279    }
280
281    @Override
282    public boolean isClosed() {
283      return false;
284    }
285
286    @Override
287    public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) {
288      throw new UnsupportedOperationException();
289    }
290
291    @Override
292    public void clearRegionLocationCache() {
293    }
294
295    @Override
296    public AsyncConnection toAsyncConnection() {
297      throw new UnsupportedOperationException();
298    }
299
300    @Override
301    public String getClusterId() {
302      return null;
303    }
304  }
305}