001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.normalizer;
019
020import edu.umd.cs.findbugs.annotations.NonNull;
021import edu.umd.cs.findbugs.annotations.Nullable;
022import java.io.IOException;
023import java.util.List;
024import java.util.concurrent.ExecutorService;
025import java.util.concurrent.Executors;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.ScheduledChore;
028import org.apache.hadoop.hbase.TableName;
029import org.apache.hadoop.hbase.conf.ConfigurationManager;
030import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
031import org.apache.yetus.audience.InterfaceAudience;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
036
037/**
038 * This class encapsulates the details of the {@link RegionNormalizer} subsystem.
039 */
040@InterfaceAudience.Private
041public class RegionNormalizerManager implements PropagatingConfigurationObserver {
042  private static final Logger LOG = LoggerFactory.getLogger(RegionNormalizerManager.class);
043
044  private final RegionNormalizerStateStore regionNormalizerStateStore;
045  private final RegionNormalizerChore regionNormalizerChore;
046  private final RegionNormalizerWorkQueue<TableName> workQueue;
047  private final RegionNormalizerWorker worker;
048  private final ExecutorService pool;
049
050  private final Object startStopLock = new Object();
051  private boolean started = false;
052  private boolean stopped = false;
053
054  RegionNormalizerManager(@NonNull final RegionNormalizerStateStore regionNormalizerStateStore,
055    @Nullable final RegionNormalizerChore regionNormalizerChore,
056    @Nullable final RegionNormalizerWorkQueue<TableName> workQueue,
057    @Nullable final RegionNormalizerWorker worker) {
058    this.regionNormalizerStateStore = regionNormalizerStateStore;
059    this.regionNormalizerChore = regionNormalizerChore;
060    this.workQueue = workQueue;
061    this.worker = worker;
062    this.pool = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
063      .setDaemon(true).setNameFormat("normalizer-worker-%d").setUncaughtExceptionHandler((thread,
064        throwable) -> LOG.error("Uncaught exception, worker thread likely terminated.", throwable))
065      .build());
066  }
067
068  @Override
069  public void registerChildren(ConfigurationManager manager) {
070    if (worker != null) {
071      manager.registerObserver(worker);
072    }
073  }
074
075  @Override
076  public void deregisterChildren(ConfigurationManager manager) {
077    if (worker != null) {
078      manager.deregisterObserver(worker);
079    }
080  }
081
082  @Override
083  public void onConfigurationChange(Configuration conf) {
084    // no configuration managed here directly.
085  }
086
087  public void start() {
088    synchronized (startStopLock) {
089      if (started) {
090        return;
091      }
092      if (worker != null) {
093        // worker will be null when master is in maintenance mode.
094        pool.submit(worker);
095      }
096      started = true;
097    }
098  }
099
100  public void stop() {
101    synchronized (startStopLock) {
102      if (!started) {
103        throw new IllegalStateException("calling `stop` without first calling `start`.");
104      }
105      if (stopped) {
106        return;
107      }
108      pool.shutdownNow(); // shutdownNow to interrupt the worker thread sitting on `take()`
109      stopped = true;
110    }
111  }
112
113  public ScheduledChore getRegionNormalizerChore() {
114    return regionNormalizerChore;
115  }
116
117  /**
118   * Return {@code true} if region normalizer is on, {@code false} otherwise
119   */
120  public boolean isNormalizerOn() {
121    return regionNormalizerStateStore.get();
122  }
123
124  /**
125   * Set region normalizer on/off
126   * @param normalizerOn whether normalizer should be on or off
127   */
128  public void setNormalizerOn(boolean normalizerOn) throws IOException {
129    regionNormalizerStateStore.set(normalizerOn);
130  }
131
132  /**
133   * Call-back for the case where plan couldn't be executed due to constraint violation, such as
134   * namespace quota.
135   * @param type type of plan that was skipped.
136   */
137  public void planSkipped(NormalizationPlan.PlanType type) {
138    // TODO: this appears to be used only for testing.
139    if (worker != null) {
140      worker.planSkipped(type);
141    }
142  }
143
144  /**
145   * Retrieve a count of the number of times plans of type {@code type} were submitted but skipped.
146   * @param type type of plan for which skipped count is to be returned
147   */
148  public long getSkippedCount(NormalizationPlan.PlanType type) {
149    // TODO: this appears to be used only for testing.
150    return worker == null ? 0 : worker.getSkippedCount(type);
151  }
152
153  /**
154   * Return the number of times a {@link SplitNormalizationPlan} has been submitted.
155   */
156  public long getSplitPlanCount() {
157    return worker == null ? 0 : worker.getSplitPlanCount();
158  }
159
160  /**
161   * Return the number of times a {@link MergeNormalizationPlan} has been submitted.
162   */
163  public long getMergePlanCount() {
164    return worker == null ? 0 : worker.getMergePlanCount();
165  }
166
167  /**
168   * Submit tables for normalization.
169   * @param tables         a list of tables to submit.
170   * @param isHighPriority {@code true} when these requested tables should skip to the front of the
171   *                       queue.
172   * @return {@code true} when work was queued, {@code false} otherwise.
173   */
174  public boolean normalizeRegions(List<TableName> tables, boolean isHighPriority) {
175    if (workQueue == null) {
176      return false;
177    }
178    if (isHighPriority) {
179      workQueue.putAllFirst(tables);
180    } else {
181      workQueue.putAll(tables);
182    }
183    return true;
184  }
185}