001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.Assert.*;
021import static org.mockito.Mockito.any;
022import static org.mockito.Mockito.anyBoolean;
023import static org.mockito.Mockito.mock;
024import static org.mockito.Mockito.when;
025
026import java.io.IOException;
027import java.net.Inet6Address;
028import java.net.InetAddress;
029import java.net.UnknownHostException;
030import java.util.Map;
031import java.util.TreeMap;
032import java.util.concurrent.ExecutorService;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseConfiguration;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.HRegionLocation;
038import org.apache.hadoop.hbase.ServerName;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.client.Admin;
041import org.apache.hadoop.hbase.client.BufferedMutator;
042import org.apache.hadoop.hbase.client.BufferedMutatorParams;
043import org.apache.hadoop.hbase.client.ClusterConnection;
044import org.apache.hadoop.hbase.client.Connection;
045import org.apache.hadoop.hbase.client.RegionInfo;
046import org.apache.hadoop.hbase.client.RegionInfoBuilder;
047import org.apache.hadoop.hbase.client.RegionLocator;
048import org.apache.hadoop.hbase.client.Table;
049import org.apache.hadoop.hbase.client.TableBuilder;
050import org.apache.hadoop.hbase.security.User;
051import org.apache.hadoop.hbase.testclassification.SmallTests;
052import org.apache.hadoop.hbase.util.Bytes;
053import org.apache.hadoop.hbase.util.Pair;
054import org.apache.hadoop.mapreduce.JobContext;
055import org.junit.ClassRule;
056import org.junit.Test;
057import org.junit.experimental.categories.Category;
058import org.mockito.invocation.InvocationOnMock;
059import org.mockito.stubbing.Answer;
060
061@Category({SmallTests.class})
062public class TestTableInputFormatBase {
063
064  @ClassRule
065  public static final HBaseClassTestRule CLASS_RULE =
066      HBaseClassTestRule.forClass(TestTableInputFormatBase.class);
067
068  @Test
069  public void testTableInputFormatBaseReverseDNSForIPv6()
070      throws UnknownHostException {
071    String address = "ipv6.google.com";
072    String localhost = null;
073    InetAddress addr = null;
074    TableInputFormat inputFormat = new TableInputFormat();
075    try {
076      localhost = InetAddress.getByName(address).getCanonicalHostName();
077      addr = Inet6Address.getByName(address);
078    } catch (UnknownHostException e) {
079      // google.com is down, we can probably forgive this test.
080      return;
081    }
082    System.out.println("Should retrun the hostname for this host " +
083        localhost + " addr : " + addr);
084    String actualHostName = inputFormat.reverseDNS(addr);
085    assertEquals("Should retrun the hostname for this host. Expected : " +
086        localhost + " Actual : " + actualHostName, localhost, actualHostName);
087  }
088
089  @Test
090  public void testNonSuccessiveSplitsAreNotMerged() throws IOException {
091    JobContext context = mock(JobContext.class);
092    Configuration conf = HBaseConfiguration.create();
093    conf.set(ClusterConnection.HBASE_CLIENT_CONNECTION_IMPL,
094        ConnectionForMergeTesting.class.getName());
095    conf.set(TableInputFormat.INPUT_TABLE, "testTable");
096    conf.setBoolean(TableInputFormatBase.MAPREDUCE_INPUT_AUTOBALANCE, true);
097    when(context.getConfiguration()).thenReturn(conf);
098
099    TableInputFormat tifExclude = new TableInputFormatForMergeTesting();
100    tifExclude.setConf(conf);
101    // split["b", "c"] is excluded, split["o", "p"] and split["p", "q"] are merged,
102    // but split["a", "b"] and split["c", "d"] are not merged.
103    assertEquals(ConnectionForMergeTesting.START_KEYS.length - 1 - 1,
104        tifExclude.getSplits(context).size());
105  }
106
107  /**
108   * Subclass of {@link TableInputFormat} to use in {@link #testNonSuccessiveSplitsAreNotMerged}.
109   * This class overrides {@link TableInputFormatBase#includeRegionInSplit}
110   * to exclude specific splits.
111   */
112  private static class TableInputFormatForMergeTesting extends TableInputFormat {
113    private byte[] prefixStartKey = Bytes.toBytes("b");
114    private byte[] prefixEndKey = Bytes.toBytes("c");
115    private RegionSizeCalculator sizeCalculator;
116
117    /**
118     * Exclude regions which contain rows starting with "b".
119     */
120    @Override
121    protected boolean includeRegionInSplit(final byte[] startKey, final byte [] endKey) {
122      if (Bytes.compareTo(startKey, prefixEndKey) < 0
123          && (Bytes.compareTo(prefixStartKey, endKey) < 0
124              || Bytes.equals(endKey, HConstants.EMPTY_END_ROW))) {
125        return false;
126      } else {
127        return true;
128      }
129    }
130
131    @Override
132    protected void initializeTable(Connection connection, TableName tableName) throws IOException {
133      super.initializeTable(connection, tableName);
134      ConnectionForMergeTesting cft = (ConnectionForMergeTesting) connection;
135      sizeCalculator = cft.getRegionSizeCalculator();
136    }
137
138    @Override
139    protected RegionSizeCalculator createRegionSizeCalculator(RegionLocator locator, Admin admin)
140      throws IOException {
141      return sizeCalculator;
142    }
143  }
144
145  /**
146   * Connection class to use in {@link #testNonSuccessiveSplitsAreNotMerged}.
147   * This class returns mocked {@link Table}, {@link RegionLocator}, {@link RegionSizeCalculator},
148   * and {@link Admin}.
149   */
150  private static class ConnectionForMergeTesting implements Connection {
151    public static final byte[][] SPLITS = new byte[][] {
152      Bytes.toBytes("a"), Bytes.toBytes("b"), Bytes.toBytes("c"), Bytes.toBytes("d"),
153      Bytes.toBytes("e"), Bytes.toBytes("f"), Bytes.toBytes("g"), Bytes.toBytes("h"),
154      Bytes.toBytes("i"), Bytes.toBytes("j"), Bytes.toBytes("k"), Bytes.toBytes("l"),
155      Bytes.toBytes("m"), Bytes.toBytes("n"), Bytes.toBytes("o"), Bytes.toBytes("p"),
156      Bytes.toBytes("q"), Bytes.toBytes("r"), Bytes.toBytes("s"), Bytes.toBytes("t"),
157      Bytes.toBytes("u"), Bytes.toBytes("v"), Bytes.toBytes("w"), Bytes.toBytes("x"),
158      Bytes.toBytes("y"), Bytes.toBytes("z")
159    };
160
161    public static final byte[][] START_KEYS;
162    public static final byte[][] END_KEYS;
163    static {
164      START_KEYS = new byte[SPLITS.length + 1][];
165      START_KEYS[0] = HConstants.EMPTY_BYTE_ARRAY;
166      for (int i = 0; i < SPLITS.length; i++) {
167        START_KEYS[i + 1] = SPLITS[i];
168      }
169
170      END_KEYS = new byte[SPLITS.length + 1][];
171      for (int i = 0; i < SPLITS.length; i++) {
172        END_KEYS[i] = SPLITS[i];
173      }
174      END_KEYS[SPLITS.length] = HConstants.EMPTY_BYTE_ARRAY;
175    }
176
177    public static final Map<byte[], Long> SIZE_MAP = new TreeMap<>(Bytes.BYTES_COMPARATOR);
178    static {
179      for (byte[] startKey : START_KEYS) {
180        SIZE_MAP.put(startKey, 1024L * 1024L * 1024L);
181      }
182      SIZE_MAP.put(Bytes.toBytes("a"), 200L * 1024L * 1024L);
183      SIZE_MAP.put(Bytes.toBytes("b"), 200L * 1024L * 1024L);
184      SIZE_MAP.put(Bytes.toBytes("c"), 200L * 1024L * 1024L);
185      SIZE_MAP.put(Bytes.toBytes("o"), 200L * 1024L * 1024L);
186      SIZE_MAP.put(Bytes.toBytes("p"), 200L * 1024L * 1024L);
187    }
188
189    ConnectionForMergeTesting(Configuration conf, ExecutorService pool, User user)
190        throws IOException {
191    }
192
193    @Override
194    public void abort(String why, Throwable e) {
195    }
196
197    @Override
198    public boolean isAborted() {
199      return false;
200    }
201
202    @Override
203    public Configuration getConfiguration() {
204      throw new UnsupportedOperationException();
205    }
206
207    @Override
208    public Table getTable(TableName tableName) throws IOException {
209      Table table = mock(Table.class);
210      when(table.getName()).thenReturn(tableName);
211      return table;
212    }
213
214    @Override
215    public Table getTable(TableName tableName, ExecutorService pool) throws IOException {
216      throw new UnsupportedOperationException();
217    }
218
219    @Override
220    public BufferedMutator getBufferedMutator(TableName tableName) throws IOException {
221      throw new UnsupportedOperationException();
222    }
223
224    @Override
225    public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
226      throw new UnsupportedOperationException();
227    }
228
229    @Override
230    public RegionLocator getRegionLocator(TableName tableName) throws IOException {
231      final Map<byte[], HRegionLocation> locationMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
232      for (byte[] startKey : START_KEYS) {
233        HRegionLocation hrl = new HRegionLocation(
234            RegionInfoBuilder.newBuilder(tableName).setStartKey(startKey).build(),
235            ServerName.valueOf("localhost", 0, 0));
236        locationMap.put(startKey, hrl);
237      }
238
239      RegionLocator locator = mock(RegionLocator.class);
240      when(locator.getRegionLocation(any(byte [].class), anyBoolean())).
241        thenAnswer(new Answer<HRegionLocation>() {
242          @Override
243          public HRegionLocation answer(InvocationOnMock invocationOnMock) throws Throwable {
244            Object [] args = invocationOnMock.getArguments();
245            byte [] key = (byte [])args[0];
246            return locationMap.get(key);
247          }
248        });
249      when(locator.getStartEndKeys()).
250        thenReturn(new Pair<byte[][], byte[][]>(START_KEYS, END_KEYS));
251      return locator;
252    }
253
254    public RegionSizeCalculator getRegionSizeCalculator() {
255      RegionSizeCalculator sizeCalculator = mock(RegionSizeCalculator.class);
256      when(sizeCalculator.getRegionSize(any(byte[].class))).
257        thenAnswer(new Answer<Long>() {
258          @Override
259          public Long answer(InvocationOnMock invocationOnMock) throws Throwable {
260            Object [] args = invocationOnMock.getArguments();
261            byte [] regionId = (byte [])args[0];
262            byte[] startKey = RegionInfo.getStartKey(regionId);
263            return SIZE_MAP.get(startKey);
264          }
265        });
266      return sizeCalculator;
267    }
268
269    @Override
270    public Admin getAdmin() throws IOException {
271      Admin admin = mock(Admin.class);
272      // return non-null admin to pass null checks
273      return admin;
274    }
275
276    @Override
277    public void close() throws IOException {
278    }
279
280    @Override
281    public boolean isClosed() {
282      return false;
283    }
284
285    @Override
286    public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) {
287      throw new UnsupportedOperationException();
288    }
289
290    @Override
291    public void clearRegionLocationCache() {
292    }
293  }
294}