001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.normalizer; 019 020import edu.umd.cs.findbugs.annotations.NonNull; 021import edu.umd.cs.findbugs.annotations.Nullable; 022import java.util.List; 023import java.util.concurrent.ExecutorService; 024import java.util.concurrent.Executors; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.ScheduledChore; 027import org.apache.hadoop.hbase.TableName; 028import org.apache.hadoop.hbase.conf.ConfigurationManager; 029import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; 030import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker; 031import org.apache.yetus.audience.InterfaceAudience; 032import org.apache.zookeeper.KeeperException; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 037 038/** 039 * This class encapsulates the details of the {@link RegionNormalizer} subsystem. 040 */ 041@InterfaceAudience.Private 042public class RegionNormalizerManager implements PropagatingConfigurationObserver { 043 private static final Logger LOG = LoggerFactory.getLogger(RegionNormalizerManager.class); 044 045 private final RegionNormalizerTracker regionNormalizerTracker; 046 private final RegionNormalizerChore regionNormalizerChore; 047 private final RegionNormalizerWorkQueue<TableName> workQueue; 048 private final RegionNormalizerWorker worker; 049 private final ExecutorService pool; 050 051 private final Object startStopLock = new Object(); 052 private boolean started = false; 053 private boolean stopped = false; 054 055 RegionNormalizerManager(@NonNull final RegionNormalizerTracker regionNormalizerTracker, 056 @Nullable final RegionNormalizerChore regionNormalizerChore, 057 @Nullable final RegionNormalizerWorkQueue<TableName> workQueue, 058 @Nullable final RegionNormalizerWorker worker) { 059 this.regionNormalizerTracker = regionNormalizerTracker; 060 this.regionNormalizerChore = regionNormalizerChore; 061 this.workQueue = workQueue; 062 this.worker = worker; 063 this.pool = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() 064 .setDaemon(true).setNameFormat("normalizer-worker-%d").setUncaughtExceptionHandler((thread, 065 throwable) -> LOG.error("Uncaught exception, worker thread likely terminated.", throwable)) 066 .build()); 067 } 068 069 @Override 070 public void registerChildren(ConfigurationManager manager) { 071 if (worker != null) { 072 manager.registerObserver(worker); 073 } 074 } 075 076 @Override 077 public void deregisterChildren(ConfigurationManager manager) { 078 if (worker != null) { 079 manager.deregisterObserver(worker); 080 } 081 } 082 083 @Override 084 public void onConfigurationChange(Configuration conf) { 085 // no configuration managed here directly. 086 } 087 088 public void start() { 089 synchronized (startStopLock) { 090 if (started) { 091 return; 092 } 093 regionNormalizerTracker.start(); 094 if (worker != null) { 095 // worker will be null when master is in maintenance mode. 096 pool.submit(worker); 097 } 098 started = true; 099 } 100 } 101 102 public void stop() { 103 synchronized (startStopLock) { 104 if (!started) { 105 throw new IllegalStateException("calling `stop` without first calling `start`."); 106 } 107 if (stopped) { 108 return; 109 } 110 pool.shutdownNow(); // shutdownNow to interrupt the worker thread sitting on `take()` 111 regionNormalizerTracker.stop(); 112 stopped = true; 113 } 114 } 115 116 public ScheduledChore getRegionNormalizerChore() { 117 return regionNormalizerChore; 118 } 119 120 /** 121 * Return {@code true} if region normalizer is on, {@code false} otherwise 122 */ 123 public boolean isNormalizerOn() { 124 return regionNormalizerTracker.isNormalizerOn(); 125 } 126 127 /** 128 * Set region normalizer on/off 129 * @param normalizerOn whether normalizer should be on or off 130 */ 131 public void setNormalizerOn(boolean normalizerOn) { 132 try { 133 regionNormalizerTracker.setNormalizerOn(normalizerOn); 134 } catch (KeeperException e) { 135 LOG.warn("Error flipping normalizer switch", e); 136 } 137 } 138 139 /** 140 * Call-back for the case where plan couldn't be executed due to constraint violation, such as 141 * namespace quota. 142 * @param type type of plan that was skipped. 143 */ 144 public void planSkipped(NormalizationPlan.PlanType type) { 145 // TODO: this appears to be used only for testing. 146 if (worker != null) { 147 worker.planSkipped(type); 148 } 149 } 150 151 /** 152 * Retrieve a count of the number of times plans of type {@code type} were submitted but skipped. 153 * @param type type of plan for which skipped count is to be returned 154 */ 155 public long getSkippedCount(NormalizationPlan.PlanType type) { 156 // TODO: this appears to be used only for testing. 157 return worker == null ? 0 : worker.getSkippedCount(type); 158 } 159 160 /** 161 * Return the number of times a {@link SplitNormalizationPlan} has been submitted. 162 */ 163 public long getSplitPlanCount() { 164 return worker == null ? 0 : worker.getSplitPlanCount(); 165 } 166 167 /** 168 * Return the number of times a {@link MergeNormalizationPlan} has been submitted. 169 */ 170 public long getMergePlanCount() { 171 return worker == null ? 0 : worker.getMergePlanCount(); 172 } 173 174 /** 175 * Submit tables for normalization. 176 * @param tables a list of tables to submit. 177 * @param isHighPriority {@code true} when these requested tables should skip to the front of the 178 * queue. 179 * @return {@code true} when work was queued, {@code false} otherwise. 180 */ 181 public boolean normalizeRegions(List<TableName> tables, boolean isHighPriority) { 182 if (workQueue == null) { 183 return false; 184 } 185 if (isHighPriority) { 186 workQueue.putAllFirst(tables); 187 } else { 188 workQueue.putAll(tables); 189 } 190 return true; 191 } 192}