001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNull;
022import static org.junit.Assert.assertTrue;
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.Arrays;
026import java.util.Collections;
027import java.util.Comparator;
028import java.util.List;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.hbase.HBaseClassTestRule;
031import org.apache.hadoop.hbase.HBaseConfiguration;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.client.Scan;
034import org.apache.hadoop.hbase.testclassification.SmallTests;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.mapreduce.InputSplit;
037import org.apache.hadoop.mapreduce.JobContext;
038import org.junit.ClassRule;
039import org.junit.Test;
040import org.junit.experimental.categories.Category;
041import org.mockito.Mockito;
042
043/**
044 * Basic test of {@link RoundRobinTableInputFormat}; i.e. RRTIF.
045 */
046@Category({SmallTests.class})
047public class TestRoundRobinTableInputFormat {
048  @ClassRule
049  public static final HBaseClassTestRule CLASS_RULE =
050      HBaseClassTestRule.forClass(TestRoundRobinTableInputFormat.class);
051
052  private static final int SERVERS_COUNT = 5;
053  private static final String[] KEYS = {
054    "aa", "ab", "ac", "ad", "ae",
055    "ba", "bb", "bc", "bd", "be",
056    "ca", "cb", "cc", "cd", "ce",
057    "da", "db", "dc", "dd", "de",
058    "ea", "eb", "ec", "ed", "ee",
059    "fa", "fb", "fc", "fd", "fe",
060    "ga", "gb", "gc", "gd", "ge",
061    "ha", "hb", "hc", "hd", "he",
062    "ia", "ib", "ic", "id", "ie",
063    "ja", "jb", "jc", "jd", "je", "jf"
064  };
065
066  /**
067   * Test default behavior.
068   */
069  @Test
070  public void testRoundRobinSplit() throws IOException, InterruptedException {
071    final List<InputSplit> splits = createSplits();
072    Collections.shuffle(splits);
073    List<InputSplit> sortedSplits = new RoundRobinTableInputFormat().roundRobin(splits);
074    testDistribution(sortedSplits);
075    // Now test that order is preserved even after being passed through the SplitComparator
076    // that sorts InputSplit by length as is done up in Hadoop in JobSubmitter.
077    List<InputSplit> copy = new ArrayList<>(sortedSplits);
078    Arrays.sort(copy.toArray(new InputSplit[0]), new SplitComparator());
079    // Assert the sort is retained even after passing through SplitComparator.
080    for (int i = 0; i < sortedSplits.size(); i++) {
081      TableSplit sortedTs = (TableSplit)sortedSplits.get(i);
082      TableSplit copyTs = (TableSplit)copy.get(i);
083      assertEquals(sortedTs.getEncodedRegionName(), copyTs.getEncodedRegionName());
084    }
085  }
086
087  /**
088   * @return Splits made out of {@link #KEYS}. Splits are for five Servers. Length is ZERO!
089   */
090  private List<InputSplit> createSplits() {
091    List<InputSplit> splits = new ArrayList<>(KEYS.length - 1);
092    for (int i = 0; i < KEYS.length - 1; i++) {
093      InputSplit split = new TableSplit(TableName.valueOf("test"), new Scan(),
094        Bytes.toBytes(KEYS[i]), Bytes.toBytes(KEYS[i + 1]), String.valueOf(i % SERVERS_COUNT + 1),
095        "", 0);
096      splits.add(split);
097    }
098    return splits;
099  }
100
101  private void testDistribution(List<InputSplit> list) throws IOException, InterruptedException {
102    for (int i = 0; i < KEYS.length/SERVERS_COUNT; i++) {
103      int [] counts = new int[SERVERS_COUNT];
104      for (int j = i * SERVERS_COUNT; j < i * SERVERS_COUNT + SERVERS_COUNT; j++) {
105        counts[Integer.parseInt(list.get(j).getLocations()[0]) - 1]++;
106      }
107      for (int value : counts) {
108        assertEquals(value, 1);
109      }
110    }
111  }
112
113  /**
114   * Private comparator copied from private JobSubmmiter Hadoop class...
115   * hadoop/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
116   * Used so we can run the sort done up in JobSubmitter here in tests.
117   */
118  private static class SplitComparator implements Comparator<InputSplit> {
119    @Override
120    public int compare(InputSplit o1, InputSplit o2) {
121      try {
122        return Long.compare(o1.getLength(), o2.getLength());
123      } catch (IOException|InterruptedException e) {
124        throw new RuntimeException("exception in compare", e);
125      }
126    }
127  }
128
129  /**
130   * Assert that lengths are descending. RRTIF writes lengths in descending order so any
131   * subsequent sort using dump SplitComparator as is done in JobSubmitter up in Hadoop keeps
132   * our RRTIF ordering.
133   */
134  private void assertLengthDescending(List<InputSplit> list)
135    throws IOException, InterruptedException {
136    long previousLength = Long.MAX_VALUE;
137    for (InputSplit is: list) {
138      long length = is.getLength();
139      assertTrue(previousLength + " " + length, previousLength > length);
140      previousLength = length;
141    }
142  }
143
144  /**
145   * Test that configure/unconfigure set and properly undo the HBASE_REGIONSIZECALCULATOR_ENABLE
146   * configuration.
147   */
148  @Test
149  public void testConfigureUnconfigure() {
150    Configuration configuration = HBaseConfiguration.create();
151    RoundRobinTableInputFormat rrtif = new RoundRobinTableInputFormat();
152    rrtif.setConf(configuration);
153    JobContext jobContext = Mockito.mock(JobContext.class);
154    Mockito.when(jobContext.getConfiguration()).thenReturn(configuration);
155    // Assert when done, HBASE_REGIONSIZECALCULATOR_ENABLE is still unset.
156    configuration.unset(RoundRobinTableInputFormat.HBASE_REGIONSIZECALCULATOR_ENABLE);
157    rrtif.configure();
158    rrtif.unconfigure();
159    String value = configuration.get(RoundRobinTableInputFormat.HBASE_REGIONSIZECALCULATOR_ENABLE);
160    assertNull(value);
161    // Assert HBASE_REGIONSIZECALCULATOR_ENABLE is still false when done.
162    checkRetainsBooleanValue(jobContext, rrtif, false);
163    // Assert HBASE_REGIONSIZECALCULATOR_ENABLE is still true when done.
164    checkRetainsBooleanValue(jobContext, rrtif, true);
165  }
166
167  private void checkRetainsBooleanValue(JobContext jobContext, RoundRobinTableInputFormat rrtif,
168      final boolean b) {
169    jobContext.getConfiguration().
170      setBoolean(RoundRobinTableInputFormat.HBASE_REGIONSIZECALCULATOR_ENABLE, b);
171    rrtif.configure();
172    rrtif.unconfigure();
173    String value = jobContext.getConfiguration().
174      get(RoundRobinTableInputFormat.HBASE_REGIONSIZECALCULATOR_ENABLE);
175    assertEquals(b, Boolean.valueOf(value));
176  }
177}