001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.normalizer;
019
020import java.io.IOException;
021import java.time.Duration;
022import java.util.Collections;
023import java.util.List;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.HConstants;
026import org.apache.hadoop.hbase.TableName;
027import org.apache.hadoop.hbase.client.RegionInfo;
028import org.apache.hadoop.hbase.client.TableDescriptor;
029import org.apache.hadoop.hbase.conf.ConfigurationManager;
030import org.apache.hadoop.hbase.conf.ConfigurationObserver;
031import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
032import org.apache.hadoop.hbase.master.MasterServices;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036import org.apache.hbase.thirdparty.com.google.common.util.concurrent.RateLimiter;
037import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
038
039/**
040 * Consumes normalization request targets ({@link TableName}s) off the
041 * {@link RegionNormalizerWorkQueue}, dispatches them to the {@link RegionNormalizer},
042 * and executes the resulting {@link NormalizationPlan}s.
043 */
044@InterfaceAudience.Private
045class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnable {
046  private static final Logger LOG = LoggerFactory.getLogger(RegionNormalizerWorker.class);
047
048  static final String RATE_LIMIT_BYTES_PER_SEC_KEY =
049    "hbase.normalizer.throughput.max_bytes_per_sec";
050  private static final long RATE_UNLIMITED_BYTES = 1_000_000_000_000L; // 1TB/sec
051
052  private final MasterServices masterServices;
053  private final RegionNormalizer regionNormalizer;
054  private final RegionNormalizerWorkQueue<TableName> workQueue;
055  private final RateLimiter rateLimiter;
056
057  private final long[] skippedCount;
058  private long splitPlanCount;
059  private long mergePlanCount;
060
061  RegionNormalizerWorker(
062    final Configuration configuration,
063    final MasterServices masterServices,
064    final RegionNormalizer regionNormalizer,
065    final RegionNormalizerWorkQueue<TableName> workQueue
066  ) {
067    this.masterServices = masterServices;
068    this.regionNormalizer = regionNormalizer;
069    this.workQueue = workQueue;
070    this.skippedCount = new long[NormalizationPlan.PlanType.values().length];
071    this.splitPlanCount = 0;
072    this.mergePlanCount = 0;
073    this.rateLimiter = loadRateLimiter(configuration);
074  }
075
076  @Override
077  public void registerChildren(ConfigurationManager manager) {
078    if (regionNormalizer instanceof ConfigurationObserver) {
079      final ConfigurationObserver observer = (ConfigurationObserver)  regionNormalizer;
080      manager.registerObserver(observer);
081    }
082  }
083
084  @Override
085  public void deregisterChildren(ConfigurationManager manager) {
086    if (regionNormalizer instanceof ConfigurationObserver) {
087      final ConfigurationObserver observer = (ConfigurationObserver)  regionNormalizer;
088      manager.deregisterObserver(observer);
089    }
090  }
091
092  @Override
093  public void onConfigurationChange(Configuration conf) {
094    rateLimiter.setRate(loadRateLimit(conf));
095  }
096
097  private static RateLimiter loadRateLimiter(final Configuration configuration) {
098    return RateLimiter.create(loadRateLimit(configuration));
099  }
100
101  private static long loadRateLimit(final Configuration configuration) {
102    long rateLimitBytes =
103      configuration.getLongBytes(RATE_LIMIT_BYTES_PER_SEC_KEY, RATE_UNLIMITED_BYTES);
104    long rateLimitMbs = rateLimitBytes / 1_000_000L;
105    if (rateLimitMbs <= 0) {
106      LOG.warn("Configured value {}={} is <= 1MB. Falling back to default.",
107        RATE_LIMIT_BYTES_PER_SEC_KEY, rateLimitBytes);
108      rateLimitBytes = RATE_UNLIMITED_BYTES;
109      rateLimitMbs = RATE_UNLIMITED_BYTES / 1_000_000L;
110    }
111    LOG.info("Normalizer rate limit set to {}",
112      rateLimitBytes == RATE_UNLIMITED_BYTES ? "unlimited" : rateLimitMbs + " MB/sec");
113    return rateLimitMbs;
114  }
115
116  /**
117   * @see RegionNormalizerManager#planSkipped(NormalizationPlan.PlanType)
118   */
119  void planSkipped(NormalizationPlan.PlanType type) {
120    synchronized (skippedCount) {
121      // updates come here via procedure threads, so synchronize access to this counter.
122      skippedCount[type.ordinal()]++;
123    }
124  }
125
126  /**
127   * @see RegionNormalizerManager#getSkippedCount(NormalizationPlan.PlanType)
128   */
129  long getSkippedCount(NormalizationPlan.PlanType type) {
130    return skippedCount[type.ordinal()];
131  }
132
133  /**
134   * @see RegionNormalizerManager#getSplitPlanCount()
135   */
136  long getSplitPlanCount() {
137    return splitPlanCount;
138  }
139
140  /**
141   * @see RegionNormalizerManager#getMergePlanCount()
142   */
143  long getMergePlanCount() {
144    return mergePlanCount;
145  }
146
147  /**
148   * Used in test only. This field is exposed to the test, as opposed to tracking the current
149   * configuration value beside the RateLimiter instance and managing synchronization to keep the
150   * two in sync.
151   */
152  RateLimiter getRateLimiter() {
153    return rateLimiter;
154  }
155
156  @Override
157  public void run() {
158    while (true) {
159      if (Thread.interrupted()) {
160        LOG.debug("interrupt detected. terminating.");
161        break;
162      }
163      final TableName tableName;
164      try {
165        tableName = workQueue.take();
166      } catch (InterruptedException e) {
167        LOG.debug("interrupt detected. terminating.");
168        break;
169      }
170
171      final List<NormalizationPlan> plans = calculatePlans(tableName);
172      submitPlans(plans);
173    }
174  }
175
176  private List<NormalizationPlan> calculatePlans(final TableName tableName) {
177    if (masterServices.skipRegionManagementAction("region normalizer")) {
178      return Collections.emptyList();
179    }
180
181    try {
182      final TableDescriptor tblDesc = masterServices.getTableDescriptors().get(tableName);
183      if (tblDesc != null && !tblDesc.isNormalizationEnabled()) {
184        LOG.debug("Skipping table {} because normalization is disabled in its table properties.",
185          tableName);
186        return Collections.emptyList();
187      }
188    } catch (IOException e) {
189      LOG.debug("Skipping table {} because unable to access its table descriptor.", tableName, e);
190      return Collections.emptyList();
191    }
192
193    final List<NormalizationPlan> plans = regionNormalizer.computePlansForTable(tableName);
194    if (CollectionUtils.isEmpty(plans)) {
195      LOG.debug("No normalization required for table {}.", tableName);
196      return Collections.emptyList();
197    }
198    return plans;
199  }
200
201  private void submitPlans(final List<NormalizationPlan> plans) {
202    // as of this writing, `plan.submit()` is non-blocking and uses Async Admin APIs to submit
203    // task, so there's no artificial rate-limiting of merge/split requests due to this serial loop.
204    for (NormalizationPlan plan : plans) {
205      switch (plan.getType()) {
206        case MERGE: {
207          submitMergePlan((MergeNormalizationPlan) plan);
208          break;
209        }
210        case SPLIT: {
211          submitSplitPlan((SplitNormalizationPlan) plan);
212          break;
213        }
214        case NONE:
215          LOG.debug("Nothing to do for {} with PlanType=NONE. Ignoring.", plan);
216          planSkipped(plan.getType());
217          break;
218        default:
219          LOG.warn("Plan {} is of an unrecognized PlanType. Ignoring.", plan);
220          planSkipped(plan.getType());
221          break;
222      }
223    }
224  }
225
226  /**
227   * Interacts with {@link MasterServices} in order to execute a plan.
228   */
229  private void submitMergePlan(final MergeNormalizationPlan plan) {
230    final int totalSizeMb;
231    try {
232      final long totalSizeMbLong = plan.getNormalizationTargets()
233        .stream()
234        .mapToLong(NormalizationTarget::getRegionSizeMb)
235        .reduce(0, Math::addExact);
236      totalSizeMb = Math.toIntExact(totalSizeMbLong);
237    } catch (ArithmeticException e) {
238      LOG.debug("Sum of merge request size overflows rate limiter data type. {}", plan);
239      planSkipped(plan.getType());
240      return;
241    }
242
243    final RegionInfo[] infos = plan.getNormalizationTargets()
244      .stream()
245      .map(NormalizationTarget::getRegionInfo)
246      .toArray(RegionInfo[]::new);
247    final long pid;
248    try {
249      pid = masterServices.mergeRegions(
250        infos, false, HConstants.NO_NONCE, HConstants.NO_NONCE);
251    } catch (IOException e) {
252      LOG.info("failed to submit plan {}.", plan, e);
253      planSkipped(plan.getType());
254      return;
255    }
256    mergePlanCount++;
257    LOG.info("Submitted {} resulting in pid {}", plan, pid);
258    final long rateLimitedSecs = Math.round(rateLimiter.acquire(Math.max(1, totalSizeMb)));
259    LOG.debug("Rate limiting delayed the worker by {}", Duration.ofSeconds(rateLimitedSecs));
260  }
261
262  /**
263   * Interacts with {@link MasterServices} in order to execute a plan.
264   */
265  private void submitSplitPlan(final SplitNormalizationPlan plan) {
266    final int totalSizeMb;
267    try {
268      totalSizeMb = Math.toIntExact(plan.getSplitTarget().getRegionSizeMb());
269    } catch (ArithmeticException e) {
270      LOG.debug("Split request size overflows rate limiter data type. {}", plan);
271      planSkipped(plan.getType());
272      return;
273    }
274    final RegionInfo info = plan.getSplitTarget().getRegionInfo();
275    final long rateLimitedSecs = Math.round(rateLimiter.acquire(Math.max(1, totalSizeMb)));
276    LOG.debug("Rate limiting delayed this operation by {}", Duration.ofSeconds(rateLimitedSecs));
277
278    final long pid;
279    try {
280      pid = masterServices.splitRegion(
281        info, null, HConstants.NO_NONCE, HConstants.NO_NONCE);
282    } catch (IOException e) {
283      LOG.info("failed to submit plan {}.", plan, e);
284      planSkipped(plan.getType());
285      return;
286    }
287    splitPlanCount++;
288    LOG.info("Submitted {} resulting in pid {}", plan, pid);
289  }
290}