001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.master.normalizer;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.Comparator;
025import java.util.List;
026import org.apache.hadoop.hbase.HBaseIOException;
027import org.apache.hadoop.hbase.RegionMetrics;
028import org.apache.hadoop.hbase.ServerName;
029import org.apache.hadoop.hbase.Size;
030import org.apache.hadoop.hbase.TableName;
031import org.apache.hadoop.hbase.client.MasterSwitchType;
032import org.apache.hadoop.hbase.client.RegionInfo;
033import org.apache.hadoop.hbase.client.TableDescriptor;
034import org.apache.hadoop.hbase.master.MasterRpcServices;
035import org.apache.hadoop.hbase.master.MasterServices;
036import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
042
043/**
044 * Simple implementation of region normalizer.
045 *
046 * Logic in use:
047 *
048 *  <ol>
049 *  <li> Get all regions of a given table
050 *  <li> Get avg size S of each region (by total size of store files reported in RegionMetrics)
051 *  <li> Seek every single region one by one. If a region R0 is bigger than S * 2, it is
052 *  kindly requested to split. Thereon evaluate the next region R1
053 *  <li> Otherwise, if R0 + R1 is smaller than S, R0 and R1 are kindly requested to merge.
054 *  Thereon evaluate the next region R2
055 *  <li> Otherwise, R1 is evaluated
056 * </ol>
057 * <p>
058 * Region sizes are coarse and approximate on the order of megabytes. Additionally,
059 * "empty" regions (less than 1MB, with the previous note) are not merged away. This
060 * is by design to prevent normalization from undoing the pre-splitting of a table.
061 */
062@InterfaceAudience.Private
063public class SimpleRegionNormalizer implements RegionNormalizer {
064
065  private static final Logger LOG = LoggerFactory.getLogger(SimpleRegionNormalizer.class);
066  private static final int MIN_REGION_COUNT = 3;
067  private MasterServices masterServices;
068  private MasterRpcServices masterRpcServices;
069  private static long[] skippedCount = new long[NormalizationPlan.PlanType.values().length];
070
071  /**
072   * Set the master service.
073   * @param masterServices inject instance of MasterServices
074   */
075  @Override
076  public void setMasterServices(MasterServices masterServices) {
077    this.masterServices = masterServices;
078  }
079
080  @Override
081  public void setMasterRpcServices(MasterRpcServices masterRpcServices) {
082    this.masterRpcServices = masterRpcServices;
083  }
084
085  @Override
086  public void planSkipped(RegionInfo hri, PlanType type) {
087    skippedCount[type.ordinal()]++;
088  }
089
090  @Override
091  public long getSkippedCount(NormalizationPlan.PlanType type) {
092    return skippedCount[type.ordinal()];
093  }
094
095  // Comparator that gives higher priority to region Split plan
096  private Comparator<NormalizationPlan> planComparator =
097      new Comparator<NormalizationPlan>() {
098    @Override
099    public int compare(NormalizationPlan plan, NormalizationPlan plan2) {
100      if (plan instanceof SplitNormalizationPlan) {
101        return -1;
102      }
103      if (plan2 instanceof SplitNormalizationPlan) {
104        return 1;
105      }
106      return 0;
107    }
108  };
109
110  /**
111   * Computes next most "urgent" normalization action on the table.
112   * Action may be either a split, or a merge, or no action.
113   *
114   * @param table table to normalize
115   * @return normalization plan to execute
116   */
117  @Override
118  public List<NormalizationPlan> computePlanForTable(TableName table) throws HBaseIOException {
119    if (table == null || table.isSystemTable()) {
120      LOG.debug("Normalization of system table " + table + " isn't allowed");
121      return null;
122    }
123    boolean splitEnabled = true, mergeEnabled = true;
124    try {
125      splitEnabled = masterRpcServices.isSplitOrMergeEnabled(null,
126        RequestConverter.buildIsSplitOrMergeEnabledRequest(MasterSwitchType.SPLIT)).getEnabled();
127    } catch (org.apache.hbase.thirdparty.com.google.protobuf.ServiceException e) {
128      LOG.debug("Unable to determine whether split is enabled", e);
129    }
130    try {
131      mergeEnabled = masterRpcServices.isSplitOrMergeEnabled(null,
132        RequestConverter.buildIsSplitOrMergeEnabledRequest(MasterSwitchType.MERGE)).getEnabled();
133    } catch (org.apache.hbase.thirdparty.com.google.protobuf.ServiceException e) {
134      LOG.debug("Unable to determine whether merge is enabled", e);
135    }
136    if (!mergeEnabled && !splitEnabled) {
137      LOG.debug("Both split and merge are disabled for table: " + table);
138      return null;
139    }
140    List<NormalizationPlan> plans = new ArrayList<>();
141    List<RegionInfo> tableRegions = masterServices.getAssignmentManager().getRegionStates().
142      getRegionsOfTable(table);
143
144    //TODO: should we make min number of regions a config param?
145    if (tableRegions == null || tableRegions.size() < MIN_REGION_COUNT) {
146      int nrRegions = tableRegions == null ? 0 : tableRegions.size();
147      LOG.debug("Table " + table + " has " + nrRegions + " regions, required min number"
148        + " of regions for normalizer to run is " + MIN_REGION_COUNT + ", not running normalizer");
149      return null;
150    }
151
152    LOG.debug("Computing normalization plan for table: " + table +
153      ", number of regions: " + tableRegions.size());
154
155    long totalSizeMb = 0;
156    int acutalRegionCnt = 0;
157
158    for (int i = 0; i < tableRegions.size(); i++) {
159      RegionInfo hri = tableRegions.get(i);
160      long regionSize = getRegionSize(hri);
161      if (regionSize > 0) {
162        acutalRegionCnt++;
163        totalSizeMb += regionSize;
164      }
165    }
166    int targetRegionCount = -1;
167    long targetRegionSize = -1;
168    try {
169      TableDescriptor tableDescriptor = masterServices.getTableDescriptors().get(table);
170      if(tableDescriptor != null) {
171        targetRegionCount =
172            tableDescriptor.getNormalizerTargetRegionCount();
173        targetRegionSize =
174            tableDescriptor.getNormalizerTargetRegionSize();
175        LOG.debug("Table {}:  target region count is {}, target region size is {}", table,
176            targetRegionCount, targetRegionSize);
177      }
178    } catch (IOException e) {
179      LOG.warn(
180        "cannot get the target number and target size of table {}, they will be default value -1.",
181        table);
182    }
183
184    double avgRegionSize;
185    if (targetRegionSize > 0) {
186      avgRegionSize = targetRegionSize;
187    } else if (targetRegionCount > 0) {
188      avgRegionSize = totalSizeMb / (double) targetRegionCount;
189    } else {
190      avgRegionSize = acutalRegionCnt == 0 ? 0 : totalSizeMb / (double) acutalRegionCnt;
191    }
192
193    LOG.debug("Table " + table + ", total aggregated regions size: " + totalSizeMb);
194    LOG.debug("Table " + table + ", average region size: " + avgRegionSize);
195
196    int candidateIdx = 0;
197    while (candidateIdx < tableRegions.size()) {
198      RegionInfo hri = tableRegions.get(candidateIdx);
199      long regionSize = getRegionSize(hri);
200      // if the region is > 2 times larger than average, we split it, split
201      // is more high priority normalization action than merge.
202      if (regionSize > 2 * avgRegionSize) {
203        if (splitEnabled) {
204          LOG.info("Table " + table + ", large region " + hri.getRegionNameAsString() + " has size "
205              + regionSize + ", more than twice avg size, splitting");
206          plans.add(new SplitNormalizationPlan(hri, null));
207        }
208      } else {
209        if (candidateIdx == tableRegions.size()-1) {
210          break;
211        }
212        if (mergeEnabled) {
213          RegionInfo hri2 = tableRegions.get(candidateIdx+1);
214          long regionSize2 = getRegionSize(hri2);
215          if (regionSize >= 0 && regionSize2 >= 0 && regionSize + regionSize2 < avgRegionSize) {
216            LOG.info("Table " + table + ", small region size: " + regionSize
217              + " plus its neighbor size: " + regionSize2
218              + ", less than the avg size " + avgRegionSize + ", merging them");
219            plans.add(new MergeNormalizationPlan(hri, hri2));
220            candidateIdx++;
221          }
222        }
223      }
224      candidateIdx++;
225    }
226    if (plans.isEmpty()) {
227      LOG.debug("No normalization needed, regions look good for table: " + table);
228      return null;
229    }
230    Collections.sort(plans, planComparator);
231    return plans;
232  }
233
234  private long getRegionSize(RegionInfo hri) {
235    ServerName sn = masterServices.getAssignmentManager().getRegionStates().
236      getRegionServerOfRegion(hri);
237    RegionMetrics regionLoad = masterServices.getServerManager().getLoad(sn).
238      getRegionMetrics().get(hri.getRegionName());
239    if (regionLoad == null) {
240      LOG.debug(hri.getRegionNameAsString() + " was not found in RegionsLoad");
241      return -1;
242    }
243    return (long) regionLoad.getStoreFileSize().get(Size.Unit.MEGABYTE);
244  }
245}