001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.normalizer; 019 020import java.io.IOException; 021import java.time.Duration; 022import java.util.ArrayList; 023import java.util.Collections; 024import java.util.List; 025import java.util.concurrent.atomic.AtomicLong; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.HConstants; 028import org.apache.hadoop.hbase.TableName; 029import org.apache.hadoop.hbase.client.RegionInfo; 030import org.apache.hadoop.hbase.client.TableDescriptor; 031import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 032import org.apache.hadoop.hbase.conf.ConfigurationManager; 033import org.apache.hadoop.hbase.conf.ConfigurationObserver; 034import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; 035import org.apache.hadoop.hbase.master.MasterServices; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040import org.apache.hbase.thirdparty.com.google.common.util.concurrent.RateLimiter; 041import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 042 043/** 044 * Consumes normalization request targets ({@link TableName}s) off the 045 * {@link RegionNormalizerWorkQueue}, dispatches them to the {@link RegionNormalizer}, and executes 046 * the resulting {@link NormalizationPlan}s. 047 */ 048@InterfaceAudience.Private 049class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnable { 050 public static final String HBASE_TABLE_NORMALIZATION_ENABLED = 051 "hbase.table.normalization.enabled"; 052 private static final Logger LOG = LoggerFactory.getLogger(RegionNormalizerWorker.class); 053 054 static final String RATE_LIMIT_BYTES_PER_SEC_KEY = 055 "hbase.normalizer.throughput.max_bytes_per_sec"; 056 private static final long RATE_UNLIMITED_BYTES = 1_000_000_000_000L; // 1TB/sec 057 058 static final String CUMULATIVE_SIZE_LIMIT_MB_KEY = "hbase.normalizer.plans_size_limit.mb"; 059 static final long DEFAULT_CUMULATIVE_SIZE_LIMIT_MB = Long.MAX_VALUE; 060 061 private final MasterServices masterServices; 062 private final RegionNormalizer regionNormalizer; 063 private final RegionNormalizerWorkQueue<TableName> workQueue; 064 private final RateLimiter rateLimiter; 065 066 private final long[] skippedCount; 067 private final boolean defaultNormalizerTableLevel; 068 private long splitPlanCount; 069 private long mergePlanCount; 070 private final AtomicLong cumulativePlansSizeLimitMb; 071 072 RegionNormalizerWorker(final Configuration configuration, final MasterServices masterServices, 073 final RegionNormalizer regionNormalizer, final RegionNormalizerWorkQueue<TableName> workQueue) { 074 this.masterServices = masterServices; 075 this.regionNormalizer = regionNormalizer; 076 this.workQueue = workQueue; 077 this.skippedCount = new long[NormalizationPlan.PlanType.values().length]; 078 this.splitPlanCount = 0; 079 this.mergePlanCount = 0; 080 this.rateLimiter = loadRateLimiter(configuration); 081 this.defaultNormalizerTableLevel = extractDefaultNormalizerValue(configuration); 082 this.cumulativePlansSizeLimitMb = new AtomicLong( 083 configuration.getLong(CUMULATIVE_SIZE_LIMIT_MB_KEY, DEFAULT_CUMULATIVE_SIZE_LIMIT_MB)); 084 } 085 086 private boolean extractDefaultNormalizerValue(final Configuration configuration) { 087 String s = configuration.get(HBASE_TABLE_NORMALIZATION_ENABLED); 088 return Boolean.parseBoolean(s); 089 } 090 091 @Override 092 public void registerChildren(ConfigurationManager manager) { 093 if (regionNormalizer instanceof ConfigurationObserver) { 094 final ConfigurationObserver observer = (ConfigurationObserver) regionNormalizer; 095 manager.registerObserver(observer); 096 } 097 } 098 099 @Override 100 public void deregisterChildren(ConfigurationManager manager) { 101 if (regionNormalizer instanceof ConfigurationObserver) { 102 final ConfigurationObserver observer = (ConfigurationObserver) regionNormalizer; 103 manager.deregisterObserver(observer); 104 } 105 } 106 107 private static long logLongConfigurationUpdated(final String key, final long oldValue, 108 final long newValue) { 109 if (oldValue != newValue) { 110 LOG.info("Updated configuration for key '{}' from {} to {}", key, oldValue, newValue); 111 } 112 return newValue; 113 } 114 115 @Override 116 public void onConfigurationChange(Configuration conf) { 117 rateLimiter.setRate(loadRateLimit(conf)); 118 cumulativePlansSizeLimitMb.set( 119 logLongConfigurationUpdated(CUMULATIVE_SIZE_LIMIT_MB_KEY, cumulativePlansSizeLimitMb.get(), 120 conf.getLong(CUMULATIVE_SIZE_LIMIT_MB_KEY, DEFAULT_CUMULATIVE_SIZE_LIMIT_MB))); 121 } 122 123 private static RateLimiter loadRateLimiter(final Configuration configuration) { 124 return RateLimiter.create(loadRateLimit(configuration)); 125 } 126 127 private static long loadRateLimit(final Configuration configuration) { 128 long rateLimitBytes = 129 configuration.getLongBytes(RATE_LIMIT_BYTES_PER_SEC_KEY, RATE_UNLIMITED_BYTES); 130 long rateLimitMbs = rateLimitBytes / 1_000_000L; 131 if (rateLimitMbs <= 0) { 132 LOG.warn("Configured value {}={} is <= 1MB. Falling back to default.", 133 RATE_LIMIT_BYTES_PER_SEC_KEY, rateLimitBytes); 134 rateLimitBytes = RATE_UNLIMITED_BYTES; 135 rateLimitMbs = RATE_UNLIMITED_BYTES / 1_000_000L; 136 } 137 LOG.info("Normalizer rate limit set to {}", 138 rateLimitBytes == RATE_UNLIMITED_BYTES ? "unlimited" : rateLimitMbs + " MB/sec"); 139 return rateLimitMbs; 140 } 141 142 /** 143 * @see RegionNormalizerManager#planSkipped(NormalizationPlan.PlanType) 144 */ 145 void planSkipped(NormalizationPlan.PlanType type) { 146 synchronized (skippedCount) { 147 // updates come here via procedure threads, so synchronize access to this counter. 148 skippedCount[type.ordinal()]++; 149 } 150 } 151 152 /** 153 * @see RegionNormalizerManager#getSkippedCount(NormalizationPlan.PlanType) 154 */ 155 long getSkippedCount(NormalizationPlan.PlanType type) { 156 return skippedCount[type.ordinal()]; 157 } 158 159 /** 160 * @see RegionNormalizerManager#getSplitPlanCount() 161 */ 162 long getSplitPlanCount() { 163 return splitPlanCount; 164 } 165 166 /** 167 * @see RegionNormalizerManager#getMergePlanCount() 168 */ 169 long getMergePlanCount() { 170 return mergePlanCount; 171 } 172 173 /** 174 * Used in test only. This field is exposed to the test, as opposed to tracking the current 175 * configuration value beside the RateLimiter instance and managing synchronization to keep the 176 * two in sync. 177 */ 178 RateLimiter getRateLimiter() { 179 return rateLimiter; 180 } 181 182 @Override 183 public void run() { 184 while (true) { 185 if (Thread.interrupted()) { 186 LOG.debug("interrupt detected. terminating."); 187 break; 188 } 189 final TableName tableName; 190 try { 191 tableName = workQueue.take(); 192 } catch (InterruptedException e) { 193 LOG.debug("interrupt detected. terminating."); 194 break; 195 } 196 197 final List<NormalizationPlan> plans = calculatePlans(tableName); 198 submitPlans(plans); 199 } 200 } 201 202 private List<NormalizationPlan> calculatePlans(final TableName tableName) { 203 if (masterServices.skipRegionManagementAction("region normalizer")) { 204 return Collections.emptyList(); 205 } 206 207 final TableDescriptor tblDesc; 208 try { 209 tblDesc = masterServices.getTableDescriptors().get(tableName); 210 boolean normalizationEnabled; 211 if (tblDesc != null) { 212 String defined = tblDesc.getValue(TableDescriptorBuilder.NORMALIZATION_ENABLED); 213 if (defined != null) { 214 normalizationEnabled = tblDesc.isNormalizationEnabled(); 215 } else { 216 normalizationEnabled = this.defaultNormalizerTableLevel; 217 } 218 if (!normalizationEnabled) { 219 LOG.debug("Skipping table {} because normalization is disabled in its table properties " 220 + "and normalization is also disabled at table level by default", tableName); 221 return Collections.emptyList(); 222 } 223 } 224 } catch (IOException e) { 225 LOG.debug("Skipping table {} because unable to access its table descriptor.", tableName, e); 226 return Collections.emptyList(); 227 } 228 229 List<NormalizationPlan> plans = regionNormalizer.computePlansForTable(tblDesc); 230 231 plans = truncateForSize(plans); 232 233 if (CollectionUtils.isEmpty(plans)) { 234 LOG.debug("No normalization required for table {}.", tableName); 235 return Collections.emptyList(); 236 } 237 return plans; 238 } 239 240 private List<NormalizationPlan> truncateForSize(List<NormalizationPlan> plans) { 241 if (cumulativePlansSizeLimitMb.get() != DEFAULT_CUMULATIVE_SIZE_LIMIT_MB) { 242 List<NormalizationPlan> maybeTruncatedPlans = new ArrayList<>(plans.size()); 243 long totalCumulativeSizeMb = 0; 244 long truncatedCumulativeSizeMb = 0; 245 for (NormalizationPlan plan : plans) { 246 totalCumulativeSizeMb += plan.getPlanSizeMb(); 247 if (totalCumulativeSizeMb <= cumulativePlansSizeLimitMb.get()) { 248 truncatedCumulativeSizeMb += plan.getPlanSizeMb(); 249 maybeTruncatedPlans.add(plan); 250 } 251 } 252 if (maybeTruncatedPlans.size() != plans.size()) { 253 LOG.debug( 254 "Truncating list of normalization plans that RegionNormalizerWorker will process " 255 + "because of {}. Original list had {} plan(s), new list has {} plan(s). " 256 + "Original list covered regions with cumulative size {} mb, " 257 + "new list covers regions with cumulative size {} mb.", 258 CUMULATIVE_SIZE_LIMIT_MB_KEY, plans.size(), maybeTruncatedPlans.size(), 259 totalCumulativeSizeMb, truncatedCumulativeSizeMb); 260 } 261 return maybeTruncatedPlans; 262 } else { 263 return plans; 264 } 265 } 266 267 private void submitPlans(final List<NormalizationPlan> plans) { 268 // as of this writing, `plan.submit()` is non-blocking and uses Async Admin APIs to submit 269 // task, so there's no artificial rate-limiting of merge/split requests due to this serial loop. 270 for (NormalizationPlan plan : plans) { 271 switch (plan.getType()) { 272 case MERGE: { 273 submitMergePlan((MergeNormalizationPlan) plan); 274 break; 275 } 276 case SPLIT: { 277 submitSplitPlan((SplitNormalizationPlan) plan); 278 break; 279 } 280 case NONE: 281 LOG.debug("Nothing to do for {} with PlanType=NONE. Ignoring.", plan); 282 planSkipped(plan.getType()); 283 break; 284 default: 285 LOG.warn("Plan {} is of an unrecognized PlanType. Ignoring.", plan); 286 planSkipped(plan.getType()); 287 break; 288 } 289 } 290 } 291 292 /** 293 * Interacts with {@link MasterServices} in order to execute a plan. 294 */ 295 private void submitMergePlan(final MergeNormalizationPlan plan) { 296 final int totalSizeMb; 297 try { 298 final long totalSizeMbLong = plan.getNormalizationTargets().stream() 299 .mapToLong(NormalizationTarget::getRegionSizeMb).reduce(0, Math::addExact); 300 totalSizeMb = Math.toIntExact(totalSizeMbLong); 301 } catch (ArithmeticException e) { 302 LOG.debug("Sum of merge request size overflows rate limiter data type. {}", plan); 303 planSkipped(plan.getType()); 304 return; 305 } 306 307 final RegionInfo[] infos = plan.getNormalizationTargets().stream() 308 .map(NormalizationTarget::getRegionInfo).toArray(RegionInfo[]::new); 309 final long pid; 310 try { 311 pid = masterServices.mergeRegions(infos, false, HConstants.NO_NONCE, HConstants.NO_NONCE); 312 } catch (IOException e) { 313 LOG.info("failed to submit plan {}.", plan, e); 314 planSkipped(plan.getType()); 315 return; 316 } 317 mergePlanCount++; 318 LOG.info("Submitted {} resulting in pid {}", plan, pid); 319 final long rateLimitedSecs = Math.round(rateLimiter.acquire(Math.max(1, totalSizeMb))); 320 LOG.debug("Rate limiting delayed the worker by {}", Duration.ofSeconds(rateLimitedSecs)); 321 } 322 323 /** 324 * Interacts with {@link MasterServices} in order to execute a plan. 325 */ 326 private void submitSplitPlan(final SplitNormalizationPlan plan) { 327 final int totalSizeMb; 328 try { 329 totalSizeMb = Math.toIntExact(plan.getSplitTarget().getRegionSizeMb()); 330 } catch (ArithmeticException e) { 331 LOG.debug("Split request size overflows rate limiter data type. {}", plan); 332 planSkipped(plan.getType()); 333 return; 334 } 335 final RegionInfo info = plan.getSplitTarget().getRegionInfo(); 336 final long rateLimitedSecs = Math.round(rateLimiter.acquire(Math.max(1, totalSizeMb))); 337 LOG.debug("Rate limiting delayed this operation by {}", Duration.ofSeconds(rateLimitedSecs)); 338 339 final long pid; 340 try { 341 pid = masterServices.splitRegion(info, null, HConstants.NO_NONCE, HConstants.NO_NONCE); 342 } catch (IOException e) { 343 LOG.info("failed to submit plan {}.", plan, e); 344 planSkipped(plan.getType()); 345 return; 346 } 347 splitPlanCount++; 348 LOG.info("Submitted {} resulting in pid {}", plan, pid); 349 } 350}