001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertNull;
022import static org.junit.jupiter.api.Assertions.assertTrue;
023
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Arrays;
027import java.util.Collections;
028import java.util.Comparator;
029import java.util.List;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.HBaseConfiguration;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.client.Scan;
034import org.apache.hadoop.hbase.testclassification.SmallTests;
035import org.apache.hadoop.hbase.util.Bytes;
036import org.apache.hadoop.mapreduce.InputSplit;
037import org.apache.hadoop.mapreduce.JobContext;
038import org.junit.jupiter.api.Tag;
039import org.junit.jupiter.api.Test;
040import org.mockito.Mockito;
041
042/**
043 * Basic test of {@link RoundRobinTableInputFormat}; i.e. RRTIF.
044 */
045@Tag(SmallTests.TAG)
046public class TestRoundRobinTableInputFormat {
047
048  private static final int SERVERS_COUNT = 5;
049  private static final String[] KEYS = { "aa", "ab", "ac", "ad", "ae", "ba", "bb", "bc", "bd", "be",
050    "ca", "cb", "cc", "cd", "ce", "da", "db", "dc", "dd", "de", "ea", "eb", "ec", "ed", "ee", "fa",
051    "fb", "fc", "fd", "fe", "ga", "gb", "gc", "gd", "ge", "ha", "hb", "hc", "hd", "he", "ia", "ib",
052    "ic", "id", "ie", "ja", "jb", "jc", "jd", "je", "jf" };
053
054  /**
055   * Test default behavior.
056   */
057  @Test
058  public void testRoundRobinSplit() throws IOException, InterruptedException {
059    final List<InputSplit> splits = createSplits();
060    Collections.shuffle(splits);
061    List<InputSplit> sortedSplits = new RoundRobinTableInputFormat().roundRobin(splits);
062    testDistribution(sortedSplits);
063    // Now test that order is preserved even after being passed through the SplitComparator
064    // that sorts InputSplit by length as is done up in Hadoop in JobSubmitter.
065    List<InputSplit> copy = new ArrayList<>(sortedSplits);
066    Arrays.sort(copy.toArray(new InputSplit[0]), new SplitComparator());
067    // Assert the sort is retained even after passing through SplitComparator.
068    for (int i = 0; i < sortedSplits.size(); i++) {
069      TableSplit sortedTs = (TableSplit) sortedSplits.get(i);
070      TableSplit copyTs = (TableSplit) copy.get(i);
071      assertEquals(sortedTs.getEncodedRegionName(), copyTs.getEncodedRegionName());
072    }
073  }
074
075  /** Returns Splits made out of {@link #KEYS}. Splits are for five Servers. Length is ZERO! */
076  private List<InputSplit> createSplits() {
077    List<InputSplit> splits = new ArrayList<>(KEYS.length - 1);
078    for (int i = 0; i < KEYS.length - 1; i++) {
079      InputSplit split =
080        new TableSplit(TableName.valueOf("test"), new Scan(), Bytes.toBytes(KEYS[i]),
081          Bytes.toBytes(KEYS[i + 1]), String.valueOf(i % SERVERS_COUNT + 1), "", 0);
082      splits.add(split);
083    }
084    return splits;
085  }
086
087  private void testDistribution(List<InputSplit> list) throws IOException, InterruptedException {
088    for (int i = 0; i < KEYS.length / SERVERS_COUNT; i++) {
089      int[] counts = new int[SERVERS_COUNT];
090      for (int j = i * SERVERS_COUNT; j < i * SERVERS_COUNT + SERVERS_COUNT; j++) {
091        counts[Integer.parseInt(list.get(j).getLocations()[0]) - 1]++;
092      }
093      for (int value : counts) {
094        assertEquals(value, 1);
095      }
096    }
097  }
098
099  /**
100   * Private comparator copied from private JobSubmmiter Hadoop class...
101   * hadoop/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
102   * Used so we can run the sort done up in JobSubmitter here in tests.
103   */
104  private static class SplitComparator implements Comparator<InputSplit> {
105    @Override
106    public int compare(InputSplit o1, InputSplit o2) {
107      try {
108        return Long.compare(o1.getLength(), o2.getLength());
109      } catch (IOException | InterruptedException e) {
110        throw new RuntimeException("exception in compare", e);
111      }
112    }
113  }
114
115  /**
116   * Assert that lengths are descending. RRTIF writes lengths in descending order so any subsequent
117   * sort using dump SplitComparator as is done in JobSubmitter up in Hadoop keeps our RRTIF
118   * ordering.
119   */
120  private void assertLengthDescending(List<InputSplit> list)
121    throws IOException, InterruptedException {
122    long previousLength = Long.MAX_VALUE;
123    for (InputSplit is : list) {
124      long length = is.getLength();
125      assertTrue(previousLength > length, previousLength + " " + length);
126      previousLength = length;
127    }
128  }
129
130  /**
131   * Test that configure/unconfigure set and properly undo the HBASE_REGIONSIZECALCULATOR_ENABLE
132   * configuration.
133   */
134  @Test
135  public void testConfigureUnconfigure() {
136    Configuration configuration = HBaseConfiguration.create();
137    RoundRobinTableInputFormat rrtif = new RoundRobinTableInputFormat();
138    rrtif.setConf(configuration);
139    JobContext jobContext = Mockito.mock(JobContext.class);
140    Mockito.when(jobContext.getConfiguration()).thenReturn(configuration);
141    // Assert when done, HBASE_REGIONSIZECALCULATOR_ENABLE is still unset.
142    configuration.unset(RoundRobinTableInputFormat.HBASE_REGIONSIZECALCULATOR_ENABLE);
143    rrtif.configure();
144    rrtif.unconfigure();
145    String value = configuration.get(RoundRobinTableInputFormat.HBASE_REGIONSIZECALCULATOR_ENABLE);
146    assertNull(value);
147    // Assert HBASE_REGIONSIZECALCULATOR_ENABLE is still false when done.
148    checkRetainsBooleanValue(jobContext, rrtif, false);
149    // Assert HBASE_REGIONSIZECALCULATOR_ENABLE is still true when done.
150    checkRetainsBooleanValue(jobContext, rrtif, true);
151  }
152
153  private void checkRetainsBooleanValue(JobContext jobContext, RoundRobinTableInputFormat rrtif,
154    final boolean b) {
155    jobContext.getConfiguration()
156      .setBoolean(RoundRobinTableInputFormat.HBASE_REGIONSIZECALCULATOR_ENABLE, b);
157    rrtif.configure();
158    rrtif.unconfigure();
159    String value = jobContext.getConfiguration()
160      .get(RoundRobinTableInputFormat.HBASE_REGIONSIZECALCULATOR_ENABLE);
161    assertEquals(b, Boolean.valueOf(value));
162  }
163}