001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.normalizer; 019 020import java.io.IOException; 021import java.time.Instant; 022import java.time.Period; 023import java.util.ArrayList; 024import java.util.Collections; 025import java.util.List; 026import java.util.Objects; 027import java.util.function.BooleanSupplier; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.HBaseInterfaceAudience; 030import org.apache.hadoop.hbase.RegionMetrics; 031import org.apache.hadoop.hbase.ServerName; 032import org.apache.hadoop.hbase.Size; 033import org.apache.hadoop.hbase.TableName; 034import org.apache.hadoop.hbase.client.MasterSwitchType; 035import org.apache.hadoop.hbase.client.RegionInfo; 036import org.apache.hadoop.hbase.client.TableDescriptor; 037import org.apache.hadoop.hbase.master.MasterServices; 038import org.apache.hadoop.hbase.master.RegionState; 039import org.apache.hadoop.hbase.master.assignment.RegionStates; 040import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType; 041import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 046 047/** 048 * Simple implementation of region normalizer. Logic in use: 049 * <ol> 050 * <li>Get all regions of a given table</li> 051 * <li>Get avg size S of the regions in the table (by total size of store files reported in 052 * RegionMetrics)</li> 053 * <li>For each region R0, if R0 is bigger than S * 2, it is kindly requested to split.</li> 054 * <li>Otherwise, for the next region in the chain R1, if R0 + R1 is smaller then S, R0 and R1 055 * are kindly requested to merge.</li> 056 * </ol> 057 * <p> 058 * The following parameters are configurable: 059 * <ol> 060 * <li>Whether to split a region as part of normalization. Configuration: 061 * {@value #SPLIT_ENABLED_KEY}, default: {@value #DEFAULT_SPLIT_ENABLED}.</li> 062 * <li>Whether to merge a region as part of normalization. Configuration: 063 * {@value #MERGE_ENABLED_KEY}, default: {@value #DEFAULT_MERGE_ENABLED}.</li> 064 * <li>The minimum number of regions in a table to consider it for merge normalization. 065 * Configuration: {@value #MIN_REGION_COUNT_KEY}, default: 066 * {@value #DEFAULT_MIN_REGION_COUNT}.</li> 067 * <li>The minimum age for a region to be considered for a merge, in days. Configuration: 068 * {@value #MERGE_MIN_REGION_AGE_DAYS_KEY}, default: 069 * {@value #DEFAULT_MERGE_MIN_REGION_AGE_DAYS}.</li> 070 * <li>The minimum size for a region to be considered for a merge, in whole MBs. Configuration: 071 * {@value #MERGE_MIN_REGION_SIZE_MB_KEY}, default: 072 * {@value #DEFAULT_MERGE_MIN_REGION_SIZE_MB}.</li> 073 * </ol> 074 * <p> 075 * To see detailed logging of the application of these configuration values, set the log level for 076 * this class to `TRACE`. 077 */ 078@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) 079public class SimpleRegionNormalizer implements RegionNormalizer { 080 private static final Logger LOG = LoggerFactory.getLogger(SimpleRegionNormalizer.class); 081 082 static final String SPLIT_ENABLED_KEY = "hbase.normalizer.split.enabled"; 083 static final boolean DEFAULT_SPLIT_ENABLED = true; 084 static final String MERGE_ENABLED_KEY = "hbase.normalizer.merge.enabled"; 085 static final boolean DEFAULT_MERGE_ENABLED = true; 086 // TODO: after HBASE-24416, `min.region.count` only applies to merge plans; should 087 // deprecate/rename the configuration key. 088 static final String MIN_REGION_COUNT_KEY = "hbase.normalizer.min.region.count"; 089 static final int DEFAULT_MIN_REGION_COUNT = 3; 090 static final String MERGE_MIN_REGION_AGE_DAYS_KEY = "hbase.normalizer.merge.min_region_age.days"; 091 static final int DEFAULT_MERGE_MIN_REGION_AGE_DAYS = 3; 092 static final String MERGE_MIN_REGION_SIZE_MB_KEY = "hbase.normalizer.merge.min_region_size.mb"; 093 static final int DEFAULT_MERGE_MIN_REGION_SIZE_MB = 1; 094 095 private final long[] skippedCount; 096 private Configuration conf; 097 private MasterServices masterServices; 098 private boolean splitEnabled; 099 private boolean mergeEnabled; 100 private int minRegionCount; 101 private Period mergeMinRegionAge; 102 private int mergeMinRegionSizeMb; 103 104 public SimpleRegionNormalizer() { 105 skippedCount = new long[NormalizationPlan.PlanType.values().length]; 106 splitEnabled = DEFAULT_SPLIT_ENABLED; 107 mergeEnabled = DEFAULT_MERGE_ENABLED; 108 minRegionCount = DEFAULT_MIN_REGION_COUNT; 109 mergeMinRegionAge = Period.ofDays(DEFAULT_MERGE_MIN_REGION_AGE_DAYS); 110 mergeMinRegionSizeMb = DEFAULT_MERGE_MIN_REGION_SIZE_MB; 111 } 112 113 @Override 114 public Configuration getConf() { 115 return conf; 116 } 117 118 @Override 119 public void setConf(final Configuration conf) { 120 if (conf == null) { 121 return; 122 } 123 this.conf = conf; 124 splitEnabled = conf.getBoolean(SPLIT_ENABLED_KEY, DEFAULT_SPLIT_ENABLED); 125 mergeEnabled = conf.getBoolean(MERGE_ENABLED_KEY, DEFAULT_MERGE_ENABLED); 126 minRegionCount = parseMinRegionCount(conf); 127 mergeMinRegionAge = parseMergeMinRegionAge(conf); 128 mergeMinRegionSizeMb = parseMergeMinRegionSizeMb(conf); 129 } 130 131 private static int parseMinRegionCount(final Configuration conf) { 132 final int parsedValue = conf.getInt(MIN_REGION_COUNT_KEY, DEFAULT_MIN_REGION_COUNT); 133 final int settledValue = Math.max(1, parsedValue); 134 if (parsedValue != settledValue) { 135 warnInvalidValue(MIN_REGION_COUNT_KEY, parsedValue, settledValue); 136 } 137 return settledValue; 138 } 139 140 private static Period parseMergeMinRegionAge(final Configuration conf) { 141 final int parsedValue = 142 conf.getInt(MERGE_MIN_REGION_AGE_DAYS_KEY, DEFAULT_MERGE_MIN_REGION_AGE_DAYS); 143 final int settledValue = Math.max(0, parsedValue); 144 if (parsedValue != settledValue) { 145 warnInvalidValue(MERGE_MIN_REGION_AGE_DAYS_KEY, parsedValue, settledValue); 146 } 147 return Period.ofDays(settledValue); 148 } 149 150 private static int parseMergeMinRegionSizeMb(final Configuration conf) { 151 final int parsedValue = 152 conf.getInt(MERGE_MIN_REGION_SIZE_MB_KEY, DEFAULT_MERGE_MIN_REGION_SIZE_MB); 153 final int settledValue = Math.max(0, parsedValue); 154 if (parsedValue != settledValue) { 155 warnInvalidValue(MERGE_MIN_REGION_SIZE_MB_KEY, parsedValue, settledValue); 156 } 157 return settledValue; 158 } 159 160 private static <T> void warnInvalidValue(final String key, final T parsedValue, 161 final T settledValue) { 162 LOG.warn("Configured value {}={} is invalid. Setting value to {}.", 163 key, parsedValue, settledValue); 164 } 165 166 /** 167 * Return this instance's configured value for {@value #SPLIT_ENABLED_KEY}. 168 */ 169 public boolean isSplitEnabled() { 170 return splitEnabled; 171 } 172 173 /** 174 * Return this instance's configured value for {@value #MERGE_ENABLED_KEY}. 175 */ 176 public boolean isMergeEnabled() { 177 return mergeEnabled; 178 } 179 180 /** 181 * Return this instance's configured value for {@value #MIN_REGION_COUNT_KEY}. 182 */ 183 public int getMinRegionCount() { 184 return minRegionCount; 185 } 186 187 /** 188 * Return this instance's configured value for {@value #MERGE_MIN_REGION_AGE_DAYS_KEY}. 189 */ 190 public Period getMergeMinRegionAge() { 191 return mergeMinRegionAge; 192 } 193 194 /** 195 * Return this instance's configured value for {@value #MERGE_MIN_REGION_SIZE_MB_KEY}. 196 */ 197 public int getMergeMinRegionSizeMb() { 198 return mergeMinRegionSizeMb; 199 } 200 201 @Override 202 public void setMasterServices(final MasterServices masterServices) { 203 this.masterServices = masterServices; 204 } 205 206 @Override 207 public void planSkipped(final RegionInfo hri, final PlanType type) { 208 skippedCount[type.ordinal()]++; 209 } 210 211 @Override 212 public long getSkippedCount(NormalizationPlan.PlanType type) { 213 return skippedCount[type.ordinal()]; 214 } 215 216 @Override 217 public List<NormalizationPlan> computePlansForTable(final TableName table) { 218 if (table == null) { 219 return Collections.emptyList(); 220 } 221 if (table.isSystemTable()) { 222 LOG.debug("Normalization of system table {} isn't allowed", table); 223 return Collections.emptyList(); 224 } 225 226 final boolean proceedWithSplitPlanning = proceedWithSplitPlanning(); 227 final boolean proceedWithMergePlanning = proceedWithMergePlanning(); 228 if (!proceedWithMergePlanning && !proceedWithSplitPlanning) { 229 LOG.debug("Both split and merge are disabled. Skipping normalization of table: {}", table); 230 return Collections.emptyList(); 231 } 232 233 final NormalizeContext ctx = new NormalizeContext(table); 234 if (CollectionUtils.isEmpty(ctx.getTableRegions())) { 235 return Collections.emptyList(); 236 } 237 238 LOG.debug("Computing normalization plan for table: {}, number of regions: {}", table, 239 ctx.getTableRegions().size()); 240 241 final List<NormalizationPlan> plans = new ArrayList<>(); 242 if (proceedWithSplitPlanning) { 243 plans.addAll(computeSplitNormalizationPlans(ctx)); 244 } 245 if (proceedWithMergePlanning) { 246 plans.addAll(computeMergeNormalizationPlans(ctx)); 247 } 248 249 LOG.debug("Computed {} normalization plans for table {}", plans.size(), table); 250 return plans; 251 } 252 253 /** 254 * @return size of region in MB and if region is not found than -1 255 */ 256 private long getRegionSizeMB(RegionInfo hri) { 257 ServerName sn = 258 masterServices.getAssignmentManager().getRegionStates().getRegionServerOfRegion(hri); 259 RegionMetrics regionLoad = 260 masterServices.getServerManager().getLoad(sn).getRegionMetrics().get(hri.getRegionName()); 261 if (regionLoad == null) { 262 LOG.debug("{} was not found in RegionsLoad", hri.getRegionNameAsString()); 263 return -1; 264 } 265 return (long) regionLoad.getStoreFileSize().get(Size.Unit.MEGABYTE); 266 } 267 268 private boolean isMasterSwitchEnabled(final MasterSwitchType masterSwitchType) { 269 return masterServices.isSplitOrMergeEnabled(masterSwitchType); 270 } 271 272 private boolean proceedWithSplitPlanning() { 273 return isSplitEnabled() && isMasterSwitchEnabled(MasterSwitchType.SPLIT); 274 } 275 276 private boolean proceedWithMergePlanning() { 277 return isMergeEnabled() && isMasterSwitchEnabled(MasterSwitchType.MERGE); 278 } 279 280 /** 281 * @param tableRegions regions of table to normalize 282 * @return average region size depending on 283 * @see org.apache.hadoop.hbase.client.TableDescriptor#getNormalizerTargetRegionCount() 284 * Also make sure tableRegions contains regions of the same table 285 */ 286 private double getAverageRegionSizeMb(final List<RegionInfo> tableRegions) { 287 if (CollectionUtils.isEmpty(tableRegions)) { 288 throw new IllegalStateException( 289 "Cannot calculate average size of a table without any regions."); 290 } 291 final int regionCount = tableRegions.size(); 292 final long totalSizeMb = tableRegions.stream() 293 .mapToLong(this::getRegionSizeMB) 294 .sum(); 295 TableName table = tableRegions.get(0).getTable(); 296 int targetRegionCount = -1; 297 long targetRegionSize = -1; 298 try { 299 TableDescriptor tableDescriptor = masterServices.getTableDescriptors().get(table); 300 if (tableDescriptor != null && LOG.isDebugEnabled()) { 301 targetRegionCount = tableDescriptor.getNormalizerTargetRegionCount(); 302 targetRegionSize = tableDescriptor.getNormalizerTargetRegionSize(); 303 LOG.debug("Table {} configured with target region count {}, target region size {}", table, 304 targetRegionCount, targetRegionSize); 305 } 306 } catch (IOException e) { 307 LOG.warn("TableDescriptor for {} unavailable, table-level target region count and size" 308 + " configurations cannot be considered.", table, e); 309 } 310 311 double avgRegionSize; 312 if (targetRegionSize > 0) { 313 avgRegionSize = targetRegionSize; 314 } else if (targetRegionCount > 0) { 315 avgRegionSize = totalSizeMb / (double) targetRegionCount; 316 } else { 317 avgRegionSize = totalSizeMb / (double) regionCount; 318 } 319 320 LOG.debug("Table {}, total aggregated regions size: {} and average region size {}", table, 321 totalSizeMb, avgRegionSize); 322 return avgRegionSize; 323 } 324 325 /** 326 * Determine if a {@link RegionInfo} should be considered for a merge operation. 327 */ 328 private boolean skipForMerge(final RegionStates regionStates, final RegionInfo regionInfo) { 329 final RegionState state = regionStates.getRegionState(regionInfo); 330 final String name = regionInfo.getEncodedName(); 331 return 332 logTraceReason( 333 () -> state == null, 334 "skipping merge of region {} because no state information is available.", name) 335 || logTraceReason( 336 () -> !Objects.equals(state.getState(), RegionState.State.OPEN), 337 "skipping merge of region {} because it is not open.", name) 338 || logTraceReason( 339 () -> !isOldEnoughForMerge(regionInfo), 340 "skipping merge of region {} because it is not old enough.", name) 341 || logTraceReason( 342 () -> !isLargeEnoughForMerge(regionInfo), 343 "skipping merge region {} because it is not large enough.", name); 344 } 345 346 /** 347 * Computes the merge plans that should be executed for this table to converge average region 348 * towards target average or target region count. 349 */ 350 private List<NormalizationPlan> computeMergeNormalizationPlans(final NormalizeContext ctx) { 351 if (ctx.getTableRegions().size() < minRegionCount) { 352 LOG.debug("Table {} has {} regions, required min number of regions for normalizer to run" 353 + " is {}, not computing merge plans.", ctx.getTableName(), ctx.getTableRegions().size(), 354 minRegionCount); 355 return Collections.emptyList(); 356 } 357 358 final double avgRegionSizeMb = ctx.getAverageRegionSizeMb(); 359 LOG.debug("Computing normalization plan for table {}. average region size: {}, number of" 360 + " regions: {}.", ctx.getTableName(), avgRegionSizeMb, ctx.getTableRegions().size()); 361 362 final List<NormalizationPlan> plans = new ArrayList<>(); 363 for (int candidateIdx = 0; candidateIdx < ctx.getTableRegions().size() - 1; candidateIdx++) { 364 final RegionInfo current = ctx.getTableRegions().get(candidateIdx); 365 final RegionInfo next = ctx.getTableRegions().get(candidateIdx + 1); 366 if (skipForMerge(ctx.getRegionStates(), current) 367 || skipForMerge(ctx.getRegionStates(), next)) { 368 continue; 369 } 370 final long currentSizeMb = getRegionSizeMB(current); 371 final long nextSizeMb = getRegionSizeMB(next); 372 if (currentSizeMb + nextSizeMb < avgRegionSizeMb) { 373 plans.add(new MergeNormalizationPlan(current, next)); 374 candidateIdx++; 375 } 376 } 377 return plans; 378 } 379 380 /** 381 * Determine if a region in {@link RegionState} should be considered for a split operation. 382 */ 383 private static boolean skipForSplit(final RegionState state, final RegionInfo regionInfo) { 384 final String name = regionInfo.getEncodedName(); 385 return 386 logTraceReason( 387 () -> state == null, 388 "skipping split of region {} because no state information is available.", name) 389 || logTraceReason( 390 () -> !Objects.equals(state.getState(), RegionState.State.OPEN), 391 "skipping merge of region {} because it is not open.", name); 392 } 393 394 /** 395 * Computes the split plans that should be executed for this table to converge average region size 396 * towards target average or target region count. 397 * <br /> 398 * if the region is > 2 times larger than average, we split it. split 399 * is more high priority normalization action than merge. 400 */ 401 private List<NormalizationPlan> computeSplitNormalizationPlans(final NormalizeContext ctx) { 402 final double avgRegionSize = ctx.getAverageRegionSizeMb(); 403 LOG.debug("Table {}, average region size: {}", ctx.getTableName(), avgRegionSize); 404 405 final List<NormalizationPlan> plans = new ArrayList<>(); 406 for (final RegionInfo hri : ctx.getTableRegions()) { 407 if (skipForSplit(ctx.getRegionStates().getRegionState(hri), hri)) { 408 continue; 409 } 410 final long regionSize = getRegionSizeMB(hri); 411 if (regionSize > 2 * avgRegionSize) { 412 LOG.info("Table {}, large region {} has size {}, more than twice avg size {}, splitting", 413 ctx.getTableName(), hri.getRegionNameAsString(), regionSize, avgRegionSize); 414 plans.add(new SplitNormalizationPlan(hri, null)); 415 } 416 } 417 return plans; 418 } 419 420 /** 421 * Return {@code true} when {@code regionInfo} has a creation date that is old 422 * enough to be considered for a merge operation, {@code false} otherwise. 423 */ 424 private boolean isOldEnoughForMerge(final RegionInfo regionInfo) { 425 final Instant currentTime = Instant.ofEpochMilli(EnvironmentEdgeManager.currentTime()); 426 final Instant regionCreateTime = Instant.ofEpochMilli(regionInfo.getRegionId()); 427 return currentTime.isAfter(regionCreateTime.plus(mergeMinRegionAge)); 428 } 429 430 /** 431 * Return {@code true} when {@code regionInfo} has a size that is sufficient 432 * to be considered for a merge operation, {@code false} otherwise. 433 */ 434 private boolean isLargeEnoughForMerge(final RegionInfo regionInfo) { 435 return getRegionSizeMB(regionInfo) >= mergeMinRegionSizeMb; 436 } 437 438 private static boolean logTraceReason(final BooleanSupplier predicate, final String fmtWhenTrue, 439 final Object... args) { 440 final boolean value = predicate.getAsBoolean(); 441 if (value) { 442 LOG.trace(fmtWhenTrue, args); 443 } 444 return value; 445 } 446 447 /** 448 * Inner class caries the state necessary to perform a single invocation of 449 * {@link #computePlansForTable(TableName)}. Grabbing this data from the assignment manager 450 * up-front allows any computed values to be realized just once. 451 */ 452 private class NormalizeContext { 453 private final TableName tableName; 454 private final RegionStates regionStates; 455 private final List<RegionInfo> tableRegions; 456 private final double averageRegionSizeMb; 457 458 public NormalizeContext(final TableName tableName) { 459 this.tableName = tableName; 460 regionStates = SimpleRegionNormalizer.this.masterServices 461 .getAssignmentManager() 462 .getRegionStates(); 463 tableRegions = regionStates.getRegionsOfTable(tableName); 464 // The list of regionInfo from getRegionsOfTable() is ordered by regionName. 465 // regionName does not necessary guarantee the order by STARTKEY (let's say 'aa1', 'aa1!', 466 // in order by regionName, it will be 'aa1!' followed by 'aa1'). 467 // This could result in normalizer merging non-adjacent regions into one and creates overlaps. 468 // In order to avoid that, sort the list by RegionInfo.COMPARATOR. 469 // See HBASE-24376 470 tableRegions.sort(RegionInfo.COMPARATOR); 471 averageRegionSizeMb = SimpleRegionNormalizer.this.getAverageRegionSizeMb(this.tableRegions); 472 } 473 474 public TableName getTableName() { 475 return tableName; 476 } 477 478 public RegionStates getRegionStates() { 479 return regionStates; 480 } 481 482 public List<RegionInfo> getTableRegions() { 483 return tableRegions; 484 } 485 486 public double getAverageRegionSizeMb() { 487 return averageRegionSizeMb; 488 } 489 } 490}