001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.Assert.assertEquals;
021import static org.mockito.ArgumentMatchers.any;
022import static org.mockito.ArgumentMatchers.anyBoolean;
023import static org.mockito.Mockito.mock;
024import static org.mockito.Mockito.when;
025
026import java.io.IOException;
027import java.net.Inet6Address;
028import java.net.InetAddress;
029import java.net.UnknownHostException;
030import java.util.Map;
031import java.util.TreeMap;
032import java.util.concurrent.ExecutorService;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseConfiguration;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.HRegionLocation;
038import org.apache.hadoop.hbase.ServerName;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.client.Admin;
041import org.apache.hadoop.hbase.client.AsyncConnection;
042import org.apache.hadoop.hbase.client.BufferedMutator;
043import org.apache.hadoop.hbase.client.BufferedMutatorParams;
044import org.apache.hadoop.hbase.client.Connection;
045import org.apache.hadoop.hbase.client.ConnectionRegistry;
046import org.apache.hadoop.hbase.client.ConnectionUtils;
047import org.apache.hadoop.hbase.client.RegionInfo;
048import org.apache.hadoop.hbase.client.RegionInfoBuilder;
049import org.apache.hadoop.hbase.client.RegionLocator;
050import org.apache.hadoop.hbase.client.Table;
051import org.apache.hadoop.hbase.client.TableBuilder;
052import org.apache.hadoop.hbase.security.User;
053import org.apache.hadoop.hbase.testclassification.SmallTests;
054import org.apache.hadoop.hbase.util.Bytes;
055import org.apache.hadoop.hbase.util.Pair;
056import org.apache.hadoop.mapreduce.JobContext;
057import org.junit.ClassRule;
058import org.junit.Test;
059import org.junit.experimental.categories.Category;
060import org.mockito.Mockito;
061import org.mockito.invocation.InvocationOnMock;
062import org.mockito.stubbing.Answer;
063
064@Category({ SmallTests.class })
065public class TestTableInputFormatBase {
066
067  @ClassRule
068  public static final HBaseClassTestRule CLASS_RULE =
069    HBaseClassTestRule.forClass(TestTableInputFormatBase.class);
070
071  @Test
072  public void testReuseRegionSizeCalculator() throws IOException {
073    JobContext context = mock(JobContext.class);
074    Configuration conf = HBaseConfiguration.create();
075    conf.set(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL,
076      ConnectionForMergeTesting.class.getName());
077    conf.set(TableInputFormat.INPUT_TABLE, "testTable");
078    conf.setBoolean(TableInputFormatBase.MAPREDUCE_INPUT_AUTOBALANCE, true);
079    when(context.getConfiguration()).thenReturn(conf);
080
081    TableInputFormat format = Mockito.spy(new TableInputFormatForMergeTesting());
082    format.setConf(conf);
083    // initialize so that table is set, otherwise cloneOnFinish
084    // will be true and each getSplits call will re-initialize everything
085    format.initialize(context);
086    format.getSplits(context);
087    format.getSplits(context);
088
089    // re-initialize which will cause the next getSplits call to create a new RegionSizeCalculator
090    format.initialize(context);
091    format.getSplits(context);
092    format.getSplits(context);
093
094    // should only be 2 despite calling getSplits 4 times
095    Mockito.verify(format, Mockito.times(2)).createRegionSizeCalculator(Mockito.any(),
096      Mockito.any());
097  }
098
099  @Test
100  public void testTableInputFormatBaseReverseDNSForIPv6() throws UnknownHostException {
101    String address = "ipv6.google.com";
102    String localhost = null;
103    InetAddress addr = null;
104    TableInputFormat inputFormat = new TableInputFormat();
105    try {
106      localhost = InetAddress.getByName(address).getCanonicalHostName();
107      addr = Inet6Address.getByName(address);
108    } catch (UnknownHostException e) {
109      // google.com is down, we can probably forgive this test.
110      return;
111    }
112    System.out.println("Should retrun the hostname for this host " + localhost + " addr : " + addr);
113    String actualHostName = inputFormat.reverseDNS(addr);
114    assertEquals("Should retrun the hostname for this host. Expected : " + localhost + " Actual : "
115      + actualHostName, localhost, actualHostName);
116  }
117
118  @Test
119  public void testNonSuccessiveSplitsAreNotMerged() throws IOException {
120    JobContext context = mock(JobContext.class);
121    Configuration conf = HBaseConfiguration.create();
122    conf.set(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL,
123      ConnectionForMergeTesting.class.getName());
124    conf.set(TableInputFormat.INPUT_TABLE, "testTable");
125    conf.setBoolean(TableInputFormatBase.MAPREDUCE_INPUT_AUTOBALANCE, true);
126    when(context.getConfiguration()).thenReturn(conf);
127
128    TableInputFormat tifExclude = new TableInputFormatForMergeTesting();
129    tifExclude.setConf(conf);
130    // split["b", "c"] is excluded, split["o", "p"] and split["p", "q"] are merged,
131    // but split["a", "b"] and split["c", "d"] are not merged.
132    assertEquals(ConnectionForMergeTesting.START_KEYS.length - 1 - 1,
133      tifExclude.getSplits(context).size());
134  }
135
136  /**
137   * Subclass of {@link TableInputFormat} to use in {@link #testNonSuccessiveSplitsAreNotMerged}.
138   * This class overrides {@link TableInputFormatBase#includeRegionInSplit} to exclude specific
139   * splits.
140   */
141  private static class TableInputFormatForMergeTesting extends TableInputFormat {
142    private byte[] prefixStartKey = Bytes.toBytes("b");
143    private byte[] prefixEndKey = Bytes.toBytes("c");
144    private RegionSizeCalculator sizeCalculator;
145
146    /**
147     * Exclude regions which contain rows starting with "b".
148     */
149    @Override
150    protected boolean includeRegionInSplit(final byte[] startKey, final byte[] endKey) {
151      if (
152        Bytes.compareTo(startKey, prefixEndKey) < 0 && (Bytes.compareTo(prefixStartKey, endKey) < 0
153          || Bytes.equals(endKey, HConstants.EMPTY_END_ROW))
154      ) {
155        return false;
156      } else {
157        return true;
158      }
159    }
160
161    @Override
162    protected void initializeTable(Connection connection, TableName tableName) throws IOException {
163      super.initializeTable(connection, tableName);
164      ConnectionForMergeTesting cft = (ConnectionForMergeTesting) connection;
165      sizeCalculator = cft.getRegionSizeCalculator();
166    }
167
168    @Override
169    protected RegionSizeCalculator createRegionSizeCalculator(RegionLocator locator, Admin admin)
170      throws IOException {
171      return sizeCalculator;
172    }
173  }
174
175  /**
176   * Connection class to use in {@link #testNonSuccessiveSplitsAreNotMerged}. This class returns
177   * mocked {@link Table}, {@link RegionLocator}, {@link RegionSizeCalculator}, and {@link Admin}.
178   */
179  private static class ConnectionForMergeTesting implements Connection {
180    public static final byte[][] SPLITS = new byte[][] { Bytes.toBytes("a"), Bytes.toBytes("b"),
181      Bytes.toBytes("c"), Bytes.toBytes("d"), Bytes.toBytes("e"), Bytes.toBytes("f"),
182      Bytes.toBytes("g"), Bytes.toBytes("h"), Bytes.toBytes("i"), Bytes.toBytes("j"),
183      Bytes.toBytes("k"), Bytes.toBytes("l"), Bytes.toBytes("m"), Bytes.toBytes("n"),
184      Bytes.toBytes("o"), Bytes.toBytes("p"), Bytes.toBytes("q"), Bytes.toBytes("r"),
185      Bytes.toBytes("s"), Bytes.toBytes("t"), Bytes.toBytes("u"), Bytes.toBytes("v"),
186      Bytes.toBytes("w"), Bytes.toBytes("x"), Bytes.toBytes("y"), Bytes.toBytes("z") };
187
188    public static final byte[][] START_KEYS;
189    public static final byte[][] END_KEYS;
190    static {
191      START_KEYS = new byte[SPLITS.length + 1][];
192      START_KEYS[0] = HConstants.EMPTY_BYTE_ARRAY;
193      for (int i = 0; i < SPLITS.length; i++) {
194        START_KEYS[i + 1] = SPLITS[i];
195      }
196
197      END_KEYS = new byte[SPLITS.length + 1][];
198      for (int i = 0; i < SPLITS.length; i++) {
199        END_KEYS[i] = SPLITS[i];
200      }
201      END_KEYS[SPLITS.length] = HConstants.EMPTY_BYTE_ARRAY;
202    }
203
204    public static final Map<byte[], Long> SIZE_MAP = new TreeMap<>(Bytes.BYTES_COMPARATOR);
205    static {
206      for (byte[] startKey : START_KEYS) {
207        SIZE_MAP.put(startKey, 1024L * 1024L * 1024L);
208      }
209      SIZE_MAP.put(Bytes.toBytes("a"), 200L * 1024L * 1024L);
210      SIZE_MAP.put(Bytes.toBytes("b"), 200L * 1024L * 1024L);
211      SIZE_MAP.put(Bytes.toBytes("c"), 200L * 1024L * 1024L);
212      SIZE_MAP.put(Bytes.toBytes("o"), 200L * 1024L * 1024L);
213      SIZE_MAP.put(Bytes.toBytes("p"), 200L * 1024L * 1024L);
214    }
215
216    ConnectionForMergeTesting(Configuration conf, ExecutorService pool, User user,
217      ConnectionRegistry registry, Map<String, byte[]> connectionAttributes) throws IOException {
218    }
219
220    @Override
221    public void abort(String why, Throwable e) {
222    }
223
224    @Override
225    public boolean isAborted() {
226      return false;
227    }
228
229    @Override
230    public Configuration getConfiguration() {
231      throw new UnsupportedOperationException();
232    }
233
234    @Override
235    public Table getTable(TableName tableName) throws IOException {
236      Table table = mock(Table.class);
237      when(table.getName()).thenReturn(tableName);
238      return table;
239    }
240
241    @Override
242    public Table getTable(TableName tableName, ExecutorService pool) throws IOException {
243      throw new UnsupportedOperationException();
244    }
245
246    @Override
247    public BufferedMutator getBufferedMutator(TableName tableName) throws IOException {
248      throw new UnsupportedOperationException();
249    }
250
251    @Override
252    public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
253      throw new UnsupportedOperationException();
254    }
255
256    @Override
257    public RegionLocator getRegionLocator(TableName tableName) throws IOException {
258      final Map<byte[], HRegionLocation> locationMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
259      for (byte[] startKey : START_KEYS) {
260        HRegionLocation hrl =
261          new HRegionLocation(RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).build(),
262            ServerName.valueOf("localhost", 0, 0));
263        locationMap.put(startKey, hrl);
264      }
265
266      RegionLocator locator = mock(RegionLocator.class);
267      when(locator.getRegionLocation(any(byte[].class), anyBoolean()))
268        .thenAnswer(new Answer<HRegionLocation>() {
269          @Override
270          public HRegionLocation answer(InvocationOnMock invocationOnMock) throws Throwable {
271            Object[] args = invocationOnMock.getArguments();
272            byte[] key = (byte[]) args[0];
273            return locationMap.get(key);
274          }
275        });
276      when(locator.getStartEndKeys())
277        .thenReturn(new Pair<byte[][], byte[][]>(START_KEYS, END_KEYS));
278      return locator;
279    }
280
281    public RegionSizeCalculator getRegionSizeCalculator() {
282      RegionSizeCalculator sizeCalculator = mock(RegionSizeCalculator.class);
283      when(sizeCalculator.getRegionSize(any(byte[].class))).thenAnswer(new Answer<Long>() {
284        @Override
285        public Long answer(InvocationOnMock invocationOnMock) throws Throwable {
286          Object[] args = invocationOnMock.getArguments();
287          byte[] regionId = (byte[]) args[0];
288          byte[] startKey = RegionInfo.getStartKey(regionId);
289          return SIZE_MAP.get(startKey);
290        }
291      });
292      return sizeCalculator;
293    }
294
295    @Override
296    public Admin getAdmin() throws IOException {
297      Admin admin = mock(Admin.class);
298      // return non-null admin to pass null checks
299      return admin;
300    }
301
302    @Override
303    public void close() throws IOException {
304    }
305
306    @Override
307    public boolean isClosed() {
308      return false;
309    }
310
311    @Override
312    public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) {
313      throw new UnsupportedOperationException();
314    }
315
316    @Override
317    public void clearRegionLocationCache() {
318    }
319
320    @Override
321    public AsyncConnection toAsyncConnection() {
322      throw new UnsupportedOperationException();
323    }
324
325    @Override
326    public String getClusterId() {
327      return null;
328    }
329  }
330}