001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.normalizer;
019
020import java.io.IOException;
021import java.time.Instant;
022import java.time.Period;
023import java.util.ArrayList;
024import java.util.Collections;
025import java.util.List;
026import java.util.Objects;
027import java.util.function.BooleanSupplier;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HBaseInterfaceAudience;
030import org.apache.hadoop.hbase.RegionMetrics;
031import org.apache.hadoop.hbase.ServerName;
032import org.apache.hadoop.hbase.Size;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.client.MasterSwitchType;
035import org.apache.hadoop.hbase.client.RegionInfo;
036import org.apache.hadoop.hbase.client.TableDescriptor;
037import org.apache.hadoop.hbase.master.MasterServices;
038import org.apache.hadoop.hbase.master.RegionState;
039import org.apache.hadoop.hbase.master.assignment.RegionStates;
040import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
041import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
042import org.apache.yetus.audience.InterfaceAudience;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
046
047/**
048 * Simple implementation of region normalizer. Logic in use:
049 * <ol>
050 *   <li>Get all regions of a given table</li>
051 *   <li>Get avg size S of the regions in the table (by total size of store files reported in
052 *     RegionMetrics)</li>
053 *   <li>For each region R0, if R0 is bigger than S * 2, it is kindly requested to split.</li>
054 *   <li>Otherwise, for the next region in the chain R1, if R0 + R1 is smaller then S, R0 and R1
055 *     are kindly requested to merge.</li>
056 * </ol>
057 * <p>
058 * The following parameters are configurable:
059 * <ol>
060 *   <li>Whether to split a region as part of normalization. Configuration:
061 *     {@value #SPLIT_ENABLED_KEY}, default: {@value #DEFAULT_SPLIT_ENABLED}.</li>
062 *   <li>Whether to merge a region as part of normalization. Configuration:
063 *     {@value #MERGE_ENABLED_KEY}, default: {@value #DEFAULT_MERGE_ENABLED}.</li>
064 *   <li>The minimum number of regions in a table to consider it for merge normalization.
065 *     Configuration: {@value #MIN_REGION_COUNT_KEY}, default:
066 *     {@value #DEFAULT_MIN_REGION_COUNT}.</li>
067 *   <li>The minimum age for a region to be considered for a merge, in days. Configuration:
068 *     {@value #MERGE_MIN_REGION_AGE_DAYS_KEY}, default:
069 *     {@value #DEFAULT_MERGE_MIN_REGION_AGE_DAYS}.</li>
070 *   <li>The minimum size for a region to be considered for a merge, in whole MBs. Configuration:
071 *     {@value #MERGE_MIN_REGION_SIZE_MB_KEY}, default:
072 *     {@value #DEFAULT_MERGE_MIN_REGION_SIZE_MB}.</li>
073 * </ol>
074 * <p>
075 * To see detailed logging of the application of these configuration values, set the log level for
076 * this class to `TRACE`.
077 */
078@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
079public class SimpleRegionNormalizer implements RegionNormalizer {
080  private static final Logger LOG = LoggerFactory.getLogger(SimpleRegionNormalizer.class);
081
082  static final String SPLIT_ENABLED_KEY = "hbase.normalizer.split.enabled";
083  static final boolean DEFAULT_SPLIT_ENABLED = true;
084  static final String MERGE_ENABLED_KEY = "hbase.normalizer.merge.enabled";
085  static final boolean DEFAULT_MERGE_ENABLED = true;
086  // TODO: after HBASE-24416, `min.region.count` only applies to merge plans; should
087  //  deprecate/rename the configuration key.
088  static final String MIN_REGION_COUNT_KEY = "hbase.normalizer.min.region.count";
089  static final int DEFAULT_MIN_REGION_COUNT = 3;
090  static final String MERGE_MIN_REGION_AGE_DAYS_KEY = "hbase.normalizer.merge.min_region_age.days";
091  static final int DEFAULT_MERGE_MIN_REGION_AGE_DAYS = 3;
092  static final String MERGE_MIN_REGION_SIZE_MB_KEY = "hbase.normalizer.merge.min_region_size.mb";
093  static final int DEFAULT_MERGE_MIN_REGION_SIZE_MB = 1;
094
095  private final long[] skippedCount;
096  private Configuration conf;
097  private MasterServices masterServices;
098  private boolean splitEnabled;
099  private boolean mergeEnabled;
100  private int minRegionCount;
101  private Period mergeMinRegionAge;
102  private int mergeMinRegionSizeMb;
103
104  public SimpleRegionNormalizer() {
105    skippedCount = new long[NormalizationPlan.PlanType.values().length];
106    splitEnabled = DEFAULT_SPLIT_ENABLED;
107    mergeEnabled = DEFAULT_MERGE_ENABLED;
108    minRegionCount = DEFAULT_MIN_REGION_COUNT;
109    mergeMinRegionAge = Period.ofDays(DEFAULT_MERGE_MIN_REGION_AGE_DAYS);
110    mergeMinRegionSizeMb = DEFAULT_MERGE_MIN_REGION_SIZE_MB;
111  }
112
113  @Override
114  public Configuration getConf() {
115    return conf;
116  }
117
118  @Override
119  public void setConf(final Configuration conf) {
120    if (conf == null) {
121      return;
122    }
123    this.conf = conf;
124    splitEnabled = conf.getBoolean(SPLIT_ENABLED_KEY, DEFAULT_SPLIT_ENABLED);
125    mergeEnabled = conf.getBoolean(MERGE_ENABLED_KEY, DEFAULT_MERGE_ENABLED);
126    minRegionCount = parseMinRegionCount(conf);
127    mergeMinRegionAge = parseMergeMinRegionAge(conf);
128    mergeMinRegionSizeMb = parseMergeMinRegionSizeMb(conf);
129  }
130
131  private static int parseMinRegionCount(final Configuration conf) {
132    final int parsedValue = conf.getInt(MIN_REGION_COUNT_KEY, DEFAULT_MIN_REGION_COUNT);
133    final int settledValue = Math.max(1, parsedValue);
134    if (parsedValue != settledValue) {
135      warnInvalidValue(MIN_REGION_COUNT_KEY, parsedValue, settledValue);
136    }
137    return settledValue;
138  }
139
140  private static Period parseMergeMinRegionAge(final Configuration conf) {
141    final int parsedValue =
142      conf.getInt(MERGE_MIN_REGION_AGE_DAYS_KEY, DEFAULT_MERGE_MIN_REGION_AGE_DAYS);
143    final int settledValue = Math.max(0, parsedValue);
144    if (parsedValue != settledValue) {
145      warnInvalidValue(MERGE_MIN_REGION_AGE_DAYS_KEY, parsedValue, settledValue);
146    }
147    return Period.ofDays(settledValue);
148  }
149
150  private static int parseMergeMinRegionSizeMb(final Configuration conf) {
151    final int parsedValue =
152      conf.getInt(MERGE_MIN_REGION_SIZE_MB_KEY, DEFAULT_MERGE_MIN_REGION_SIZE_MB);
153    final int settledValue = Math.max(0, parsedValue);
154    if (parsedValue != settledValue) {
155      warnInvalidValue(MERGE_MIN_REGION_SIZE_MB_KEY, parsedValue, settledValue);
156    }
157    return settledValue;
158  }
159
160  private static <T> void warnInvalidValue(final String key, final T parsedValue,
161    final T settledValue) {
162    LOG.warn("Configured value {}={} is invalid. Setting value to {}.",
163      key, parsedValue, settledValue);
164  }
165
166  /**
167   * Return this instance's configured value for {@value #SPLIT_ENABLED_KEY}.
168   */
169  public boolean isSplitEnabled() {
170    return splitEnabled;
171  }
172
173  /**
174   * Return this instance's configured value for {@value #MERGE_ENABLED_KEY}.
175   */
176  public boolean isMergeEnabled() {
177    return mergeEnabled;
178  }
179
180  /**
181   * Return this instance's configured value for {@value #MIN_REGION_COUNT_KEY}.
182   */
183  public int getMinRegionCount() {
184    return minRegionCount;
185  }
186
187  /**
188   * Return this instance's configured value for {@value #MERGE_MIN_REGION_AGE_DAYS_KEY}.
189   */
190  public Period getMergeMinRegionAge() {
191    return mergeMinRegionAge;
192  }
193
194  /**
195   * Return this instance's configured value for {@value #MERGE_MIN_REGION_SIZE_MB_KEY}.
196   */
197  public int getMergeMinRegionSizeMb() {
198    return mergeMinRegionSizeMb;
199  }
200
201  @Override
202  public void setMasterServices(final MasterServices masterServices) {
203    this.masterServices = masterServices;
204  }
205
206  @Override
207  public void planSkipped(final RegionInfo hri, final PlanType type) {
208    skippedCount[type.ordinal()]++;
209  }
210
211  @Override
212  public long getSkippedCount(NormalizationPlan.PlanType type) {
213    return skippedCount[type.ordinal()];
214  }
215
216  @Override
217  public List<NormalizationPlan> computePlansForTable(final TableName table) {
218    if (table == null) {
219      return Collections.emptyList();
220    }
221    if (table.isSystemTable()) {
222      LOG.debug("Normalization of system table {} isn't allowed", table);
223      return Collections.emptyList();
224    }
225
226    final boolean proceedWithSplitPlanning = proceedWithSplitPlanning();
227    final boolean proceedWithMergePlanning = proceedWithMergePlanning();
228    if (!proceedWithMergePlanning && !proceedWithSplitPlanning) {
229      LOG.debug("Both split and merge are disabled. Skipping normalization of table: {}", table);
230      return Collections.emptyList();
231    }
232
233    final NormalizeContext ctx = new NormalizeContext(table);
234    if (CollectionUtils.isEmpty(ctx.getTableRegions())) {
235      return Collections.emptyList();
236    }
237
238    LOG.debug("Computing normalization plan for table:  {}, number of regions: {}", table,
239      ctx.getTableRegions().size());
240
241    final List<NormalizationPlan> plans = new ArrayList<>();
242    if (proceedWithSplitPlanning) {
243      plans.addAll(computeSplitNormalizationPlans(ctx));
244    }
245    if (proceedWithMergePlanning) {
246      plans.addAll(computeMergeNormalizationPlans(ctx));
247    }
248
249    LOG.debug("Computed {} normalization plans for table {}", plans.size(), table);
250    return plans;
251  }
252
253  /**
254   * @return size of region in MB and if region is not found than -1
255   */
256  private long getRegionSizeMB(RegionInfo hri) {
257    ServerName sn =
258      masterServices.getAssignmentManager().getRegionStates().getRegionServerOfRegion(hri);
259    RegionMetrics regionLoad =
260      masterServices.getServerManager().getLoad(sn).getRegionMetrics().get(hri.getRegionName());
261    if (regionLoad == null) {
262      LOG.debug("{} was not found in RegionsLoad", hri.getRegionNameAsString());
263      return -1;
264    }
265    return (long) regionLoad.getStoreFileSize().get(Size.Unit.MEGABYTE);
266  }
267
268  private boolean isMasterSwitchEnabled(final MasterSwitchType masterSwitchType) {
269    return masterServices.isSplitOrMergeEnabled(masterSwitchType);
270  }
271
272  private boolean proceedWithSplitPlanning() {
273    return isSplitEnabled() && isMasterSwitchEnabled(MasterSwitchType.SPLIT);
274  }
275
276  private boolean proceedWithMergePlanning() {
277    return isMergeEnabled() && isMasterSwitchEnabled(MasterSwitchType.MERGE);
278  }
279
280  /**
281   * @param tableRegions regions of table to normalize
282   * @return average region size depending on
283   * @see org.apache.hadoop.hbase.client.TableDescriptor#getNormalizerTargetRegionCount()
284   * Also make sure tableRegions contains regions of the same table
285   */
286  private double getAverageRegionSizeMb(final List<RegionInfo> tableRegions) {
287    if (CollectionUtils.isEmpty(tableRegions)) {
288      throw new IllegalStateException(
289        "Cannot calculate average size of a table without any regions.");
290    }
291    final int regionCount = tableRegions.size();
292    final long totalSizeMb = tableRegions.stream()
293      .mapToLong(this::getRegionSizeMB)
294      .sum();
295    TableName table = tableRegions.get(0).getTable();
296    int targetRegionCount = -1;
297    long targetRegionSize = -1;
298    try {
299      TableDescriptor tableDescriptor = masterServices.getTableDescriptors().get(table);
300      if (tableDescriptor != null && LOG.isDebugEnabled()) {
301        targetRegionCount = tableDescriptor.getNormalizerTargetRegionCount();
302        targetRegionSize = tableDescriptor.getNormalizerTargetRegionSize();
303        LOG.debug("Table {} configured with target region count {}, target region size {}", table,
304          targetRegionCount, targetRegionSize);
305      }
306    } catch (IOException e) {
307      LOG.warn("TableDescriptor for {} unavailable, table-level target region count and size"
308        + " configurations cannot be considered.", table, e);
309    }
310
311    double avgRegionSize;
312    if (targetRegionSize > 0) {
313      avgRegionSize = targetRegionSize;
314    } else if (targetRegionCount > 0) {
315      avgRegionSize = totalSizeMb / (double) targetRegionCount;
316    } else {
317      avgRegionSize = totalSizeMb / (double) regionCount;
318    }
319
320    LOG.debug("Table {}, total aggregated regions size: {} and average region size {}", table,
321      totalSizeMb, avgRegionSize);
322    return avgRegionSize;
323  }
324
325  /**
326   * Determine if a {@link RegionInfo} should be considered for a merge operation.
327   */
328  private boolean skipForMerge(final RegionStates regionStates, final RegionInfo regionInfo) {
329    final RegionState state = regionStates.getRegionState(regionInfo);
330    final String name = regionInfo.getEncodedName();
331    return
332      logTraceReason(
333        () -> state == null,
334        "skipping merge of region {} because no state information is available.", name)
335        || logTraceReason(
336          () -> !Objects.equals(state.getState(), RegionState.State.OPEN),
337          "skipping merge of region {} because it is not open.", name)
338        || logTraceReason(
339          () -> !isOldEnoughForMerge(regionInfo),
340          "skipping merge of region {} because it is not old enough.", name)
341        || logTraceReason(
342          () -> !isLargeEnoughForMerge(regionInfo),
343          "skipping merge region {} because it is not large enough.", name);
344  }
345
346  /**
347   * Computes the merge plans that should be executed for this table to converge average region
348   * towards target average or target region count.
349   */
350  private List<NormalizationPlan> computeMergeNormalizationPlans(final NormalizeContext ctx) {
351    if (ctx.getTableRegions().size() < minRegionCount) {
352      LOG.debug("Table {} has {} regions, required min number of regions for normalizer to run"
353        + " is {}, not computing merge plans.", ctx.getTableName(), ctx.getTableRegions().size(),
354        minRegionCount);
355      return Collections.emptyList();
356    }
357
358    final double avgRegionSizeMb = ctx.getAverageRegionSizeMb();
359    LOG.debug("Computing normalization plan for table {}. average region size: {}, number of"
360      + " regions: {}.", ctx.getTableName(), avgRegionSizeMb, ctx.getTableRegions().size());
361
362    final List<NormalizationPlan> plans = new ArrayList<>();
363    for (int candidateIdx = 0; candidateIdx < ctx.getTableRegions().size() - 1; candidateIdx++) {
364      final RegionInfo current = ctx.getTableRegions().get(candidateIdx);
365      final RegionInfo next = ctx.getTableRegions().get(candidateIdx + 1);
366      if (skipForMerge(ctx.getRegionStates(), current)
367        || skipForMerge(ctx.getRegionStates(), next)) {
368        continue;
369      }
370      final long currentSizeMb = getRegionSizeMB(current);
371      final long nextSizeMb = getRegionSizeMB(next);
372      if (currentSizeMb + nextSizeMb < avgRegionSizeMb) {
373        plans.add(new MergeNormalizationPlan(current, next));
374        candidateIdx++;
375      }
376    }
377    return plans;
378  }
379
380  /**
381   * Determine if a region in {@link RegionState} should be considered for a split operation.
382   */
383  private static boolean skipForSplit(final RegionState state, final RegionInfo regionInfo) {
384    final String name = regionInfo.getEncodedName();
385    return
386      logTraceReason(
387        () -> state == null,
388        "skipping split of region {} because no state information is available.", name)
389        || logTraceReason(
390          () -> !Objects.equals(state.getState(), RegionState.State.OPEN),
391          "skipping merge of region {} because it is not open.", name);
392  }
393
394  /**
395   * Computes the split plans that should be executed for this table to converge average region size
396   * towards target average or target region count.
397   * <br />
398   * if the region is > 2 times larger than average, we split it. split
399   * is more high priority normalization action than merge.
400   */
401  private List<NormalizationPlan> computeSplitNormalizationPlans(final NormalizeContext ctx) {
402    final double avgRegionSize = ctx.getAverageRegionSizeMb();
403    LOG.debug("Table {}, average region size: {}", ctx.getTableName(), avgRegionSize);
404
405    final List<NormalizationPlan> plans = new ArrayList<>();
406    for (final RegionInfo hri : ctx.getTableRegions()) {
407      if (skipForSplit(ctx.getRegionStates().getRegionState(hri), hri)) {
408        continue;
409      }
410      final long regionSize = getRegionSizeMB(hri);
411      if (regionSize > 2 * avgRegionSize) {
412        LOG.info("Table {}, large region {} has size {}, more than twice avg size {}, splitting",
413          ctx.getTableName(), hri.getRegionNameAsString(), regionSize, avgRegionSize);
414        plans.add(new SplitNormalizationPlan(hri, null));
415      }
416    }
417    return plans;
418  }
419
420  /**
421   * Return {@code true} when {@code regionInfo} has a creation date that is old
422   * enough to be considered for a merge operation, {@code false} otherwise.
423   */
424  private boolean isOldEnoughForMerge(final RegionInfo regionInfo) {
425    final Instant currentTime = Instant.ofEpochMilli(EnvironmentEdgeManager.currentTime());
426    final Instant regionCreateTime = Instant.ofEpochMilli(regionInfo.getRegionId());
427    return currentTime.isAfter(regionCreateTime.plus(mergeMinRegionAge));
428  }
429
430  /**
431   * Return {@code true} when {@code regionInfo} has a size that is sufficient
432   * to be considered for a merge operation, {@code false} otherwise.
433   */
434  private boolean isLargeEnoughForMerge(final RegionInfo regionInfo) {
435    return getRegionSizeMB(regionInfo) >= mergeMinRegionSizeMb;
436  }
437
438  private static boolean logTraceReason(final BooleanSupplier predicate, final String fmtWhenTrue,
439    final Object... args) {
440    final boolean value = predicate.getAsBoolean();
441    if (value) {
442      LOG.trace(fmtWhenTrue, args);
443    }
444    return value;
445  }
446
447  /**
448   * Inner class caries the state necessary to perform a single invocation of
449   * {@link #computePlansForTable(TableName)}. Grabbing this data from the assignment manager
450   * up-front allows any computed values to be realized just once.
451   */
452  private class NormalizeContext {
453    private final TableName tableName;
454    private final RegionStates regionStates;
455    private final List<RegionInfo> tableRegions;
456    private final double averageRegionSizeMb;
457
458    public NormalizeContext(final TableName tableName) {
459      this.tableName = tableName;
460      regionStates = SimpleRegionNormalizer.this.masterServices
461        .getAssignmentManager()
462        .getRegionStates();
463      tableRegions = regionStates.getRegionsOfTable(tableName);
464      // The list of regionInfo from getRegionsOfTable() is ordered by regionName.
465      // regionName does not necessary guarantee the order by STARTKEY (let's say 'aa1', 'aa1!',
466      // in order by regionName, it will be 'aa1!' followed by 'aa1').
467      // This could result in normalizer merging non-adjacent regions into one and creates overlaps.
468      // In order to avoid that, sort the list by RegionInfo.COMPARATOR.
469      // See HBASE-24376
470      tableRegions.sort(RegionInfo.COMPARATOR);
471      averageRegionSizeMb = SimpleRegionNormalizer.this.getAverageRegionSizeMb(this.tableRegions);
472    }
473
474    public TableName getTableName() {
475      return tableName;
476    }
477
478    public RegionStates getRegionStates() {
479      return regionStates;
480    }
481
482    public List<RegionInfo> getTableRegions() {
483      return tableRegions;
484    }
485
486    public double getAverageRegionSizeMb() {
487      return averageRegionSizeMb;
488    }
489  }
490}