001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.normalizer;
019
020import edu.umd.cs.findbugs.annotations.NonNull;
021import edu.umd.cs.findbugs.annotations.Nullable;
022import java.util.List;
023import java.util.concurrent.ExecutorService;
024import java.util.concurrent.Executors;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.ScheduledChore;
027import org.apache.hadoop.hbase.TableName;
028import org.apache.hadoop.hbase.conf.ConfigurationManager;
029import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
030import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker;
031import org.apache.yetus.audience.InterfaceAudience;
032import org.apache.zookeeper.KeeperException;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
037
038/**
039 * This class encapsulates the details of the {@link RegionNormalizer} subsystem.
040 */
041@InterfaceAudience.Private
042public class RegionNormalizerManager implements PropagatingConfigurationObserver {
043  private static final Logger LOG = LoggerFactory.getLogger(RegionNormalizerManager.class);
044
045  private final RegionNormalizerTracker regionNormalizerTracker;
046  private final RegionNormalizerChore regionNormalizerChore;
047  private final RegionNormalizerWorkQueue<TableName> workQueue;
048  private final RegionNormalizerWorker worker;
049  private final ExecutorService pool;
050
051  private final Object startStopLock = new Object();
052  private boolean started = false;
053  private boolean stopped = false;
054
055  RegionNormalizerManager(@NonNull final RegionNormalizerTracker regionNormalizerTracker,
056    @Nullable final RegionNormalizerChore regionNormalizerChore,
057    @Nullable final RegionNormalizerWorkQueue<TableName> workQueue,
058    @Nullable final RegionNormalizerWorker worker) {
059    this.regionNormalizerTracker = regionNormalizerTracker;
060    this.regionNormalizerChore = regionNormalizerChore;
061    this.workQueue = workQueue;
062    this.worker = worker;
063    this.pool = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
064      .setDaemon(true).setNameFormat("normalizer-worker-%d").setUncaughtExceptionHandler((thread,
065        throwable) -> LOG.error("Uncaught exception, worker thread likely terminated.", throwable))
066      .build());
067  }
068
069  @Override
070  public void registerChildren(ConfigurationManager manager) {
071    if (worker != null) {
072      manager.registerObserver(worker);
073    }
074  }
075
076  @Override
077  public void deregisterChildren(ConfigurationManager manager) {
078    if (worker != null) {
079      manager.deregisterObserver(worker);
080    }
081  }
082
083  @Override
084  public void onConfigurationChange(Configuration conf) {
085    // no configuration managed here directly.
086  }
087
088  public void start() {
089    synchronized (startStopLock) {
090      if (started) {
091        return;
092      }
093      regionNormalizerTracker.start();
094      if (worker != null) {
095        // worker will be null when master is in maintenance mode.
096        pool.submit(worker);
097      }
098      started = true;
099    }
100  }
101
102  public void stop() {
103    synchronized (startStopLock) {
104      if (!started) {
105        throw new IllegalStateException("calling `stop` without first calling `start`.");
106      }
107      if (stopped) {
108        return;
109      }
110      pool.shutdownNow(); // shutdownNow to interrupt the worker thread sitting on `take()`
111      regionNormalizerTracker.stop();
112      stopped = true;
113    }
114  }
115
116  public ScheduledChore getRegionNormalizerChore() {
117    return regionNormalizerChore;
118  }
119
120  /**
121   * Return {@code true} if region normalizer is on, {@code false} otherwise
122   */
123  public boolean isNormalizerOn() {
124    return regionNormalizerTracker.isNormalizerOn();
125  }
126
127  /**
128   * Set region normalizer on/off
129   * @param normalizerOn whether normalizer should be on or off
130   */
131  public void setNormalizerOn(boolean normalizerOn) {
132    try {
133      regionNormalizerTracker.setNormalizerOn(normalizerOn);
134    } catch (KeeperException e) {
135      LOG.warn("Error flipping normalizer switch", e);
136    }
137  }
138
139  /**
140   * Call-back for the case where plan couldn't be executed due to constraint violation, such as
141   * namespace quota.
142   * @param type type of plan that was skipped.
143   */
144  public void planSkipped(NormalizationPlan.PlanType type) {
145    // TODO: this appears to be used only for testing.
146    if (worker != null) {
147      worker.planSkipped(type);
148    }
149  }
150
151  /**
152   * Retrieve a count of the number of times plans of type {@code type} were submitted but skipped.
153   * @param type type of plan for which skipped count is to be returned
154   */
155  public long getSkippedCount(NormalizationPlan.PlanType type) {
156    // TODO: this appears to be used only for testing.
157    return worker == null ? 0 : worker.getSkippedCount(type);
158  }
159
160  /**
161   * Return the number of times a {@link SplitNormalizationPlan} has been submitted.
162   */
163  public long getSplitPlanCount() {
164    return worker == null ? 0 : worker.getSplitPlanCount();
165  }
166
167  /**
168   * Return the number of times a {@link MergeNormalizationPlan} has been submitted.
169   */
170  public long getMergePlanCount() {
171    return worker == null ? 0 : worker.getMergePlanCount();
172  }
173
174  /**
175   * Submit tables for normalization.
176   * @param tables         a list of tables to submit.
177   * @param isHighPriority {@code true} when these requested tables should skip to the front of the
178   *                       queue.
179   * @return {@code true} when work was queued, {@code false} otherwise.
180   */
181  public boolean normalizeRegions(List<TableName> tables, boolean isHighPriority) {
182    if (workQueue == null) {
183      return false;
184    }
185    if (isHighPriority) {
186      workQueue.putAllFirst(tables);
187    } else {
188      workQueue.putAll(tables);
189    }
190    return true;
191  }
192}