001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertNull; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Arrays; 027import java.util.Collections; 028import java.util.Comparator; 029import java.util.List; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.HBaseConfiguration; 032import org.apache.hadoop.hbase.TableName; 033import org.apache.hadoop.hbase.client.Scan; 034import org.apache.hadoop.hbase.testclassification.SmallTests; 035import org.apache.hadoop.hbase.util.Bytes; 036import org.apache.hadoop.mapreduce.InputSplit; 037import org.apache.hadoop.mapreduce.JobContext; 038import org.junit.jupiter.api.Tag; 039import org.junit.jupiter.api.Test; 040import org.mockito.Mockito; 041 042/** 043 * Basic test of {@link RoundRobinTableInputFormat}; i.e. RRTIF. 044 */ 045@Tag(SmallTests.TAG) 046public class TestRoundRobinTableInputFormat { 047 048 private static final int SERVERS_COUNT = 5; 049 private static final String[] KEYS = { "aa", "ab", "ac", "ad", "ae", "ba", "bb", "bc", "bd", "be", 050 "ca", "cb", "cc", "cd", "ce", "da", "db", "dc", "dd", "de", "ea", "eb", "ec", "ed", "ee", "fa", 051 "fb", "fc", "fd", "fe", "ga", "gb", "gc", "gd", "ge", "ha", "hb", "hc", "hd", "he", "ia", "ib", 052 "ic", "id", "ie", "ja", "jb", "jc", "jd", "je", "jf" }; 053 054 /** 055 * Test default behavior. 056 */ 057 @Test 058 public void testRoundRobinSplit() throws IOException, InterruptedException { 059 final List<InputSplit> splits = createSplits(); 060 Collections.shuffle(splits); 061 List<InputSplit> sortedSplits = new RoundRobinTableInputFormat().roundRobin(splits); 062 testDistribution(sortedSplits); 063 // Now test that order is preserved even after being passed through the SplitComparator 064 // that sorts InputSplit by length as is done up in Hadoop in JobSubmitter. 065 List<InputSplit> copy = new ArrayList<>(sortedSplits); 066 Arrays.sort(copy.toArray(new InputSplit[0]), new SplitComparator()); 067 // Assert the sort is retained even after passing through SplitComparator. 068 for (int i = 0; i < sortedSplits.size(); i++) { 069 TableSplit sortedTs = (TableSplit) sortedSplits.get(i); 070 TableSplit copyTs = (TableSplit) copy.get(i); 071 assertEquals(sortedTs.getEncodedRegionName(), copyTs.getEncodedRegionName()); 072 } 073 } 074 075 /** Returns Splits made out of {@link #KEYS}. Splits are for five Servers. Length is ZERO! */ 076 private List<InputSplit> createSplits() { 077 List<InputSplit> splits = new ArrayList<>(KEYS.length - 1); 078 for (int i = 0; i < KEYS.length - 1; i++) { 079 InputSplit split = 080 new TableSplit(TableName.valueOf("test"), new Scan(), Bytes.toBytes(KEYS[i]), 081 Bytes.toBytes(KEYS[i + 1]), String.valueOf(i % SERVERS_COUNT + 1), "", 0); 082 splits.add(split); 083 } 084 return splits; 085 } 086 087 private void testDistribution(List<InputSplit> list) throws IOException, InterruptedException { 088 for (int i = 0; i < KEYS.length / SERVERS_COUNT; i++) { 089 int[] counts = new int[SERVERS_COUNT]; 090 for (int j = i * SERVERS_COUNT; j < i * SERVERS_COUNT + SERVERS_COUNT; j++) { 091 counts[Integer.parseInt(list.get(j).getLocations()[0]) - 1]++; 092 } 093 for (int value : counts) { 094 assertEquals(value, 1); 095 } 096 } 097 } 098 099 /** 100 * Private comparator copied from private JobSubmmiter Hadoop class... 101 * hadoop/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java 102 * Used so we can run the sort done up in JobSubmitter here in tests. 103 */ 104 private static class SplitComparator implements Comparator<InputSplit> { 105 @Override 106 public int compare(InputSplit o1, InputSplit o2) { 107 try { 108 return Long.compare(o1.getLength(), o2.getLength()); 109 } catch (IOException | InterruptedException e) { 110 throw new RuntimeException("exception in compare", e); 111 } 112 } 113 } 114 115 /** 116 * Assert that lengths are descending. RRTIF writes lengths in descending order so any subsequent 117 * sort using dump SplitComparator as is done in JobSubmitter up in Hadoop keeps our RRTIF 118 * ordering. 119 */ 120 private void assertLengthDescending(List<InputSplit> list) 121 throws IOException, InterruptedException { 122 long previousLength = Long.MAX_VALUE; 123 for (InputSplit is : list) { 124 long length = is.getLength(); 125 assertTrue(previousLength > length, previousLength + " " + length); 126 previousLength = length; 127 } 128 } 129 130 /** 131 * Test that configure/unconfigure set and properly undo the HBASE_REGIONSIZECALCULATOR_ENABLE 132 * configuration. 133 */ 134 @Test 135 public void testConfigureUnconfigure() { 136 Configuration configuration = HBaseConfiguration.create(); 137 RoundRobinTableInputFormat rrtif = new RoundRobinTableInputFormat(); 138 rrtif.setConf(configuration); 139 JobContext jobContext = Mockito.mock(JobContext.class); 140 Mockito.when(jobContext.getConfiguration()).thenReturn(configuration); 141 // Assert when done, HBASE_REGIONSIZECALCULATOR_ENABLE is still unset. 142 configuration.unset(RoundRobinTableInputFormat.HBASE_REGIONSIZECALCULATOR_ENABLE); 143 rrtif.configure(); 144 rrtif.unconfigure(); 145 String value = configuration.get(RoundRobinTableInputFormat.HBASE_REGIONSIZECALCULATOR_ENABLE); 146 assertNull(value); 147 // Assert HBASE_REGIONSIZECALCULATOR_ENABLE is still false when done. 148 checkRetainsBooleanValue(jobContext, rrtif, false); 149 // Assert HBASE_REGIONSIZECALCULATOR_ENABLE is still true when done. 150 checkRetainsBooleanValue(jobContext, rrtif, true); 151 } 152 153 private void checkRetainsBooleanValue(JobContext jobContext, RoundRobinTableInputFormat rrtif, 154 final boolean b) { 155 jobContext.getConfiguration() 156 .setBoolean(RoundRobinTableInputFormat.HBASE_REGIONSIZECALCULATOR_ENABLE, b); 157 rrtif.configure(); 158 rrtif.unconfigure(); 159 String value = jobContext.getConfiguration() 160 .get(RoundRobinTableInputFormat.HBASE_REGIONSIZECALCULATOR_ENABLE); 161 assertEquals(b, Boolean.valueOf(value)); 162 } 163}