001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNull;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Arrays;
027import java.util.Collections;
028import java.util.Comparator;
029import java.util.List;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.HBaseClassTestRule;
032import org.apache.hadoop.hbase.HBaseConfiguration;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.Scan;
035import org.apache.hadoop.hbase.testclassification.SmallTests;
036import org.apache.hadoop.hbase.util.Bytes;
037import org.apache.hadoop.mapreduce.InputSplit;
038import org.apache.hadoop.mapreduce.JobContext;
039import org.junit.ClassRule;
040import org.junit.Test;
041import org.junit.experimental.categories.Category;
042import org.mockito.Mockito;
043
044/**
045 * Basic test of {@link RoundRobinTableInputFormat}; i.e. RRTIF.
046 */
047@Category({ SmallTests.class })
048public class TestRoundRobinTableInputFormat {
049  @ClassRule
050  public static final HBaseClassTestRule CLASS_RULE =
051    HBaseClassTestRule.forClass(TestRoundRobinTableInputFormat.class);
052
053  private static final int SERVERS_COUNT = 5;
054  private static final String[] KEYS = { "aa", "ab", "ac", "ad", "ae", "ba", "bb", "bc", "bd", "be",
055    "ca", "cb", "cc", "cd", "ce", "da", "db", "dc", "dd", "de", "ea", "eb", "ec", "ed", "ee", "fa",
056    "fb", "fc", "fd", "fe", "ga", "gb", "gc", "gd", "ge", "ha", "hb", "hc", "hd", "he", "ia", "ib",
057    "ic", "id", "ie", "ja", "jb", "jc", "jd", "je", "jf" };
058
059  /**
060   * Test default behavior.
061   */
062  @Test
063  public void testRoundRobinSplit() throws IOException, InterruptedException {
064    final List<InputSplit> splits = createSplits();
065    Collections.shuffle(splits);
066    List<InputSplit> sortedSplits = new RoundRobinTableInputFormat().roundRobin(splits);
067    testDistribution(sortedSplits);
068    // Now test that order is preserved even after being passed through the SplitComparator
069    // that sorts InputSplit by length as is done up in Hadoop in JobSubmitter.
070    List<InputSplit> copy = new ArrayList<>(sortedSplits);
071    Arrays.sort(copy.toArray(new InputSplit[0]), new SplitComparator());
072    // Assert the sort is retained even after passing through SplitComparator.
073    for (int i = 0; i < sortedSplits.size(); i++) {
074      TableSplit sortedTs = (TableSplit) sortedSplits.get(i);
075      TableSplit copyTs = (TableSplit) copy.get(i);
076      assertEquals(sortedTs.getEncodedRegionName(), copyTs.getEncodedRegionName());
077    }
078  }
079
080  /** Returns Splits made out of {@link #KEYS}. Splits are for five Servers. Length is ZERO! */
081  private List<InputSplit> createSplits() {
082    List<InputSplit> splits = new ArrayList<>(KEYS.length - 1);
083    for (int i = 0; i < KEYS.length - 1; i++) {
084      InputSplit split =
085        new TableSplit(TableName.valueOf("test"), new Scan(), Bytes.toBytes(KEYS[i]),
086          Bytes.toBytes(KEYS[i + 1]), String.valueOf(i % SERVERS_COUNT + 1), "", 0);
087      splits.add(split);
088    }
089    return splits;
090  }
091
092  private void testDistribution(List<InputSplit> list) throws IOException, InterruptedException {
093    for (int i = 0; i < KEYS.length / SERVERS_COUNT; i++) {
094      int[] counts = new int[SERVERS_COUNT];
095      for (int j = i * SERVERS_COUNT; j < i * SERVERS_COUNT + SERVERS_COUNT; j++) {
096        counts[Integer.parseInt(list.get(j).getLocations()[0]) - 1]++;
097      }
098      for (int value : counts) {
099        assertEquals(value, 1);
100      }
101    }
102  }
103
104  /**
105   * Private comparator copied from private JobSubmmiter Hadoop class...
106   * hadoop/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
107   * Used so we can run the sort done up in JobSubmitter here in tests.
108   */
109  private static class SplitComparator implements Comparator<InputSplit> {
110    @Override
111    public int compare(InputSplit o1, InputSplit o2) {
112      try {
113        return Long.compare(o1.getLength(), o2.getLength());
114      } catch (IOException | InterruptedException e) {
115        throw new RuntimeException("exception in compare", e);
116      }
117    }
118  }
119
120  /**
121   * Assert that lengths are descending. RRTIF writes lengths in descending order so any subsequent
122   * sort using dump SplitComparator as is done in JobSubmitter up in Hadoop keeps our RRTIF
123   * ordering.
124   */
125  private void assertLengthDescending(List<InputSplit> list)
126    throws IOException, InterruptedException {
127    long previousLength = Long.MAX_VALUE;
128    for (InputSplit is : list) {
129      long length = is.getLength();
130      assertTrue(previousLength + " " + length, previousLength > length);
131      previousLength = length;
132    }
133  }
134
135  /**
136   * Test that configure/unconfigure set and properly undo the HBASE_REGIONSIZECALCULATOR_ENABLE
137   * configuration.
138   */
139  @Test
140  public void testConfigureUnconfigure() {
141    Configuration configuration = HBaseConfiguration.create();
142    RoundRobinTableInputFormat rrtif = new RoundRobinTableInputFormat();
143    rrtif.setConf(configuration);
144    JobContext jobContext = Mockito.mock(JobContext.class);
145    Mockito.when(jobContext.getConfiguration()).thenReturn(configuration);
146    // Assert when done, HBASE_REGIONSIZECALCULATOR_ENABLE is still unset.
147    configuration.unset(RoundRobinTableInputFormat.HBASE_REGIONSIZECALCULATOR_ENABLE);
148    rrtif.configure();
149    rrtif.unconfigure();
150    String value = configuration.get(RoundRobinTableInputFormat.HBASE_REGIONSIZECALCULATOR_ENABLE);
151    assertNull(value);
152    // Assert HBASE_REGIONSIZECALCULATOR_ENABLE is still false when done.
153    checkRetainsBooleanValue(jobContext, rrtif, false);
154    // Assert HBASE_REGIONSIZECALCULATOR_ENABLE is still true when done.
155    checkRetainsBooleanValue(jobContext, rrtif, true);
156  }
157
158  private void checkRetainsBooleanValue(JobContext jobContext, RoundRobinTableInputFormat rrtif,
159    final boolean b) {
160    jobContext.getConfiguration()
161      .setBoolean(RoundRobinTableInputFormat.HBASE_REGIONSIZECALCULATOR_ENABLE, b);
162    rrtif.configure();
163    rrtif.unconfigure();
164    String value = jobContext.getConfiguration()
165      .get(RoundRobinTableInputFormat.HBASE_REGIONSIZECALCULATOR_ENABLE);
166    assertEquals(b, Boolean.valueOf(value));
167  }
168}