001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNull; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Arrays; 027import java.util.Collections; 028import java.util.Comparator; 029import java.util.List; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.HBaseClassTestRule; 032import org.apache.hadoop.hbase.HBaseConfiguration; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.client.Scan; 035import org.apache.hadoop.hbase.testclassification.SmallTests; 036import org.apache.hadoop.hbase.util.Bytes; 037import org.apache.hadoop.mapreduce.InputSplit; 038import org.apache.hadoop.mapreduce.JobContext; 039import org.junit.ClassRule; 040import org.junit.Test; 041import org.junit.experimental.categories.Category; 042import org.mockito.Mockito; 043 044/** 045 * Basic test of {@link RoundRobinTableInputFormat}; i.e. RRTIF. 046 */ 047@Category({ SmallTests.class }) 048public class TestRoundRobinTableInputFormat { 049 @ClassRule 050 public static final HBaseClassTestRule CLASS_RULE = 051 HBaseClassTestRule.forClass(TestRoundRobinTableInputFormat.class); 052 053 private static final int SERVERS_COUNT = 5; 054 private static final String[] KEYS = { "aa", "ab", "ac", "ad", "ae", "ba", "bb", "bc", "bd", "be", 055 "ca", "cb", "cc", "cd", "ce", "da", "db", "dc", "dd", "de", "ea", "eb", "ec", "ed", "ee", "fa", 056 "fb", "fc", "fd", "fe", "ga", "gb", "gc", "gd", "ge", "ha", "hb", "hc", "hd", "he", "ia", "ib", 057 "ic", "id", "ie", "ja", "jb", "jc", "jd", "je", "jf" }; 058 059 /** 060 * Test default behavior. 061 */ 062 @Test 063 public void testRoundRobinSplit() throws IOException, InterruptedException { 064 final List<InputSplit> splits = createSplits(); 065 Collections.shuffle(splits); 066 List<InputSplit> sortedSplits = new RoundRobinTableInputFormat().roundRobin(splits); 067 testDistribution(sortedSplits); 068 // Now test that order is preserved even after being passed through the SplitComparator 069 // that sorts InputSplit by length as is done up in Hadoop in JobSubmitter. 070 List<InputSplit> copy = new ArrayList<>(sortedSplits); 071 Arrays.sort(copy.toArray(new InputSplit[0]), new SplitComparator()); 072 // Assert the sort is retained even after passing through SplitComparator. 073 for (int i = 0; i < sortedSplits.size(); i++) { 074 TableSplit sortedTs = (TableSplit) sortedSplits.get(i); 075 TableSplit copyTs = (TableSplit) copy.get(i); 076 assertEquals(sortedTs.getEncodedRegionName(), copyTs.getEncodedRegionName()); 077 } 078 } 079 080 /** Returns Splits made out of {@link #KEYS}. Splits are for five Servers. Length is ZERO! */ 081 private List<InputSplit> createSplits() { 082 List<InputSplit> splits = new ArrayList<>(KEYS.length - 1); 083 for (int i = 0; i < KEYS.length - 1; i++) { 084 InputSplit split = 085 new TableSplit(TableName.valueOf("test"), new Scan(), Bytes.toBytes(KEYS[i]), 086 Bytes.toBytes(KEYS[i + 1]), String.valueOf(i % SERVERS_COUNT + 1), "", 0); 087 splits.add(split); 088 } 089 return splits; 090 } 091 092 private void testDistribution(List<InputSplit> list) throws IOException, InterruptedException { 093 for (int i = 0; i < KEYS.length / SERVERS_COUNT; i++) { 094 int[] counts = new int[SERVERS_COUNT]; 095 for (int j = i * SERVERS_COUNT; j < i * SERVERS_COUNT + SERVERS_COUNT; j++) { 096 counts[Integer.parseInt(list.get(j).getLocations()[0]) - 1]++; 097 } 098 for (int value : counts) { 099 assertEquals(value, 1); 100 } 101 } 102 } 103 104 /** 105 * Private comparator copied from private JobSubmmiter Hadoop class... 106 * hadoop/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java 107 * Used so we can run the sort done up in JobSubmitter here in tests. 108 */ 109 private static class SplitComparator implements Comparator<InputSplit> { 110 @Override 111 public int compare(InputSplit o1, InputSplit o2) { 112 try { 113 return Long.compare(o1.getLength(), o2.getLength()); 114 } catch (IOException | InterruptedException e) { 115 throw new RuntimeException("exception in compare", e); 116 } 117 } 118 } 119 120 /** 121 * Assert that lengths are descending. RRTIF writes lengths in descending order so any subsequent 122 * sort using dump SplitComparator as is done in JobSubmitter up in Hadoop keeps our RRTIF 123 * ordering. 124 */ 125 private void assertLengthDescending(List<InputSplit> list) 126 throws IOException, InterruptedException { 127 long previousLength = Long.MAX_VALUE; 128 for (InputSplit is : list) { 129 long length = is.getLength(); 130 assertTrue(previousLength + " " + length, previousLength > length); 131 previousLength = length; 132 } 133 } 134 135 /** 136 * Test that configure/unconfigure set and properly undo the HBASE_REGIONSIZECALCULATOR_ENABLE 137 * configuration. 138 */ 139 @Test 140 public void testConfigureUnconfigure() { 141 Configuration configuration = HBaseConfiguration.create(); 142 RoundRobinTableInputFormat rrtif = new RoundRobinTableInputFormat(); 143 rrtif.setConf(configuration); 144 JobContext jobContext = Mockito.mock(JobContext.class); 145 Mockito.when(jobContext.getConfiguration()).thenReturn(configuration); 146 // Assert when done, HBASE_REGIONSIZECALCULATOR_ENABLE is still unset. 147 configuration.unset(RoundRobinTableInputFormat.HBASE_REGIONSIZECALCULATOR_ENABLE); 148 rrtif.configure(); 149 rrtif.unconfigure(); 150 String value = configuration.get(RoundRobinTableInputFormat.HBASE_REGIONSIZECALCULATOR_ENABLE); 151 assertNull(value); 152 // Assert HBASE_REGIONSIZECALCULATOR_ENABLE is still false when done. 153 checkRetainsBooleanValue(jobContext, rrtif, false); 154 // Assert HBASE_REGIONSIZECALCULATOR_ENABLE is still true when done. 155 checkRetainsBooleanValue(jobContext, rrtif, true); 156 } 157 158 private void checkRetainsBooleanValue(JobContext jobContext, RoundRobinTableInputFormat rrtif, 159 final boolean b) { 160 jobContext.getConfiguration() 161 .setBoolean(RoundRobinTableInputFormat.HBASE_REGIONSIZECALCULATOR_ENABLE, b); 162 rrtif.configure(); 163 rrtif.unconfigure(); 164 String value = jobContext.getConfiguration() 165 .get(RoundRobinTableInputFormat.HBASE_REGIONSIZECALCULATOR_ENABLE); 166 assertEquals(b, Boolean.valueOf(value)); 167 } 168}