001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.normalizer; 019 020import java.io.IOException; 021import java.time.Duration; 022import java.util.Collections; 023import java.util.List; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.HConstants; 026import org.apache.hadoop.hbase.TableName; 027import org.apache.hadoop.hbase.client.RegionInfo; 028import org.apache.hadoop.hbase.client.TableDescriptor; 029import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 030import org.apache.hadoop.hbase.conf.ConfigurationManager; 031import org.apache.hadoop.hbase.conf.ConfigurationObserver; 032import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; 033import org.apache.hadoop.hbase.master.MasterServices; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038import org.apache.hbase.thirdparty.com.google.common.util.concurrent.RateLimiter; 039import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 040 041/** 042 * Consumes normalization request targets ({@link TableName}s) off the 043 * {@link RegionNormalizerWorkQueue}, dispatches them to the {@link RegionNormalizer}, and executes 044 * the resulting {@link NormalizationPlan}s. 045 */ 046@InterfaceAudience.Private 047class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnable { 048 public static final String HBASE_TABLE_NORMALIZATION_ENABLED = 049 "hbase.table.normalization.enabled"; 050 private static final Logger LOG = LoggerFactory.getLogger(RegionNormalizerWorker.class); 051 052 static final String RATE_LIMIT_BYTES_PER_SEC_KEY = 053 "hbase.normalizer.throughput.max_bytes_per_sec"; 054 private static final long RATE_UNLIMITED_BYTES = 1_000_000_000_000L; // 1TB/sec 055 056 private final MasterServices masterServices; 057 private final RegionNormalizer regionNormalizer; 058 private final RegionNormalizerWorkQueue<TableName> workQueue; 059 private final RateLimiter rateLimiter; 060 061 private final long[] skippedCount; 062 private final boolean defaultNormalizerTableLevel; 063 private long splitPlanCount; 064 private long mergePlanCount; 065 066 RegionNormalizerWorker(final Configuration configuration, final MasterServices masterServices, 067 final RegionNormalizer regionNormalizer, final RegionNormalizerWorkQueue<TableName> workQueue) { 068 this.masterServices = masterServices; 069 this.regionNormalizer = regionNormalizer; 070 this.workQueue = workQueue; 071 this.skippedCount = new long[NormalizationPlan.PlanType.values().length]; 072 this.splitPlanCount = 0; 073 this.mergePlanCount = 0; 074 this.rateLimiter = loadRateLimiter(configuration); 075 this.defaultNormalizerTableLevel = extractDefaultNormalizerValue(configuration); 076 } 077 078 private boolean extractDefaultNormalizerValue(final Configuration configuration) { 079 String s = configuration.get(HBASE_TABLE_NORMALIZATION_ENABLED); 080 return Boolean.parseBoolean(s); 081 } 082 083 @Override 084 public void registerChildren(ConfigurationManager manager) { 085 if (regionNormalizer instanceof ConfigurationObserver) { 086 final ConfigurationObserver observer = (ConfigurationObserver) regionNormalizer; 087 manager.registerObserver(observer); 088 } 089 } 090 091 @Override 092 public void deregisterChildren(ConfigurationManager manager) { 093 if (regionNormalizer instanceof ConfigurationObserver) { 094 final ConfigurationObserver observer = (ConfigurationObserver) regionNormalizer; 095 manager.deregisterObserver(observer); 096 } 097 } 098 099 @Override 100 public void onConfigurationChange(Configuration conf) { 101 rateLimiter.setRate(loadRateLimit(conf)); 102 } 103 104 private static RateLimiter loadRateLimiter(final Configuration configuration) { 105 return RateLimiter.create(loadRateLimit(configuration)); 106 } 107 108 private static long loadRateLimit(final Configuration configuration) { 109 long rateLimitBytes = 110 configuration.getLongBytes(RATE_LIMIT_BYTES_PER_SEC_KEY, RATE_UNLIMITED_BYTES); 111 long rateLimitMbs = rateLimitBytes / 1_000_000L; 112 if (rateLimitMbs <= 0) { 113 LOG.warn("Configured value {}={} is <= 1MB. Falling back to default.", 114 RATE_LIMIT_BYTES_PER_SEC_KEY, rateLimitBytes); 115 rateLimitBytes = RATE_UNLIMITED_BYTES; 116 rateLimitMbs = RATE_UNLIMITED_BYTES / 1_000_000L; 117 } 118 LOG.info("Normalizer rate limit set to {}", 119 rateLimitBytes == RATE_UNLIMITED_BYTES ? "unlimited" : rateLimitMbs + " MB/sec"); 120 return rateLimitMbs; 121 } 122 123 /** 124 * @see RegionNormalizerManager#planSkipped(NormalizationPlan.PlanType) 125 */ 126 void planSkipped(NormalizationPlan.PlanType type) { 127 synchronized (skippedCount) { 128 // updates come here via procedure threads, so synchronize access to this counter. 129 skippedCount[type.ordinal()]++; 130 } 131 } 132 133 /** 134 * @see RegionNormalizerManager#getSkippedCount(NormalizationPlan.PlanType) 135 */ 136 long getSkippedCount(NormalizationPlan.PlanType type) { 137 return skippedCount[type.ordinal()]; 138 } 139 140 /** 141 * @see RegionNormalizerManager#getSplitPlanCount() 142 */ 143 long getSplitPlanCount() { 144 return splitPlanCount; 145 } 146 147 /** 148 * @see RegionNormalizerManager#getMergePlanCount() 149 */ 150 long getMergePlanCount() { 151 return mergePlanCount; 152 } 153 154 /** 155 * Used in test only. This field is exposed to the test, as opposed to tracking the current 156 * configuration value beside the RateLimiter instance and managing synchronization to keep the 157 * two in sync. 158 */ 159 RateLimiter getRateLimiter() { 160 return rateLimiter; 161 } 162 163 @Override 164 public void run() { 165 while (true) { 166 if (Thread.interrupted()) { 167 LOG.debug("interrupt detected. terminating."); 168 break; 169 } 170 final TableName tableName; 171 try { 172 tableName = workQueue.take(); 173 } catch (InterruptedException e) { 174 LOG.debug("interrupt detected. terminating."); 175 break; 176 } 177 178 final List<NormalizationPlan> plans = calculatePlans(tableName); 179 submitPlans(plans); 180 } 181 } 182 183 private List<NormalizationPlan> calculatePlans(final TableName tableName) { 184 if (masterServices.skipRegionManagementAction("region normalizer")) { 185 return Collections.emptyList(); 186 } 187 188 final TableDescriptor tblDesc; 189 try { 190 tblDesc = masterServices.getTableDescriptors().get(tableName); 191 boolean normalizationEnabled; 192 if (tblDesc != null) { 193 String defined = tblDesc.getValue(TableDescriptorBuilder.NORMALIZATION_ENABLED); 194 if (defined != null) { 195 normalizationEnabled = tblDesc.isNormalizationEnabled(); 196 } else { 197 normalizationEnabled = this.defaultNormalizerTableLevel; 198 } 199 if (!normalizationEnabled) { 200 LOG.debug("Skipping table {} because normalization is disabled in its table properties " 201 + "and normalization is also disabled at table level by default", tableName); 202 return Collections.emptyList(); 203 } 204 } 205 } catch (IOException e) { 206 LOG.debug("Skipping table {} because unable to access its table descriptor.", tableName, e); 207 return Collections.emptyList(); 208 } 209 210 final List<NormalizationPlan> plans = regionNormalizer.computePlansForTable(tblDesc); 211 if (CollectionUtils.isEmpty(plans)) { 212 LOG.debug("No normalization required for table {}.", tableName); 213 return Collections.emptyList(); 214 } 215 return plans; 216 } 217 218 private void submitPlans(final List<NormalizationPlan> plans) { 219 // as of this writing, `plan.submit()` is non-blocking and uses Async Admin APIs to submit 220 // task, so there's no artificial rate-limiting of merge/split requests due to this serial loop. 221 for (NormalizationPlan plan : plans) { 222 switch (plan.getType()) { 223 case MERGE: { 224 submitMergePlan((MergeNormalizationPlan) plan); 225 break; 226 } 227 case SPLIT: { 228 submitSplitPlan((SplitNormalizationPlan) plan); 229 break; 230 } 231 case NONE: 232 LOG.debug("Nothing to do for {} with PlanType=NONE. Ignoring.", plan); 233 planSkipped(plan.getType()); 234 break; 235 default: 236 LOG.warn("Plan {} is of an unrecognized PlanType. Ignoring.", plan); 237 planSkipped(plan.getType()); 238 break; 239 } 240 } 241 } 242 243 /** 244 * Interacts with {@link MasterServices} in order to execute a plan. 245 */ 246 private void submitMergePlan(final MergeNormalizationPlan plan) { 247 final int totalSizeMb; 248 try { 249 final long totalSizeMbLong = plan.getNormalizationTargets().stream() 250 .mapToLong(NormalizationTarget::getRegionSizeMb).reduce(0, Math::addExact); 251 totalSizeMb = Math.toIntExact(totalSizeMbLong); 252 } catch (ArithmeticException e) { 253 LOG.debug("Sum of merge request size overflows rate limiter data type. {}", plan); 254 planSkipped(plan.getType()); 255 return; 256 } 257 258 final RegionInfo[] infos = plan.getNormalizationTargets().stream() 259 .map(NormalizationTarget::getRegionInfo).toArray(RegionInfo[]::new); 260 final long pid; 261 try { 262 pid = masterServices.mergeRegions(infos, false, HConstants.NO_NONCE, HConstants.NO_NONCE); 263 } catch (IOException e) { 264 LOG.info("failed to submit plan {}.", plan, e); 265 planSkipped(plan.getType()); 266 return; 267 } 268 mergePlanCount++; 269 LOG.info("Submitted {} resulting in pid {}", plan, pid); 270 final long rateLimitedSecs = Math.round(rateLimiter.acquire(Math.max(1, totalSizeMb))); 271 LOG.debug("Rate limiting delayed the worker by {}", Duration.ofSeconds(rateLimitedSecs)); 272 } 273 274 /** 275 * Interacts with {@link MasterServices} in order to execute a plan. 276 */ 277 private void submitSplitPlan(final SplitNormalizationPlan plan) { 278 final int totalSizeMb; 279 try { 280 totalSizeMb = Math.toIntExact(plan.getSplitTarget().getRegionSizeMb()); 281 } catch (ArithmeticException e) { 282 LOG.debug("Split request size overflows rate limiter data type. {}", plan); 283 planSkipped(plan.getType()); 284 return; 285 } 286 final RegionInfo info = plan.getSplitTarget().getRegionInfo(); 287 final long rateLimitedSecs = Math.round(rateLimiter.acquire(Math.max(1, totalSizeMb))); 288 LOG.debug("Rate limiting delayed this operation by {}", Duration.ofSeconds(rateLimitedSecs)); 289 290 final long pid; 291 try { 292 pid = masterServices.splitRegion(info, null, HConstants.NO_NONCE, HConstants.NO_NONCE); 293 } catch (IOException e) { 294 LOG.info("failed to submit plan {}.", plan, e); 295 planSkipped(plan.getType()); 296 return; 297 } 298 splitPlanCount++; 299 LOG.info("Submitted {} resulting in pid {}", plan, pid); 300 } 301}