001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.normalizer; 019 020import edu.umd.cs.findbugs.annotations.NonNull; 021import edu.umd.cs.findbugs.annotations.Nullable; 022import java.io.IOException; 023import java.util.List; 024import java.util.concurrent.ExecutorService; 025import java.util.concurrent.Executors; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.ScheduledChore; 028import org.apache.hadoop.hbase.TableName; 029import org.apache.hadoop.hbase.conf.ConfigurationManager; 030import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; 031import org.apache.yetus.audience.InterfaceAudience; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 036 037/** 038 * This class encapsulates the details of the {@link RegionNormalizer} subsystem. 039 */ 040@InterfaceAudience.Private 041public class RegionNormalizerManager implements PropagatingConfigurationObserver { 042 private static final Logger LOG = LoggerFactory.getLogger(RegionNormalizerManager.class); 043 044 private final RegionNormalizerStateStore regionNormalizerStateStore; 045 private final RegionNormalizerChore regionNormalizerChore; 046 private final RegionNormalizerWorkQueue<TableName> workQueue; 047 private final RegionNormalizerWorker worker; 048 private final ExecutorService pool; 049 050 private final Object startStopLock = new Object(); 051 private boolean started = false; 052 private boolean stopped = false; 053 054 RegionNormalizerManager(@NonNull final RegionNormalizerStateStore regionNormalizerStateStore, 055 @Nullable final RegionNormalizerChore regionNormalizerChore, 056 @Nullable final RegionNormalizerWorkQueue<TableName> workQueue, 057 @Nullable final RegionNormalizerWorker worker) { 058 this.regionNormalizerStateStore = regionNormalizerStateStore; 059 this.regionNormalizerChore = regionNormalizerChore; 060 this.workQueue = workQueue; 061 this.worker = worker; 062 this.pool = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() 063 .setDaemon(true).setNameFormat("normalizer-worker-%d").setUncaughtExceptionHandler((thread, 064 throwable) -> LOG.error("Uncaught exception, worker thread likely terminated.", throwable)) 065 .build()); 066 } 067 068 @Override 069 public void registerChildren(ConfigurationManager manager) { 070 if (worker != null) { 071 manager.registerObserver(worker); 072 } 073 } 074 075 @Override 076 public void deregisterChildren(ConfigurationManager manager) { 077 if (worker != null) { 078 manager.deregisterObserver(worker); 079 } 080 } 081 082 @Override 083 public void onConfigurationChange(Configuration conf) { 084 // no configuration managed here directly. 085 } 086 087 public void start() { 088 synchronized (startStopLock) { 089 if (started) { 090 return; 091 } 092 if (worker != null) { 093 // worker will be null when master is in maintenance mode. 094 pool.submit(worker); 095 } 096 started = true; 097 } 098 } 099 100 public void stop() { 101 synchronized (startStopLock) { 102 if (!started) { 103 throw new IllegalStateException("calling `stop` without first calling `start`."); 104 } 105 if (stopped) { 106 return; 107 } 108 pool.shutdownNow(); // shutdownNow to interrupt the worker thread sitting on `take()` 109 stopped = true; 110 } 111 } 112 113 public ScheduledChore getRegionNormalizerChore() { 114 return regionNormalizerChore; 115 } 116 117 /** 118 * Return {@code true} if region normalizer is on, {@code false} otherwise 119 */ 120 public boolean isNormalizerOn() { 121 return regionNormalizerStateStore.get(); 122 } 123 124 /** 125 * Set region normalizer on/off 126 * @param normalizerOn whether normalizer should be on or off 127 */ 128 public void setNormalizerOn(boolean normalizerOn) throws IOException { 129 regionNormalizerStateStore.set(normalizerOn); 130 } 131 132 /** 133 * Call-back for the case where plan couldn't be executed due to constraint violation, such as 134 * namespace quota. 135 * @param type type of plan that was skipped. 136 */ 137 public void planSkipped(NormalizationPlan.PlanType type) { 138 // TODO: this appears to be used only for testing. 139 if (worker != null) { 140 worker.planSkipped(type); 141 } 142 } 143 144 /** 145 * Retrieve a count of the number of times plans of type {@code type} were submitted but skipped. 146 * @param type type of plan for which skipped count is to be returned 147 */ 148 public long getSkippedCount(NormalizationPlan.PlanType type) { 149 // TODO: this appears to be used only for testing. 150 return worker == null ? 0 : worker.getSkippedCount(type); 151 } 152 153 /** 154 * Return the number of times a {@link SplitNormalizationPlan} has been submitted. 155 */ 156 public long getSplitPlanCount() { 157 return worker == null ? 0 : worker.getSplitPlanCount(); 158 } 159 160 /** 161 * Return the number of times a {@link MergeNormalizationPlan} has been submitted. 162 */ 163 public long getMergePlanCount() { 164 return worker == null ? 0 : worker.getMergePlanCount(); 165 } 166 167 /** 168 * Submit tables for normalization. 169 * @param tables a list of tables to submit. 170 * @param isHighPriority {@code true} when these requested tables should skip to the front of the 171 * queue. 172 * @return {@code true} when work was queued, {@code false} otherwise. 173 */ 174 public boolean normalizeRegions(List<TableName> tables, boolean isHighPriority) { 175 if (workQueue == null) { 176 return false; 177 } 178 if (isHighPriority) { 179 workQueue.putAllFirst(tables); 180 } else { 181 workQueue.putAll(tables); 182 } 183 return true; 184 } 185}