001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.normalizer;
019
020import edu.umd.cs.findbugs.annotations.NonNull;
021import edu.umd.cs.findbugs.annotations.Nullable;
022import java.util.List;
023import java.util.concurrent.ExecutorService;
024import java.util.concurrent.Executors;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.ScheduledChore;
027import org.apache.hadoop.hbase.TableName;
028import org.apache.hadoop.hbase.conf.ConfigurationManager;
029import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
030import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker;
031import org.apache.yetus.audience.InterfaceAudience;
032import org.apache.zookeeper.KeeperException;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
036
037/**
038 * This class encapsulates the details of the {@link RegionNormalizer} subsystem.
039 */
040@InterfaceAudience.Private
041public class RegionNormalizerManager implements PropagatingConfigurationObserver {
042  private static final Logger LOG = LoggerFactory.getLogger(RegionNormalizerManager.class);
043
044  private final RegionNormalizerTracker regionNormalizerTracker;
045  private final RegionNormalizerChore regionNormalizerChore;
046  private final RegionNormalizerWorkQueue<TableName> workQueue;
047  private final RegionNormalizerWorker worker;
048  private final ExecutorService pool;
049
050  private final Object startStopLock = new Object();
051  private boolean started = false;
052  private boolean stopped = false;
053
054  RegionNormalizerManager(
055    @NonNull  final RegionNormalizerTracker regionNormalizerTracker,
056    @Nullable final RegionNormalizerChore regionNormalizerChore,
057    @Nullable final RegionNormalizerWorkQueue<TableName> workQueue,
058    @Nullable final RegionNormalizerWorker worker
059  ) {
060    this.regionNormalizerTracker = regionNormalizerTracker;
061    this.regionNormalizerChore = regionNormalizerChore;
062    this.workQueue = workQueue;
063    this.worker = worker;
064    this.pool = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
065      .setDaemon(true)
066      .setNameFormat("normalizer-worker-%d")
067      .setUncaughtExceptionHandler(
068        (thread, throwable) ->
069          LOG.error("Uncaught exception, worker thread likely terminated.", throwable))
070      .build());
071  }
072
073  @Override
074  public void registerChildren(ConfigurationManager manager) {
075    if (worker != null) {
076      manager.registerObserver(worker);
077    }
078  }
079
080  @Override
081  public void deregisterChildren(ConfigurationManager manager) {
082    if (worker != null) {
083      manager.deregisterObserver(worker);
084    }
085  }
086
087  @Override
088  public void onConfigurationChange(Configuration conf) {
089    // no configuration managed here directly.
090  }
091
092  public void start() {
093    synchronized (startStopLock) {
094      if (started) {
095        return;
096      }
097      regionNormalizerTracker.start();
098      if (worker != null) {
099        // worker will be null when master is in maintenance mode.
100        pool.submit(worker);
101      }
102      started = true;
103    }
104  }
105
106  public void stop() {
107    synchronized (startStopLock) {
108      if (!started) {
109        throw new IllegalStateException("calling `stop` without first calling `start`.");
110      }
111      if (stopped) {
112        return;
113      }
114      pool.shutdownNow(); // shutdownNow to interrupt the worker thread sitting on `take()`
115      regionNormalizerTracker.stop();
116      stopped = true;
117    }
118  }
119
120  public ScheduledChore getRegionNormalizerChore() {
121    return regionNormalizerChore;
122  }
123
124  /**
125   * Return {@code true} if region normalizer is on, {@code false} otherwise
126   */
127  public boolean isNormalizerOn() {
128    return regionNormalizerTracker.isNormalizerOn();
129  }
130
131  /**
132   * Set region normalizer on/off
133   * @param normalizerOn whether normalizer should be on or off
134   */
135  public void setNormalizerOn(boolean normalizerOn) {
136    try {
137      regionNormalizerTracker.setNormalizerOn(normalizerOn);
138    } catch (KeeperException e) {
139      LOG.warn("Error flipping normalizer switch", e);
140    }
141  }
142
143  /**
144   * Call-back for the case where plan couldn't be executed due to constraint violation,
145   * such as namespace quota.
146   * @param type type of plan that was skipped.
147   */
148  public void planSkipped(NormalizationPlan.PlanType type) {
149    // TODO: this appears to be used only for testing.
150    if (worker != null) {
151      worker.planSkipped(type);
152    }
153  }
154
155  /**
156   * Retrieve a count of the number of times plans of type {@code type} were submitted but skipped.
157   * @param type type of plan for which skipped count is to be returned
158   */
159  public long getSkippedCount(NormalizationPlan.PlanType type) {
160    // TODO: this appears to be used only for testing.
161    return worker == null ? 0 : worker.getSkippedCount(type);
162  }
163
164  /**
165   * Return the number of times a {@link SplitNormalizationPlan} has been submitted.
166   */
167  public long getSplitPlanCount() {
168    return worker == null ? 0 : worker.getSplitPlanCount();
169  }
170
171  /**
172   * Return the number of times a {@link MergeNormalizationPlan} has been submitted.
173   */
174  public long getMergePlanCount() {
175    return worker == null ? 0 : worker.getMergePlanCount();
176  }
177
178  /**
179   * Submit tables for normalization.
180   * @param tables   a list of tables to submit.
181   * @param isHighPriority {@code true} when these requested tables should skip to the front of
182   *   the queue.
183   * @return {@code true} when work was queued, {@code false} otherwise.
184   */
185  public boolean normalizeRegions(List<TableName> tables, boolean isHighPriority) {
186    if (workQueue == null) {
187      return false;
188    }
189    if (isHighPriority) {
190      workQueue.putAllFirst(tables);
191    } else {
192      workQueue.putAll(tables);
193    }
194    return true;
195  }
196}