001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.HashMap; 023import java.util.Iterator; 024import java.util.List; 025import java.util.Map; 026import org.apache.commons.lang3.StringUtils; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.HBaseConfiguration; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.mapreduce.InputSplit; 031import org.apache.hadoop.mapreduce.JobContext; 032import org.apache.hadoop.mapreduce.JobID; 033import org.apache.hadoop.mapreduce.task.JobContextImpl; 034import org.apache.yetus.audience.InterfaceAudience; 035 036/** 037 * Process the return from super-class {@link TableInputFormat} (TIF) so as to undo any clumping of 038 * {@link InputSplit}s around RegionServers. Spread splits broadly to distribute read-load over 039 * RegionServers in the cluster. The super-class TIF returns splits in hbase:meta table order. 040 * Adjacent or near-adjacent hbase:meta Regions can be hosted on the same RegionServer -- nothing 041 * prevents this. This hbase:maeta ordering of InputSplit placement can be lumpy making it so some 042 * RegionServers end up hosting lots of InputSplit scans while contemporaneously other RegionServers 043 * host few or none. This class does a pass over the return from the super-class to better spread 044 * the load. See the below helpful Flipkart blog post for a description and from where the base of 045 * this code comes from (with permission). 046 * @see https://tech.flipkart.com/is-data-locality-always-out-of-the-box-in-hadoop-not-really-2ae9c95163cb 047 */ 048@InterfaceAudience.Public 049public class RoundRobinTableInputFormat extends TableInputFormat { 050 private Boolean hbaseRegionsizecalculatorEnableOriginalValue = null; 051 /** 052 * Boolean config for whether superclass should produce InputSplits with 'lengths'. If true, TIF 053 * will query every RegionServer to get the 'size' of all involved Regions and this 'size' will be 054 * used the the InputSplit length. If false, we skip this query and the super-classes returned 055 * InputSplits will have lenghths of zero. This override will set the flag to false. All returned 056 * lengths will be zero. Makes it so sorting on 'length' becomes a noop. The sort returned by this 057 * override will prevail. Thats what we want. 058 */ 059 static String HBASE_REGIONSIZECALCULATOR_ENABLE = "hbase.regionsizecalculator.enable"; 060 061 @Override 062 public List<InputSplit> getSplits(JobContext context) throws IOException { 063 try { 064 // Do a round robin on what we get back from the super-class. 065 configure(); 066 return roundRobin(getSuperSplits(context)); 067 } finally { 068 unconfigure(); 069 } 070 } 071 072 /** 073 * Call super-classes' getSplits. Have it out here as its own method so can be overridden. 074 */ 075 List<InputSplit> getSuperSplits(JobContext context) throws IOException { 076 return super.getSplits(context); 077 } 078 079 /** 080 * Spread the splits list so as to avoid clumping on RegionServers. Order splits so every server 081 * gets one split before a server gets a second, and so on; i.e. round-robin the splits amongst 082 * the servers in the cluster. 083 */ 084 List<InputSplit> roundRobin(List<InputSplit> inputs) throws IOException { 085 if ((inputs == null) || inputs.isEmpty()) { 086 return inputs; 087 } 088 List<InputSplit> result = new ArrayList<>(inputs.size()); 089 // Prepare a hashmap with each region server as key and list of Input Splits as value 090 Map<String, List<InputSplit>> regionServerSplits = new HashMap<>(); 091 for (InputSplit is : inputs) { 092 if (is instanceof TableSplit) { 093 String regionServer = ((TableSplit) is).getRegionLocation(); 094 if (regionServer != null && !StringUtils.isBlank(regionServer)) { 095 regionServerSplits.computeIfAbsent(regionServer, k -> new ArrayList<>()).add(is); 096 continue; 097 } 098 } 099 // If TableSplit or region server not found, add it anyways. 100 result.add(is); 101 } 102 // Write out splits in a manner that spreads splits for a RegionServer to avoid 'clumping'. 103 while (!regionServerSplits.isEmpty()) { 104 Iterator<List<InputSplit>> iter = regionServerSplits.values().iterator(); 105 while (iter.hasNext()) { 106 List<InputSplit> inputSplitListForRegion = iter.next(); 107 if (!inputSplitListForRegion.isEmpty()) { 108 result.add(inputSplitListForRegion.remove(0)); 109 } 110 if (inputSplitListForRegion.isEmpty()) { 111 iter.remove(); 112 } 113 } 114 } 115 return result; 116 } 117 118 /** 119 * Adds a configuration to the Context disabling remote rpc'ing to figure Region size when 120 * calculating InputSplits. See up in super-class TIF where we rpc to every server to find the 121 * size of all involved Regions. Here we disable this super-class action. This means InputSplits 122 * will have a length of zero. If all InputSplits have zero-length InputSplits, the ordering done 123 * in here will 'pass-through' Hadoop's length-first sort. The superclass TIF will ask every node 124 * for the current size of each of the participating Table Regions. It does this because it wants 125 * to schedule the biggest Regions first (This fixation comes of hadoop itself -- see JobSubmitter 126 * where it sorts inputs by size). This extra diligence takes time and is of no utility in this 127 * RRTIF where spread is of more import than size-first. Also, if a rolling restart is happening 128 * when we go to launch the job, the job launch may fail because the request for Region size fails 129 * -- even after retries -- because rolled RegionServer may take a while to come online: e.g. it 130 * takes java 90 seconds to allocate a 160G. RegionServer is offline during this time. The job 131 * launch will fail with 'Connection rejected'. So, we set 'hbase.regionsizecalculator.enable' to 132 * false here in RRTIF. 133 * @see #unconfigure() 134 */ 135 void configure() { 136 if (getConf().get(HBASE_REGIONSIZECALCULATOR_ENABLE) != null) { 137 this.hbaseRegionsizecalculatorEnableOriginalValue = 138 getConf().getBoolean(HBASE_REGIONSIZECALCULATOR_ENABLE, true); 139 } 140 getConf().setBoolean(HBASE_REGIONSIZECALCULATOR_ENABLE, false); 141 } 142 143 /** 144 * @see #configure() 145 */ 146 void unconfigure() { 147 if (this.hbaseRegionsizecalculatorEnableOriginalValue == null) { 148 getConf().unset(HBASE_REGIONSIZECALCULATOR_ENABLE); 149 } else { 150 getConf().setBoolean(HBASE_REGIONSIZECALCULATOR_ENABLE, 151 this.hbaseRegionsizecalculatorEnableOriginalValue); 152 } 153 } 154 155 /** 156 * Pass table name as argument. Set the zk ensemble to use with the System property 157 * 'hbase.zookeeper.quorum' 158 */ 159 public static void main(String[] args) throws IOException { 160 TableInputFormat tif = new RoundRobinTableInputFormat(); 161 final Configuration configuration = HBaseConfiguration.create(); 162 configuration.setBoolean("hbase.regionsizecalculator.enable", false); 163 configuration.set(HConstants.ZOOKEEPER_QUORUM, 164 System.getProperty(HConstants.ZOOKEEPER_QUORUM, "localhost")); 165 configuration.set(TableInputFormat.INPUT_TABLE, args[0]); 166 tif.setConf(configuration); 167 List<InputSplit> splits = tif.getSplits(new JobContextImpl(configuration, new JobID())); 168 for (InputSplit split : splits) { 169 System.out.println(split); 170 } 171 } 172}