001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.master.normalizer;
020
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.Comparator;
024import java.util.List;
025import org.apache.hadoop.hbase.HBaseIOException;
026import org.apache.hadoop.hbase.RegionMetrics;
027import org.apache.hadoop.hbase.ServerName;
028import org.apache.hadoop.hbase.Size;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.client.MasterSwitchType;
031import org.apache.hadoop.hbase.client.RegionInfo;
032import org.apache.hadoop.hbase.master.MasterRpcServices;
033import org.apache.hadoop.hbase.master.MasterServices;
034import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
040
041/**
042 * Simple implementation of region normalizer.
043 *
044 * Logic in use:
045 *
046 *  <ol>
047 *  <li> Get all regions of a given table
048 *  <li> Get avg size S of each region (by total size of store files reported in RegionMetrics)
049 *  <li> Seek every single region one by one. If a region R0 is bigger than S * 2, it is
050 *  kindly requested to split. Thereon evaluate the next region R1
051 *  <li> Otherwise, if R0 + R1 is smaller than S, R0 and R1 are kindly requested to merge.
052 *  Thereon evaluate the next region R2
053 *  <li> Otherwise, R1 is evaluated
054 * </ol>
055 * <p>
056 * Region sizes are coarse and approximate on the order of megabytes. Additionally,
057 * "empty" regions (less than 1MB, with the previous note) are not merged away. This
058 * is by design to prevent normalization from undoing the pre-splitting of a table.
059 */
060@InterfaceAudience.Private
061public class SimpleRegionNormalizer implements RegionNormalizer {
062
063  private static final Logger LOG = LoggerFactory.getLogger(SimpleRegionNormalizer.class);
064  private static final int MIN_REGION_COUNT = 3;
065  private MasterServices masterServices;
066  private MasterRpcServices masterRpcServices;
067  private static long[] skippedCount = new long[NormalizationPlan.PlanType.values().length];
068
069  /**
070   * Set the master service.
071   * @param masterServices inject instance of MasterServices
072   */
073  @Override
074  public void setMasterServices(MasterServices masterServices) {
075    this.masterServices = masterServices;
076  }
077
078  @Override
079  public void setMasterRpcServices(MasterRpcServices masterRpcServices) {
080    this.masterRpcServices = masterRpcServices;
081  }
082
083  @Override
084  public void planSkipped(RegionInfo hri, PlanType type) {
085    skippedCount[type.ordinal()]++;
086  }
087
088  @Override
089  public long getSkippedCount(NormalizationPlan.PlanType type) {
090    return skippedCount[type.ordinal()];
091  }
092
093  // Comparator that gives higher priority to region Split plan
094  private Comparator<NormalizationPlan> planComparator =
095      new Comparator<NormalizationPlan>() {
096    @Override
097    public int compare(NormalizationPlan plan, NormalizationPlan plan2) {
098      if (plan instanceof SplitNormalizationPlan) {
099        return -1;
100      }
101      if (plan2 instanceof SplitNormalizationPlan) {
102        return 1;
103      }
104      return 0;
105    }
106  };
107
108  /**
109   * Computes next most "urgent" normalization action on the table.
110   * Action may be either a split, or a merge, or no action.
111   *
112   * @param table table to normalize
113   * @return normalization plan to execute
114   */
115  @Override
116  public List<NormalizationPlan> computePlanForTable(TableName table) throws HBaseIOException {
117    if (table == null || table.isSystemTable()) {
118      LOG.debug("Normalization of system table " + table + " isn't allowed");
119      return null;
120    }
121
122    List<NormalizationPlan> plans = new ArrayList<>();
123    List<RegionInfo> tableRegions = masterServices.getAssignmentManager().getRegionStates().
124      getRegionsOfTable(table);
125
126    //TODO: should we make min number of regions a config param?
127    if (tableRegions == null || tableRegions.size() < MIN_REGION_COUNT) {
128      int nrRegions = tableRegions == null ? 0 : tableRegions.size();
129      LOG.debug("Table " + table + " has " + nrRegions + " regions, required min number"
130        + " of regions for normalizer to run is " + MIN_REGION_COUNT + ", not running normalizer");
131      return null;
132    }
133
134    LOG.debug("Computing normalization plan for table: " + table +
135      ", number of regions: " + tableRegions.size());
136
137    long totalSizeMb = 0;
138    int acutalRegionCnt = 0;
139
140    for (int i = 0; i < tableRegions.size(); i++) {
141      RegionInfo hri = tableRegions.get(i);
142      long regionSize = getRegionSize(hri);
143      if (regionSize > 0) {
144        acutalRegionCnt++;
145        totalSizeMb += regionSize;
146      }
147    }
148
149    double avgRegionSize = acutalRegionCnt == 0 ? 0 : totalSizeMb / (double) acutalRegionCnt;
150
151    LOG.debug("Table " + table + ", total aggregated regions size: " + totalSizeMb);
152    LOG.debug("Table " + table + ", average region size: " + avgRegionSize);
153
154    int candidateIdx = 0;
155    boolean splitEnabled = true, mergeEnabled = true;
156    try {
157      splitEnabled = masterRpcServices.isSplitOrMergeEnabled(null,
158        RequestConverter.buildIsSplitOrMergeEnabledRequest(MasterSwitchType.SPLIT)).getEnabled();
159    } catch (org.apache.hbase.thirdparty.com.google.protobuf.ServiceException e) {
160      LOG.debug("Unable to determine whether split is enabled", e);
161    }
162    try {
163      mergeEnabled = masterRpcServices.isSplitOrMergeEnabled(null,
164        RequestConverter.buildIsSplitOrMergeEnabledRequest(MasterSwitchType.MERGE)).getEnabled();
165    } catch (org.apache.hbase.thirdparty.com.google.protobuf.ServiceException e) {
166      LOG.debug("Unable to determine whether split is enabled", e);
167    }
168    while (candidateIdx < tableRegions.size()) {
169      RegionInfo hri = tableRegions.get(candidateIdx);
170      long regionSize = getRegionSize(hri);
171      // if the region is > 2 times larger than average, we split it, split
172      // is more high priority normalization action than merge.
173      if (regionSize > 2 * avgRegionSize) {
174        if (splitEnabled) {
175          LOG.info("Table " + table + ", large region " + hri.getRegionNameAsString() + " has size "
176              + regionSize + ", more than twice avg size, splitting");
177          plans.add(new SplitNormalizationPlan(hri, null));
178        }
179      } else {
180        if (candidateIdx == tableRegions.size()-1) {
181          break;
182        }
183        if (mergeEnabled) {
184          RegionInfo hri2 = tableRegions.get(candidateIdx+1);
185          long regionSize2 = getRegionSize(hri2);
186          if (regionSize >= 0 && regionSize2 >= 0 && regionSize + regionSize2 < avgRegionSize) {
187            LOG.info("Table " + table + ", small region size: " + regionSize
188              + " plus its neighbor size: " + regionSize2
189              + ", less than the avg size " + avgRegionSize + ", merging them");
190            plans.add(new MergeNormalizationPlan(hri, hri2));
191            candidateIdx++;
192          }
193        }
194      }
195      candidateIdx++;
196    }
197    if (plans.isEmpty()) {
198      LOG.debug("No normalization needed, regions look good for table: " + table);
199      return null;
200    }
201    Collections.sort(plans, planComparator);
202    return plans;
203  }
204
205  private long getRegionSize(RegionInfo hri) {
206    ServerName sn = masterServices.getAssignmentManager().getRegionStates().
207      getRegionServerOfRegion(hri);
208    RegionMetrics regionLoad = masterServices.getServerManager().getLoad(sn).
209      getRegionMetrics().get(hri.getRegionName());
210    if (regionLoad == null) {
211      LOG.debug(hri.getRegionNameAsString() + " was not found in RegionsLoad");
212      return -1;
213    }
214    return (long) regionLoad.getStoreFileSize().get(Size.Unit.MEGABYTE);
215  }
216}