001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.normalizer;
019
020import java.io.IOException;
021import java.time.Duration;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.List;
025import java.util.concurrent.atomic.AtomicLong;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.hadoop.hbase.TableName;
029import org.apache.hadoop.hbase.client.RegionInfo;
030import org.apache.hadoop.hbase.client.TableDescriptor;
031import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
032import org.apache.hadoop.hbase.conf.ConfigurationManager;
033import org.apache.hadoop.hbase.conf.ConfigurationObserver;
034import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
035import org.apache.hadoop.hbase.master.MasterServices;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040import org.apache.hbase.thirdparty.com.google.common.util.concurrent.RateLimiter;
041import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
042
043/**
044 * Consumes normalization request targets ({@link TableName}s) off the
045 * {@link RegionNormalizerWorkQueue}, dispatches them to the {@link RegionNormalizer}, and executes
046 * the resulting {@link NormalizationPlan}s.
047 */
048@InterfaceAudience.Private
049class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnable {
050  public static final String HBASE_TABLE_NORMALIZATION_ENABLED =
051    "hbase.table.normalization.enabled";
052  private static final Logger LOG = LoggerFactory.getLogger(RegionNormalizerWorker.class);
053
054  static final String RATE_LIMIT_BYTES_PER_SEC_KEY =
055    "hbase.normalizer.throughput.max_bytes_per_sec";
056  private static final long RATE_UNLIMITED_BYTES = 1_000_000_000_000L; // 1TB/sec
057
058  static final String CUMULATIVE_SIZE_LIMIT_MB_KEY = "hbase.normalizer.plans_size_limit.mb";
059  static final long DEFAULT_CUMULATIVE_SIZE_LIMIT_MB = Long.MAX_VALUE;
060
061  private final MasterServices masterServices;
062  private final RegionNormalizer regionNormalizer;
063  private final RegionNormalizerWorkQueue<TableName> workQueue;
064  private final RateLimiter rateLimiter;
065
066  private final long[] skippedCount;
067  private final boolean defaultNormalizerTableLevel;
068  private long splitPlanCount;
069  private long mergePlanCount;
070  private final AtomicLong cumulativePlansSizeLimitMb;
071
072  RegionNormalizerWorker(final Configuration configuration, final MasterServices masterServices,
073    final RegionNormalizer regionNormalizer, final RegionNormalizerWorkQueue<TableName> workQueue) {
074    this.masterServices = masterServices;
075    this.regionNormalizer = regionNormalizer;
076    this.workQueue = workQueue;
077    this.skippedCount = new long[NormalizationPlan.PlanType.values().length];
078    this.splitPlanCount = 0;
079    this.mergePlanCount = 0;
080    this.rateLimiter = loadRateLimiter(configuration);
081    this.defaultNormalizerTableLevel = extractDefaultNormalizerValue(configuration);
082    this.cumulativePlansSizeLimitMb = new AtomicLong(
083      configuration.getLong(CUMULATIVE_SIZE_LIMIT_MB_KEY, DEFAULT_CUMULATIVE_SIZE_LIMIT_MB));
084  }
085
086  private boolean extractDefaultNormalizerValue(final Configuration configuration) {
087    String s = configuration.get(HBASE_TABLE_NORMALIZATION_ENABLED);
088    return Boolean.parseBoolean(s);
089  }
090
091  @Override
092  public void registerChildren(ConfigurationManager manager) {
093    if (regionNormalizer instanceof ConfigurationObserver) {
094      final ConfigurationObserver observer = (ConfigurationObserver) regionNormalizer;
095      manager.registerObserver(observer);
096    }
097  }
098
099  @Override
100  public void deregisterChildren(ConfigurationManager manager) {
101    if (regionNormalizer instanceof ConfigurationObserver) {
102      final ConfigurationObserver observer = (ConfigurationObserver) regionNormalizer;
103      manager.deregisterObserver(observer);
104    }
105  }
106
107  private static long logLongConfigurationUpdated(final String key, final long oldValue,
108    final long newValue) {
109    if (oldValue != newValue) {
110      LOG.info("Updated configuration for key '{}' from {} to {}", key, oldValue, newValue);
111    }
112    return newValue;
113  }
114
115  @Override
116  public void onConfigurationChange(Configuration conf) {
117    rateLimiter.setRate(loadRateLimit(conf));
118    cumulativePlansSizeLimitMb.set(
119      logLongConfigurationUpdated(CUMULATIVE_SIZE_LIMIT_MB_KEY, cumulativePlansSizeLimitMb.get(),
120        conf.getLong(CUMULATIVE_SIZE_LIMIT_MB_KEY, DEFAULT_CUMULATIVE_SIZE_LIMIT_MB)));
121  }
122
123  private static RateLimiter loadRateLimiter(final Configuration configuration) {
124    return RateLimiter.create(loadRateLimit(configuration));
125  }
126
127  private static long loadRateLimit(final Configuration configuration) {
128    long rateLimitBytes =
129      configuration.getLongBytes(RATE_LIMIT_BYTES_PER_SEC_KEY, RATE_UNLIMITED_BYTES);
130    long rateLimitMbs = rateLimitBytes / 1_000_000L;
131    if (rateLimitMbs <= 0) {
132      LOG.warn("Configured value {}={} is <= 1MB. Falling back to default.",
133        RATE_LIMIT_BYTES_PER_SEC_KEY, rateLimitBytes);
134      rateLimitBytes = RATE_UNLIMITED_BYTES;
135      rateLimitMbs = RATE_UNLIMITED_BYTES / 1_000_000L;
136    }
137    LOG.info("Normalizer rate limit set to {}",
138      rateLimitBytes == RATE_UNLIMITED_BYTES ? "unlimited" : rateLimitMbs + " MB/sec");
139    return rateLimitMbs;
140  }
141
142  /**
143   * @see RegionNormalizerManager#planSkipped(NormalizationPlan.PlanType)
144   */
145  void planSkipped(NormalizationPlan.PlanType type) {
146    synchronized (skippedCount) {
147      // updates come here via procedure threads, so synchronize access to this counter.
148      skippedCount[type.ordinal()]++;
149    }
150  }
151
152  /**
153   * @see RegionNormalizerManager#getSkippedCount(NormalizationPlan.PlanType)
154   */
155  long getSkippedCount(NormalizationPlan.PlanType type) {
156    return skippedCount[type.ordinal()];
157  }
158
159  /**
160   * @see RegionNormalizerManager#getSplitPlanCount()
161   */
162  long getSplitPlanCount() {
163    return splitPlanCount;
164  }
165
166  /**
167   * @see RegionNormalizerManager#getMergePlanCount()
168   */
169  long getMergePlanCount() {
170    return mergePlanCount;
171  }
172
173  /**
174   * Used in test only. This field is exposed to the test, as opposed to tracking the current
175   * configuration value beside the RateLimiter instance and managing synchronization to keep the
176   * two in sync.
177   */
178  RateLimiter getRateLimiter() {
179    return rateLimiter;
180  }
181
182  @Override
183  public void run() {
184    while (true) {
185      if (Thread.interrupted()) {
186        LOG.debug("interrupt detected. terminating.");
187        break;
188      }
189      final TableName tableName;
190      try {
191        tableName = workQueue.take();
192      } catch (InterruptedException e) {
193        LOG.debug("interrupt detected. terminating.");
194        break;
195      }
196
197      final List<NormalizationPlan> plans = calculatePlans(tableName);
198      submitPlans(plans);
199    }
200  }
201
202  private List<NormalizationPlan> calculatePlans(final TableName tableName) {
203    if (masterServices.skipRegionManagementAction("region normalizer")) {
204      return Collections.emptyList();
205    }
206
207    final TableDescriptor tblDesc;
208    try {
209      tblDesc = masterServices.getTableDescriptors().get(tableName);
210      boolean normalizationEnabled;
211      if (tblDesc != null) {
212        String defined = tblDesc.getValue(TableDescriptorBuilder.NORMALIZATION_ENABLED);
213        if (defined != null) {
214          normalizationEnabled = tblDesc.isNormalizationEnabled();
215        } else {
216          normalizationEnabled = this.defaultNormalizerTableLevel;
217        }
218        if (!normalizationEnabled) {
219          LOG.debug("Skipping table {} because normalization is disabled in its table properties "
220            + "and normalization is also disabled at table level by default", tableName);
221          return Collections.emptyList();
222        }
223      }
224    } catch (IOException e) {
225      LOG.debug("Skipping table {} because unable to access its table descriptor.", tableName, e);
226      return Collections.emptyList();
227    }
228
229    List<NormalizationPlan> plans = regionNormalizer.computePlansForTable(tblDesc);
230
231    plans = truncateForSize(plans);
232
233    if (CollectionUtils.isEmpty(plans)) {
234      LOG.debug("No normalization required for table {}.", tableName);
235      return Collections.emptyList();
236    }
237    return plans;
238  }
239
240  private List<NormalizationPlan> truncateForSize(List<NormalizationPlan> plans) {
241    if (cumulativePlansSizeLimitMb.get() != DEFAULT_CUMULATIVE_SIZE_LIMIT_MB) {
242      List<NormalizationPlan> maybeTruncatedPlans = new ArrayList<>(plans.size());
243      long totalCumulativeSizeMb = 0;
244      long truncatedCumulativeSizeMb = 0;
245      for (NormalizationPlan plan : plans) {
246        totalCumulativeSizeMb += plan.getPlanSizeMb();
247        if (totalCumulativeSizeMb <= cumulativePlansSizeLimitMb.get()) {
248          truncatedCumulativeSizeMb += plan.getPlanSizeMb();
249          maybeTruncatedPlans.add(plan);
250        }
251      }
252      if (maybeTruncatedPlans.size() != plans.size()) {
253        LOG.debug(
254          "Truncating list of normalization plans that RegionNormalizerWorker will process "
255            + "because of {}. Original list had {} plan(s), new list has {} plan(s). "
256            + "Original list covered regions with cumulative size {} mb, "
257            + "new list covers regions with cumulative size {} mb.",
258          CUMULATIVE_SIZE_LIMIT_MB_KEY, plans.size(), maybeTruncatedPlans.size(),
259          totalCumulativeSizeMb, truncatedCumulativeSizeMb);
260      }
261      return maybeTruncatedPlans;
262    } else {
263      return plans;
264    }
265  }
266
267  private void submitPlans(final List<NormalizationPlan> plans) {
268    // as of this writing, `plan.submit()` is non-blocking and uses Async Admin APIs to submit
269    // task, so there's no artificial rate-limiting of merge/split requests due to this serial loop.
270    for (NormalizationPlan plan : plans) {
271      switch (plan.getType()) {
272        case MERGE: {
273          submitMergePlan((MergeNormalizationPlan) plan);
274          break;
275        }
276        case SPLIT: {
277          submitSplitPlan((SplitNormalizationPlan) plan);
278          break;
279        }
280        case NONE:
281          LOG.debug("Nothing to do for {} with PlanType=NONE. Ignoring.", plan);
282          planSkipped(plan.getType());
283          break;
284        default:
285          LOG.warn("Plan {} is of an unrecognized PlanType. Ignoring.", plan);
286          planSkipped(plan.getType());
287          break;
288      }
289    }
290  }
291
292  /**
293   * Interacts with {@link MasterServices} in order to execute a plan.
294   */
295  private void submitMergePlan(final MergeNormalizationPlan plan) {
296    final int totalSizeMb;
297    try {
298      final long totalSizeMbLong = plan.getNormalizationTargets().stream()
299        .mapToLong(NormalizationTarget::getRegionSizeMb).reduce(0, Math::addExact);
300      totalSizeMb = Math.toIntExact(totalSizeMbLong);
301    } catch (ArithmeticException e) {
302      LOG.debug("Sum of merge request size overflows rate limiter data type. {}", plan);
303      planSkipped(plan.getType());
304      return;
305    }
306
307    final RegionInfo[] infos = plan.getNormalizationTargets().stream()
308      .map(NormalizationTarget::getRegionInfo).toArray(RegionInfo[]::new);
309    final long pid;
310    try {
311      pid = masterServices.mergeRegions(infos, false, HConstants.NO_NONCE, HConstants.NO_NONCE);
312    } catch (IOException e) {
313      LOG.info("failed to submit plan {}.", plan, e);
314      planSkipped(plan.getType());
315      return;
316    }
317    mergePlanCount++;
318    LOG.info("Submitted {} resulting in pid {}", plan, pid);
319    final long rateLimitedSecs = Math.round(rateLimiter.acquire(Math.max(1, totalSizeMb)));
320    LOG.debug("Rate limiting delayed the worker by {}", Duration.ofSeconds(rateLimitedSecs));
321  }
322
323  /**
324   * Interacts with {@link MasterServices} in order to execute a plan.
325   */
326  private void submitSplitPlan(final SplitNormalizationPlan plan) {
327    final int totalSizeMb;
328    try {
329      totalSizeMb = Math.toIntExact(plan.getSplitTarget().getRegionSizeMb());
330    } catch (ArithmeticException e) {
331      LOG.debug("Split request size overflows rate limiter data type. {}", plan);
332      planSkipped(plan.getType());
333      return;
334    }
335    final RegionInfo info = plan.getSplitTarget().getRegionInfo();
336    final long rateLimitedSecs = Math.round(rateLimiter.acquire(Math.max(1, totalSizeMb)));
337    LOG.debug("Rate limiting delayed this operation by {}", Duration.ofSeconds(rateLimitedSecs));
338
339    final long pid;
340    try {
341      pid = masterServices.splitRegion(info, null, HConstants.NO_NONCE, HConstants.NO_NONCE);
342    } catch (IOException e) {
343      LOG.info("failed to submit plan {}.", plan, e);
344      planSkipped(plan.getType());
345      return;
346    }
347    splitPlanCount++;
348    LOG.info("Submitted {} resulting in pid {}", plan, pid);
349  }
350}