001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.normalizer;
019
020import java.io.IOException;
021import java.time.Duration;
022import java.util.Collections;
023import java.util.List;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.HConstants;
026import org.apache.hadoop.hbase.TableName;
027import org.apache.hadoop.hbase.client.RegionInfo;
028import org.apache.hadoop.hbase.client.TableDescriptor;
029import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
030import org.apache.hadoop.hbase.conf.ConfigurationManager;
031import org.apache.hadoop.hbase.conf.ConfigurationObserver;
032import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
033import org.apache.hadoop.hbase.master.MasterServices;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038import org.apache.hbase.thirdparty.com.google.common.util.concurrent.RateLimiter;
039import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
040
041/**
042 * Consumes normalization request targets ({@link TableName}s) off the
043 * {@link RegionNormalizerWorkQueue}, dispatches them to the {@link RegionNormalizer}, and executes
044 * the resulting {@link NormalizationPlan}s.
045 */
046@InterfaceAudience.Private
047class RegionNormalizerWorker implements PropagatingConfigurationObserver, Runnable {
048  public static final String HBASE_TABLE_NORMALIZATION_ENABLED =
049    "hbase.table.normalization.enabled";
050  private static final Logger LOG = LoggerFactory.getLogger(RegionNormalizerWorker.class);
051
052  static final String RATE_LIMIT_BYTES_PER_SEC_KEY =
053    "hbase.normalizer.throughput.max_bytes_per_sec";
054  private static final long RATE_UNLIMITED_BYTES = 1_000_000_000_000L; // 1TB/sec
055
056  private final MasterServices masterServices;
057  private final RegionNormalizer regionNormalizer;
058  private final RegionNormalizerWorkQueue<TableName> workQueue;
059  private final RateLimiter rateLimiter;
060
061  private final long[] skippedCount;
062  private final boolean defaultNormalizerTableLevel;
063  private long splitPlanCount;
064  private long mergePlanCount;
065
066  RegionNormalizerWorker(final Configuration configuration, final MasterServices masterServices,
067    final RegionNormalizer regionNormalizer, final RegionNormalizerWorkQueue<TableName> workQueue) {
068    this.masterServices = masterServices;
069    this.regionNormalizer = regionNormalizer;
070    this.workQueue = workQueue;
071    this.skippedCount = new long[NormalizationPlan.PlanType.values().length];
072    this.splitPlanCount = 0;
073    this.mergePlanCount = 0;
074    this.rateLimiter = loadRateLimiter(configuration);
075    this.defaultNormalizerTableLevel = extractDefaultNormalizerValue(configuration);
076  }
077
078  private boolean extractDefaultNormalizerValue(final Configuration configuration) {
079    String s = configuration.get(HBASE_TABLE_NORMALIZATION_ENABLED);
080    return Boolean.parseBoolean(s);
081  }
082
083  @Override
084  public void registerChildren(ConfigurationManager manager) {
085    if (regionNormalizer instanceof ConfigurationObserver) {
086      final ConfigurationObserver observer = (ConfigurationObserver) regionNormalizer;
087      manager.registerObserver(observer);
088    }
089  }
090
091  @Override
092  public void deregisterChildren(ConfigurationManager manager) {
093    if (regionNormalizer instanceof ConfigurationObserver) {
094      final ConfigurationObserver observer = (ConfigurationObserver) regionNormalizer;
095      manager.deregisterObserver(observer);
096    }
097  }
098
099  @Override
100  public void onConfigurationChange(Configuration conf) {
101    rateLimiter.setRate(loadRateLimit(conf));
102  }
103
104  private static RateLimiter loadRateLimiter(final Configuration configuration) {
105    return RateLimiter.create(loadRateLimit(configuration));
106  }
107
108  private static long loadRateLimit(final Configuration configuration) {
109    long rateLimitBytes =
110      configuration.getLongBytes(RATE_LIMIT_BYTES_PER_SEC_KEY, RATE_UNLIMITED_BYTES);
111    long rateLimitMbs = rateLimitBytes / 1_000_000L;
112    if (rateLimitMbs <= 0) {
113      LOG.warn("Configured value {}={} is <= 1MB. Falling back to default.",
114        RATE_LIMIT_BYTES_PER_SEC_KEY, rateLimitBytes);
115      rateLimitBytes = RATE_UNLIMITED_BYTES;
116      rateLimitMbs = RATE_UNLIMITED_BYTES / 1_000_000L;
117    }
118    LOG.info("Normalizer rate limit set to {}",
119      rateLimitBytes == RATE_UNLIMITED_BYTES ? "unlimited" : rateLimitMbs + " MB/sec");
120    return rateLimitMbs;
121  }
122
123  /**
124   * @see RegionNormalizerManager#planSkipped(NormalizationPlan.PlanType)
125   */
126  void planSkipped(NormalizationPlan.PlanType type) {
127    synchronized (skippedCount) {
128      // updates come here via procedure threads, so synchronize access to this counter.
129      skippedCount[type.ordinal()]++;
130    }
131  }
132
133  /**
134   * @see RegionNormalizerManager#getSkippedCount(NormalizationPlan.PlanType)
135   */
136  long getSkippedCount(NormalizationPlan.PlanType type) {
137    return skippedCount[type.ordinal()];
138  }
139
140  /**
141   * @see RegionNormalizerManager#getSplitPlanCount()
142   */
143  long getSplitPlanCount() {
144    return splitPlanCount;
145  }
146
147  /**
148   * @see RegionNormalizerManager#getMergePlanCount()
149   */
150  long getMergePlanCount() {
151    return mergePlanCount;
152  }
153
154  /**
155   * Used in test only. This field is exposed to the test, as opposed to tracking the current
156   * configuration value beside the RateLimiter instance and managing synchronization to keep the
157   * two in sync.
158   */
159  RateLimiter getRateLimiter() {
160    return rateLimiter;
161  }
162
163  @Override
164  public void run() {
165    while (true) {
166      if (Thread.interrupted()) {
167        LOG.debug("interrupt detected. terminating.");
168        break;
169      }
170      final TableName tableName;
171      try {
172        tableName = workQueue.take();
173      } catch (InterruptedException e) {
174        LOG.debug("interrupt detected. terminating.");
175        break;
176      }
177
178      final List<NormalizationPlan> plans = calculatePlans(tableName);
179      submitPlans(plans);
180    }
181  }
182
183  private List<NormalizationPlan> calculatePlans(final TableName tableName) {
184    if (masterServices.skipRegionManagementAction("region normalizer")) {
185      return Collections.emptyList();
186    }
187
188    final TableDescriptor tblDesc;
189    try {
190      tblDesc = masterServices.getTableDescriptors().get(tableName);
191      boolean normalizationEnabled;
192      if (tblDesc != null) {
193        String defined = tblDesc.getValue(TableDescriptorBuilder.NORMALIZATION_ENABLED);
194        if (defined != null) {
195          normalizationEnabled = tblDesc.isNormalizationEnabled();
196        } else {
197          normalizationEnabled = this.defaultNormalizerTableLevel;
198        }
199        if (!normalizationEnabled) {
200          LOG.debug("Skipping table {} because normalization is disabled in its table properties "
201            + "and normalization is also disabled at table level by default", tableName);
202          return Collections.emptyList();
203        }
204      }
205    } catch (IOException e) {
206      LOG.debug("Skipping table {} because unable to access its table descriptor.", tableName, e);
207      return Collections.emptyList();
208    }
209
210    final List<NormalizationPlan> plans = regionNormalizer.computePlansForTable(tblDesc);
211    if (CollectionUtils.isEmpty(plans)) {
212      LOG.debug("No normalization required for table {}.", tableName);
213      return Collections.emptyList();
214    }
215    return plans;
216  }
217
218  private void submitPlans(final List<NormalizationPlan> plans) {
219    // as of this writing, `plan.submit()` is non-blocking and uses Async Admin APIs to submit
220    // task, so there's no artificial rate-limiting of merge/split requests due to this serial loop.
221    for (NormalizationPlan plan : plans) {
222      switch (plan.getType()) {
223        case MERGE: {
224          submitMergePlan((MergeNormalizationPlan) plan);
225          break;
226        }
227        case SPLIT: {
228          submitSplitPlan((SplitNormalizationPlan) plan);
229          break;
230        }
231        case NONE:
232          LOG.debug("Nothing to do for {} with PlanType=NONE. Ignoring.", plan);
233          planSkipped(plan.getType());
234          break;
235        default:
236          LOG.warn("Plan {} is of an unrecognized PlanType. Ignoring.", plan);
237          planSkipped(plan.getType());
238          break;
239      }
240    }
241  }
242
243  /**
244   * Interacts with {@link MasterServices} in order to execute a plan.
245   */
246  private void submitMergePlan(final MergeNormalizationPlan plan) {
247    final int totalSizeMb;
248    try {
249      final long totalSizeMbLong = plan.getNormalizationTargets().stream()
250        .mapToLong(NormalizationTarget::getRegionSizeMb).reduce(0, Math::addExact);
251      totalSizeMb = Math.toIntExact(totalSizeMbLong);
252    } catch (ArithmeticException e) {
253      LOG.debug("Sum of merge request size overflows rate limiter data type. {}", plan);
254      planSkipped(plan.getType());
255      return;
256    }
257
258    final RegionInfo[] infos = plan.getNormalizationTargets().stream()
259      .map(NormalizationTarget::getRegionInfo).toArray(RegionInfo[]::new);
260    final long pid;
261    try {
262      pid = masterServices.mergeRegions(infos, false, HConstants.NO_NONCE, HConstants.NO_NONCE);
263    } catch (IOException e) {
264      LOG.info("failed to submit plan {}.", plan, e);
265      planSkipped(plan.getType());
266      return;
267    }
268    mergePlanCount++;
269    LOG.info("Submitted {} resulting in pid {}", plan, pid);
270    final long rateLimitedSecs = Math.round(rateLimiter.acquire(Math.max(1, totalSizeMb)));
271    LOG.debug("Rate limiting delayed the worker by {}", Duration.ofSeconds(rateLimitedSecs));
272  }
273
274  /**
275   * Interacts with {@link MasterServices} in order to execute a plan.
276   */
277  private void submitSplitPlan(final SplitNormalizationPlan plan) {
278    final int totalSizeMb;
279    try {
280      totalSizeMb = Math.toIntExact(plan.getSplitTarget().getRegionSizeMb());
281    } catch (ArithmeticException e) {
282      LOG.debug("Split request size overflows rate limiter data type. {}", plan);
283      planSkipped(plan.getType());
284      return;
285    }
286    final RegionInfo info = plan.getSplitTarget().getRegionInfo();
287    final long rateLimitedSecs = Math.round(rateLimiter.acquire(Math.max(1, totalSizeMb)));
288    LOG.debug("Rate limiting delayed this operation by {}", Duration.ofSeconds(rateLimitedSecs));
289
290    final long pid;
291    try {
292      pid = masterServices.splitRegion(info, null, HConstants.NO_NONCE, HConstants.NO_NONCE);
293    } catch (IOException e) {
294      LOG.info("failed to submit plan {}.", plan, e);
295      planSkipped(plan.getType());
296      return;
297    }
298    splitPlanCount++;
299    LOG.info("Submitted {} resulting in pid {}", plan, pid);
300  }
301}