001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.normalizer;
019
020import static org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils.isEmpty;
021import java.io.IOException;
022import java.time.Instant;
023import java.time.Period;
024import java.util.ArrayList;
025import java.util.Collections;
026import java.util.LinkedList;
027import java.util.List;
028import java.util.Objects;
029import java.util.function.BooleanSupplier;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.HBaseInterfaceAudience;
032import org.apache.hadoop.hbase.RegionMetrics;
033import org.apache.hadoop.hbase.ServerName;
034import org.apache.hadoop.hbase.Size;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.MasterSwitchType;
037import org.apache.hadoop.hbase.client.RegionInfo;
038import org.apache.hadoop.hbase.client.TableDescriptor;
039import org.apache.hadoop.hbase.conf.ConfigurationObserver;
040import org.apache.hadoop.hbase.master.MasterServices;
041import org.apache.hadoop.hbase.master.RegionState;
042import org.apache.hadoop.hbase.master.assignment.RegionStates;
043import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
044import org.apache.yetus.audience.InterfaceAudience;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048/**
049 * Simple implementation of region normalizer. Logic in use:
050 * <ol>
051 *   <li>Get all regions of a given table</li>
052 *   <li>Get avg size S of the regions in the table (by total size of store files reported in
053 *     RegionMetrics)</li>
054 *   <li>For each region R0, if R0 is bigger than S * 2, it is kindly requested to split.</li>
055 *   <li>Otherwise, for the next region in the chain R1, if R0 + R1 is smaller then S, R0 and R1
056 *     are kindly requested to merge.</li>
057 * </ol>
058 */
059@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
060class SimpleRegionNormalizer implements RegionNormalizer, ConfigurationObserver {
061  private static final Logger LOG = LoggerFactory.getLogger(SimpleRegionNormalizer.class);
062
063  static final String SPLIT_ENABLED_KEY = "hbase.normalizer.split.enabled";
064  static final boolean DEFAULT_SPLIT_ENABLED = true;
065  static final String MERGE_ENABLED_KEY = "hbase.normalizer.merge.enabled";
066  static final boolean DEFAULT_MERGE_ENABLED = true;
067  // TODO: after HBASE-24416, `min.region.count` only applies to merge plans; should
068  //  deprecate/rename the configuration key.
069  static final String MIN_REGION_COUNT_KEY = "hbase.normalizer.min.region.count";
070  static final int DEFAULT_MIN_REGION_COUNT = 3;
071  static final String MERGE_MIN_REGION_AGE_DAYS_KEY = "hbase.normalizer.merge.min_region_age.days";
072  static final int DEFAULT_MERGE_MIN_REGION_AGE_DAYS = 3;
073  static final String MERGE_MIN_REGION_SIZE_MB_KEY = "hbase.normalizer.merge.min_region_size.mb";
074  static final int DEFAULT_MERGE_MIN_REGION_SIZE_MB = 1;
075
076  private MasterServices masterServices;
077  private NormalizerConfiguration normalizerConfiguration;
078
079  public SimpleRegionNormalizer() {
080    masterServices = null;
081    normalizerConfiguration = new NormalizerConfiguration();
082  }
083
084  @Override
085  public Configuration getConf() {
086    return normalizerConfiguration.getConf();
087  }
088
089  @Override
090  public void setConf(final Configuration conf) {
091    if (conf == null) {
092      return;
093    }
094    normalizerConfiguration = new NormalizerConfiguration(conf, normalizerConfiguration);
095  }
096
097  @Override
098  public void onConfigurationChange(Configuration conf) {
099    LOG.debug("Updating configuration parameters according to new configuration instance.");
100    setConf(conf);
101  }
102
103  private static int parseMinRegionCount(final Configuration conf) {
104    final int parsedValue = conf.getInt(MIN_REGION_COUNT_KEY, DEFAULT_MIN_REGION_COUNT);
105    final int settledValue = Math.max(1, parsedValue);
106    if (parsedValue != settledValue) {
107      warnInvalidValue(MIN_REGION_COUNT_KEY, parsedValue, settledValue);
108    }
109    return settledValue;
110  }
111
112  private static Period parseMergeMinRegionAge(final Configuration conf) {
113    final int parsedValue =
114      conf.getInt(MERGE_MIN_REGION_AGE_DAYS_KEY, DEFAULT_MERGE_MIN_REGION_AGE_DAYS);
115    final int settledValue = Math.max(0, parsedValue);
116    if (parsedValue != settledValue) {
117      warnInvalidValue(MERGE_MIN_REGION_AGE_DAYS_KEY, parsedValue, settledValue);
118    }
119    return Period.ofDays(settledValue);
120  }
121
122  private static long parseMergeMinRegionSizeMb(final Configuration conf) {
123    final long parsedValue =
124      conf.getLong(MERGE_MIN_REGION_SIZE_MB_KEY, DEFAULT_MERGE_MIN_REGION_SIZE_MB);
125    final long settledValue = Math.max(0, parsedValue);
126    if (parsedValue != settledValue) {
127      warnInvalidValue(MERGE_MIN_REGION_SIZE_MB_KEY, parsedValue, settledValue);
128    }
129    return settledValue;
130  }
131
132  private static <T> void warnInvalidValue(final String key, final T parsedValue,
133    final T settledValue) {
134    LOG.warn("Configured value {}={} is invalid. Setting value to {}.",
135      key, parsedValue, settledValue);
136  }
137
138  private static <T> void logConfigurationUpdated(final String key, final T oldValue,
139    final T newValue) {
140    if (!Objects.equals(oldValue, newValue)) {
141      LOG.info("Updated configuration for key '{}' from {} to {}", key, oldValue, newValue);
142    }
143  }
144
145  /**
146   * Return this instance's configured value for {@value #SPLIT_ENABLED_KEY}.
147   */
148  public boolean isSplitEnabled() {
149    return normalizerConfiguration.isSplitEnabled();
150  }
151
152  /**
153   * Return this instance's configured value for {@value #MERGE_ENABLED_KEY}.
154   */
155  public boolean isMergeEnabled() {
156    return normalizerConfiguration.isMergeEnabled();
157  }
158
159  /**
160   * Return this instance's configured value for {@value #MIN_REGION_COUNT_KEY}.
161   */
162  public int getMinRegionCount() {
163    return normalizerConfiguration.getMinRegionCount();
164  }
165
166  /**
167   * Return this instance's configured value for {@value #MERGE_MIN_REGION_AGE_DAYS_KEY}.
168   */
169  public Period getMergeMinRegionAge() {
170    return normalizerConfiguration.getMergeMinRegionAge();
171  }
172
173  /**
174   * Return this instance's configured value for {@value #MERGE_MIN_REGION_SIZE_MB_KEY}.
175   */
176  public long getMergeMinRegionSizeMb() {
177    return normalizerConfiguration.getMergeMinRegionSizeMb();
178  }
179
180  @Override
181  public void setMasterServices(final MasterServices masterServices) {
182    this.masterServices = masterServices;
183  }
184
185  @Override
186  public List<NormalizationPlan> computePlansForTable(final TableName table) {
187    if (table == null) {
188      return Collections.emptyList();
189    }
190    if (table.isSystemTable()) {
191      LOG.debug("Normalization of system table {} isn't allowed", table);
192      return Collections.emptyList();
193    }
194
195    final boolean proceedWithSplitPlanning = proceedWithSplitPlanning();
196    final boolean proceedWithMergePlanning = proceedWithMergePlanning();
197    if (!proceedWithMergePlanning && !proceedWithSplitPlanning) {
198      LOG.debug("Both split and merge are disabled. Skipping normalization of table: {}", table);
199      return Collections.emptyList();
200    }
201
202    final NormalizeContext ctx = new NormalizeContext(table);
203    if (isEmpty(ctx.getTableRegions())) {
204      return Collections.emptyList();
205    }
206
207    LOG.debug("Computing normalization plan for table:  {}, number of regions: {}", table,
208      ctx.getTableRegions().size());
209
210    final List<NormalizationPlan> plans = new ArrayList<>();
211    if (proceedWithSplitPlanning) {
212      plans.addAll(computeSplitNormalizationPlans(ctx));
213    }
214    if (proceedWithMergePlanning) {
215      plans.addAll(computeMergeNormalizationPlans(ctx));
216    }
217
218    LOG.debug("Computed {} normalization plans for table {}", plans.size(), table);
219    return plans;
220  }
221
222  /**
223   * @return size of region in MB and if region is not found than -1
224   */
225  private long getRegionSizeMB(RegionInfo hri) {
226    ServerName sn =
227      masterServices.getAssignmentManager().getRegionStates().getRegionServerOfRegion(hri);
228    RegionMetrics regionLoad =
229      masterServices.getServerManager().getLoad(sn).getRegionMetrics().get(hri.getRegionName());
230    if (regionLoad == null) {
231      LOG.debug("{} was not found in RegionsLoad", hri.getRegionNameAsString());
232      return -1;
233    }
234    return (long) regionLoad.getStoreFileSize().get(Size.Unit.MEGABYTE);
235  }
236
237  private boolean isMasterSwitchEnabled(final MasterSwitchType masterSwitchType) {
238    return masterServices.isSplitOrMergeEnabled(masterSwitchType);
239  }
240
241  private boolean proceedWithSplitPlanning() {
242    return isSplitEnabled() && isMasterSwitchEnabled(MasterSwitchType.SPLIT);
243  }
244
245  private boolean proceedWithMergePlanning() {
246    return isMergeEnabled() && isMasterSwitchEnabled(MasterSwitchType.MERGE);
247  }
248
249  /**
250   * @param tableRegions regions of table to normalize
251   * @return average region size depending on
252   * @see org.apache.hadoop.hbase.client.TableDescriptor#getNormalizerTargetRegionCount()
253   * Also make sure tableRegions contains regions of the same table
254   */
255  private double getAverageRegionSizeMb(final List<RegionInfo> tableRegions) {
256    if (isEmpty(tableRegions)) {
257      throw new IllegalStateException(
258        "Cannot calculate average size of a table without any regions.");
259    }
260    final int regionCount = tableRegions.size();
261    final long totalSizeMb = tableRegions.stream()
262      .mapToLong(this::getRegionSizeMB)
263      .sum();
264    TableName table = tableRegions.get(0).getTable();
265    int targetRegionCount = -1;
266    long targetRegionSize = -1;
267    try {
268      TableDescriptor tableDescriptor = masterServices.getTableDescriptors().get(table);
269      if (tableDescriptor != null && LOG.isDebugEnabled()) {
270        targetRegionCount = tableDescriptor.getNormalizerTargetRegionCount();
271        targetRegionSize = tableDescriptor.getNormalizerTargetRegionSize();
272        LOG.debug("Table {} configured with target region count {}, target region size {}", table,
273          targetRegionCount, targetRegionSize);
274      }
275    } catch (IOException e) {
276      LOG.warn("TableDescriptor for {} unavailable, table-level target region count and size"
277        + " configurations cannot be considered.", table, e);
278    }
279
280    double avgRegionSize;
281    if (targetRegionSize > 0) {
282      avgRegionSize = targetRegionSize;
283    } else if (targetRegionCount > 0) {
284      avgRegionSize = totalSizeMb / (double) targetRegionCount;
285    } else {
286      avgRegionSize = totalSizeMb / (double) regionCount;
287    }
288
289    LOG.debug("Table {}, total aggregated regions size: {} and average region size {}", table,
290      totalSizeMb, avgRegionSize);
291    return avgRegionSize;
292  }
293
294  /**
295   * Determine if a {@link RegionInfo} should be considered for a merge operation.
296   * </p>
297   * Callers beware: for safe concurrency, be sure to pass in the local instance of
298   * {@link NormalizerConfiguration}, don't use {@code this}'s instance.
299   */
300  private boolean skipForMerge(
301    final NormalizerConfiguration normalizerConfiguration,
302    final RegionStates regionStates,
303    final RegionInfo regionInfo
304  ) {
305    final RegionState state = regionStates.getRegionState(regionInfo);
306    final String name = regionInfo.getEncodedName();
307    return
308      logTraceReason(
309        () -> state == null,
310        "skipping merge of region {} because no state information is available.", name)
311        || logTraceReason(
312          () -> !Objects.equals(state.getState(), RegionState.State.OPEN),
313          "skipping merge of region {} because it is not open.", name)
314        || logTraceReason(
315          () -> !isOldEnoughForMerge(normalizerConfiguration, regionInfo),
316          "skipping merge of region {} because it is not old enough.", name)
317        || logTraceReason(
318          () -> !isLargeEnoughForMerge(normalizerConfiguration, regionInfo),
319          "skipping merge region {} because it is not large enough.", name);
320  }
321
322  /**
323   * Computes the merge plans that should be executed for this table to converge average region
324   * towards target average or target region count.
325   */
326  private List<NormalizationPlan> computeMergeNormalizationPlans(final NormalizeContext ctx) {
327    final NormalizerConfiguration configuration = normalizerConfiguration;
328    if (ctx.getTableRegions().size() < configuration.getMinRegionCount()) {
329      LOG.debug("Table {} has {} regions, required min number of regions for normalizer to run"
330          + " is {}, not computing merge plans.", ctx.getTableName(),
331        ctx.getTableRegions().size(), configuration.getMinRegionCount());
332      return Collections.emptyList();
333    }
334
335    final long avgRegionSizeMb = (long) ctx.getAverageRegionSizeMb();
336    if (avgRegionSizeMb < configuration.getMergeMinRegionSizeMb()) {
337      return Collections.emptyList();
338    }
339    LOG.debug("Computing normalization plan for table {}. average region size: {}, number of"
340      + " regions: {}.", ctx.getTableName(), avgRegionSizeMb, ctx.getTableRegions().size());
341
342    // this nested loop walks the table's region chain once, looking for contiguous sequences of
343    // regions that meet the criteria for merge. The outer loop tracks the starting point of the
344    // next sequence, the inner loop looks for the end of that sequence. A single sequence becomes
345    // an instance of MergeNormalizationPlan.
346
347    final List<NormalizationPlan> plans = new LinkedList<>();
348    final List<NormalizationTarget> rangeMembers = new LinkedList<>();
349    long sumRangeMembersSizeMb;
350    int current = 0;
351    for (int rangeStart = 0;
352         rangeStart < ctx.getTableRegions().size() - 1 && current < ctx.getTableRegions().size();) {
353      // walk the region chain looking for contiguous sequences of regions that can be merged.
354      rangeMembers.clear();
355      sumRangeMembersSizeMb = 0;
356      for (current = rangeStart; current < ctx.getTableRegions().size(); current++) {
357        final RegionInfo regionInfo = ctx.getTableRegions().get(current);
358        final long regionSizeMb = getRegionSizeMB(regionInfo);
359        if (skipForMerge(configuration, ctx.getRegionStates(), regionInfo)) {
360          // this region cannot participate in a range. resume the outer loop.
361          rangeStart = Math.max(current, rangeStart + 1);
362          break;
363        }
364        if (rangeMembers.isEmpty() // when there are no range members, seed the range with whatever
365                                   // we have. this way we're prepared in case the next region is
366                                   // 0-size.
367          || regionSizeMb == 0 // always add an empty region to the current range.
368          || (regionSizeMb + sumRangeMembersSizeMb <= avgRegionSizeMb)) { // add the current region
369                                                                          // to the range when
370                                                                          // there's capacity
371                                                                          // remaining.
372          rangeMembers.add(new NormalizationTarget(regionInfo, regionSizeMb));
373          sumRangeMembersSizeMb += regionSizeMb;
374          continue;
375        }
376        // we have accumulated enough regions to fill a range. resume the outer loop.
377        rangeStart = Math.max(current, rangeStart + 1);
378        break;
379      }
380      if (rangeMembers.size() > 1) {
381        plans.add(new MergeNormalizationPlan.Builder().setTargets(rangeMembers).build());
382      }
383    }
384    return plans;
385  }
386
387  /**
388   * Determine if a region in {@link RegionState} should be considered for a split operation.
389   */
390  private static boolean skipForSplit(final RegionState state, final RegionInfo regionInfo) {
391    final String name = regionInfo.getEncodedName();
392    return
393      logTraceReason(
394        () -> state == null,
395        "skipping split of region {} because no state information is available.", name)
396        || logTraceReason(
397          () -> !Objects.equals(state.getState(), RegionState.State.OPEN),
398          "skipping merge of region {} because it is not open.", name);
399  }
400
401  /**
402   * Computes the split plans that should be executed for this table to converge average region size
403   * towards target average or target region count.
404   * <br />
405   * if the region is > 2 times larger than average, we split it. split
406   * is more high priority normalization action than merge.
407   */
408  private List<NormalizationPlan> computeSplitNormalizationPlans(final NormalizeContext ctx) {
409    final double avgRegionSize = ctx.getAverageRegionSizeMb();
410    LOG.debug("Table {}, average region size: {}", ctx.getTableName(), avgRegionSize);
411
412    final List<NormalizationPlan> plans = new ArrayList<>();
413    for (final RegionInfo hri : ctx.getTableRegions()) {
414      if (skipForSplit(ctx.getRegionStates().getRegionState(hri), hri)) {
415        continue;
416      }
417      final long regionSizeMb = getRegionSizeMB(hri);
418      if (regionSizeMb > 2 * avgRegionSize) {
419        LOG.info("Table {}, large region {} has size {}, more than twice avg size {}, splitting",
420          ctx.getTableName(), hri.getRegionNameAsString(), regionSizeMb, avgRegionSize);
421        plans.add(new SplitNormalizationPlan(hri, regionSizeMb));
422      }
423    }
424    return plans;
425  }
426
427  /**
428   * Return {@code true} when {@code regionInfo} has a creation date that is old
429   * enough to be considered for a merge operation, {@code false} otherwise.
430   */
431  private static boolean isOldEnoughForMerge(
432    final NormalizerConfiguration normalizerConfiguration,
433    final RegionInfo regionInfo
434  ) {
435    final Instant currentTime = Instant.ofEpochMilli(EnvironmentEdgeManager.currentTime());
436    final Instant regionCreateTime = Instant.ofEpochMilli(regionInfo.getRegionId());
437    return currentTime.isAfter(
438      regionCreateTime.plus(normalizerConfiguration.getMergeMinRegionAge()));
439  }
440
441  /**
442   * Return {@code true} when {@code regionInfo} has a size that is sufficient
443   * to be considered for a merge operation, {@code false} otherwise.
444   * </p>
445   * Callers beware: for safe concurrency, be sure to pass in the local instance of
446   * {@link NormalizerConfiguration}, don't use {@code this}'s instance.
447   */
448  private boolean isLargeEnoughForMerge(
449    final NormalizerConfiguration normalizerConfiguration,
450    final RegionInfo regionInfo
451  ) {
452    return getRegionSizeMB(regionInfo) >= normalizerConfiguration.getMergeMinRegionSizeMb();
453  }
454
455  private static boolean logTraceReason(final BooleanSupplier predicate, final String fmtWhenTrue,
456    final Object... args) {
457    final boolean value = predicate.getAsBoolean();
458    if (value) {
459      LOG.trace(fmtWhenTrue, args);
460    }
461    return value;
462  }
463
464  /**
465   * Holds the configuration values read from {@link Configuration}. Encapsulation in a POJO
466   * enables atomic hot-reloading of configs without locks.
467   */
468  private static final class NormalizerConfiguration {
469    private final Configuration conf;
470    private final boolean splitEnabled;
471    private final boolean mergeEnabled;
472    private final int minRegionCount;
473    private final Period mergeMinRegionAge;
474    private final long mergeMinRegionSizeMb;
475
476    private NormalizerConfiguration() {
477      conf = null;
478      splitEnabled = DEFAULT_SPLIT_ENABLED;
479      mergeEnabled = DEFAULT_MERGE_ENABLED;
480      minRegionCount = DEFAULT_MIN_REGION_COUNT;
481      mergeMinRegionAge = Period.ofDays(DEFAULT_MERGE_MIN_REGION_AGE_DAYS);
482      mergeMinRegionSizeMb = DEFAULT_MERGE_MIN_REGION_SIZE_MB;
483    }
484
485    private NormalizerConfiguration(
486      final Configuration conf,
487      final NormalizerConfiguration currentConfiguration
488    ) {
489      this.conf = conf;
490      splitEnabled = conf.getBoolean(SPLIT_ENABLED_KEY, DEFAULT_SPLIT_ENABLED);
491      mergeEnabled = conf.getBoolean(MERGE_ENABLED_KEY, DEFAULT_MERGE_ENABLED);
492      minRegionCount = parseMinRegionCount(conf);
493      mergeMinRegionAge = parseMergeMinRegionAge(conf);
494      mergeMinRegionSizeMb = parseMergeMinRegionSizeMb(conf);
495      logConfigurationUpdated(SPLIT_ENABLED_KEY, currentConfiguration.isSplitEnabled(),
496        splitEnabled);
497      logConfigurationUpdated(MERGE_ENABLED_KEY, currentConfiguration.isMergeEnabled(),
498        mergeEnabled);
499      logConfigurationUpdated(MIN_REGION_COUNT_KEY, currentConfiguration.getMinRegionCount(),
500        minRegionCount);
501      logConfigurationUpdated(MERGE_MIN_REGION_AGE_DAYS_KEY,
502        currentConfiguration.getMergeMinRegionAge(), mergeMinRegionAge);
503      logConfigurationUpdated(MERGE_MIN_REGION_SIZE_MB_KEY,
504        currentConfiguration.getMergeMinRegionSizeMb(), mergeMinRegionSizeMb);
505    }
506
507    public Configuration getConf() {
508      return conf;
509    }
510
511    public boolean isSplitEnabled() {
512      return splitEnabled;
513    }
514
515    public boolean isMergeEnabled() {
516      return mergeEnabled;
517    }
518
519    public int getMinRegionCount() {
520      return minRegionCount;
521    }
522
523    public Period getMergeMinRegionAge() {
524      return mergeMinRegionAge;
525    }
526
527    public long getMergeMinRegionSizeMb() {
528      return mergeMinRegionSizeMb;
529    }
530  }
531
532  /**
533   * Inner class caries the state necessary to perform a single invocation of
534   * {@link #computePlansForTable(TableName)}. Grabbing this data from the assignment manager
535   * up-front allows any computed values to be realized just once.
536   */
537  private class NormalizeContext {
538    private final TableName tableName;
539    private final RegionStates regionStates;
540    private final List<RegionInfo> tableRegions;
541    private final double averageRegionSizeMb;
542
543    public NormalizeContext(final TableName tableName) {
544      this.tableName = tableName;
545      regionStates = SimpleRegionNormalizer.this.masterServices
546        .getAssignmentManager()
547        .getRegionStates();
548      tableRegions = regionStates.getRegionsOfTable(tableName);
549      // The list of regionInfo from getRegionsOfTable() is ordered by regionName.
550      // regionName does not necessary guarantee the order by STARTKEY (let's say 'aa1', 'aa1!',
551      // in order by regionName, it will be 'aa1!' followed by 'aa1').
552      // This could result in normalizer merging non-adjacent regions into one and creates overlaps.
553      // In order to avoid that, sort the list by RegionInfo.COMPARATOR.
554      // See HBASE-24376
555      tableRegions.sort(RegionInfo.COMPARATOR);
556      averageRegionSizeMb = SimpleRegionNormalizer.this.getAverageRegionSizeMb(this.tableRegions);
557    }
558
559    public TableName getTableName() {
560      return tableName;
561    }
562
563    public RegionStates getRegionStates() {
564      return regionStates;
565    }
566
567    public List<RegionInfo> getTableRegions() {
568      return tableRegions;
569    }
570
571    public double getAverageRegionSizeMb() {
572      return averageRegionSizeMb;
573    }
574  }
575}