001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.HashMap;
023import java.util.Iterator;
024import java.util.List;
025import java.util.Map;
026import org.apache.commons.lang3.StringUtils;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.HBaseConfiguration;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.hadoop.mapreduce.InputSplit;
031import org.apache.hadoop.mapreduce.JobContext;
032import org.apache.hadoop.mapreduce.JobID;
033import org.apache.hadoop.mapreduce.task.JobContextImpl;
034import org.apache.yetus.audience.InterfaceAudience;
035
036/**
037 * Process the return from super-class {@link TableInputFormat} (TIF) so as to undo any clumping of
038 * {@link InputSplit}s around RegionServers. Spread splits broadly to distribute read-load over
039 * RegionServers in the cluster. The super-class TIF returns splits in hbase:meta table order.
040 * Adjacent or near-adjacent hbase:meta Regions can be hosted on the same RegionServer -- nothing
041 * prevents this. This hbase:maeta ordering of InputSplit placement can be lumpy making it so some
042 * RegionServers end up hosting lots of InputSplit scans while contemporaneously other RegionServers
043 * host few or none. This class does a pass over the return from the super-class to better spread
044 * the load. See the below helpful Flipkart blog post for a description and from where the base of
045 * this code comes from (with permission).
046 * @see https://tech.flipkart.com/is-data-locality-always-out-of-the-box-in-hadoop-not-really-2ae9c95163cb
047 */
048@InterfaceAudience.Public
049public class RoundRobinTableInputFormat extends TableInputFormat {
050  private Boolean hbaseRegionsizecalculatorEnableOriginalValue = null;
051  /**
052   * Boolean config for whether superclass should produce InputSplits with 'lengths'. If true, TIF
053   * will query every RegionServer to get the 'size' of all involved Regions and this 'size' will be
054   * used the the InputSplit length. If false, we skip this query and the super-classes returned
055   * InputSplits will have lenghths of zero. This override will set the flag to false. All returned
056   * lengths will be zero. Makes it so sorting on 'length' becomes a noop. The sort returned by this
057   * override will prevail. Thats what we want.
058   */
059  static String HBASE_REGIONSIZECALCULATOR_ENABLE = "hbase.regionsizecalculator.enable";
060
061  @Override
062  public List<InputSplit> getSplits(JobContext context) throws IOException {
063    try {
064      // Do a round robin on what we get back from the super-class.
065      configure();
066      return roundRobin(getSuperSplits(context));
067    } finally {
068      unconfigure();
069    }
070  }
071
072  /**
073   * Call super-classes' getSplits. Have it out here as its own method so can be overridden.
074   */
075  List<InputSplit> getSuperSplits(JobContext context) throws IOException {
076    return super.getSplits(context);
077  }
078
079  /**
080   * Spread the splits list so as to avoid clumping on RegionServers. Order splits so every server
081   * gets one split before a server gets a second, and so on; i.e. round-robin the splits amongst
082   * the servers in the cluster.
083   */
084  List<InputSplit> roundRobin(List<InputSplit> inputs) throws IOException {
085    if ((inputs == null) || inputs.isEmpty()) {
086      return inputs;
087    }
088    List<InputSplit> result = new ArrayList<>(inputs.size());
089    // Prepare a hashmap with each region server as key and list of Input Splits as value
090    Map<String, List<InputSplit>> regionServerSplits = new HashMap<>();
091    for (InputSplit is : inputs) {
092      if (is instanceof TableSplit) {
093        String regionServer = ((TableSplit) is).getRegionLocation();
094        if (regionServer != null && !StringUtils.isBlank(regionServer)) {
095          regionServerSplits.computeIfAbsent(regionServer, k -> new ArrayList<>()).add(is);
096          continue;
097        }
098      }
099      // If TableSplit or region server not found, add it anyways.
100      result.add(is);
101    }
102    // Write out splits in a manner that spreads splits for a RegionServer to avoid 'clumping'.
103    while (!regionServerSplits.isEmpty()) {
104      Iterator<List<InputSplit>> iter = regionServerSplits.values().iterator();
105      while (iter.hasNext()) {
106        List<InputSplit> inputSplitListForRegion = iter.next();
107        if (!inputSplitListForRegion.isEmpty()) {
108          result.add(inputSplitListForRegion.remove(0));
109        }
110        if (inputSplitListForRegion.isEmpty()) {
111          iter.remove();
112        }
113      }
114    }
115    return result;
116  }
117
118  /**
119   * Adds a configuration to the Context disabling remote rpc'ing to figure Region size when
120   * calculating InputSplits. See up in super-class TIF where we rpc to every server to find the
121   * size of all involved Regions. Here we disable this super-class action. This means InputSplits
122   * will have a length of zero. If all InputSplits have zero-length InputSplits, the ordering done
123   * in here will 'pass-through' Hadoop's length-first sort. The superclass TIF will ask every node
124   * for the current size of each of the participating Table Regions. It does this because it wants
125   * to schedule the biggest Regions first (This fixation comes of hadoop itself -- see JobSubmitter
126   * where it sorts inputs by size). This extra diligence takes time and is of no utility in this
127   * RRTIF where spread is of more import than size-first. Also, if a rolling restart is happening
128   * when we go to launch the job, the job launch may fail because the request for Region size fails
129   * -- even after retries -- because rolled RegionServer may take a while to come online: e.g. it
130   * takes java 90 seconds to allocate a 160G. RegionServer is offline during this time. The job
131   * launch will fail with 'Connection rejected'. So, we set 'hbase.regionsizecalculator.enable' to
132   * false here in RRTIF.
133   * @see #unconfigure()
134   */
135  void configure() {
136    if (getConf().get(HBASE_REGIONSIZECALCULATOR_ENABLE) != null) {
137      this.hbaseRegionsizecalculatorEnableOriginalValue =
138        getConf().getBoolean(HBASE_REGIONSIZECALCULATOR_ENABLE, true);
139    }
140    getConf().setBoolean(HBASE_REGIONSIZECALCULATOR_ENABLE, false);
141  }
142
143  /**
144   * @see #configure()
145   */
146  void unconfigure() {
147    if (this.hbaseRegionsizecalculatorEnableOriginalValue == null) {
148      getConf().unset(HBASE_REGIONSIZECALCULATOR_ENABLE);
149    } else {
150      getConf().setBoolean(HBASE_REGIONSIZECALCULATOR_ENABLE,
151        this.hbaseRegionsizecalculatorEnableOriginalValue);
152    }
153  }
154
155  /**
156   * Pass table name as argument. Set the zk ensemble to use with the System property
157   * 'hbase.zookeeper.quorum'
158   */
159  public static void main(String[] args) throws IOException {
160    TableInputFormat tif = new RoundRobinTableInputFormat();
161    final Configuration configuration = HBaseConfiguration.create();
162    configuration.setBoolean("hbase.regionsizecalculator.enable", false);
163    configuration.set(HConstants.ZOOKEEPER_QUORUM,
164      System.getProperty(HConstants.ZOOKEEPER_QUORUM, "localhost"));
165    configuration.set(TableInputFormat.INPUT_TABLE, args[0]);
166    tif.setConf(configuration);
167    List<InputSplit> splits = tif.getSplits(new JobContextImpl(configuration, new JobID()));
168    for (InputSplit split : splits) {
169      System.out.println(split);
170    }
171  }
172}