001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.normalizer; 019 020import java.io.IOException; 021import java.time.Duration; 022import java.util.Collections; 023import java.util.List; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.HConstants; 026import org.apache.hadoop.hbase.TableName; 027import org.apache.hadoop.hbase.client.RegionInfo; 028import org.apache.hadoop.hbase.client.TableDescriptor; 029import org.apache.hadoop.hbase.conf.ConfigurationManager; 030import org.apache.hadoop.hbase.conf.ConfigurationObserver; 031import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; 032import org.apache.hadoop.hbase.master.MasterServices; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036import org.apache.hbase.thirdparty.com.google.common.util.concurrent.RateLimiter; 037import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 038 039/** 040 * Consumes normalization request targets ({@link TableName}s) off the 041 * {@link RegionNormalizerWorkQueue}, dispatches them to the {@link RegionNormalizer}, 042 * and executes the resulting {@link NormalizationPlan}s. 043 */ 044@InterfaceAudience.Private 045class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnable { 046 private static final Logger LOG = LoggerFactory.getLogger(RegionNormalizerWorker.class); 047 048 static final String RATE_LIMIT_BYTES_PER_SEC_KEY = 049 "hbase.normalizer.throughput.max_bytes_per_sec"; 050 private static final long RATE_UNLIMITED_BYTES = 1_000_000_000_000L; // 1TB/sec 051 052 private final MasterServices masterServices; 053 private final RegionNormalizer regionNormalizer; 054 private final RegionNormalizerWorkQueue<TableName> workQueue; 055 private final RateLimiter rateLimiter; 056 057 private final long[] skippedCount; 058 private long splitPlanCount; 059 private long mergePlanCount; 060 061 RegionNormalizerWorker( 062 final Configuration configuration, 063 final MasterServices masterServices, 064 final RegionNormalizer regionNormalizer, 065 final RegionNormalizerWorkQueue<TableName> workQueue 066 ) { 067 this.masterServices = masterServices; 068 this.regionNormalizer = regionNormalizer; 069 this.workQueue = workQueue; 070 this.skippedCount = new long[NormalizationPlan.PlanType.values().length]; 071 this.splitPlanCount = 0; 072 this.mergePlanCount = 0; 073 this.rateLimiter = loadRateLimiter(configuration); 074 } 075 076 @Override 077 public void registerChildren(ConfigurationManager manager) { 078 if (regionNormalizer instanceof ConfigurationObserver) { 079 final ConfigurationObserver observer = (ConfigurationObserver) regionNormalizer; 080 manager.registerObserver(observer); 081 } 082 } 083 084 @Override 085 public void deregisterChildren(ConfigurationManager manager) { 086 if (regionNormalizer instanceof ConfigurationObserver) { 087 final ConfigurationObserver observer = (ConfigurationObserver) regionNormalizer; 088 manager.deregisterObserver(observer); 089 } 090 } 091 092 @Override 093 public void onConfigurationChange(Configuration conf) { 094 rateLimiter.setRate(loadRateLimit(conf)); 095 } 096 097 private static RateLimiter loadRateLimiter(final Configuration configuration) { 098 return RateLimiter.create(loadRateLimit(configuration)); 099 } 100 101 private static long loadRateLimit(final Configuration configuration) { 102 long rateLimitBytes = 103 configuration.getLongBytes(RATE_LIMIT_BYTES_PER_SEC_KEY, RATE_UNLIMITED_BYTES); 104 long rateLimitMbs = rateLimitBytes / 1_000_000L; 105 if (rateLimitMbs <= 0) { 106 LOG.warn("Configured value {}={} is <= 1MB. Falling back to default.", 107 RATE_LIMIT_BYTES_PER_SEC_KEY, rateLimitBytes); 108 rateLimitBytes = RATE_UNLIMITED_BYTES; 109 rateLimitMbs = RATE_UNLIMITED_BYTES / 1_000_000L; 110 } 111 LOG.info("Normalizer rate limit set to {}", 112 rateLimitBytes == RATE_UNLIMITED_BYTES ? "unlimited" : rateLimitMbs + " MB/sec"); 113 return rateLimitMbs; 114 } 115 116 /** 117 * @see RegionNormalizerManager#planSkipped(NormalizationPlan.PlanType) 118 */ 119 void planSkipped(NormalizationPlan.PlanType type) { 120 synchronized (skippedCount) { 121 // updates come here via procedure threads, so synchronize access to this counter. 122 skippedCount[type.ordinal()]++; 123 } 124 } 125 126 /** 127 * @see RegionNormalizerManager#getSkippedCount(NormalizationPlan.PlanType) 128 */ 129 long getSkippedCount(NormalizationPlan.PlanType type) { 130 return skippedCount[type.ordinal()]; 131 } 132 133 /** 134 * @see RegionNormalizerManager#getSplitPlanCount() 135 */ 136 long getSplitPlanCount() { 137 return splitPlanCount; 138 } 139 140 /** 141 * @see RegionNormalizerManager#getMergePlanCount() 142 */ 143 long getMergePlanCount() { 144 return mergePlanCount; 145 } 146 147 /** 148 * Used in test only. This field is exposed to the test, as opposed to tracking the current 149 * configuration value beside the RateLimiter instance and managing synchronization to keep the 150 * two in sync. 151 */ 152 RateLimiter getRateLimiter() { 153 return rateLimiter; 154 } 155 156 @Override 157 public void run() { 158 while (true) { 159 if (Thread.interrupted()) { 160 LOG.debug("interrupt detected. terminating."); 161 break; 162 } 163 final TableName tableName; 164 try { 165 tableName = workQueue.take(); 166 } catch (InterruptedException e) { 167 LOG.debug("interrupt detected. terminating."); 168 break; 169 } 170 171 final List<NormalizationPlan> plans = calculatePlans(tableName); 172 submitPlans(plans); 173 } 174 } 175 176 private List<NormalizationPlan> calculatePlans(final TableName tableName) { 177 if (masterServices.skipRegionManagementAction("region normalizer")) { 178 return Collections.emptyList(); 179 } 180 181 try { 182 final TableDescriptor tblDesc = masterServices.getTableDescriptors().get(tableName); 183 if (tblDesc != null && !tblDesc.isNormalizationEnabled()) { 184 LOG.debug("Skipping table {} because normalization is disabled in its table properties.", 185 tableName); 186 return Collections.emptyList(); 187 } 188 } catch (IOException e) { 189 LOG.debug("Skipping table {} because unable to access its table descriptor.", tableName, e); 190 return Collections.emptyList(); 191 } 192 193 final List<NormalizationPlan> plans = regionNormalizer.computePlansForTable(tableName); 194 if (CollectionUtils.isEmpty(plans)) { 195 LOG.debug("No normalization required for table {}.", tableName); 196 return Collections.emptyList(); 197 } 198 return plans; 199 } 200 201 private void submitPlans(final List<NormalizationPlan> plans) { 202 // as of this writing, `plan.submit()` is non-blocking and uses Async Admin APIs to submit 203 // task, so there's no artificial rate-limiting of merge/split requests due to this serial loop. 204 for (NormalizationPlan plan : plans) { 205 switch (plan.getType()) { 206 case MERGE: { 207 submitMergePlan((MergeNormalizationPlan) plan); 208 break; 209 } 210 case SPLIT: { 211 submitSplitPlan((SplitNormalizationPlan) plan); 212 break; 213 } 214 case NONE: 215 LOG.debug("Nothing to do for {} with PlanType=NONE. Ignoring.", plan); 216 planSkipped(plan.getType()); 217 break; 218 default: 219 LOG.warn("Plan {} is of an unrecognized PlanType. Ignoring.", plan); 220 planSkipped(plan.getType()); 221 break; 222 } 223 } 224 } 225 226 /** 227 * Interacts with {@link MasterServices} in order to execute a plan. 228 */ 229 private void submitMergePlan(final MergeNormalizationPlan plan) { 230 final int totalSizeMb; 231 try { 232 final long totalSizeMbLong = plan.getNormalizationTargets() 233 .stream() 234 .mapToLong(NormalizationTarget::getRegionSizeMb) 235 .reduce(0, Math::addExact); 236 totalSizeMb = Math.toIntExact(totalSizeMbLong); 237 } catch (ArithmeticException e) { 238 LOG.debug("Sum of merge request size overflows rate limiter data type. {}", plan); 239 planSkipped(plan.getType()); 240 return; 241 } 242 243 final RegionInfo[] infos = plan.getNormalizationTargets() 244 .stream() 245 .map(NormalizationTarget::getRegionInfo) 246 .toArray(RegionInfo[]::new); 247 final long pid; 248 try { 249 pid = masterServices.mergeRegions( 250 infos, false, HConstants.NO_NONCE, HConstants.NO_NONCE); 251 } catch (IOException e) { 252 LOG.info("failed to submit plan {}.", plan, e); 253 planSkipped(plan.getType()); 254 return; 255 } 256 mergePlanCount++; 257 LOG.info("Submitted {} resulting in pid {}", plan, pid); 258 final long rateLimitedSecs = Math.round(rateLimiter.acquire(Math.max(1, totalSizeMb))); 259 LOG.debug("Rate limiting delayed the worker by {}", Duration.ofSeconds(rateLimitedSecs)); 260 } 261 262 /** 263 * Interacts with {@link MasterServices} in order to execute a plan. 264 */ 265 private void submitSplitPlan(final SplitNormalizationPlan plan) { 266 final int totalSizeMb; 267 try { 268 totalSizeMb = Math.toIntExact(plan.getSplitTarget().getRegionSizeMb()); 269 } catch (ArithmeticException e) { 270 LOG.debug("Split request size overflows rate limiter data type. {}", plan); 271 planSkipped(plan.getType()); 272 return; 273 } 274 final RegionInfo info = plan.getSplitTarget().getRegionInfo(); 275 final long rateLimitedSecs = Math.round(rateLimiter.acquire(Math.max(1, totalSizeMb))); 276 LOG.debug("Rate limiting delayed this operation by {}", Duration.ofSeconds(rateLimitedSecs)); 277 278 final long pid; 279 try { 280 pid = masterServices.splitRegion( 281 info, null, HConstants.NO_NONCE, HConstants.NO_NONCE); 282 } catch (IOException e) { 283 LOG.info("failed to submit plan {}.", plan, e); 284 planSkipped(plan.getType()); 285 return; 286 } 287 splitPlanCount++; 288 LOG.info("Submitted {} resulting in pid {}", plan, pid); 289 } 290}