001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.normalizer; 019 020import edu.umd.cs.findbugs.annotations.NonNull; 021import edu.umd.cs.findbugs.annotations.Nullable; 022import java.util.List; 023import java.util.concurrent.ExecutorService; 024import java.util.concurrent.Executors; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.ScheduledChore; 027import org.apache.hadoop.hbase.TableName; 028import org.apache.hadoop.hbase.conf.ConfigurationManager; 029import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; 030import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker; 031import org.apache.yetus.audience.InterfaceAudience; 032import org.apache.zookeeper.KeeperException; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 036 037/** 038 * This class encapsulates the details of the {@link RegionNormalizer} subsystem. 039 */ 040@InterfaceAudience.Private 041public class RegionNormalizerManager implements PropagatingConfigurationObserver { 042 private static final Logger LOG = LoggerFactory.getLogger(RegionNormalizerManager.class); 043 044 private final RegionNormalizerTracker regionNormalizerTracker; 045 private final RegionNormalizerChore regionNormalizerChore; 046 private final RegionNormalizerWorkQueue<TableName> workQueue; 047 private final RegionNormalizerWorker worker; 048 private final ExecutorService pool; 049 050 private final Object startStopLock = new Object(); 051 private boolean started = false; 052 private boolean stopped = false; 053 054 RegionNormalizerManager( 055 @NonNull final RegionNormalizerTracker regionNormalizerTracker, 056 @Nullable final RegionNormalizerChore regionNormalizerChore, 057 @Nullable final RegionNormalizerWorkQueue<TableName> workQueue, 058 @Nullable final RegionNormalizerWorker worker 059 ) { 060 this.regionNormalizerTracker = regionNormalizerTracker; 061 this.regionNormalizerChore = regionNormalizerChore; 062 this.workQueue = workQueue; 063 this.worker = worker; 064 this.pool = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() 065 .setDaemon(true) 066 .setNameFormat("normalizer-worker-%d") 067 .setUncaughtExceptionHandler( 068 (thread, throwable) -> 069 LOG.error("Uncaught exception, worker thread likely terminated.", throwable)) 070 .build()); 071 } 072 073 @Override 074 public void registerChildren(ConfigurationManager manager) { 075 if (worker != null) { 076 manager.registerObserver(worker); 077 } 078 } 079 080 @Override 081 public void deregisterChildren(ConfigurationManager manager) { 082 if (worker != null) { 083 manager.deregisterObserver(worker); 084 } 085 } 086 087 @Override 088 public void onConfigurationChange(Configuration conf) { 089 // no configuration managed here directly. 090 } 091 092 public void start() { 093 synchronized (startStopLock) { 094 if (started) { 095 return; 096 } 097 regionNormalizerTracker.start(); 098 if (worker != null) { 099 // worker will be null when master is in maintenance mode. 100 pool.submit(worker); 101 } 102 started = true; 103 } 104 } 105 106 public void stop() { 107 synchronized (startStopLock) { 108 if (!started) { 109 throw new IllegalStateException("calling `stop` without first calling `start`."); 110 } 111 if (stopped) { 112 return; 113 } 114 pool.shutdownNow(); // shutdownNow to interrupt the worker thread sitting on `take()` 115 regionNormalizerTracker.stop(); 116 stopped = true; 117 } 118 } 119 120 public ScheduledChore getRegionNormalizerChore() { 121 return regionNormalizerChore; 122 } 123 124 /** 125 * Return {@code true} if region normalizer is on, {@code false} otherwise 126 */ 127 public boolean isNormalizerOn() { 128 return regionNormalizerTracker.isNormalizerOn(); 129 } 130 131 /** 132 * Set region normalizer on/off 133 * @param normalizerOn whether normalizer should be on or off 134 */ 135 public void setNormalizerOn(boolean normalizerOn) { 136 try { 137 regionNormalizerTracker.setNormalizerOn(normalizerOn); 138 } catch (KeeperException e) { 139 LOG.warn("Error flipping normalizer switch", e); 140 } 141 } 142 143 /** 144 * Call-back for the case where plan couldn't be executed due to constraint violation, 145 * such as namespace quota. 146 * @param type type of plan that was skipped. 147 */ 148 public void planSkipped(NormalizationPlan.PlanType type) { 149 // TODO: this appears to be used only for testing. 150 if (worker != null) { 151 worker.planSkipped(type); 152 } 153 } 154 155 /** 156 * Retrieve a count of the number of times plans of type {@code type} were submitted but skipped. 157 * @param type type of plan for which skipped count is to be returned 158 */ 159 public long getSkippedCount(NormalizationPlan.PlanType type) { 160 // TODO: this appears to be used only for testing. 161 return worker == null ? 0 : worker.getSkippedCount(type); 162 } 163 164 /** 165 * Return the number of times a {@link SplitNormalizationPlan} has been submitted. 166 */ 167 public long getSplitPlanCount() { 168 return worker == null ? 0 : worker.getSplitPlanCount(); 169 } 170 171 /** 172 * Return the number of times a {@link MergeNormalizationPlan} has been submitted. 173 */ 174 public long getMergePlanCount() { 175 return worker == null ? 0 : worker.getMergePlanCount(); 176 } 177 178 /** 179 * Submit tables for normalization. 180 * @param tables a list of tables to submit. 181 * @param isHighPriority {@code true} when these requested tables should skip to the front of 182 * the queue. 183 * @return {@code true} when work was queued, {@code false} otherwise. 184 */ 185 public boolean normalizeRegions(List<TableName> tables, boolean isHighPriority) { 186 if (workQueue == null) { 187 return false; 188 } 189 if (isHighPriority) { 190 workQueue.putAllFirst(tables); 191 } else { 192 workQueue.putAll(tables); 193 } 194 return true; 195 } 196}