001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
021import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
022import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
023
024import com.google.protobuf.Descriptors;
025import com.google.protobuf.Service;
026import java.io.IOException;
027import java.io.InterruptedIOException;
028import java.lang.reflect.Constructor;
029import java.lang.reflect.InvocationTargetException;
030import java.net.InetAddress;
031import java.net.InetSocketAddress;
032import java.net.UnknownHostException;
033import java.util.ArrayList;
034import java.util.Arrays;
035import java.util.Collection;
036import java.util.Collections;
037import java.util.Comparator;
038import java.util.EnumSet;
039import java.util.HashMap;
040import java.util.Iterator;
041import java.util.List;
042import java.util.Map;
043import java.util.Map.Entry;
044import java.util.Objects;
045import java.util.Optional;
046import java.util.Set;
047import java.util.concurrent.ExecutionException;
048import java.util.concurrent.Future;
049import java.util.concurrent.TimeUnit;
050import java.util.concurrent.TimeoutException;
051import java.util.concurrent.atomic.AtomicInteger;
052import java.util.function.Function;
053import java.util.regex.Pattern;
054import java.util.stream.Collectors;
055import javax.servlet.ServletException;
056import javax.servlet.http.HttpServlet;
057import javax.servlet.http.HttpServletRequest;
058import javax.servlet.http.HttpServletResponse;
059import org.apache.commons.lang3.StringUtils;
060import org.apache.hadoop.conf.Configuration;
061import org.apache.hadoop.fs.Path;
062import org.apache.hadoop.hbase.ChoreService;
063import org.apache.hadoop.hbase.ClusterId;
064import org.apache.hadoop.hbase.ClusterMetrics;
065import org.apache.hadoop.hbase.ClusterMetrics.Option;
066import org.apache.hadoop.hbase.ClusterMetricsBuilder;
067import org.apache.hadoop.hbase.DoNotRetryIOException;
068import org.apache.hadoop.hbase.HBaseIOException;
069import org.apache.hadoop.hbase.HBaseInterfaceAudience;
070import org.apache.hadoop.hbase.HConstants;
071import org.apache.hadoop.hbase.InvalidFamilyOperationException;
072import org.apache.hadoop.hbase.MasterNotRunningException;
073import org.apache.hadoop.hbase.MetaTableAccessor;
074import org.apache.hadoop.hbase.NamespaceDescriptor;
075import org.apache.hadoop.hbase.PleaseHoldException;
076import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
077import org.apache.hadoop.hbase.ServerName;
078import org.apache.hadoop.hbase.TableName;
079import org.apache.hadoop.hbase.TableNotDisabledException;
080import org.apache.hadoop.hbase.TableNotFoundException;
081import org.apache.hadoop.hbase.UnknownRegionException;
082import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
083import org.apache.hadoop.hbase.client.MasterSwitchType;
084import org.apache.hadoop.hbase.client.RegionInfo;
085import org.apache.hadoop.hbase.client.RegionInfoBuilder;
086import org.apache.hadoop.hbase.client.RegionStatesCount;
087import org.apache.hadoop.hbase.client.TableDescriptor;
088import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
089import org.apache.hadoop.hbase.client.TableState;
090import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
091import org.apache.hadoop.hbase.exceptions.DeserializationException;
092import org.apache.hadoop.hbase.executor.ExecutorType;
093import org.apache.hadoop.hbase.favored.FavoredNodesManager;
094import org.apache.hadoop.hbase.favored.FavoredNodesPromoter;
095import org.apache.hadoop.hbase.http.InfoServer;
096import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
097import org.apache.hadoop.hbase.ipc.RpcServer;
098import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
099import org.apache.hadoop.hbase.log.HBaseMarkers;
100import org.apache.hadoop.hbase.master.MasterRpcServices.BalanceSwitchMode;
101import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
102import org.apache.hadoop.hbase.master.assignment.MergeTableRegionsProcedure;
103import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
104import org.apache.hadoop.hbase.master.assignment.RegionStates;
105import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
106import org.apache.hadoop.hbase.master.balancer.BalancerChore;
107import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
108import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
109import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
110import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
111import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
112import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
113import org.apache.hadoop.hbase.master.cleaner.ReplicationBarrierCleaner;
114import org.apache.hadoop.hbase.master.cleaner.SnapshotCleanerChore;
115import org.apache.hadoop.hbase.master.locking.LockManager;
116import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan;
117import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
118import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
119import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerChore;
120import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerFactory;
121import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
122import org.apache.hadoop.hbase.master.procedure.DeleteNamespaceProcedure;
123import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure;
124import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure;
125import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure;
126import org.apache.hadoop.hbase.master.procedure.InitMetaProcedure;
127import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
128import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
129import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
130import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
131import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil.NonceProcedureRunnable;
132import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure;
133import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
134import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
135import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure;
136import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
137import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure;
138import org.apache.hadoop.hbase.master.replication.AbstractPeerProcedure;
139import org.apache.hadoop.hbase.master.replication.AddPeerProcedure;
140import org.apache.hadoop.hbase.master.replication.DisablePeerProcedure;
141import org.apache.hadoop.hbase.master.replication.EnablePeerProcedure;
142import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure;
143import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
144import org.apache.hadoop.hbase.master.replication.SyncReplicationReplayWALManager;
145import org.apache.hadoop.hbase.master.replication.TransitPeerSyncReplicationStateProcedure;
146import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure;
147import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
148import org.apache.hadoop.hbase.master.zksyncer.MasterAddressSyncer;
149import org.apache.hadoop.hbase.master.zksyncer.MetaLocationSyncer;
150import org.apache.hadoop.hbase.mob.MobConstants;
151import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
152import org.apache.hadoop.hbase.monitoring.MonitoredTask;
153import org.apache.hadoop.hbase.monitoring.TaskMonitor;
154import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
155import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager;
156import org.apache.hadoop.hbase.procedure2.LockedResource;
157import org.apache.hadoop.hbase.procedure2.Procedure;
158import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
159import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
160import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
161import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
162import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
163import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener;
164import org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore;
165import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
166import org.apache.hadoop.hbase.quotas.MasterQuotasObserver;
167import org.apache.hadoop.hbase.quotas.QuotaObserverChore;
168import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
169import org.apache.hadoop.hbase.quotas.QuotaUtil;
170import org.apache.hadoop.hbase.quotas.SnapshotQuotaObserverChore;
171import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
172import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
173import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshotNotifier;
174import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshotNotifierFactory;
175import org.apache.hadoop.hbase.quotas.SpaceViolationPolicy;
176import org.apache.hadoop.hbase.regionserver.HRegionServer;
177import org.apache.hadoop.hbase.regionserver.RSRpcServices;
178import org.apache.hadoop.hbase.replication.ReplicationException;
179import org.apache.hadoop.hbase.replication.ReplicationLoadSource;
180import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
181import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
182import org.apache.hadoop.hbase.replication.ReplicationUtils;
183import org.apache.hadoop.hbase.replication.SyncReplicationState;
184import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
185import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
186import org.apache.hadoop.hbase.replication.master.ReplicationPeerConfigUpgrader;
187import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
188import org.apache.hadoop.hbase.security.AccessDeniedException;
189import org.apache.hadoop.hbase.security.SecurityConstants;
190import org.apache.hadoop.hbase.security.UserProvider;
191import org.apache.hadoop.hbase.trace.TraceUtil;
192import org.apache.hadoop.hbase.util.Addressing;
193import org.apache.hadoop.hbase.util.Bytes;
194import org.apache.hadoop.hbase.util.FutureUtils;
195import org.apache.hadoop.hbase.util.HBaseFsck;
196import org.apache.hadoop.hbase.util.HFileArchiveUtil;
197import org.apache.hadoop.hbase.util.HasThread;
198import org.apache.hadoop.hbase.util.IdLock;
199import org.apache.hadoop.hbase.util.ModifyRegionUtils;
200import org.apache.hadoop.hbase.util.Pair;
201import org.apache.hadoop.hbase.util.RetryCounter;
202import org.apache.hadoop.hbase.util.RetryCounterFactory;
203import org.apache.hadoop.hbase.util.TableDescriptorChecker;
204import org.apache.hadoop.hbase.util.Threads;
205import org.apache.hadoop.hbase.util.VersionInfo;
206import org.apache.hadoop.hbase.zookeeper.LoadBalancerTracker;
207import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
208import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker;
209import org.apache.hadoop.hbase.zookeeper.SnapshotCleanupTracker;
210import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
211import org.apache.hadoop.hbase.zookeeper.ZKUtil;
212import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
213import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
214import org.apache.yetus.audience.InterfaceAudience;
215import org.apache.zookeeper.KeeperException;
216import org.eclipse.jetty.server.Server;
217import org.eclipse.jetty.server.ServerConnector;
218import org.eclipse.jetty.servlet.ServletHolder;
219import org.eclipse.jetty.webapp.WebAppContext;
220import org.slf4j.Logger;
221import org.slf4j.LoggerFactory;
222
223import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
224import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
225import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
226
227import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
228import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
229import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
230import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
231
232/**
233 * HMaster is the "master server" for HBase. An HBase cluster has one active
234 * master.  If many masters are started, all compete.  Whichever wins goes on to
235 * run the cluster.  All others park themselves in their constructor until
236 * master or cluster shutdown or until the active master loses its lease in
237 * zookeeper.  Thereafter, all running master jostle to take over master role.
238 *
239 * <p>The Master can be asked shutdown the cluster. See {@link #shutdown()}.  In
240 * this case it will tell all regionservers to go down and then wait on them
241 * all reporting in that they are down.  This master will then shut itself down.
242 *
243 * <p>You can also shutdown just this master.  Call {@link #stopMaster()}.
244 *
245 * @see org.apache.zookeeper.Watcher
246 */
247@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
248@SuppressWarnings("deprecation")
249public class HMaster extends HRegionServer implements MasterServices {
250  private static Logger LOG = LoggerFactory.getLogger(HMaster.class);
251
252  /**
253   * Protection against zombie master. Started once Master accepts active responsibility and
254   * starts taking over responsibilities. Allows a finite time window before giving up ownership.
255   */
256  private static class InitializationMonitor extends HasThread {
257    /** The amount of time in milliseconds to sleep before checking initialization status. */
258    public static final String TIMEOUT_KEY = "hbase.master.initializationmonitor.timeout";
259    public static final long TIMEOUT_DEFAULT = TimeUnit.MILLISECONDS.convert(15, TimeUnit.MINUTES);
260
261    /**
262     * When timeout expired and initialization has not complete, call {@link System#exit(int)} when
263     * true, do nothing otherwise.
264     */
265    public static final String HALT_KEY = "hbase.master.initializationmonitor.haltontimeout";
266    public static final boolean HALT_DEFAULT = false;
267
268    private final HMaster master;
269    private final long timeout;
270    private final boolean haltOnTimeout;
271
272    /** Creates a Thread that monitors the {@link #isInitialized()} state. */
273    InitializationMonitor(HMaster master) {
274      super("MasterInitializationMonitor");
275      this.master = master;
276      this.timeout = master.getConfiguration().getLong(TIMEOUT_KEY, TIMEOUT_DEFAULT);
277      this.haltOnTimeout = master.getConfiguration().getBoolean(HALT_KEY, HALT_DEFAULT);
278      this.setDaemon(true);
279    }
280
281    @Override
282    public void run() {
283      try {
284        while (!master.isStopped() && master.isActiveMaster()) {
285          Thread.sleep(timeout);
286          if (master.isInitialized()) {
287            LOG.debug("Initialization completed within allotted tolerance. Monitor exiting.");
288          } else {
289            LOG.error("Master failed to complete initialization after " + timeout + "ms. Please"
290                + " consider submitting a bug report including a thread dump of this process.");
291            if (haltOnTimeout) {
292              LOG.error("Zombie Master exiting. Thread dump to stdout");
293              Threads.printThreadInfo(System.out, "Zombie HMaster");
294              System.exit(-1);
295            }
296          }
297        }
298      } catch (InterruptedException ie) {
299        LOG.trace("InitMonitor thread interrupted. Existing.");
300      }
301    }
302  }
303
304  // MASTER is name of the webapp and the attribute name used stuffing this
305  //instance into web context.
306  public static final String MASTER = "master";
307
308  // Manager and zk listener for master election
309  private final ActiveMasterManager activeMasterManager;
310  // Region server tracker
311  private RegionServerTracker regionServerTracker;
312  // Draining region server tracker
313  private DrainingServerTracker drainingServerTracker;
314  // Tracker for load balancer state
315  LoadBalancerTracker loadBalancerTracker;
316  // Tracker for meta location, if any client ZK quorum specified
317  MetaLocationSyncer metaLocationSyncer;
318  // Tracker for active master location, if any client ZK quorum specified
319  MasterAddressSyncer masterAddressSyncer;
320  // Tracker for auto snapshot cleanup state
321  SnapshotCleanupTracker snapshotCleanupTracker;
322
323  // Tracker for split and merge state
324  private SplitOrMergeTracker splitOrMergeTracker;
325
326  // Tracker for region normalizer state
327  private RegionNormalizerTracker regionNormalizerTracker;
328
329  private ClusterSchemaService clusterSchemaService;
330
331  public static final String HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS =
332    "hbase.master.wait.on.service.seconds";
333  public static final int DEFAULT_HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS = 5 * 60;
334
335  public static final String HBASE_MASTER_CLEANER_INTERVAL = "hbase.master.cleaner.interval";
336
337  public static final int DEFAULT_HBASE_MASTER_CLEANER_INTERVAL = 600 * 1000;
338
339  // Metrics for the HMaster
340  final MetricsMaster metricsMaster;
341  // file system manager for the master FS operations
342  private MasterFileSystem fileSystemManager;
343  private MasterWalManager walManager;
344
345  // manager to manage procedure-based WAL splitting, can be null if current
346  // is zk-based WAL splitting. SplitWALManager will replace SplitLogManager
347  // and MasterWalManager, which means zk-based WAL splitting code will be
348  // useless after we switch to the procedure-based one. our eventual goal
349  // is to remove all the zk-based WAL splitting code.
350  private SplitWALManager splitWALManager;
351
352  // server manager to deal with region server info
353  private volatile ServerManager serverManager;
354
355  // manager of assignment nodes in zookeeper
356  private AssignmentManager assignmentManager;
357
358  // manager of replication
359  private ReplicationPeerManager replicationPeerManager;
360
361  private SyncReplicationReplayWALManager syncReplicationReplayWALManager;
362
363  // buffer for "fatal error" notices from region servers
364  // in the cluster. This is only used for assisting
365  // operations/debugging.
366  MemoryBoundedLogMessageBuffer rsFatals;
367
368  // flag set after we become the active master (used for testing)
369  private volatile boolean activeMaster = false;
370
371  // flag set after we complete initialization once active
372  private final ProcedureEvent<?> initialized = new ProcedureEvent<>("master initialized");
373
374  // flag set after master services are started,
375  // initialization may have not completed yet.
376  volatile boolean serviceStarted = false;
377
378  // Maximum time we should run balancer for
379  private final int maxBlancingTime;
380  // Maximum percent of regions in transition when balancing
381  private final double maxRitPercent;
382
383  private final LockManager lockManager = new LockManager(this);
384
385  private LoadBalancer balancer;
386  private RegionNormalizer normalizer;
387  private BalancerChore balancerChore;
388  private RegionNormalizerChore normalizerChore;
389  private ClusterStatusChore clusterStatusChore;
390  private ClusterStatusPublisher clusterStatusPublisherChore = null;
391  private SnapshotCleanerChore snapshotCleanerChore = null;
392
393  private HbckChore hbckChore;
394  CatalogJanitor catalogJanitorChore;
395  private DirScanPool cleanerPool;
396  private LogCleaner logCleaner;
397  private HFileCleaner hfileCleaner;
398  private ReplicationBarrierCleaner replicationBarrierCleaner;
399  private ExpiredMobFileCleanerChore expiredMobFileCleanerChore;
400  private MobCompactionChore mobCompactChore;
401  private MasterMobCompactionThread mobCompactThread;
402  // used to synchronize the mobCompactionStates
403  private final IdLock mobCompactionLock = new IdLock();
404  // save the information of mob compactions in tables.
405  // the key is table name, the value is the number of compactions in that table.
406  private Map<TableName, AtomicInteger> mobCompactionStates = Maps.newConcurrentMap();
407
408  MasterCoprocessorHost cpHost;
409
410  private final boolean preLoadTableDescriptors;
411
412  // Time stamps for when a hmaster became active
413  private long masterActiveTime;
414
415  // Time stamp for when HMaster finishes becoming Active Master
416  private long masterFinishedInitializationTime;
417
418  Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
419
420  // monitor for snapshot of hbase tables
421  SnapshotManager snapshotManager;
422  // monitor for distributed procedures
423  private MasterProcedureManagerHost mpmHost;
424
425  private RegionsRecoveryChore regionsRecoveryChore = null;
426
427  private RegionsRecoveryConfigManager regionsRecoveryConfigManager = null;
428  // it is assigned after 'initialized' guard set to true, so should be volatile
429  private volatile MasterQuotaManager quotaManager;
430  private SpaceQuotaSnapshotNotifier spaceQuotaSnapshotNotifier;
431  private QuotaObserverChore quotaObserverChore;
432  private SnapshotQuotaObserverChore snapshotQuotaChore;
433
434  private ProcedureExecutor<MasterProcedureEnv> procedureExecutor;
435  private ProcedureStore procedureStore;
436
437  // handle table states
438  private TableStateManager tableStateManager;
439
440  private long splitPlanCount;
441  private long mergePlanCount;
442
443  /* Handle favored nodes information */
444  private FavoredNodesManager favoredNodesManager;
445
446  /** jetty server for master to redirect requests to regionserver infoServer */
447  private Server masterJettyServer;
448
449  // Determine if we should do normal startup or minimal "single-user" mode with no region
450  // servers and no user tables. Useful for repair and recovery of hbase:meta
451  private final boolean maintenanceMode;
452  static final String MAINTENANCE_MODE = "hbase.master.maintenance_mode";
453
454  // Cached clusterId on stand by masters to serve clusterID requests from clients.
455  private final CachedClusterId cachedClusterId;
456
457  public static class RedirectServlet extends HttpServlet {
458    private static final long serialVersionUID = 2894774810058302473L;
459    private final int regionServerInfoPort;
460    private final String regionServerHostname;
461
462    /**
463     * @param infoServer that we're trying to send all requests to
464     * @param hostname may be null. if given, will be used for redirects instead of host from client.
465     */
466    public RedirectServlet(InfoServer infoServer, String hostname) {
467       regionServerInfoPort = infoServer.getPort();
468       regionServerHostname = hostname;
469    }
470
471    @Override
472    public void doGet(HttpServletRequest request,
473        HttpServletResponse response) throws ServletException, IOException {
474      String redirectHost = regionServerHostname;
475      if(redirectHost == null) {
476        redirectHost = request.getServerName();
477        if(!Addressing.isLocalAddress(InetAddress.getByName(redirectHost))) {
478          LOG.warn("Couldn't resolve '" + redirectHost + "' as an address local to this node and '" +
479              MASTER_HOSTNAME_KEY + "' is not set; client will get an HTTP 400 response. If " +
480              "your HBase deployment relies on client accessible names that the region server process " +
481              "can't resolve locally, then you should set the previously mentioned configuration variable " +
482              "to an appropriate hostname.");
483          // no sending client provided input back to the client, so the goal host is just in the logs.
484          response.sendError(400, "Request was to a host that I can't resolve for any of the network interfaces on " +
485              "this node. If this is due to an intermediary such as an HTTP load balancer or other proxy, your HBase " +
486              "administrator can set '" + MASTER_HOSTNAME_KEY + "' to point to the correct hostname.");
487          return;
488        }
489      }
490      // TODO this scheme should come from looking at the scheme registered in the infoserver's http server for the
491      // host and port we're using, but it's buried way too deep to do that ATM.
492      String redirectUrl = request.getScheme() + "://"
493        + redirectHost + ":" + regionServerInfoPort
494        + request.getRequestURI();
495      response.sendRedirect(redirectUrl);
496    }
497  }
498
499  /**
500   * Initializes the HMaster. The steps are as follows:
501   * <p>
502   * <ol>
503   * <li>Initialize the local HRegionServer
504   * <li>Start the ActiveMasterManager.
505   * </ol>
506   * <p>
507   * Remaining steps of initialization occur in
508   * #finishActiveMasterInitialization(MonitoredTask) after
509   * the master becomes the active one.
510   */
511  public HMaster(final Configuration conf)
512      throws IOException, KeeperException {
513    super(conf);
514    TraceUtil.initTracer(conf);
515    try {
516      if (conf.getBoolean(MAINTENANCE_MODE, false)) {
517        LOG.info("Detected {}=true via configuration.", MAINTENANCE_MODE);
518        maintenanceMode = true;
519      } else if (Boolean.getBoolean(MAINTENANCE_MODE)) {
520        LOG.info("Detected {}=true via environment variables.", MAINTENANCE_MODE);
521        maintenanceMode = true;
522      } else {
523        maintenanceMode = false;
524      }
525
526      this.rsFatals = new MemoryBoundedLogMessageBuffer(
527          conf.getLong("hbase.master.buffer.for.rs.fatals", 1 * 1024 * 1024));
528      LOG.info("hbase.rootdir={}, hbase.cluster.distributed={}", getDataRootDir(),
529          this.conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, false));
530
531      // Disable usage of meta replicas in the master
532      this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
533
534      decorateMasterConfiguration(this.conf);
535
536      // Hack! Maps DFSClient => Master for logs.  HDFS made this
537      // config param for task trackers, but we can piggyback off of it.
538      if (this.conf.get("mapreduce.task.attempt.id") == null) {
539        this.conf.set("mapreduce.task.attempt.id", "hb_m_" + this.serverName.toString());
540      }
541
542      this.metricsMaster = new MetricsMaster(new MetricsMasterWrapperImpl(this));
543
544      // preload table descriptor at startup
545      this.preLoadTableDescriptors = conf.getBoolean("hbase.master.preload.tabledescriptors", true);
546
547      this.maxBlancingTime = getMaxBalancingTime();
548      this.maxRitPercent = conf.getDouble(HConstants.HBASE_MASTER_BALANCER_MAX_RIT_PERCENT,
549          HConstants.DEFAULT_HBASE_MASTER_BALANCER_MAX_RIT_PERCENT);
550
551      // Do we publish the status?
552
553      boolean shouldPublish = conf.getBoolean(HConstants.STATUS_PUBLISHED,
554          HConstants.STATUS_PUBLISHED_DEFAULT);
555      Class<? extends ClusterStatusPublisher.Publisher> publisherClass =
556          conf.getClass(ClusterStatusPublisher.STATUS_PUBLISHER_CLASS,
557              ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS,
558              ClusterStatusPublisher.Publisher.class);
559
560      if (shouldPublish) {
561        if (publisherClass == null) {
562          LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
563              ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS +
564              " is not set - not publishing status");
565        } else {
566          clusterStatusPublisherChore = new ClusterStatusPublisher(this, conf, publisherClass);
567          getChoreService().scheduleChore(clusterStatusPublisherChore);
568        }
569      }
570
571      // Some unit tests don't need a cluster, so no zookeeper at all
572      if (!conf.getBoolean("hbase.testing.nocluster", false)) {
573        this.activeMasterManager = new ActiveMasterManager(zooKeeper, this.serverName, this);
574      } else {
575        this.activeMasterManager = null;
576      }
577      cachedClusterId = new CachedClusterId(conf);
578    } catch (Throwable t) {
579      // Make sure we log the exception. HMaster is often started via reflection and the
580      // cause of failed startup is lost.
581      LOG.error("Failed construction of Master", t);
582      throw t;
583    }
584  }
585
586  @Override
587  protected String getUseThisHostnameInstead(Configuration conf) {
588    return conf.get(MASTER_HOSTNAME_KEY);
589  }
590
591  // Main run loop. Calls through to the regionserver run loop AFTER becoming active Master; will
592  // block in here until then.
593  @Override
594  public void run() {
595    try {
596      if (!conf.getBoolean("hbase.testing.nocluster", false)) {
597        Threads.setDaemonThreadRunning(new Thread(() -> {
598          try {
599            int infoPort = putUpJettyServer();
600            startActiveMasterManager(infoPort);
601          } catch (Throwable t) {
602            // Make sure we log the exception.
603            String error = "Failed to become Active Master";
604            LOG.error(error, t);
605            // Abort should have been called already.
606            if (!isAborted()) {
607              abort(error, t);
608            }
609          }
610        }), getName() + ":becomeActiveMaster");
611      }
612      // Fall in here even if we have been aborted. Need to run the shutdown services and
613      // the super run call will do this for us.
614      super.run();
615    } finally {
616      if (this.clusterSchemaService != null) {
617        // If on way out, then we are no longer active master.
618        this.clusterSchemaService.stopAsync();
619        try {
620          this.clusterSchemaService.awaitTerminated(
621              getConfiguration().getInt(HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS,
622              DEFAULT_HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS), TimeUnit.SECONDS);
623        } catch (TimeoutException te) {
624          LOG.warn("Failed shutdown of clusterSchemaService", te);
625        }
626      }
627      this.activeMaster = false;
628    }
629  }
630
631  // return the actual infoPort, -1 means disable info server.
632  private int putUpJettyServer() throws IOException {
633    if (!conf.getBoolean("hbase.master.infoserver.redirect", true)) {
634      return -1;
635    }
636    final int infoPort = conf.getInt("hbase.master.info.port.orig",
637      HConstants.DEFAULT_MASTER_INFOPORT);
638    // -1 is for disabling info server, so no redirecting
639    if (infoPort < 0 || infoServer == null) {
640      return -1;
641    }
642    if(infoPort == infoServer.getPort()) {
643      return infoPort;
644    }
645    final String addr = conf.get("hbase.master.info.bindAddress", "0.0.0.0");
646    if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) {
647      String msg =
648          "Failed to start redirecting jetty server. Address " + addr
649              + " does not belong to this host. Correct configuration parameter: "
650              + "hbase.master.info.bindAddress";
651      LOG.error(msg);
652      throw new IOException(msg);
653    }
654
655    // TODO I'm pretty sure we could just add another binding to the InfoServer run by
656    // the RegionServer and have it run the RedirectServlet instead of standing up
657    // a second entire stack here.
658    masterJettyServer = new Server();
659    final ServerConnector connector = new ServerConnector(masterJettyServer);
660    connector.setHost(addr);
661    connector.setPort(infoPort);
662    masterJettyServer.addConnector(connector);
663    masterJettyServer.setStopAtShutdown(true);
664
665    final String redirectHostname =
666        StringUtils.isBlank(useThisHostnameInstead) ? null : useThisHostnameInstead;
667
668    final RedirectServlet redirect = new RedirectServlet(infoServer, redirectHostname);
669    final WebAppContext context = new WebAppContext(null, "/", null, null, null, null, WebAppContext.NO_SESSIONS);
670    context.addServlet(new ServletHolder(redirect), "/*");
671    context.setServer(masterJettyServer);
672
673    try {
674      masterJettyServer.start();
675    } catch (Exception e) {
676      throw new IOException("Failed to start redirecting jetty server", e);
677    }
678    return connector.getLocalPort();
679  }
680
681  @Override
682  protected Function<TableDescriptorBuilder, TableDescriptorBuilder> getMetaTableObserver() {
683    return builder -> builder.setRegionReplication(conf.getInt(HConstants.META_REPLICAS_NUM, HConstants.DEFAULT_META_REPLICA_NUM));
684  }
685  /**
686   * For compatibility, if failed with regionserver credentials, try the master one
687   */
688  @Override
689  protected void login(UserProvider user, String host) throws IOException {
690    try {
691      super.login(user, host);
692    } catch (IOException ie) {
693      user.login(SecurityConstants.MASTER_KRB_KEYTAB_FILE,
694              SecurityConstants.MASTER_KRB_PRINCIPAL, host);
695    }
696  }
697
698  /**
699   * If configured to put regions on active master,
700   * wait till a backup master becomes active.
701   * Otherwise, loop till the server is stopped or aborted.
702   */
703  @Override
704  protected void waitForMasterActive(){
705    if (maintenanceMode) {
706      return;
707    }
708    boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(conf);
709    while (!(tablesOnMaster && activeMaster) && !isStopped() && !isAborted()) {
710      sleeper.sleep();
711    }
712  }
713
714  @VisibleForTesting
715  public MasterRpcServices getMasterRpcServices() {
716    return (MasterRpcServices)rpcServices;
717  }
718
719  public boolean balanceSwitch(final boolean b) throws IOException {
720    return getMasterRpcServices().switchBalancer(b, BalanceSwitchMode.ASYNC);
721  }
722
723  @Override
724  protected String getProcessName() {
725    return MASTER;
726  }
727
728  @Override
729  protected boolean canCreateBaseZNode() {
730    return true;
731  }
732
733  @Override
734  protected boolean canUpdateTableDescriptor() {
735    return true;
736  }
737
738  @Override
739  protected RSRpcServices createRpcServices() throws IOException {
740    return new MasterRpcServices(this);
741  }
742
743  @Override
744  protected void configureInfoServer() {
745    infoServer.addServlet("master-status", "/master-status", MasterStatusServlet.class);
746    infoServer.setAttribute(MASTER, this);
747    if (LoadBalancer.isTablesOnMaster(conf)) {
748      super.configureInfoServer();
749    }
750  }
751
752  @Override
753  protected Class<? extends HttpServlet> getDumpServlet() {
754    return MasterDumpServlet.class;
755  }
756
757  @Override
758  public MetricsMaster getMasterMetrics() {
759    return metricsMaster;
760  }
761
762  /**
763   * <p>
764   * Initialize all ZK based system trackers. But do not include {@link RegionServerTracker}, it
765   * should have already been initialized along with {@link ServerManager}.
766   * </p>
767   * <p>
768   * Will be overridden in tests.
769   * </p>
770   */
771  @VisibleForTesting
772  protected void initializeZKBasedSystemTrackers()
773      throws IOException, InterruptedException, KeeperException, ReplicationException {
774    this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
775    this.normalizer = RegionNormalizerFactory.getRegionNormalizer(conf);
776    this.normalizer.setMasterServices(this);
777    this.normalizer.setMasterRpcServices((MasterRpcServices)rpcServices);
778    this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this);
779    this.loadBalancerTracker.start();
780
781    this.regionNormalizerTracker = new RegionNormalizerTracker(zooKeeper, this);
782    this.regionNormalizerTracker.start();
783
784    this.splitOrMergeTracker = new SplitOrMergeTracker(zooKeeper, conf, this);
785    this.splitOrMergeTracker.start();
786
787    this.replicationPeerManager = ReplicationPeerManager.create(zooKeeper, conf);
788
789    this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, this.serverManager);
790    this.drainingServerTracker.start();
791
792    this.snapshotCleanupTracker = new SnapshotCleanupTracker(zooKeeper, this);
793    this.snapshotCleanupTracker.start();
794
795    String clientQuorumServers = conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM);
796    boolean clientZkObserverMode = conf.getBoolean(HConstants.CLIENT_ZOOKEEPER_OBSERVER_MODE,
797      HConstants.DEFAULT_CLIENT_ZOOKEEPER_OBSERVER_MODE);
798    if (clientQuorumServers != null && !clientZkObserverMode) {
799      // we need to take care of the ZK information synchronization
800      // if given client ZK are not observer nodes
801      ZKWatcher clientZkWatcher = new ZKWatcher(conf,
802          getProcessName() + ":" + rpcServices.getSocketAddress().getPort() + "-clientZK", this,
803          false, true);
804      this.metaLocationSyncer = new MetaLocationSyncer(zooKeeper, clientZkWatcher, this);
805      this.metaLocationSyncer.start();
806      this.masterAddressSyncer = new MasterAddressSyncer(zooKeeper, clientZkWatcher, this);
807      this.masterAddressSyncer.start();
808      // set cluster id is a one-go effort
809      ZKClusterId.setClusterId(clientZkWatcher, fileSystemManager.getClusterId());
810    }
811
812    // Set the cluster as up.  If new RSs, they'll be waiting on this before
813    // going ahead with their startup.
814    boolean wasUp = this.clusterStatusTracker.isClusterUp();
815    if (!wasUp) this.clusterStatusTracker.setClusterUp();
816
817    LOG.info("Active/primary master=" + this.serverName +
818        ", sessionid=0x" +
819        Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) +
820        ", setting cluster-up flag (Was=" + wasUp + ")");
821
822    // create/initialize the snapshot manager and other procedure managers
823    this.snapshotManager = new SnapshotManager();
824    this.mpmHost = new MasterProcedureManagerHost();
825    this.mpmHost.register(this.snapshotManager);
826    this.mpmHost.register(new MasterFlushTableProcedureManager());
827    this.mpmHost.loadProcedures(conf);
828    this.mpmHost.initialize(this, this.metricsMaster);
829  }
830
831  // Will be overriden in test to inject customized AssignmentManager
832  @VisibleForTesting
833  protected AssignmentManager createAssignmentManager(MasterServices master) {
834    return new AssignmentManager(master);
835  }
836
837  /**
838   * Finish initialization of HMaster after becoming the primary master.
839   * <p/>
840   * The startup order is a bit complicated but very important, do not change it unless you know
841   * what you are doing.
842   * <ol>
843   * <li>Initialize file system based components - file system manager, wal manager, table
844   * descriptors, etc</li>
845   * <li>Publish cluster id</li>
846   * <li>Here comes the most complicated part - initialize server manager, assignment manager and
847   * region server tracker
848   * <ol type='i'>
849   * <li>Create server manager</li>
850   * <li>Create procedure executor, load the procedures, but do not start workers. We will start it
851   * later after we finish scheduling SCPs to avoid scheduling duplicated SCPs for the same
852   * server</li>
853   * <li>Create assignment manager and start it, load the meta region state, but do not load data
854   * from meta region</li>
855   * <li>Start region server tracker, construct the online servers set and find out dead servers and
856   * schedule SCP for them. The online servers will be constructed by scanning zk, and we will also
857   * scan the wal directory to find out possible live region servers, and the differences between
858   * these two sets are the dead servers</li>
859   * </ol>
860   * </li>
861   * <li>If this is a new deploy, schedule a InitMetaProcedure to initialize meta</li>
862   * <li>Start necessary service threads - balancer, catalog janior, executor services, and also the
863   * procedure executor, etc. Notice that the balancer must be created first as assignment manager
864   * may use it when assigning regions.</li>
865   * <li>Wait for meta to be initialized if necesssary, start table state manager.</li>
866   * <li>Wait for enough region servers to check-in</li>
867   * <li>Let assignment manager load data from meta and construct region states</li>
868   * <li>Start all other things such as chore services, etc</li>
869   * </ol>
870   * <p/>
871   * Notice that now we will not schedule a special procedure to make meta online(unless the first
872   * time where meta has not been created yet), we will rely on SCP to bring meta online.
873   */
874  private void finishActiveMasterInitialization(MonitoredTask status) throws IOException,
875          InterruptedException, KeeperException, ReplicationException {
876    /*
877     * We are active master now... go initialize components we need to run.
878     */
879    status.setStatus("Initializing Master file system");
880
881    this.masterActiveTime = System.currentTimeMillis();
882    // TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring.
883
884    // always initialize the MemStoreLAB as we use a region to store procedure now.
885    initializeMemStoreChunkCreator();
886    this.fileSystemManager = new MasterFileSystem(conf);
887    this.walManager = new MasterWalManager(this);
888
889    // enable table descriptors cache
890    this.tableDescriptors.setCacheOn();
891
892    // warm-up HTDs cache on master initialization
893    if (preLoadTableDescriptors) {
894      status.setStatus("Pre-loading table descriptors");
895      this.tableDescriptors.getAll();
896    }
897
898    // Publish cluster ID; set it in Master too. The superclass RegionServer does this later but
899    // only after it has checked in with the Master. At least a few tests ask Master for clusterId
900    // before it has called its run method and before RegionServer has done the reportForDuty.
901    ClusterId clusterId = fileSystemManager.getClusterId();
902    status.setStatus("Publishing Cluster ID " + clusterId + " in ZooKeeper");
903    ZKClusterId.setClusterId(this.zooKeeper, fileSystemManager.getClusterId());
904    this.clusterId = clusterId.toString();
905
906    // Precaution. Put in place the old hbck1 lock file to fence out old hbase1s running their
907    // hbck1s against an hbase2 cluster; it could do damage. To skip this behavior, set
908    // hbase.write.hbck1.lock.file to false.
909    if (this.conf.getBoolean("hbase.write.hbck1.lock.file", true)) {
910      HBaseFsck.checkAndMarkRunningHbck(this.conf,
911          HBaseFsck.createLockRetryCounterFactory(this.conf).create());
912    }
913
914    status.setStatus("Initialize ServerManager and schedule SCP for crash servers");
915    // The below two managers must be created before loading procedures, as they will be used during
916    // loading.
917    this.serverManager = createServerManager(this);
918    this.syncReplicationReplayWALManager = new SyncReplicationReplayWALManager(this);
919    if (!conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
920      DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) {
921      this.splitWALManager = new SplitWALManager(this);
922    }
923    createProcedureExecutor();
924    Map<Class<?>, List<Procedure<MasterProcedureEnv>>> procsByType =
925      procedureExecutor.getActiveProceduresNoCopy().stream()
926        .collect(Collectors.groupingBy(p -> p.getClass()));
927
928    // Create Assignment Manager
929    this.assignmentManager = createAssignmentManager(this);
930    this.assignmentManager.start();
931    // TODO: TRSP can perform as the sub procedure for other procedures, so even if it is marked as
932    // completed, it could still be in the procedure list. This is a bit strange but is another
933    // story, need to verify the implementation for ProcedureExecutor and ProcedureStore.
934    List<TransitRegionStateProcedure> ritList =
935      procsByType.getOrDefault(TransitRegionStateProcedure.class, Collections.emptyList()).stream()
936        .filter(p -> !p.isFinished()).map(p -> (TransitRegionStateProcedure) p)
937        .collect(Collectors.toList());
938    this.assignmentManager.setupRIT(ritList);
939
940    // Start RegionServerTracker with listing of servers found with exiting SCPs -- these should
941    // be registered in the deadServers set -- and with the list of servernames out on the
942    // filesystem that COULD BE 'alive' (we'll schedule SCPs for each and let SCP figure it out).
943    // We also pass dirs that are already 'splitting'... so we can do some checks down in tracker.
944    // TODO: Generate the splitting and live Set in one pass instead of two as we currently do.
945    this.regionServerTracker = new RegionServerTracker(zooKeeper, this, this.serverManager);
946    this.regionServerTracker.start(
947      procsByType.getOrDefault(ServerCrashProcedure.class, Collections.emptyList()).stream()
948        .map(p -> (ServerCrashProcedure) p).map(p -> p.getServerName()).collect(Collectors.toSet()),
949      walManager.getLiveServersFromWALDir(), walManager.getSplittingServersFromWALDir());
950    // This manager will be started AFTER hbase:meta is confirmed on line.
951    // hbase.mirror.table.state.to.zookeeper is so hbase1 clients can connect. They read table
952    // state from zookeeper while hbase2 reads it from hbase:meta. Disable if no hbase1 clients.
953    this.tableStateManager =
954      this.conf.getBoolean(MirroringTableStateManager.MIRROR_TABLE_STATE_TO_ZK_KEY, true)
955        ?
956        new MirroringTableStateManager(this):
957        new TableStateManager(this);
958
959    status.setStatus("Initializing ZK system trackers");
960    initializeZKBasedSystemTrackers();
961    status.setStatus("Loading last flushed sequence id of regions");
962    try {
963      this.serverManager.loadLastFlushedSequenceIds();
964    } catch (IOException e) {
965      LOG.info("Failed to load last flushed sequence id of regions"
966          + " from file system", e);
967    }
968    // Set ourselves as active Master now our claim has succeeded up in zk.
969    this.activeMaster = true;
970
971    // Start the Zombie master detector after setting master as active, see HBASE-21535
972    Thread zombieDetector = new Thread(new InitializationMonitor(this),
973        "ActiveMasterInitializationMonitor-" + System.currentTimeMillis());
974    zombieDetector.setDaemon(true);
975    zombieDetector.start();
976
977    // This is for backwards compatibility
978    // See HBASE-11393
979    status.setStatus("Update TableCFs node in ZNode");
980    ReplicationPeerConfigUpgrader tableCFsUpdater =
981        new ReplicationPeerConfigUpgrader(zooKeeper, conf);
982    tableCFsUpdater.copyTableCFs();
983
984    if (!maintenanceMode) {
985      // Add the Observer to delete quotas on table deletion before starting all CPs by
986      // default with quota support, avoiding if user specifically asks to not load this Observer.
987      if (QuotaUtil.isQuotaEnabled(conf)) {
988        updateConfigurationForQuotasObserver(conf);
989      }
990      // initialize master side coprocessors before we start handling requests
991      status.setStatus("Initializing master coprocessors");
992      this.cpHost = new MasterCoprocessorHost(this, this.conf);
993    }
994
995    // Checking if meta needs initializing.
996    status.setStatus("Initializing meta table if this is a new deploy");
997    InitMetaProcedure initMetaProc = null;
998    // Print out state of hbase:meta on startup; helps debugging.
999    RegionState rs = this.assignmentManager.getRegionStates().
1000        getRegionState(RegionInfoBuilder.FIRST_META_REGIONINFO);
1001    LOG.info("hbase:meta {}", rs);
1002    if (rs != null && rs.isOffline()) {
1003      Optional<InitMetaProcedure> optProc = procedureExecutor.getProcedures().stream()
1004        .filter(p -> p instanceof InitMetaProcedure).map(o -> (InitMetaProcedure) o).findAny();
1005      initMetaProc = optProc.orElseGet(() -> {
1006        // schedule an init meta procedure if meta has not been deployed yet
1007        InitMetaProcedure temp = new InitMetaProcedure();
1008        procedureExecutor.submitProcedure(temp);
1009        return temp;
1010      });
1011    }
1012    if (this.balancer instanceof FavoredNodesPromoter) {
1013      favoredNodesManager = new FavoredNodesManager(this);
1014    }
1015
1016    // initialize load balancer
1017    this.balancer.setMasterServices(this);
1018    this.balancer.setClusterMetrics(getClusterMetricsWithoutCoprocessor());
1019    this.balancer.initialize();
1020
1021    // start up all service threads.
1022    status.setStatus("Initializing master service threads");
1023    startServiceThreads();
1024    // wait meta to be initialized after we start procedure executor
1025    if (initMetaProc != null) {
1026      initMetaProc.await();
1027    }
1028    // Wake up this server to check in
1029    sleeper.skipSleepCycle();
1030
1031    // Wait for region servers to report in.
1032    // With this as part of master initialization, it precludes our being able to start a single
1033    // server that is both Master and RegionServer. Needs more thought. TODO.
1034    String statusStr = "Wait for region servers to report in";
1035    status.setStatus(statusStr);
1036    LOG.info(Objects.toString(status));
1037    waitForRegionServers(status);
1038
1039    // Check if master is shutting down because issue initializing regionservers or balancer.
1040    if (isStopped()) {
1041      return;
1042    }
1043
1044    status.setStatus("Starting assignment manager");
1045    // FIRST HBASE:META READ!!!!
1046    // The below cannot make progress w/o hbase:meta being online.
1047    // This is the FIRST attempt at going to hbase:meta. Meta on-lining is going on in background
1048    // as procedures run -- in particular SCPs for crashed servers... One should put up hbase:meta
1049    // if it is down. It may take a while to come online. So, wait here until meta if for sure
1050    // available. That's what waitForMetaOnline does.
1051    if (!waitForMetaOnline()) {
1052      return;
1053    }
1054    this.assignmentManager.joinCluster();
1055    // The below depends on hbase:meta being online.
1056    this.tableStateManager.start();
1057    // Below has to happen after tablestatemanager has started in the case where this hbase-2.x
1058    // is being started over an hbase-1.x dataset. tablestatemanager runs a migration as part
1059    // of its 'start' moving table state from zookeeper to hbase:meta. This migration needs to
1060    // complete before we do this next step processing offline regions else it fails reading
1061    // table states messing up master launch (namespace table, etc., are not assigned).
1062    this.assignmentManager.processOfflineRegions();
1063    // Initialize after meta is up as below scans meta
1064    if (favoredNodesManager != null && !maintenanceMode) {
1065      SnapshotOfRegionAssignmentFromMeta snapshotOfRegionAssignment =
1066          new SnapshotOfRegionAssignmentFromMeta(getConnection());
1067      snapshotOfRegionAssignment.initialize();
1068      favoredNodesManager.initialize(snapshotOfRegionAssignment);
1069    }
1070
1071    // set cluster status again after user regions are assigned
1072    this.balancer.setClusterMetrics(getClusterMetricsWithoutCoprocessor());
1073
1074    // Start balancer and meta catalog janitor after meta and regions have been assigned.
1075    status.setStatus("Starting balancer and catalog janitor");
1076    this.clusterStatusChore = new ClusterStatusChore(this, balancer);
1077    getChoreService().scheduleChore(clusterStatusChore);
1078    this.balancerChore = new BalancerChore(this);
1079    getChoreService().scheduleChore(balancerChore);
1080    this.normalizerChore = new RegionNormalizerChore(this);
1081    getChoreService().scheduleChore(normalizerChore);
1082    this.catalogJanitorChore = new CatalogJanitor(this);
1083    getChoreService().scheduleChore(catalogJanitorChore);
1084    this.hbckChore = new HbckChore(this);
1085    getChoreService().scheduleChore(hbckChore);
1086    this.serverManager.startChore();
1087
1088    // Only for rolling upgrade, where we need to migrate the data in namespace table to meta table.
1089    if (!waitForNamespaceOnline()) {
1090      return;
1091    }
1092    status.setStatus("Starting cluster schema service");
1093    initClusterSchemaService();
1094
1095    if (this.cpHost != null) {
1096      try {
1097        this.cpHost.preMasterInitialization();
1098      } catch (IOException e) {
1099        LOG.error("Coprocessor preMasterInitialization() hook failed", e);
1100      }
1101    }
1102
1103    status.markComplete("Initialization successful");
1104    LOG.info(String.format("Master has completed initialization %.3fsec",
1105       (System.currentTimeMillis() - masterActiveTime) / 1000.0f));
1106    this.masterFinishedInitializationTime = System.currentTimeMillis();
1107    configurationManager.registerObserver(this.balancer);
1108    configurationManager.registerObserver(this.cleanerPool);
1109    configurationManager.registerObserver(this.hfileCleaner);
1110    configurationManager.registerObserver(this.logCleaner);
1111    configurationManager.registerObserver(this.regionsRecoveryConfigManager);
1112    // Set master as 'initialized'.
1113    setInitialized(true);
1114
1115    if (maintenanceMode) {
1116      LOG.info("Detected repair mode, skipping final initialization steps.");
1117      return;
1118    }
1119
1120    assignmentManager.checkIfShouldMoveSystemRegionAsync();
1121    status.setStatus("Assign meta replicas");
1122    MasterMetaBootstrap metaBootstrap = createMetaBootstrap();
1123    metaBootstrap.assignMetaReplicas();
1124    status.setStatus("Starting quota manager");
1125    initQuotaManager();
1126    if (QuotaUtil.isQuotaEnabled(conf)) {
1127      // Create the quota snapshot notifier
1128      spaceQuotaSnapshotNotifier = createQuotaSnapshotNotifier();
1129      spaceQuotaSnapshotNotifier.initialize(getConnection());
1130      this.quotaObserverChore = new QuotaObserverChore(this, getMasterMetrics());
1131      // Start the chore to read the region FS space reports and act on them
1132      getChoreService().scheduleChore(quotaObserverChore);
1133
1134      this.snapshotQuotaChore = new SnapshotQuotaObserverChore(this, getMasterMetrics());
1135      // Start the chore to read snapshots and add their usage to table/NS quotas
1136      getChoreService().scheduleChore(snapshotQuotaChore);
1137    }
1138
1139    // clear the dead servers with same host name and port of online server because we are not
1140    // removing dead server with same hostname and port of rs which is trying to check in before
1141    // master initialization. See HBASE-5916.
1142    this.serverManager.clearDeadServersWithSameHostNameAndPortOfOnlineServer();
1143
1144    // Check and set the znode ACLs if needed in case we are overtaking a non-secure configuration
1145    status.setStatus("Checking ZNode ACLs");
1146    zooKeeper.checkAndSetZNodeAcls();
1147
1148    status.setStatus("Initializing MOB Cleaner");
1149    initMobCleaner();
1150
1151    status.setStatus("Calling postStartMaster coprocessors");
1152    if (this.cpHost != null) {
1153      // don't let cp initialization errors kill the master
1154      try {
1155        this.cpHost.postStartMaster();
1156      } catch (IOException ioe) {
1157        LOG.error("Coprocessor postStartMaster() hook failed", ioe);
1158      }
1159    }
1160
1161    zombieDetector.interrupt();
1162
1163    /*
1164     * After master has started up, lets do balancer post startup initialization. Since this runs
1165     * in activeMasterManager thread, it should be fine.
1166     */
1167    long start = System.currentTimeMillis();
1168    this.balancer.postMasterStartupInitialize();
1169    if (LOG.isDebugEnabled()) {
1170      LOG.debug("Balancer post startup initialization complete, took " + (
1171          (System.currentTimeMillis() - start) / 1000) + " seconds");
1172    }
1173  }
1174
1175  /**
1176   * Check hbase:meta is up and ready for reading. For use during Master startup only.
1177   * @return True if meta is UP and online and startup can progress. Otherwise, meta is not online
1178   *   and we will hold here until operator intervention.
1179   */
1180  @VisibleForTesting
1181  public boolean waitForMetaOnline() throws InterruptedException {
1182    return isRegionOnline(RegionInfoBuilder.FIRST_META_REGIONINFO);
1183  }
1184
1185  /**
1186   * @return True if region is online and scannable else false if an error or shutdown (Otherwise
1187   *   we just block in here holding up all forward-progess).
1188   */
1189  private boolean isRegionOnline(RegionInfo ri) throws InterruptedException {
1190    RetryCounter rc = null;
1191    while (!isStopped()) {
1192      RegionState rs = this.assignmentManager.getRegionStates().getRegionState(ri);
1193      if (rs.isOpened()) {
1194        if (this.getServerManager().isServerOnline(rs.getServerName())) {
1195          return true;
1196        }
1197      }
1198      // Region is not OPEN.
1199      Optional<Procedure<MasterProcedureEnv>> optProc = this.procedureExecutor.getProcedures().
1200          stream().filter(p -> p instanceof ServerCrashProcedure).findAny();
1201      // TODO: Add a page to refguide on how to do repair. Have this log message point to it.
1202      // Page will talk about loss of edits, how to schedule at least the meta WAL recovery, and
1203      // then how to assign including how to break region lock if one held.
1204      LOG.warn("{} is NOT online; state={}; ServerCrashProcedures={}. Master startup cannot " +
1205          "progress, in holding-pattern until region onlined.",
1206          ri.getRegionNameAsString(), rs, optProc.isPresent());
1207      // Check once-a-minute.
1208      if (rc == null) {
1209        rc = new RetryCounterFactory(1000).create();
1210      }
1211      Threads.sleep(rc.getBackoffTimeAndIncrementAttempts());
1212    }
1213    return false;
1214  }
1215
1216  /**
1217   * Check hbase:namespace table is assigned. If not, startup will hang looking for the ns table
1218   * <p/>
1219   * This is for rolling upgrading, later we will migrate the data in ns table to the ns family of
1220   * meta table. And if this is a new clsuter, this method will return immediately as there will be
1221   * no namespace table/region.
1222   * @return True if namespace table is up/online.
1223   */
1224  private boolean waitForNamespaceOnline() throws InterruptedException, IOException {
1225    TableState nsTableState =
1226      MetaTableAccessor.getTableState(getConnection(), TableName.NAMESPACE_TABLE_NAME);
1227    if (nsTableState == null || nsTableState.isDisabled()) {
1228      // this means we have already migrated the data and disabled or deleted the namespace table,
1229      // or this is a new depliy which does not have a namespace table from the beginning.
1230      return true;
1231    }
1232    List<RegionInfo> ris =
1233      this.assignmentManager.getRegionStates().getRegionsOfTable(TableName.NAMESPACE_TABLE_NAME);
1234    if (ris.isEmpty()) {
1235      // maybe this will not happen any more, but anyway, no harm to add a check here...
1236      return true;
1237    }
1238    // Else there are namespace regions up in meta. Ensure they are assigned before we go on.
1239    for (RegionInfo ri : ris) {
1240      isRegionOnline(ri);
1241    }
1242    return true;
1243  }
1244
1245  /**
1246   * Adds the {@code MasterQuotasObserver} to the list of configured Master observers to
1247   * automatically remove quotas for a table when that table is deleted.
1248   */
1249  @VisibleForTesting
1250  public void updateConfigurationForQuotasObserver(Configuration conf) {
1251    // We're configured to not delete quotas on table deletion, so we don't need to add the obs.
1252    if (!conf.getBoolean(
1253          MasterQuotasObserver.REMOVE_QUOTA_ON_TABLE_DELETE,
1254          MasterQuotasObserver.REMOVE_QUOTA_ON_TABLE_DELETE_DEFAULT)) {
1255      return;
1256    }
1257    String[] masterCoprocs = conf.getStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY);
1258    final int length = null == masterCoprocs ? 0 : masterCoprocs.length;
1259    String[] updatedCoprocs = new String[length + 1];
1260    if (length > 0) {
1261      System.arraycopy(masterCoprocs, 0, updatedCoprocs, 0, masterCoprocs.length);
1262    }
1263    updatedCoprocs[length] = MasterQuotasObserver.class.getName();
1264    conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, updatedCoprocs);
1265  }
1266
1267  private void initMobCleaner() {
1268    this.expiredMobFileCleanerChore = new ExpiredMobFileCleanerChore(this);
1269    getChoreService().scheduleChore(expiredMobFileCleanerChore);
1270
1271    int mobCompactionPeriod = conf.getInt(MobConstants.MOB_COMPACTION_CHORE_PERIOD,
1272        MobConstants.DEFAULT_MOB_COMPACTION_CHORE_PERIOD);
1273    this.mobCompactChore = new MobCompactionChore(this, mobCompactionPeriod);
1274    getChoreService().scheduleChore(mobCompactChore);
1275    this.mobCompactThread = new MasterMobCompactionThread(this);
1276  }
1277
1278  /**
1279   * <p>
1280   * Create a {@link MasterMetaBootstrap} instance.
1281   * </p>
1282   * <p>
1283   * Will be overridden in tests.
1284   * </p>
1285   */
1286  @VisibleForTesting
1287  protected MasterMetaBootstrap createMetaBootstrap() {
1288    // We put this out here in a method so can do a Mockito.spy and stub it out
1289    // w/ a mocked up MasterMetaBootstrap.
1290    return new MasterMetaBootstrap(this);
1291  }
1292
1293  /**
1294   * <p>
1295   * Create a {@link ServerManager} instance.
1296   * </p>
1297   * <p>
1298   * Will be overridden in tests.
1299   * </p>
1300   */
1301  @VisibleForTesting
1302  protected ServerManager createServerManager(final MasterServices master) throws IOException {
1303    // We put this out here in a method so can do a Mockito.spy and stub it out
1304    // w/ a mocked up ServerManager.
1305    setupClusterConnection();
1306    return new ServerManager(master);
1307  }
1308
1309  private void waitForRegionServers(final MonitoredTask status)
1310      throws IOException, InterruptedException {
1311    this.serverManager.waitForRegionServers(status);
1312  }
1313
1314  // Will be overridden in tests
1315  @VisibleForTesting
1316  protected void initClusterSchemaService() throws IOException, InterruptedException {
1317    this.clusterSchemaService = new ClusterSchemaServiceImpl(this);
1318    this.clusterSchemaService.startAsync();
1319    try {
1320      this.clusterSchemaService.awaitRunning(getConfiguration().getInt(
1321        HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS,
1322        DEFAULT_HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS), TimeUnit.SECONDS);
1323    } catch (TimeoutException toe) {
1324      throw new IOException("Timedout starting ClusterSchemaService", toe);
1325    }
1326  }
1327
1328  private void initQuotaManager() throws IOException {
1329    MasterQuotaManager quotaManager = new MasterQuotaManager(this);
1330    quotaManager.start();
1331    this.quotaManager = quotaManager;
1332  }
1333
1334  private SpaceQuotaSnapshotNotifier createQuotaSnapshotNotifier() {
1335    SpaceQuotaSnapshotNotifier notifier =
1336        SpaceQuotaSnapshotNotifierFactory.getInstance().create(getConfiguration());
1337    return notifier;
1338  }
1339
1340  boolean isCatalogJanitorEnabled() {
1341    return catalogJanitorChore != null ?
1342      catalogJanitorChore.getEnabled() : false;
1343  }
1344
1345  boolean isCleanerChoreEnabled() {
1346    boolean hfileCleanerFlag = true, logCleanerFlag = true;
1347
1348    if (hfileCleaner != null) {
1349      hfileCleanerFlag = hfileCleaner.getEnabled();
1350    }
1351
1352    if (logCleaner != null) {
1353      logCleanerFlag = logCleaner.getEnabled();
1354    }
1355
1356    return (hfileCleanerFlag && logCleanerFlag);
1357  }
1358
1359  @Override
1360  public ServerManager getServerManager() {
1361    return this.serverManager;
1362  }
1363
1364  @Override
1365  public MasterFileSystem getMasterFileSystem() {
1366    return this.fileSystemManager;
1367  }
1368
1369  @Override
1370  public MasterWalManager getMasterWalManager() {
1371    return this.walManager;
1372  }
1373
1374  @Override
1375  public SplitWALManager getSplitWALManager() {
1376    return splitWALManager;
1377  }
1378
1379  @Override
1380  public TableStateManager getTableStateManager() {
1381    return tableStateManager;
1382  }
1383
1384  /*
1385   * Start up all services. If any of these threads gets an unhandled exception
1386   * then they just die with a logged message.  This should be fine because
1387   * in general, we do not expect the master to get such unhandled exceptions
1388   *  as OOMEs; it should be lightly loaded. See what HRegionServer does if
1389   *  need to install an unexpected exception handler.
1390   */
1391  private void startServiceThreads() throws IOException {
1392    // Start the executor service pools
1393    this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION, conf.getInt(
1394      HConstants.MASTER_OPEN_REGION_THREADS, HConstants.MASTER_OPEN_REGION_THREADS_DEFAULT));
1395    this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION, conf.getInt(
1396      HConstants.MASTER_CLOSE_REGION_THREADS, HConstants.MASTER_CLOSE_REGION_THREADS_DEFAULT));
1397    this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
1398      conf.getInt(HConstants.MASTER_SERVER_OPERATIONS_THREADS,
1399        HConstants.MASTER_SERVER_OPERATIONS_THREADS_DEFAULT));
1400    this.executorService.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS,
1401      conf.getInt(HConstants.MASTER_META_SERVER_OPERATIONS_THREADS,
1402        HConstants.MASTER_META_SERVER_OPERATIONS_THREADS_DEFAULT));
1403    this.executorService.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS, conf.getInt(
1404      HConstants.MASTER_LOG_REPLAY_OPS_THREADS, HConstants.MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT));
1405    this.executorService.startExecutorService(ExecutorType.MASTER_SNAPSHOT_OPERATIONS, conf.getInt(
1406      SnapshotManager.SNAPSHOT_POOL_THREADS_KEY, SnapshotManager.SNAPSHOT_POOL_THREADS_DEFAULT));
1407
1408    // We depend on there being only one instance of this executor running
1409    // at a time. To do concurrency, would need fencing of enable/disable of
1410    // tables.
1411    // Any time changing this maxThreads to > 1, pls see the comment at
1412    // AccessController#postCompletedCreateTableAction
1413    this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
1414    startProcedureExecutor();
1415
1416    // Start log cleaner thread
1417    int cleanerInterval =
1418      conf.getInt(HBASE_MASTER_CLEANER_INTERVAL, DEFAULT_HBASE_MASTER_CLEANER_INTERVAL);
1419    this.logCleaner = new LogCleaner(cleanerInterval, this, conf,
1420      getMasterWalManager().getFileSystem(), getMasterWalManager().getOldLogDir(), cleanerPool);
1421    getChoreService().scheduleChore(logCleaner);
1422
1423    // start the hfile archive cleaner thread
1424    Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
1425    Map<String, Object> params = new HashMap<>();
1426    params.put(MASTER, this);
1427    this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf,
1428      getMasterFileSystem().getFileSystem(), archiveDir, cleanerPool, params);
1429    getChoreService().scheduleChore(hfileCleaner);
1430
1431    // Regions Reopen based on very high storeFileRefCount is considered enabled
1432    // only if hbase.regions.recovery.store.file.ref.count has value > 0
1433    final int maxStoreFileRefCount = conf.getInt(
1434      HConstants.STORE_FILE_REF_COUNT_THRESHOLD,
1435      HConstants.DEFAULT_STORE_FILE_REF_COUNT_THRESHOLD);
1436    if (maxStoreFileRefCount > 0) {
1437      this.regionsRecoveryChore = new RegionsRecoveryChore(this, conf, this);
1438      getChoreService().scheduleChore(this.regionsRecoveryChore);
1439    } else {
1440      LOG.info("Reopening regions with very high storeFileRefCount is disabled. " +
1441          "Provide threshold value > 0 for {} to enable it.",
1442        HConstants.STORE_FILE_REF_COUNT_THRESHOLD);
1443    }
1444
1445    this.regionsRecoveryConfigManager = new RegionsRecoveryConfigManager(this);
1446
1447    replicationBarrierCleaner = new ReplicationBarrierCleaner(conf, this, getConnection(),
1448      replicationPeerManager);
1449    getChoreService().scheduleChore(replicationBarrierCleaner);
1450
1451    final boolean isSnapshotChoreEnabled = this.snapshotCleanupTracker
1452        .isSnapshotCleanupEnabled();
1453    this.snapshotCleanerChore = new SnapshotCleanerChore(this, conf, getSnapshotManager());
1454    if (isSnapshotChoreEnabled) {
1455      getChoreService().scheduleChore(this.snapshotCleanerChore);
1456    } else {
1457      if (LOG.isTraceEnabled()) {
1458        LOG.trace("Snapshot Cleaner Chore is disabled. Not starting up the chore..");
1459      }
1460    }
1461    serviceStarted = true;
1462    if (LOG.isTraceEnabled()) {
1463      LOG.trace("Started service threads");
1464    }
1465  }
1466
1467  @Override
1468  protected void stopServiceThreads() {
1469    if (masterJettyServer != null) {
1470      LOG.info("Stopping master jetty server");
1471      try {
1472        masterJettyServer.stop();
1473      } catch (Exception e) {
1474        LOG.error("Failed to stop master jetty server", e);
1475      }
1476    }
1477    stopChores();
1478    if (this.mobCompactThread != null) {
1479      this.mobCompactThread.close();
1480    }
1481    super.stopServiceThreads();
1482    if (cleanerPool != null) {
1483      cleanerPool.shutdownNow();
1484      cleanerPool = null;
1485    }
1486
1487    LOG.debug("Stopping service threads");
1488
1489    if (this.quotaManager != null) {
1490      this.quotaManager.stop();
1491    }
1492
1493    if (this.activeMasterManager != null) {
1494      this.activeMasterManager.stop();
1495    }
1496    if (this.serverManager != null) {
1497      this.serverManager.stop();
1498    }
1499    if (this.assignmentManager != null) {
1500      this.assignmentManager.stop();
1501    }
1502
1503    stopProcedureExecutor();
1504
1505    if (this.walManager != null) {
1506      this.walManager.stop();
1507    }
1508    if (this.fileSystemManager != null) {
1509      this.fileSystemManager.stop();
1510    }
1511    if (this.mpmHost != null) {
1512      this.mpmHost.stop("server shutting down.");
1513    }
1514    if (this.regionServerTracker != null) {
1515      this.regionServerTracker.stop();
1516    }
1517  }
1518
1519  private void createProcedureExecutor() throws IOException {
1520    MasterProcedureEnv procEnv = new MasterProcedureEnv(this);
1521    // Create cleaner thread pool
1522    cleanerPool = new DirScanPool(conf);
1523    procedureStore = new RegionProcedureStore(this, cleanerPool,
1524      new MasterProcedureEnv.FsUtilsLeaseRecovery(this));
1525    procedureStore.registerListener(new ProcedureStoreListener() {
1526
1527      @Override
1528      public void abortProcess() {
1529        abort("The Procedure Store lost the lease", null);
1530      }
1531    });
1532    MasterProcedureScheduler procedureScheduler = procEnv.getProcedureScheduler();
1533    procedureExecutor = new ProcedureExecutor<>(conf, procEnv, procedureStore, procedureScheduler);
1534    configurationManager.registerObserver(procEnv);
1535
1536    int cpus = Runtime.getRuntime().availableProcessors();
1537    final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, Math.max(
1538      (cpus > 0 ? cpus / 4 : 0), MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS));
1539    final boolean abortOnCorruption =
1540      conf.getBoolean(MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION,
1541        MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION);
1542    procedureStore.start(numThreads);
1543    // Just initialize it but do not start the workers, we will start the workers later by calling
1544    // startProcedureExecutor. See the javadoc for finishActiveMasterInitialization for more
1545    // details.
1546    procedureExecutor.init(numThreads, abortOnCorruption);
1547    procEnv.getRemoteDispatcher().start();
1548  }
1549
1550  private void startProcedureExecutor() throws IOException {
1551    procedureExecutor.startWorkers();
1552  }
1553
1554  /**
1555   * Turn on/off Snapshot Cleanup Chore
1556   *
1557   * @param on indicates whether Snapshot Cleanup Chore is to be run
1558   */
1559  void switchSnapshotCleanup(final boolean on, final boolean synchronous) {
1560    if (synchronous) {
1561      synchronized (this.snapshotCleanerChore) {
1562        switchSnapshotCleanup(on);
1563      }
1564    } else {
1565      switchSnapshotCleanup(on);
1566    }
1567  }
1568
1569  private void switchSnapshotCleanup(final boolean on) {
1570    try {
1571      snapshotCleanupTracker.setSnapshotCleanupEnabled(on);
1572      if (on) {
1573        if (!getChoreService().isChoreScheduled(this.snapshotCleanerChore)) {
1574          getChoreService().scheduleChore(this.snapshotCleanerChore);
1575        }
1576      } else {
1577        getChoreService().cancelChore(this.snapshotCleanerChore);
1578      }
1579    } catch (KeeperException e) {
1580      LOG.error("Error updating snapshot cleanup mode to {}", on, e);
1581    }
1582  }
1583
1584
1585  private void stopProcedureExecutor() {
1586    if (procedureExecutor != null) {
1587      configurationManager.deregisterObserver(procedureExecutor.getEnvironment());
1588      procedureExecutor.getEnvironment().getRemoteDispatcher().stop();
1589      procedureExecutor.stop();
1590      procedureExecutor.join();
1591      procedureExecutor = null;
1592    }
1593
1594    if (procedureStore != null) {
1595      procedureStore.stop(isAborted());
1596      procedureStore = null;
1597    }
1598  }
1599
1600  private void stopChores() {
1601    ChoreService choreService = getChoreService();
1602    if (choreService != null) {
1603      choreService.cancelChore(this.expiredMobFileCleanerChore);
1604      choreService.cancelChore(this.mobCompactChore);
1605      choreService.cancelChore(this.balancerChore);
1606      choreService.cancelChore(this.normalizerChore);
1607      choreService.cancelChore(this.clusterStatusChore);
1608      choreService.cancelChore(this.catalogJanitorChore);
1609      choreService.cancelChore(this.clusterStatusPublisherChore);
1610      choreService.cancelChore(this.snapshotQuotaChore);
1611      choreService.cancelChore(this.logCleaner);
1612      choreService.cancelChore(this.hfileCleaner);
1613      choreService.cancelChore(this.replicationBarrierCleaner);
1614      choreService.cancelChore(this.snapshotCleanerChore);
1615      choreService.cancelChore(this.hbckChore);
1616      choreService.cancelChore(this.regionsRecoveryChore);
1617    }
1618  }
1619
1620  /**
1621   * @return Get remote side's InetAddress
1622   */
1623  InetAddress getRemoteInetAddress(final int port,
1624      final long serverStartCode) throws UnknownHostException {
1625    // Do it out here in its own little method so can fake an address when
1626    // mocking up in tests.
1627    InetAddress ia = RpcServer.getRemoteIp();
1628
1629    // The call could be from the local regionserver,
1630    // in which case, there is no remote address.
1631    if (ia == null && serverStartCode == startcode) {
1632      InetSocketAddress isa = rpcServices.getSocketAddress();
1633      if (isa != null && isa.getPort() == port) {
1634        ia = isa.getAddress();
1635      }
1636    }
1637    return ia;
1638  }
1639
1640  /**
1641   * @return Maximum time we should run balancer for
1642   */
1643  private int getMaxBalancingTime() {
1644    // if max balancing time isn't set, defaulting it to period time
1645    int maxBalancingTime = getConfiguration().getInt(HConstants.HBASE_BALANCER_MAX_BALANCING,
1646      getConfiguration()
1647        .getInt(HConstants.HBASE_BALANCER_PERIOD, HConstants.DEFAULT_HBASE_BALANCER_PERIOD));
1648    return maxBalancingTime;
1649  }
1650
1651  /**
1652   * @return Maximum number of regions in transition
1653   */
1654  private int getMaxRegionsInTransition() {
1655    int numRegions = this.assignmentManager.getRegionStates().getRegionAssignments().size();
1656    return Math.max((int) Math.floor(numRegions * this.maxRitPercent), 1);
1657  }
1658
1659  /**
1660   * It first sleep to the next balance plan start time. Meanwhile, throttling by the max
1661   * number regions in transition to protect availability.
1662   * @param nextBalanceStartTime The next balance plan start time
1663   * @param maxRegionsInTransition max number of regions in transition
1664   * @param cutoffTime when to exit balancer
1665   */
1666  private void balanceThrottling(long nextBalanceStartTime, int maxRegionsInTransition,
1667      long cutoffTime) {
1668    boolean interrupted = false;
1669
1670    // Sleep to next balance plan start time
1671    // But if there are zero regions in transition, it can skip sleep to speed up.
1672    while (!interrupted && System.currentTimeMillis() < nextBalanceStartTime
1673        && this.assignmentManager.getRegionStates().hasRegionsInTransition()) {
1674      try {
1675        Thread.sleep(100);
1676      } catch (InterruptedException ie) {
1677        interrupted = true;
1678      }
1679    }
1680
1681    // Throttling by max number regions in transition
1682    while (!interrupted
1683        && maxRegionsInTransition > 0
1684        && this.assignmentManager.getRegionStates().getRegionsInTransitionCount()
1685        >= maxRegionsInTransition && System.currentTimeMillis() <= cutoffTime) {
1686      try {
1687        // sleep if the number of regions in transition exceeds the limit
1688        Thread.sleep(100);
1689      } catch (InterruptedException ie) {
1690        interrupted = true;
1691      }
1692    }
1693
1694    if (interrupted) Thread.currentThread().interrupt();
1695  }
1696
1697  public boolean balance() throws IOException {
1698    return balance(false);
1699  }
1700
1701  public boolean balance(boolean force) throws IOException {
1702    // if master not initialized, don't run balancer.
1703    if (!isInitialized()) {
1704      LOG.debug("Master has not been initialized, don't run balancer.");
1705      return false;
1706    }
1707
1708    if (isInMaintenanceMode()) {
1709      LOG.info("Master is in maintenanceMode mode, don't run balancer.");
1710      return false;
1711    }
1712
1713    synchronized (this.balancer) {
1714      // If balance not true, don't run balancer.
1715      if (!this.loadBalancerTracker.isBalancerOn()) return false;
1716        // Only allow one balance run at at time.
1717      if (this.assignmentManager.hasRegionsInTransition()) {
1718        List<RegionStateNode> regionsInTransition = assignmentManager.getRegionsInTransition();
1719        // if hbase:meta region is in transition, result of assignment cannot be recorded
1720        // ignore the force flag in that case
1721        boolean metaInTransition = assignmentManager.isMetaRegionInTransition();
1722        String prefix = force && !metaInTransition ? "R" : "Not r";
1723        List<RegionStateNode> toPrint = regionsInTransition;
1724        int max = 5;
1725        boolean truncated = false;
1726        if (regionsInTransition.size() > max) {
1727          toPrint = regionsInTransition.subList(0, max);
1728          truncated = true;
1729        }
1730        LOG.info(prefix + "unning balancer because " + regionsInTransition.size() +
1731          " region(s) in transition: " + toPrint + (truncated? "(truncated list)": ""));
1732        if (!force || metaInTransition) return false;
1733      }
1734      if (this.serverManager.areDeadServersInProgress()) {
1735        LOG.info("Not running balancer because processing dead regionserver(s): " +
1736          this.serverManager.getDeadServers());
1737        return false;
1738      }
1739
1740      if (this.cpHost != null) {
1741        try {
1742          if (this.cpHost.preBalance()) {
1743            LOG.debug("Coprocessor bypassing balancer request");
1744            return false;
1745          }
1746        } catch (IOException ioe) {
1747          LOG.error("Error invoking master coprocessor preBalance()", ioe);
1748          return false;
1749        }
1750      }
1751
1752      boolean isByTable = getConfiguration().getBoolean("hbase.master.loadbalance.bytable", false);
1753      Map<TableName, Map<ServerName, List<RegionInfo>>> assignments =
1754        this.assignmentManager.getRegionStates()
1755          .getAssignmentsForBalancer(tableStateManager, this.serverManager.getOnlineServersList(),
1756            isByTable);
1757      for (Map<ServerName, List<RegionInfo>> serverMap : assignments.values()) {
1758        serverMap.keySet().removeAll(this.serverManager.getDrainingServersList());
1759      }
1760
1761      //Give the balancer the current cluster state.
1762      this.balancer.setClusterMetrics(getClusterMetricsWithoutCoprocessor());
1763      this.balancer.setClusterLoad(assignments);
1764
1765      List<RegionPlan> plans = new ArrayList<>();
1766      for (Entry<TableName, Map<ServerName, List<RegionInfo>>> e : assignments.entrySet()) {
1767        List<RegionPlan> partialPlans = this.balancer.balanceCluster(e.getKey(), e.getValue());
1768        if (partialPlans != null) {
1769          plans.addAll(partialPlans);
1770        }
1771      }
1772
1773      List<RegionPlan> sucRPs = executeRegionPlansWithThrottling(plans);
1774
1775      if (this.cpHost != null) {
1776        try {
1777          this.cpHost.postBalance(sucRPs);
1778        } catch (IOException ioe) {
1779          // balancing already succeeded so don't change the result
1780          LOG.error("Error invoking master coprocessor postBalance()", ioe);
1781        }
1782      }
1783    }
1784    // If LoadBalancer did not generate any plans, it means the cluster is already balanced.
1785    // Return true indicating a success.
1786    return true;
1787  }
1788
1789  public List<RegionPlan> executeRegionPlansWithThrottling(List<RegionPlan> plans) {
1790    List<RegionPlan> sucRPs = new ArrayList<>();
1791    int maxRegionsInTransition = getMaxRegionsInTransition();
1792    long balanceStartTime = System.currentTimeMillis();
1793    long cutoffTime = balanceStartTime + this.maxBlancingTime;
1794    int rpCount = 0;  // number of RegionPlans balanced so far
1795    if (plans != null && !plans.isEmpty()) {
1796      int balanceInterval = this.maxBlancingTime / plans.size();
1797      LOG.info("Balancer plans size is " + plans.size() + ", the balance interval is "
1798          + balanceInterval + " ms, and the max number regions in transition is "
1799          + maxRegionsInTransition);
1800
1801      for (RegionPlan plan: plans) {
1802        LOG.info("balance " + plan);
1803        //TODO: bulk assign
1804        try {
1805          this.assignmentManager.moveAsync(plan);
1806        } catch (HBaseIOException hioe) {
1807          //should ignore failed plans here, avoiding the whole balance plans be aborted
1808          //later calls of balance() can fetch up the failed and skipped plans
1809          LOG.warn("Failed balance plan {}, skipping...", plan, hioe);
1810        }
1811        //rpCount records balance plans processed, does not care if a plan succeeds
1812        rpCount++;
1813
1814        if (this.maxBlancingTime > 0) {
1815          balanceThrottling(balanceStartTime + rpCount * balanceInterval, maxRegionsInTransition,
1816            cutoffTime);
1817        }
1818
1819        // if performing next balance exceeds cutoff time, exit the loop
1820        if (this.maxBlancingTime > 0 && rpCount < plans.size()
1821          && System.currentTimeMillis() > cutoffTime) {
1822          // TODO: After balance, there should not be a cutoff time (keeping it as
1823          // a security net for now)
1824          LOG.debug("No more balancing till next balance run; maxBalanceTime="
1825              + this.maxBlancingTime);
1826          break;
1827        }
1828      }
1829    }
1830    return sucRPs;
1831  }
1832
1833  @Override
1834  @VisibleForTesting
1835  public RegionNormalizer getRegionNormalizer() {
1836    return this.normalizer;
1837  }
1838
1839  /**
1840   * Perform normalization of cluster (invoked by {@link RegionNormalizerChore}).
1841   *
1842   * @return true if normalization step was performed successfully, false otherwise
1843   *    (specifically, if HMaster hasn't been initialized properly or normalization
1844   *    is globally disabled)
1845   */
1846  public boolean normalizeRegions() throws IOException {
1847    if (!isInitialized()) {
1848      LOG.debug("Master has not been initialized, don't run region normalizer.");
1849      return false;
1850    }
1851    if (this.getServerManager().isClusterShutdown()) {
1852      LOG.info("Cluster is shutting down, don't run region normalizer.");
1853      return false;
1854    }
1855    if (isInMaintenanceMode()) {
1856      LOG.info("Master is in maintenance mode, don't run region normalizer.");
1857      return false;
1858    }
1859    if (!this.regionNormalizerTracker.isNormalizerOn()) {
1860      LOG.debug("Region normalization is disabled, don't run region normalizer.");
1861      return false;
1862    }
1863
1864    synchronized (this.normalizer) {
1865      // Don't run the normalizer concurrently
1866      List<TableName> allEnabledTables = new ArrayList<>(
1867        this.tableStateManager.getTablesInStates(TableState.State.ENABLED));
1868
1869      Collections.shuffle(allEnabledTables);
1870
1871      for (TableName table : allEnabledTables) {
1872        if (isInMaintenanceMode()) {
1873          LOG.debug("Master is in maintenance mode, stop running region normalizer.");
1874          return false;
1875        }
1876
1877        TableDescriptor tblDesc = getTableDescriptors().get(table);
1878        if (table.isSystemTable() || (tblDesc != null &&
1879            !tblDesc.isNormalizationEnabled())) {
1880          LOG.trace("Skipping normalization for {}, as it's either system"
1881              + " table or doesn't have auto normalization turned on", table);
1882          continue;
1883        }
1884        List<NormalizationPlan> plans = this.normalizer.computePlanForTable(table);
1885        if (plans != null) {
1886          for (NormalizationPlan plan : plans) {
1887            plan.execute(asyncClusterConnection.toConnection().getAdmin());
1888            if (plan.getType() == PlanType.SPLIT) {
1889              splitPlanCount++;
1890            } else if (plan.getType() == PlanType.MERGE) {
1891              mergePlanCount++;
1892            }
1893          }
1894        }
1895      }
1896    }
1897    // If Region did not generate any plans, it means the cluster is already balanced.
1898    // Return true indicating a success.
1899    return true;
1900  }
1901
1902  /**
1903   * @return Client info for use as prefix on an audit log string; who did an action
1904   */
1905  @Override
1906  public String getClientIdAuditPrefix() {
1907    return "Client=" + RpcServer.getRequestUserName().orElse(null)
1908        + "/" + RpcServer.getRemoteAddress().orElse(null);
1909  }
1910
1911  /**
1912   * Switch for the background CatalogJanitor thread.
1913   * Used for testing.  The thread will continue to run.  It will just be a noop
1914   * if disabled.
1915   * @param b If false, the catalog janitor won't do anything.
1916   */
1917  public void setCatalogJanitorEnabled(final boolean b) {
1918    this.catalogJanitorChore.setEnabled(b);
1919  }
1920
1921  @Override
1922  public long mergeRegions(
1923      final RegionInfo[] regionsToMerge,
1924      final boolean forcible,
1925      final long ng,
1926      final long nonce) throws IOException {
1927    checkInitialized();
1928
1929    final String mergeRegionsStr = Arrays.stream(regionsToMerge).
1930      map(r -> RegionInfo.getShortNameToLog(r)).collect(Collectors.joining(", "));
1931    return MasterProcedureUtil.submitProcedure(new NonceProcedureRunnable(this, ng, nonce) {
1932      @Override
1933      protected void run() throws IOException {
1934        getMaster().getMasterCoprocessorHost().preMergeRegions(regionsToMerge);
1935        String aid = getClientIdAuditPrefix();
1936        LOG.info("{} merge regions {}", aid, mergeRegionsStr);
1937        submitProcedure(new MergeTableRegionsProcedure(procedureExecutor.getEnvironment(),
1938            regionsToMerge, forcible));
1939        getMaster().getMasterCoprocessorHost().postMergeRegions(regionsToMerge);
1940      }
1941
1942      @Override
1943      protected String getDescription() {
1944        return "MergeTableProcedure";
1945      }
1946    });
1947  }
1948
1949  @Override
1950  public long splitRegion(final RegionInfo regionInfo, final byte[] splitRow,
1951      final long nonceGroup, final long nonce)
1952  throws IOException {
1953    checkInitialized();
1954    return MasterProcedureUtil.submitProcedure(
1955        new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
1956      @Override
1957      protected void run() throws IOException {
1958        getMaster().getMasterCoprocessorHost().preSplitRegion(regionInfo.getTable(), splitRow);
1959        LOG.info(getClientIdAuditPrefix() + " split " + regionInfo.getRegionNameAsString());
1960
1961        // Execute the operation asynchronously
1962        submitProcedure(getAssignmentManager().createSplitProcedure(regionInfo, splitRow));
1963      }
1964
1965      @Override
1966      protected String getDescription() {
1967        return "SplitTableProcedure";
1968      }
1969    });
1970  }
1971
1972  private void warmUpRegion(ServerName server, RegionInfo region) {
1973    FutureUtils.addListener(asyncClusterConnection.getRegionServerAdmin(server)
1974      .warmupRegion(RequestConverter.buildWarmupRegionRequest(region)), (r, e) -> {
1975        if (e != null) {
1976          LOG.warn("Failed to warm up region {} on server {}", region, server, e);
1977        }
1978      });
1979  }
1980
1981  // Public so can be accessed by tests. Blocks until move is done.
1982  // Replace with an async implementation from which you can get
1983  // a success/failure result.
1984  @VisibleForTesting
1985  public void move(final byte[] encodedRegionName, byte[] destServerName) throws HBaseIOException {
1986    RegionState regionState = assignmentManager.getRegionStates().
1987      getRegionState(Bytes.toString(encodedRegionName));
1988
1989    RegionInfo hri;
1990    if (regionState != null) {
1991      hri = regionState.getRegion();
1992    } else {
1993      throw new UnknownRegionException(Bytes.toStringBinary(encodedRegionName));
1994    }
1995
1996    ServerName dest;
1997    List<ServerName> exclude = hri.getTable().isSystemTable() ? assignmentManager.getExcludedServersForSystemTable()
1998        : new ArrayList<>(1);
1999    if (destServerName != null && exclude.contains(ServerName.valueOf(Bytes.toString(destServerName)))) {
2000      LOG.info(
2001          Bytes.toString(encodedRegionName) + " can not move to " + Bytes.toString(destServerName)
2002              + " because the server is in exclude list");
2003      destServerName = null;
2004    }
2005    if (destServerName == null || destServerName.length == 0) {
2006      LOG.info("Passed destination servername is null/empty so " +
2007        "choosing a server at random");
2008      exclude.add(regionState.getServerName());
2009      final List<ServerName> destServers = this.serverManager.createDestinationServersList(exclude);
2010      dest = balancer.randomAssignment(hri, destServers);
2011      if (dest == null) {
2012        LOG.debug("Unable to determine a plan to assign " + hri);
2013        return;
2014      }
2015    } else {
2016      ServerName candidate = ServerName.valueOf(Bytes.toString(destServerName));
2017      dest = balancer.randomAssignment(hri, Lists.newArrayList(candidate));
2018      if (dest == null) {
2019        LOG.debug("Unable to determine a plan to assign " + hri);
2020        return;
2021      }
2022      // TODO: What is this? I don't get it.
2023      if (dest.equals(serverName) && balancer instanceof BaseLoadBalancer
2024          && !((BaseLoadBalancer)balancer).shouldBeOnMaster(hri)) {
2025        // To avoid unnecessary region moving later by balancer. Don't put user
2026        // regions on master.
2027        LOG.debug("Skipping move of region " + hri.getRegionNameAsString()
2028          + " to avoid unnecessary region moving later by load balancer,"
2029          + " because it should not be on master");
2030        return;
2031      }
2032    }
2033
2034    if (dest.equals(regionState.getServerName())) {
2035      LOG.debug("Skipping move of region " + hri.getRegionNameAsString()
2036        + " because region already assigned to the same server " + dest + ".");
2037      return;
2038    }
2039
2040    // Now we can do the move
2041    RegionPlan rp = new RegionPlan(hri, regionState.getServerName(), dest);
2042    assert rp.getDestination() != null: rp.toString() + " " + dest;
2043
2044    try {
2045      checkInitialized();
2046      if (this.cpHost != null) {
2047        this.cpHost.preMove(hri, rp.getSource(), rp.getDestination());
2048      }
2049
2050      TransitRegionStateProcedure proc =
2051        this.assignmentManager.createMoveRegionProcedure(rp.getRegionInfo(), rp.getDestination());
2052      // Warmup the region on the destination before initiating the move.
2053      // A region server could reject the close request because it either does not
2054      // have the specified region or the region is being split.
2055      warmUpRegion(rp.getDestination(), hri);
2056
2057      LOG.info(getClientIdAuditPrefix() + " move " + rp + ", running balancer");
2058      Future<byte[]> future = ProcedureSyncWait.submitProcedure(this.procedureExecutor, proc);
2059      try {
2060        // Is this going to work? Will we throw exception on error?
2061        // TODO: CompletableFuture rather than this stunted Future.
2062        future.get();
2063      } catch (InterruptedException | ExecutionException e) {
2064        throw new HBaseIOException(e);
2065      }
2066      if (this.cpHost != null) {
2067        this.cpHost.postMove(hri, rp.getSource(), rp.getDestination());
2068      }
2069    } catch (IOException ioe) {
2070      if (ioe instanceof HBaseIOException) {
2071        throw (HBaseIOException)ioe;
2072      }
2073      throw new HBaseIOException(ioe);
2074    }
2075  }
2076
2077  @Override
2078  public long createTable(final TableDescriptor tableDescriptor, final byte[][] splitKeys,
2079      final long nonceGroup, final long nonce) throws IOException {
2080    checkInitialized();
2081    TableDescriptor desc = getMasterCoprocessorHost().preCreateTableRegionsInfos(tableDescriptor);
2082    if (desc == null) {
2083      throw new IOException("Creation for " + tableDescriptor + " is canceled by CP");
2084    }
2085    String namespace = desc.getTableName().getNamespaceAsString();
2086    this.clusterSchemaService.getNamespace(namespace);
2087
2088    RegionInfo[] newRegions = ModifyRegionUtils.createRegionInfos(desc, splitKeys);
2089    TableDescriptorChecker.sanityCheck(conf, desc);
2090
2091    return MasterProcedureUtil
2092      .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2093        @Override
2094        protected void run() throws IOException {
2095          getMaster().getMasterCoprocessorHost().preCreateTable(desc, newRegions);
2096
2097          LOG.info(getClientIdAuditPrefix() + " create " + desc);
2098
2099          // TODO: We can handle/merge duplicate requests, and differentiate the case of
2100          // TableExistsException by saying if the schema is the same or not.
2101          //
2102          // We need to wait for the procedure to potentially fail due to "prepare" sanity
2103          // checks. This will block only the beginning of the procedure. See HBASE-19953.
2104          ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch();
2105          submitProcedure(
2106            new CreateTableProcedure(procedureExecutor.getEnvironment(), desc, newRegions, latch));
2107          latch.await();
2108
2109          getMaster().getMasterCoprocessorHost().postCreateTable(desc, newRegions);
2110        }
2111
2112        @Override
2113        protected String getDescription() {
2114          return "CreateTableProcedure";
2115        }
2116      });
2117  }
2118
2119  @Override
2120  public long createSystemTable(final TableDescriptor tableDescriptor) throws IOException {
2121    if (isStopped()) {
2122      throw new MasterNotRunningException();
2123    }
2124
2125    TableName tableName = tableDescriptor.getTableName();
2126    if (!(tableName.isSystemTable())) {
2127      throw new IllegalArgumentException(
2128        "Only system table creation can use this createSystemTable API");
2129    }
2130
2131    RegionInfo[] newRegions = ModifyRegionUtils.createRegionInfos(tableDescriptor, null);
2132
2133    LOG.info(getClientIdAuditPrefix() + " create " + tableDescriptor);
2134
2135    // This special create table is called locally to master.  Therefore, no RPC means no need
2136    // to use nonce to detect duplicated RPC call.
2137    long procId = this.procedureExecutor.submitProcedure(
2138      new CreateTableProcedure(procedureExecutor.getEnvironment(), tableDescriptor, newRegions));
2139
2140    return procId;
2141  }
2142
2143  private void startActiveMasterManager(int infoPort) throws KeeperException {
2144    String backupZNode = ZNodePaths.joinZNode(
2145      zooKeeper.getZNodePaths().backupMasterAddressesZNode, serverName.toString());
2146    /*
2147    * Add a ZNode for ourselves in the backup master directory since we
2148    * may not become the active master. If so, we want the actual active
2149    * master to know we are backup masters, so that it won't assign
2150    * regions to us if so configured.
2151    *
2152    * If we become the active master later, ActiveMasterManager will delete
2153    * this node explicitly.  If we crash before then, ZooKeeper will delete
2154    * this node for us since it is ephemeral.
2155    */
2156    LOG.info("Adding backup master ZNode " + backupZNode);
2157    if (!MasterAddressTracker.setMasterAddress(zooKeeper, backupZNode, serverName, infoPort)) {
2158      LOG.warn("Failed create of " + backupZNode + " by " + serverName);
2159    }
2160    this.activeMasterManager.setInfoPort(infoPort);
2161    int timeout = conf.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
2162    // If we're a backup master, stall until a primary to write this address
2163    if (conf.getBoolean(HConstants.MASTER_TYPE_BACKUP, HConstants.DEFAULT_MASTER_TYPE_BACKUP)) {
2164      LOG.debug("HMaster started in backup mode. Stalling until master znode is written.");
2165      // This will only be a minute or so while the cluster starts up,
2166      // so don't worry about setting watches on the parent znode
2167      while (!activeMasterManager.hasActiveMaster()) {
2168        LOG.debug("Waiting for master address and cluster state znode to be written.");
2169        Threads.sleep(timeout);
2170      }
2171    }
2172    MonitoredTask status = TaskMonitor.get().createStatus("Master startup");
2173    status.setDescription("Master startup");
2174    try {
2175      if (activeMasterManager.blockUntilBecomingActiveMaster(timeout, status)) {
2176        finishActiveMasterInitialization(status);
2177      }
2178    } catch (Throwable t) {
2179      status.setStatus("Failed to become active: " + t.getMessage());
2180      LOG.error(HBaseMarkers.FATAL, "Failed to become active master", t);
2181      // HBASE-5680: Likely hadoop23 vs hadoop 20.x/1.x incompatibility
2182      if (t instanceof NoClassDefFoundError && t.getMessage().
2183          contains("org/apache/hadoop/hdfs/protocol/HdfsConstants$SafeModeAction")) {
2184        // improved error message for this special case
2185        abort("HBase is having a problem with its Hadoop jars.  You may need to recompile " +
2186          "HBase against Hadoop version " + org.apache.hadoop.util.VersionInfo.getVersion() +
2187          " or change your hadoop jars to start properly", t);
2188      } else {
2189        abort("Unhandled exception. Starting shutdown.", t);
2190      }
2191    } finally {
2192      status.cleanup();
2193    }
2194  }
2195
2196  private static boolean isCatalogTable(final TableName tableName) {
2197    return tableName.equals(TableName.META_TABLE_NAME);
2198  }
2199
2200  @Override
2201  public long deleteTable(
2202      final TableName tableName,
2203      final long nonceGroup,
2204      final long nonce) throws IOException {
2205    checkInitialized();
2206
2207    return MasterProcedureUtil.submitProcedure(
2208        new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2209      @Override
2210      protected void run() throws IOException {
2211        getMaster().getMasterCoprocessorHost().preDeleteTable(tableName);
2212
2213        LOG.info(getClientIdAuditPrefix() + " delete " + tableName);
2214
2215        // TODO: We can handle/merge duplicate request
2216        //
2217        // We need to wait for the procedure to potentially fail due to "prepare" sanity
2218        // checks. This will block only the beginning of the procedure. See HBASE-19953.
2219        ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch();
2220        submitProcedure(new DeleteTableProcedure(procedureExecutor.getEnvironment(),
2221            tableName, latch));
2222        latch.await();
2223
2224        getMaster().getMasterCoprocessorHost().postDeleteTable(tableName);
2225      }
2226
2227      @Override
2228      protected String getDescription() {
2229        return "DeleteTableProcedure";
2230      }
2231    });
2232  }
2233
2234  @Override
2235  public long truncateTable(
2236      final TableName tableName,
2237      final boolean preserveSplits,
2238      final long nonceGroup,
2239      final long nonce) throws IOException {
2240    checkInitialized();
2241
2242    return MasterProcedureUtil.submitProcedure(
2243        new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2244      @Override
2245      protected void run() throws IOException {
2246        getMaster().getMasterCoprocessorHost().preTruncateTable(tableName);
2247
2248        LOG.info(getClientIdAuditPrefix() + " truncate " + tableName);
2249        ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(2, 0);
2250        submitProcedure(new TruncateTableProcedure(procedureExecutor.getEnvironment(),
2251            tableName, preserveSplits, latch));
2252        latch.await();
2253
2254        getMaster().getMasterCoprocessorHost().postTruncateTable(tableName);
2255      }
2256
2257      @Override
2258      protected String getDescription() {
2259        return "TruncateTableProcedure";
2260      }
2261    });
2262  }
2263
2264  @Override
2265  public long addColumn(final TableName tableName, final ColumnFamilyDescriptor column,
2266      final long nonceGroup, final long nonce) throws IOException {
2267    checkInitialized();
2268    checkTableExists(tableName);
2269
2270    return modifyTable(tableName, new TableDescriptorGetter() {
2271
2272      @Override
2273      public TableDescriptor get() throws IOException {
2274        TableDescriptor old = getTableDescriptors().get(tableName);
2275        if (old.hasColumnFamily(column.getName())) {
2276          throw new InvalidFamilyOperationException("Column family '" + column.getNameAsString()
2277              + "' in table '" + tableName + "' already exists so cannot be added");
2278        }
2279
2280        return TableDescriptorBuilder.newBuilder(old).setColumnFamily(column).build();
2281      }
2282    }, nonceGroup, nonce, true);
2283  }
2284
2285  /**
2286   * Implement to return TableDescriptor after pre-checks
2287   */
2288  protected interface TableDescriptorGetter {
2289    TableDescriptor get() throws IOException;
2290  }
2291
2292  @Override
2293  public long modifyColumn(final TableName tableName, final ColumnFamilyDescriptor descriptor,
2294      final long nonceGroup, final long nonce) throws IOException {
2295    checkInitialized();
2296    checkTableExists(tableName);
2297    return modifyTable(tableName, new TableDescriptorGetter() {
2298
2299      @Override
2300      public TableDescriptor get() throws IOException {
2301        TableDescriptor old = getTableDescriptors().get(tableName);
2302        if (!old.hasColumnFamily(descriptor.getName())) {
2303          throw new InvalidFamilyOperationException("Family '" + descriptor.getNameAsString()
2304              + "' does not exist, so it cannot be modified");
2305        }
2306
2307        return TableDescriptorBuilder.newBuilder(old).modifyColumnFamily(descriptor).build();
2308      }
2309    }, nonceGroup, nonce, true);
2310  }
2311
2312  @Override
2313  public long deleteColumn(final TableName tableName, final byte[] columnName,
2314      final long nonceGroup, final long nonce) throws IOException {
2315    checkInitialized();
2316    checkTableExists(tableName);
2317
2318    return modifyTable(tableName, new TableDescriptorGetter() {
2319
2320      @Override
2321      public TableDescriptor get() throws IOException {
2322        TableDescriptor old = getTableDescriptors().get(tableName);
2323
2324        if (!old.hasColumnFamily(columnName)) {
2325          throw new InvalidFamilyOperationException("Family '" + Bytes.toString(columnName)
2326              + "' does not exist, so it cannot be deleted");
2327        }
2328        if (old.getColumnFamilyCount() == 1) {
2329          throw new InvalidFamilyOperationException("Family '" + Bytes.toString(columnName)
2330              + "' is the only column family in the table, so it cannot be deleted");
2331        }
2332        return TableDescriptorBuilder.newBuilder(old).removeColumnFamily(columnName).build();
2333      }
2334    }, nonceGroup, nonce, true);
2335  }
2336
2337  @Override
2338  public long enableTable(final TableName tableName, final long nonceGroup, final long nonce)
2339      throws IOException {
2340    checkInitialized();
2341
2342    return MasterProcedureUtil.submitProcedure(
2343        new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2344      @Override
2345      protected void run() throws IOException {
2346        getMaster().getMasterCoprocessorHost().preEnableTable(tableName);
2347
2348        // Normally, it would make sense for this authorization check to exist inside
2349        // AccessController, but because the authorization check is done based on internal state
2350        // (rather than explicit permissions) we'll do the check here instead of in the
2351        // coprocessor.
2352        MasterQuotaManager quotaManager = getMasterQuotaManager();
2353        if (quotaManager != null) {
2354          if (quotaManager.isQuotaInitialized()) {
2355              SpaceQuotaSnapshot currSnapshotOfTable =
2356                  QuotaTableUtil.getCurrentSnapshotFromQuotaTable(getConnection(), tableName);
2357              if (currSnapshotOfTable != null) {
2358                SpaceQuotaStatus quotaStatus = currSnapshotOfTable.getQuotaStatus();
2359                if (quotaStatus.isInViolation()
2360                    && SpaceViolationPolicy.DISABLE == quotaStatus.getPolicy().orElse(null)) {
2361                throw new AccessDeniedException("Enabling the table '" + tableName
2362                    + "' is disallowed due to a violated space quota.");
2363              }
2364            }
2365          } else if (LOG.isTraceEnabled()) {
2366            LOG.trace("Unable to check for space quotas as the MasterQuotaManager is not enabled");
2367          }
2368        }
2369
2370        LOG.info(getClientIdAuditPrefix() + " enable " + tableName);
2371
2372        // Execute the operation asynchronously - client will check the progress of the operation
2373        // In case the request is from a <1.1 client before returning,
2374        // we want to make sure that the table is prepared to be
2375        // enabled (the table is locked and the table state is set).
2376        // Note: if the procedure throws exception, we will catch it and rethrow.
2377        final ProcedurePrepareLatch prepareLatch = ProcedurePrepareLatch.createLatch();
2378        submitProcedure(new EnableTableProcedure(procedureExecutor.getEnvironment(),
2379            tableName, prepareLatch));
2380        prepareLatch.await();
2381
2382        getMaster().getMasterCoprocessorHost().postEnableTable(tableName);
2383      }
2384
2385      @Override
2386      protected String getDescription() {
2387        return "EnableTableProcedure";
2388      }
2389    });
2390  }
2391
2392  @Override
2393  public long disableTable(final TableName tableName, final long nonceGroup, final long nonce)
2394      throws IOException {
2395    checkInitialized();
2396
2397    return MasterProcedureUtil.submitProcedure(
2398        new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2399      @Override
2400      protected void run() throws IOException {
2401        getMaster().getMasterCoprocessorHost().preDisableTable(tableName);
2402
2403        LOG.info(getClientIdAuditPrefix() + " disable " + tableName);
2404
2405        // Execute the operation asynchronously - client will check the progress of the operation
2406        // In case the request is from a <1.1 client before returning,
2407        // we want to make sure that the table is prepared to be
2408        // enabled (the table is locked and the table state is set).
2409        // Note: if the procedure throws exception, we will catch it and rethrow.
2410        //
2411        // We need to wait for the procedure to potentially fail due to "prepare" sanity
2412        // checks. This will block only the beginning of the procedure. See HBASE-19953.
2413        final ProcedurePrepareLatch prepareLatch = ProcedurePrepareLatch.createBlockingLatch();
2414        submitProcedure(new DisableTableProcedure(procedureExecutor.getEnvironment(),
2415            tableName, false, prepareLatch));
2416        prepareLatch.await();
2417
2418        getMaster().getMasterCoprocessorHost().postDisableTable(tableName);
2419      }
2420
2421      @Override
2422      protected String getDescription() {
2423        return "DisableTableProcedure";
2424      }
2425    });
2426  }
2427
2428  private long modifyTable(final TableName tableName,
2429      final TableDescriptorGetter newDescriptorGetter, final long nonceGroup, final long nonce,
2430      final boolean shouldCheckDescriptor) throws IOException {
2431    return MasterProcedureUtil
2432        .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2433          @Override
2434          protected void run() throws IOException {
2435            TableDescriptor oldDescriptor = getMaster().getTableDescriptors().get(tableName);
2436            TableDescriptor newDescriptor = getMaster().getMasterCoprocessorHost()
2437                .preModifyTable(tableName, oldDescriptor, newDescriptorGetter.get());
2438            TableDescriptorChecker.sanityCheck(conf, newDescriptor);
2439            LOG.info("{} modify table {} from {} to {}", getClientIdAuditPrefix(), tableName,
2440                oldDescriptor, newDescriptor);
2441
2442            // Execute the operation synchronously - wait for the operation completes before
2443            // continuing.
2444            //
2445            // We need to wait for the procedure to potentially fail due to "prepare" sanity
2446            // checks. This will block only the beginning of the procedure. See HBASE-19953.
2447            ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch();
2448            submitProcedure(new ModifyTableProcedure(procedureExecutor.getEnvironment(),
2449                newDescriptor, latch, oldDescriptor, shouldCheckDescriptor));
2450            latch.await();
2451
2452            getMaster().getMasterCoprocessorHost().postModifyTable(tableName, oldDescriptor,
2453              newDescriptor);
2454          }
2455
2456          @Override
2457          protected String getDescription() {
2458            return "ModifyTableProcedure";
2459          }
2460        });
2461
2462  }
2463
2464  @Override
2465  public long modifyTable(final TableName tableName, final TableDescriptor newDescriptor,
2466      final long nonceGroup, final long nonce) throws IOException {
2467    checkInitialized();
2468    return modifyTable(tableName, new TableDescriptorGetter() {
2469      @Override
2470      public TableDescriptor get() throws IOException {
2471        return newDescriptor;
2472      }
2473    }, nonceGroup, nonce, false);
2474
2475  }
2476
2477  public long restoreSnapshot(final SnapshotDescription snapshotDesc,
2478      final long nonceGroup, final long nonce, final boolean restoreAcl) throws IOException {
2479    checkInitialized();
2480    getSnapshotManager().checkSnapshotSupport();
2481
2482    // Ensure namespace exists. Will throw exception if non-known NS.
2483    final TableName dstTable = TableName.valueOf(snapshotDesc.getTable());
2484    getClusterSchema().getNamespace(dstTable.getNamespaceAsString());
2485
2486    return MasterProcedureUtil.submitProcedure(
2487        new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2488      @Override
2489      protected void run() throws IOException {
2490          setProcId(
2491            getSnapshotManager().restoreOrCloneSnapshot(snapshotDesc, getNonceKey(), restoreAcl));
2492      }
2493
2494      @Override
2495      protected String getDescription() {
2496        return "RestoreSnapshotProcedure";
2497      }
2498    });
2499  }
2500
2501  private void checkTableExists(final TableName tableName)
2502      throws IOException, TableNotFoundException {
2503    if (!MetaTableAccessor.tableExists(getConnection(), tableName)) {
2504      throw new TableNotFoundException(tableName);
2505    }
2506  }
2507
2508  @Override
2509  public void checkTableModifiable(final TableName tableName)
2510      throws IOException, TableNotFoundException, TableNotDisabledException {
2511    if (isCatalogTable(tableName)) {
2512      throw new IOException("Can't modify catalog tables");
2513    }
2514    checkTableExists(tableName);
2515    TableState ts = getTableStateManager().getTableState(tableName);
2516    if (!ts.isDisabled()) {
2517      throw new TableNotDisabledException("Not DISABLED; " + ts);
2518    }
2519  }
2520
2521  public ClusterMetrics getClusterMetricsWithoutCoprocessor() throws InterruptedIOException {
2522    return getClusterMetricsWithoutCoprocessor(EnumSet.allOf(Option.class));
2523  }
2524
2525  public ClusterMetrics getClusterMetricsWithoutCoprocessor(EnumSet<Option> options)
2526      throws InterruptedIOException {
2527    ClusterMetricsBuilder builder = ClusterMetricsBuilder.newBuilder();
2528    // given that hbase1 can't submit the request with Option,
2529    // we return all information to client if the list of Option is empty.
2530    if (options.isEmpty()) {
2531      options = EnumSet.allOf(Option.class);
2532    }
2533
2534    for (Option opt : options) {
2535      switch (opt) {
2536        case HBASE_VERSION: builder.setHBaseVersion(VersionInfo.getVersion()); break;
2537        case CLUSTER_ID: builder.setClusterId(getClusterId()); break;
2538        case MASTER: builder.setMasterName(getServerName()); break;
2539        case BACKUP_MASTERS: builder.setBackerMasterNames(getBackupMasters()); break;
2540        case LIVE_SERVERS: {
2541          if (serverManager != null) {
2542            builder.setLiveServerMetrics(serverManager.getOnlineServers().entrySet().stream()
2543              .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())));
2544          }
2545          break;
2546        }
2547        case DEAD_SERVERS: {
2548          if (serverManager != null) {
2549            builder.setDeadServerNames(new ArrayList<>(
2550              serverManager.getDeadServers().copyServerNames()));
2551          }
2552          break;
2553        }
2554        case MASTER_COPROCESSORS: {
2555          if (cpHost != null) {
2556            builder.setMasterCoprocessorNames(Arrays.asList(getMasterCoprocessors()));
2557          }
2558          break;
2559        }
2560        case REGIONS_IN_TRANSITION: {
2561          if (assignmentManager != null) {
2562            builder.setRegionsInTransition(assignmentManager.getRegionStates()
2563                .getRegionsStateInTransition());
2564          }
2565          break;
2566        }
2567        case BALANCER_ON: {
2568          if (loadBalancerTracker != null) {
2569            builder.setBalancerOn(loadBalancerTracker.isBalancerOn());
2570          }
2571          break;
2572        }
2573        case MASTER_INFO_PORT: {
2574          if (infoServer != null) {
2575            builder.setMasterInfoPort(infoServer.getPort());
2576          }
2577          break;
2578        }
2579        case SERVERS_NAME: {
2580          if (serverManager != null) {
2581            builder.setServerNames(serverManager.getOnlineServersList());
2582          }
2583          break;
2584        }
2585        case TABLE_TO_REGIONS_COUNT: {
2586          if (isActiveMaster() && isInitialized() && assignmentManager != null) {
2587            try {
2588              Map<TableName, RegionStatesCount> tableRegionStatesCountMap = new HashMap<>();
2589              Map<String, TableDescriptor> tableDescriptorMap = getTableDescriptors().getAll();
2590              for (TableDescriptor tableDescriptor : tableDescriptorMap.values()) {
2591                TableName tableName = tableDescriptor.getTableName();
2592                RegionStatesCount regionStatesCount = assignmentManager
2593                  .getRegionStatesCount(tableName);
2594                tableRegionStatesCountMap.put(tableName, regionStatesCount);
2595              }
2596              builder.setTableRegionStatesCount(tableRegionStatesCountMap);
2597            } catch (IOException e) {
2598              LOG.error("Error while populating TABLE_TO_REGIONS_COUNT for Cluster Metrics..", e);
2599            }
2600          }
2601          break;
2602        }
2603      }
2604    }
2605    return builder.build();
2606  }
2607
2608  /**
2609   * @return cluster status
2610   */
2611  public ClusterMetrics getClusterMetrics() throws IOException {
2612    return getClusterMetrics(EnumSet.allOf(Option.class));
2613  }
2614
2615  public ClusterMetrics getClusterMetrics(EnumSet<Option> options) throws IOException {
2616    if (cpHost != null) {
2617      cpHost.preGetClusterMetrics();
2618    }
2619    ClusterMetrics status = getClusterMetricsWithoutCoprocessor(options);
2620    if (cpHost != null) {
2621      cpHost.postGetClusterMetrics(status);
2622    }
2623    return status;
2624  }
2625
2626  private List<ServerName> getBackupMasters() throws InterruptedIOException {
2627    // Build Set of backup masters from ZK nodes
2628    List<String> backupMasterStrings;
2629    try {
2630      backupMasterStrings = ZKUtil.listChildrenNoWatch(this.zooKeeper,
2631        this.zooKeeper.getZNodePaths().backupMasterAddressesZNode);
2632    } catch (KeeperException e) {
2633      LOG.warn(this.zooKeeper.prefix("Unable to list backup servers"), e);
2634      backupMasterStrings = null;
2635    }
2636
2637    List<ServerName> backupMasters = Collections.emptyList();
2638    if (backupMasterStrings != null && !backupMasterStrings.isEmpty()) {
2639      backupMasters = new ArrayList<>(backupMasterStrings.size());
2640      for (String s: backupMasterStrings) {
2641        try {
2642          byte [] bytes;
2643          try {
2644            bytes = ZKUtil.getData(this.zooKeeper, ZNodePaths.joinZNode(
2645                this.zooKeeper.getZNodePaths().backupMasterAddressesZNode, s));
2646          } catch (InterruptedException e) {
2647            throw new InterruptedIOException();
2648          }
2649          if (bytes != null) {
2650            ServerName sn;
2651            try {
2652              sn = ProtobufUtil.parseServerNameFrom(bytes);
2653            } catch (DeserializationException e) {
2654              LOG.warn("Failed parse, skipping registering backup server", e);
2655              continue;
2656            }
2657            backupMasters.add(sn);
2658          }
2659        } catch (KeeperException e) {
2660          LOG.warn(this.zooKeeper.prefix("Unable to get information about " +
2661                   "backup servers"), e);
2662        }
2663      }
2664      Collections.sort(backupMasters, new Comparator<ServerName>() {
2665        @Override
2666        public int compare(ServerName s1, ServerName s2) {
2667          return s1.getServerName().compareTo(s2.getServerName());
2668        }});
2669    }
2670    return backupMasters;
2671  }
2672
2673  /**
2674   * The set of loaded coprocessors is stored in a static set. Since it's
2675   * statically allocated, it does not require that HMaster's cpHost be
2676   * initialized prior to accessing it.
2677   * @return a String representation of the set of names of the loaded coprocessors.
2678   */
2679  public static String getLoadedCoprocessors() {
2680    return CoprocessorHost.getLoadedCoprocessors().toString();
2681  }
2682
2683  /**
2684   * @return timestamp in millis when HMaster was started.
2685   */
2686  public long getMasterStartTime() {
2687    return startcode;
2688  }
2689
2690  /**
2691   * @return timestamp in millis when HMaster became the active master.
2692   */
2693  public long getMasterActiveTime() {
2694    return masterActiveTime;
2695  }
2696
2697  /**
2698   * @return timestamp in millis when HMaster finished becoming the active master
2699   */
2700  public long getMasterFinishedInitializationTime() {
2701    return masterFinishedInitializationTime;
2702  }
2703
2704  public int getNumWALFiles() {
2705    return 0;
2706  }
2707
2708  public ProcedureStore getProcedureStore() {
2709    return procedureStore;
2710  }
2711
2712  public int getRegionServerInfoPort(final ServerName sn) {
2713    int port = this.serverManager.getInfoPort(sn);
2714    return port == 0 ? conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
2715      HConstants.DEFAULT_REGIONSERVER_INFOPORT) : port;
2716  }
2717
2718  @Override
2719  public String getRegionServerVersion(ServerName sn) {
2720    // Will return "0.0.0" if the server is not online to prevent move system region to unknown
2721    // version RS.
2722    return this.serverManager.getVersion(sn);
2723  }
2724
2725  @Override
2726  public void checkIfShouldMoveSystemRegionAsync() {
2727    assignmentManager.checkIfShouldMoveSystemRegionAsync();
2728  }
2729
2730  /**
2731   * @return array of coprocessor SimpleNames.
2732   */
2733  public String[] getMasterCoprocessors() {
2734    Set<String> masterCoprocessors = getMasterCoprocessorHost().getCoprocessors();
2735    return masterCoprocessors.toArray(new String[masterCoprocessors.size()]);
2736  }
2737
2738  @Override
2739  public void abort(String reason, Throwable cause) {
2740    if (isAborted() || isStopped()) {
2741      return;
2742    }
2743    setAbortRequested();
2744    if (cpHost != null) {
2745      // HBASE-4014: dump a list of loaded coprocessors.
2746      LOG.error(HBaseMarkers.FATAL, "Master server abort: loaded coprocessors are: " +
2747          getLoadedCoprocessors());
2748    }
2749    String msg = "***** ABORTING master " + this + ": " + reason + " *****";
2750    if (cause != null) {
2751      LOG.error(HBaseMarkers.FATAL, msg, cause);
2752    } else {
2753      LOG.error(HBaseMarkers.FATAL, msg);
2754    }
2755
2756    try {
2757      stopMaster();
2758    } catch (IOException e) {
2759      LOG.error("Exception occurred while stopping master", e);
2760    }
2761  }
2762
2763  @Override
2764  public ZKWatcher getZooKeeper() {
2765    return zooKeeper;
2766  }
2767
2768  @Override
2769  public MasterCoprocessorHost getMasterCoprocessorHost() {
2770    return cpHost;
2771  }
2772
2773  @Override
2774  public MasterQuotaManager getMasterQuotaManager() {
2775    return quotaManager;
2776  }
2777
2778  @Override
2779  public ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
2780    return procedureExecutor;
2781  }
2782
2783  @Override
2784  public ServerName getServerName() {
2785    return this.serverName;
2786  }
2787
2788  @Override
2789  public AssignmentManager getAssignmentManager() {
2790    return this.assignmentManager;
2791  }
2792
2793  @Override
2794  public CatalogJanitor getCatalogJanitor() {
2795    return this.catalogJanitorChore;
2796  }
2797
2798  public MemoryBoundedLogMessageBuffer getRegionServerFatalLogBuffer() {
2799    return rsFatals;
2800  }
2801
2802  /**
2803   * Shutdown the cluster.
2804   * Master runs a coordinated stop of all RegionServers and then itself.
2805   */
2806  public void shutdown() throws IOException {
2807    if (cpHost != null) {
2808      cpHost.preShutdown();
2809    }
2810    // Tell the servermanager cluster shutdown has been called. This makes it so when Master is
2811    // last running server, it'll stop itself. Next, we broadcast the cluster shutdown by setting
2812    // the cluster status as down. RegionServers will notice this change in state and will start
2813    // shutting themselves down. When last has exited, Master can go down.
2814    if (this.serverManager != null) {
2815      this.serverManager.shutdownCluster();
2816    }
2817    if (this.clusterStatusTracker != null) {
2818      try {
2819        this.clusterStatusTracker.setClusterDown();
2820      } catch (KeeperException e) {
2821        LOG.error("ZooKeeper exception trying to set cluster as down in ZK", e);
2822      }
2823    }
2824    // Stop the procedure executor. Will stop any ongoing assign, unassign, server crash etc.,
2825    // processing so we can go down.
2826    if (this.procedureExecutor != null) {
2827      this.procedureExecutor.stop();
2828    }
2829    // Shutdown our cluster connection. This will kill any hosted RPCs that might be going on;
2830    // this is what we want especially if the Master is in startup phase doing call outs to
2831    // hbase:meta, etc. when cluster is down. Without ths connection close, we'd have to wait on
2832    // the rpc to timeout.
2833    if (this.asyncClusterConnection != null) {
2834      this.asyncClusterConnection.close();
2835    }
2836  }
2837
2838  public void stopMaster() throws IOException {
2839    if (cpHost != null) {
2840      cpHost.preStopMaster();
2841    }
2842    stop("Stopped by " + Thread.currentThread().getName());
2843  }
2844
2845  @Override
2846  public void stop(String msg) {
2847    if (!isStopped()) {
2848      super.stop(msg);
2849      if (this.activeMasterManager != null) {
2850        this.activeMasterManager.stop();
2851      }
2852    }
2853  }
2854
2855  @VisibleForTesting
2856  protected void checkServiceStarted() throws ServerNotRunningYetException {
2857    if (!serviceStarted) {
2858      throw new ServerNotRunningYetException("Server is not running yet");
2859    }
2860  }
2861
2862  public static class MasterStoppedException extends DoNotRetryIOException {
2863    MasterStoppedException() {
2864      super();
2865    }
2866  }
2867
2868  void checkInitialized() throws PleaseHoldException, ServerNotRunningYetException,
2869      MasterNotRunningException, MasterStoppedException {
2870    checkServiceStarted();
2871    if (!isInitialized()) {
2872      throw new PleaseHoldException("Master is initializing");
2873    }
2874    if (isStopped()) {
2875      throw new MasterStoppedException();
2876    }
2877  }
2878
2879  /**
2880   * Report whether this master is currently the active master or not.
2881   * If not active master, we are parked on ZK waiting to become active.
2882   *
2883   * This method is used for testing.
2884   *
2885   * @return true if active master, false if not.
2886   */
2887  @Override
2888  public boolean isActiveMaster() {
2889    return activeMaster;
2890  }
2891
2892  /**
2893   * Report whether this master has completed with its initialization and is
2894   * ready.  If ready, the master is also the active master.  A standby master
2895   * is never ready.
2896   *
2897   * This method is used for testing.
2898   *
2899   * @return true if master is ready to go, false if not.
2900   */
2901  @Override
2902  public boolean isInitialized() {
2903    return initialized.isReady();
2904  }
2905
2906  /**
2907   * Report whether this master is in maintenance mode.
2908   *
2909   * @return true if master is in maintenanceMode
2910   */
2911  @Override
2912  public boolean isInMaintenanceMode() {
2913    return maintenanceMode;
2914  }
2915
2916  @VisibleForTesting
2917  public void setInitialized(boolean isInitialized) {
2918    procedureExecutor.getEnvironment().setEventReady(initialized, isInitialized);
2919  }
2920
2921  @Override
2922  public ProcedureEvent<?> getInitializedEvent() {
2923    return initialized;
2924  }
2925
2926  /**
2927   * Compute the average load across all region servers.
2928   * Currently, this uses a very naive computation - just uses the number of
2929   * regions being served, ignoring stats about number of requests.
2930   * @return the average load
2931   */
2932  public double getAverageLoad() {
2933    if (this.assignmentManager == null) {
2934      return 0;
2935    }
2936
2937    RegionStates regionStates = this.assignmentManager.getRegionStates();
2938    if (regionStates == null) {
2939      return 0;
2940    }
2941    return regionStates.getAverageLoad();
2942  }
2943
2944  /*
2945   * @return the count of region split plans executed
2946   */
2947  public long getSplitPlanCount() {
2948    return splitPlanCount;
2949  }
2950
2951  /*
2952   * @return the count of region merge plans executed
2953   */
2954  public long getMergePlanCount() {
2955    return mergePlanCount;
2956  }
2957
2958  @Override
2959  public boolean registerService(Service instance) {
2960    /*
2961     * No stacking of instances is allowed for a single service name
2962     */
2963    Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
2964    String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc);
2965    if (coprocessorServiceHandlers.containsKey(serviceName)) {
2966      LOG.error("Coprocessor service "+serviceName+
2967          " already registered, rejecting request from "+instance
2968      );
2969      return false;
2970    }
2971
2972    coprocessorServiceHandlers.put(serviceName, instance);
2973    if (LOG.isDebugEnabled()) {
2974      LOG.debug("Registered master coprocessor service: service="+serviceName);
2975    }
2976    return true;
2977  }
2978
2979  /**
2980   * Utility for constructing an instance of the passed HMaster class.
2981   * @param masterClass
2982   * @return HMaster instance.
2983   */
2984  public static HMaster constructMaster(Class<? extends HMaster> masterClass,
2985      final Configuration conf)  {
2986    try {
2987      Constructor<? extends HMaster> c = masterClass.getConstructor(Configuration.class);
2988      return c.newInstance(conf);
2989    } catch(Exception e) {
2990      Throwable error = e;
2991      if (e instanceof InvocationTargetException &&
2992          ((InvocationTargetException)e).getTargetException() != null) {
2993        error = ((InvocationTargetException)e).getTargetException();
2994      }
2995      throw new RuntimeException("Failed construction of Master: " + masterClass.toString() + ". "
2996        , error);
2997    }
2998  }
2999
3000  /**
3001   * @see org.apache.hadoop.hbase.master.HMasterCommandLine
3002   */
3003  public static void main(String [] args) {
3004    LOG.info("STARTING service " + HMaster.class.getSimpleName());
3005    VersionInfo.logVersion();
3006    new HMasterCommandLine(HMaster.class).doMain(args);
3007  }
3008
3009  public HFileCleaner getHFileCleaner() {
3010    return this.hfileCleaner;
3011  }
3012
3013  public LogCleaner getLogCleaner() {
3014    return this.logCleaner;
3015  }
3016
3017  /**
3018   * @return the underlying snapshot manager
3019   */
3020  @Override
3021  public SnapshotManager getSnapshotManager() {
3022    return this.snapshotManager;
3023  }
3024
3025  /**
3026   * @return the underlying MasterProcedureManagerHost
3027   */
3028  @Override
3029  public MasterProcedureManagerHost getMasterProcedureManagerHost() {
3030    return mpmHost;
3031  }
3032
3033  @Override
3034  public ClusterSchema getClusterSchema() {
3035    return this.clusterSchemaService;
3036  }
3037
3038  /**
3039   * Create a new Namespace.
3040   * @param namespaceDescriptor descriptor for new Namespace
3041   * @param nonceGroup Identifier for the source of the request, a client or process.
3042   * @param nonce A unique identifier for this operation from the client or process identified by
3043   * <code>nonceGroup</code> (the source must ensure each operation gets a unique id).
3044   * @return procedure id
3045   */
3046  long createNamespace(final NamespaceDescriptor namespaceDescriptor, final long nonceGroup,
3047      final long nonce) throws IOException {
3048    checkInitialized();
3049
3050    TableName.isLegalNamespaceName(Bytes.toBytes(namespaceDescriptor.getName()));
3051
3052    return MasterProcedureUtil.submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this,
3053          nonceGroup, nonce) {
3054      @Override
3055      protected void run() throws IOException {
3056        getMaster().getMasterCoprocessorHost().preCreateNamespace(namespaceDescriptor);
3057        // We need to wait for the procedure to potentially fail due to "prepare" sanity
3058        // checks. This will block only the beginning of the procedure. See HBASE-19953.
3059        ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch();
3060        LOG.info(getClientIdAuditPrefix() + " creating " + namespaceDescriptor);
3061        // Execute the operation synchronously - wait for the operation to complete before
3062        // continuing.
3063        setProcId(getClusterSchema().createNamespace(namespaceDescriptor, getNonceKey(), latch));
3064        latch.await();
3065        getMaster().getMasterCoprocessorHost().postCreateNamespace(namespaceDescriptor);
3066      }
3067
3068      @Override
3069      protected String getDescription() {
3070        return "CreateNamespaceProcedure";
3071      }
3072    });
3073  }
3074
3075  /**
3076   * Modify an existing Namespace.
3077   * @param nonceGroup Identifier for the source of the request, a client or process.
3078   * @param nonce A unique identifier for this operation from the client or process identified by
3079   * <code>nonceGroup</code> (the source must ensure each operation gets a unique id).
3080   * @return procedure id
3081   */
3082  long modifyNamespace(final NamespaceDescriptor newNsDescriptor, final long nonceGroup,
3083      final long nonce) throws IOException {
3084    checkInitialized();
3085
3086    TableName.isLegalNamespaceName(Bytes.toBytes(newNsDescriptor.getName()));
3087
3088    return MasterProcedureUtil.submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this,
3089          nonceGroup, nonce) {
3090      @Override
3091      protected void run() throws IOException {
3092        NamespaceDescriptor oldNsDescriptor = getNamespace(newNsDescriptor.getName());
3093        getMaster().getMasterCoprocessorHost().preModifyNamespace(oldNsDescriptor, newNsDescriptor);
3094        // We need to wait for the procedure to potentially fail due to "prepare" sanity
3095        // checks. This will block only the beginning of the procedure. See HBASE-19953.
3096        ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch();
3097        LOG.info(getClientIdAuditPrefix() + " modify " + newNsDescriptor);
3098        // Execute the operation synchronously - wait for the operation to complete before
3099        // continuing.
3100        setProcId(getClusterSchema().modifyNamespace(newNsDescriptor, getNonceKey(), latch));
3101        latch.await();
3102        getMaster().getMasterCoprocessorHost().postModifyNamespace(oldNsDescriptor,
3103          newNsDescriptor);
3104      }
3105
3106      @Override
3107      protected String getDescription() {
3108        return "ModifyNamespaceProcedure";
3109      }
3110    });
3111  }
3112
3113  /**
3114   * Delete an existing Namespace. Only empty Namespaces (no tables) can be removed.
3115   * @param nonceGroup Identifier for the source of the request, a client or process.
3116   * @param nonce A unique identifier for this operation from the client or process identified by
3117   * <code>nonceGroup</code> (the source must ensure each operation gets a unique id).
3118   * @return procedure id
3119   */
3120  long deleteNamespace(final String name, final long nonceGroup, final long nonce)
3121      throws IOException {
3122    checkInitialized();
3123
3124    return MasterProcedureUtil.submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this,
3125          nonceGroup, nonce) {
3126      @Override
3127      protected void run() throws IOException {
3128        getMaster().getMasterCoprocessorHost().preDeleteNamespace(name);
3129        LOG.info(getClientIdAuditPrefix() + " delete " + name);
3130        // Execute the operation synchronously - wait for the operation to complete before
3131        // continuing.
3132        //
3133        // We need to wait for the procedure to potentially fail due to "prepare" sanity
3134        // checks. This will block only the beginning of the procedure. See HBASE-19953.
3135        ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch();
3136        setProcId(submitProcedure(
3137              new DeleteNamespaceProcedure(procedureExecutor.getEnvironment(), name, latch)));
3138        latch.await();
3139        // Will not be invoked in the face of Exception thrown by the Procedure's execution
3140        getMaster().getMasterCoprocessorHost().postDeleteNamespace(name);
3141      }
3142
3143      @Override
3144      protected String getDescription() {
3145        return "DeleteNamespaceProcedure";
3146      }
3147    });
3148  }
3149
3150  /**
3151   * Get a Namespace
3152   * @param name Name of the Namespace
3153   * @return Namespace descriptor for <code>name</code>
3154   */
3155  NamespaceDescriptor getNamespace(String name) throws IOException {
3156    checkInitialized();
3157    if (this.cpHost != null) this.cpHost.preGetNamespaceDescriptor(name);
3158    NamespaceDescriptor nsd = this.clusterSchemaService.getNamespace(name);
3159    if (this.cpHost != null) this.cpHost.postGetNamespaceDescriptor(nsd);
3160    return nsd;
3161  }
3162
3163  /**
3164   * Get all Namespaces
3165   * @return All Namespace descriptors
3166   */
3167  List<NamespaceDescriptor> getNamespaces() throws IOException {
3168    checkInitialized();
3169    final List<NamespaceDescriptor> nsds = new ArrayList<>();
3170    if (cpHost != null) {
3171      cpHost.preListNamespaceDescriptors(nsds);
3172    }
3173    nsds.addAll(this.clusterSchemaService.getNamespaces());
3174    if (this.cpHost != null) {
3175      this.cpHost.postListNamespaceDescriptors(nsds);
3176    }
3177    return nsds;
3178  }
3179
3180  /**
3181   * List namespace names
3182   * @return All namespace names
3183   */
3184  public List<String> listNamespaces() throws IOException {
3185    checkInitialized();
3186    List<String> namespaces = new ArrayList<>();
3187    if (cpHost != null) {
3188      cpHost.preListNamespaces(namespaces);
3189    }
3190    for (NamespaceDescriptor namespace : clusterSchemaService.getNamespaces()) {
3191      namespaces.add(namespace.getName());
3192    }
3193    if (cpHost != null) {
3194      cpHost.postListNamespaces(namespaces);
3195    }
3196    return namespaces;
3197  }
3198
3199  @Override
3200  public List<TableName> listTableNamesByNamespace(String name) throws IOException {
3201    checkInitialized();
3202    return listTableNames(name, null, true);
3203  }
3204
3205  @Override
3206  public List<TableDescriptor> listTableDescriptorsByNamespace(String name) throws IOException {
3207    checkInitialized();
3208    return listTableDescriptors(name, null, null, true);
3209  }
3210
3211  @Override
3212  public boolean abortProcedure(final long procId, final boolean mayInterruptIfRunning)
3213      throws IOException {
3214    if (cpHost != null) {
3215      cpHost.preAbortProcedure(this.procedureExecutor, procId);
3216    }
3217
3218    final boolean result = this.procedureExecutor.abort(procId, mayInterruptIfRunning);
3219
3220    if (cpHost != null) {
3221      cpHost.postAbortProcedure();
3222    }
3223
3224    return result;
3225  }
3226
3227  @Override
3228  public List<Procedure<?>> getProcedures() throws IOException {
3229    if (cpHost != null) {
3230      cpHost.preGetProcedures();
3231    }
3232
3233    @SuppressWarnings({ "unchecked", "rawtypes" })
3234    List<Procedure<?>> procList = (List) this.procedureExecutor.getProcedures();
3235
3236    if (cpHost != null) {
3237      cpHost.postGetProcedures(procList);
3238    }
3239
3240    return procList;
3241  }
3242
3243  @Override
3244  public List<LockedResource> getLocks() throws IOException {
3245    if (cpHost != null) {
3246      cpHost.preGetLocks();
3247    }
3248
3249    MasterProcedureScheduler procedureScheduler =
3250      procedureExecutor.getEnvironment().getProcedureScheduler();
3251
3252    final List<LockedResource> lockedResources = procedureScheduler.getLocks();
3253
3254    if (cpHost != null) {
3255      cpHost.postGetLocks(lockedResources);
3256    }
3257
3258    return lockedResources;
3259  }
3260
3261  /**
3262   * Returns the list of table descriptors that match the specified request
3263   * @param namespace the namespace to query, or null if querying for all
3264   * @param regex The regular expression to match against, or null if querying for all
3265   * @param tableNameList the list of table names, or null if querying for all
3266   * @param includeSysTables False to match only against userspace tables
3267   * @return the list of table descriptors
3268   */
3269  public List<TableDescriptor> listTableDescriptors(final String namespace, final String regex,
3270      final List<TableName> tableNameList, final boolean includeSysTables)
3271  throws IOException {
3272    List<TableDescriptor> htds = new ArrayList<>();
3273    if (cpHost != null) {
3274      cpHost.preGetTableDescriptors(tableNameList, htds, regex);
3275    }
3276    htds = getTableDescriptors(htds, namespace, regex, tableNameList, includeSysTables);
3277    if (cpHost != null) {
3278      cpHost.postGetTableDescriptors(tableNameList, htds, regex);
3279    }
3280    return htds;
3281  }
3282
3283  /**
3284   * Returns the list of table names that match the specified request
3285   * @param regex The regular expression to match against, or null if querying for all
3286   * @param namespace the namespace to query, or null if querying for all
3287   * @param includeSysTables False to match only against userspace tables
3288   * @return the list of table names
3289   */
3290  public List<TableName> listTableNames(final String namespace, final String regex,
3291      final boolean includeSysTables) throws IOException {
3292    List<TableDescriptor> htds = new ArrayList<>();
3293    if (cpHost != null) {
3294      cpHost.preGetTableNames(htds, regex);
3295    }
3296    htds = getTableDescriptors(htds, namespace, regex, null, includeSysTables);
3297    if (cpHost != null) {
3298      cpHost.postGetTableNames(htds, regex);
3299    }
3300    List<TableName> result = new ArrayList<>(htds.size());
3301    for (TableDescriptor htd: htds) result.add(htd.getTableName());
3302    return result;
3303  }
3304
3305  /**
3306   * @return list of table table descriptors after filtering by regex and whether to include system
3307   *    tables, etc.
3308   * @throws IOException
3309   */
3310  private List<TableDescriptor> getTableDescriptors(final List<TableDescriptor> htds,
3311      final String namespace, final String regex, final List<TableName> tableNameList,
3312      final boolean includeSysTables)
3313  throws IOException {
3314    if (tableNameList == null || tableNameList.isEmpty()) {
3315      // request for all TableDescriptors
3316      Collection<TableDescriptor> allHtds;
3317      if (namespace != null && namespace.length() > 0) {
3318        // Do a check on the namespace existence. Will fail if does not exist.
3319        this.clusterSchemaService.getNamespace(namespace);
3320        allHtds = tableDescriptors.getByNamespace(namespace).values();
3321      } else {
3322        allHtds = tableDescriptors.getAll().values();
3323      }
3324      for (TableDescriptor desc: allHtds) {
3325        if (tableStateManager.isTablePresent(desc.getTableName())
3326            && (includeSysTables || !desc.getTableName().isSystemTable())) {
3327          htds.add(desc);
3328        }
3329      }
3330    } else {
3331      for (TableName s: tableNameList) {
3332        if (tableStateManager.isTablePresent(s)) {
3333          TableDescriptor desc = tableDescriptors.get(s);
3334          if (desc != null) {
3335            htds.add(desc);
3336          }
3337        }
3338      }
3339    }
3340
3341    // Retains only those matched by regular expression.
3342    if (regex != null) filterTablesByRegex(htds, Pattern.compile(regex));
3343    return htds;
3344  }
3345
3346  /**
3347   * Removes the table descriptors that don't match the pattern.
3348   * @param descriptors list of table descriptors to filter
3349   * @param pattern the regex to use
3350   */
3351  private static void filterTablesByRegex(final Collection<TableDescriptor> descriptors,
3352      final Pattern pattern) {
3353    final String defaultNS = NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR;
3354    Iterator<TableDescriptor> itr = descriptors.iterator();
3355    while (itr.hasNext()) {
3356      TableDescriptor htd = itr.next();
3357      String tableName = htd.getTableName().getNameAsString();
3358      boolean matched = pattern.matcher(tableName).matches();
3359      if (!matched && htd.getTableName().getNamespaceAsString().equals(defaultNS)) {
3360        matched = pattern.matcher(defaultNS + TableName.NAMESPACE_DELIM + tableName).matches();
3361      }
3362      if (!matched) {
3363        itr.remove();
3364      }
3365    }
3366  }
3367
3368  @Override
3369  public long getLastMajorCompactionTimestamp(TableName table) throws IOException {
3370    return getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))
3371        .getLastMajorCompactionTimestamp(table);
3372  }
3373
3374  @Override
3375  public long getLastMajorCompactionTimestampForRegion(byte[] regionName) throws IOException {
3376    return getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))
3377        .getLastMajorCompactionTimestamp(regionName);
3378  }
3379
3380  /**
3381   * Gets the mob file compaction state for a specific table.
3382   * Whether all the mob files are selected is known during the compaction execution, but
3383   * the statistic is done just before compaction starts, it is hard to know the compaction
3384   * type at that time, so the rough statistics are chosen for the mob file compaction. Only two
3385   * compaction states are available, CompactionState.MAJOR_AND_MINOR and CompactionState.NONE.
3386   * @param tableName The current table name.
3387   * @return If a given table is in mob file compaction now.
3388   */
3389  public CompactionState getMobCompactionState(TableName tableName) {
3390    AtomicInteger compactionsCount = mobCompactionStates.get(tableName);
3391    if (compactionsCount != null && compactionsCount.get() != 0) {
3392      return CompactionState.MAJOR_AND_MINOR;
3393    }
3394    return CompactionState.NONE;
3395  }
3396
3397  public void reportMobCompactionStart(TableName tableName) throws IOException {
3398    IdLock.Entry lockEntry = null;
3399    try {
3400      lockEntry = mobCompactionLock.getLockEntry(tableName.hashCode());
3401      AtomicInteger compactionsCount = mobCompactionStates.get(tableName);
3402      if (compactionsCount == null) {
3403        compactionsCount = new AtomicInteger(0);
3404        mobCompactionStates.put(tableName, compactionsCount);
3405      }
3406      compactionsCount.incrementAndGet();
3407    } finally {
3408      if (lockEntry != null) {
3409        mobCompactionLock.releaseLockEntry(lockEntry);
3410      }
3411    }
3412  }
3413
3414  public void reportMobCompactionEnd(TableName tableName) throws IOException {
3415    IdLock.Entry lockEntry = null;
3416    try {
3417      lockEntry = mobCompactionLock.getLockEntry(tableName.hashCode());
3418      AtomicInteger compactionsCount = mobCompactionStates.get(tableName);
3419      if (compactionsCount != null) {
3420        int count = compactionsCount.decrementAndGet();
3421        // remove the entry if the count is 0.
3422        if (count == 0) {
3423          mobCompactionStates.remove(tableName);
3424        }
3425      }
3426    } finally {
3427      if (lockEntry != null) {
3428        mobCompactionLock.releaseLockEntry(lockEntry);
3429      }
3430    }
3431  }
3432
3433  /**
3434   * Requests mob compaction.
3435   * @param tableName The table the compact.
3436   * @param columns The compacted columns.
3437   * @param allFiles Whether add all mob files into the compaction.
3438   */
3439  public void requestMobCompaction(TableName tableName,
3440                                   List<ColumnFamilyDescriptor> columns, boolean allFiles) throws IOException {
3441    mobCompactThread.requestMobCompaction(conf, getFileSystem(), tableName, columns, allFiles);
3442  }
3443
3444  /**
3445   * Queries the state of the {@link LoadBalancerTracker}. If the balancer is not initialized,
3446   * false is returned.
3447   *
3448   * @return The state of the load balancer, or false if the load balancer isn't defined.
3449   */
3450  public boolean isBalancerOn() {
3451    return !isInMaintenanceMode()
3452        && loadBalancerTracker != null
3453        && loadBalancerTracker.isBalancerOn();
3454  }
3455
3456  /**
3457   * Queries the state of the {@link RegionNormalizerTracker}. If it's not initialized,
3458   * false is returned.
3459   */
3460  public boolean isNormalizerOn() {
3461    return !isInMaintenanceMode()
3462        && regionNormalizerTracker != null
3463        && regionNormalizerTracker.isNormalizerOn();
3464  }
3465
3466  /**
3467   * Queries the state of the {@link SplitOrMergeTracker}. If it is not initialized,
3468   * false is returned. If switchType is illegal, false will return.
3469   * @param switchType see {@link org.apache.hadoop.hbase.client.MasterSwitchType}
3470   * @return The state of the switch
3471   */
3472  @Override
3473  public boolean isSplitOrMergeEnabled(MasterSwitchType switchType) {
3474    return !isInMaintenanceMode()
3475        && splitOrMergeTracker != null
3476        && splitOrMergeTracker.isSplitOrMergeEnabled(switchType);
3477  }
3478
3479  /**
3480   * Fetch the configured {@link LoadBalancer} class name. If none is set, a default is returned.
3481   *
3482   * @return The name of the {@link LoadBalancer} in use.
3483   */
3484  public String getLoadBalancerClassName() {
3485    return conf.get(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, LoadBalancerFactory
3486        .getDefaultLoadBalancerClass().getName());
3487  }
3488
3489  /**
3490   * @return RegionNormalizerTracker instance
3491   */
3492  public RegionNormalizerTracker getRegionNormalizerTracker() {
3493    return regionNormalizerTracker;
3494  }
3495
3496  public SplitOrMergeTracker getSplitOrMergeTracker() {
3497    return splitOrMergeTracker;
3498  }
3499
3500  @Override
3501  public LoadBalancer getLoadBalancer() {
3502    return balancer;
3503  }
3504
3505  @Override
3506  public FavoredNodesManager getFavoredNodesManager() {
3507    return favoredNodesManager;
3508  }
3509
3510  private long executePeerProcedure(AbstractPeerProcedure<?> procedure) throws IOException {
3511    long procId = procedureExecutor.submitProcedure(procedure);
3512    procedure.getLatch().await();
3513    return procId;
3514  }
3515
3516  @Override
3517  public long addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
3518      throws ReplicationException, IOException {
3519    LOG.info(getClientIdAuditPrefix() + " creating replication peer, id=" + peerId + ", config=" +
3520      peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED"));
3521    return executePeerProcedure(new AddPeerProcedure(peerId, peerConfig, enabled));
3522  }
3523
3524  @Override
3525  public long removeReplicationPeer(String peerId) throws ReplicationException, IOException {
3526    LOG.info(getClientIdAuditPrefix() + " removing replication peer, id=" + peerId);
3527    return executePeerProcedure(new RemovePeerProcedure(peerId));
3528  }
3529
3530  @Override
3531  public long enableReplicationPeer(String peerId) throws ReplicationException, IOException {
3532    LOG.info(getClientIdAuditPrefix() + " enable replication peer, id=" + peerId);
3533    return executePeerProcedure(new EnablePeerProcedure(peerId));
3534  }
3535
3536  @Override
3537  public long disableReplicationPeer(String peerId) throws ReplicationException, IOException {
3538    LOG.info(getClientIdAuditPrefix() + " disable replication peer, id=" + peerId);
3539    return executePeerProcedure(new DisablePeerProcedure(peerId));
3540  }
3541
3542  @Override
3543  public ReplicationPeerConfig getReplicationPeerConfig(String peerId)
3544      throws ReplicationException, IOException {
3545    if (cpHost != null) {
3546      cpHost.preGetReplicationPeerConfig(peerId);
3547    }
3548    LOG.info(getClientIdAuditPrefix() + " get replication peer config, id=" + peerId);
3549    ReplicationPeerConfig peerConfig = this.replicationPeerManager.getPeerConfig(peerId)
3550        .orElseThrow(() -> new ReplicationPeerNotFoundException(peerId));
3551    if (cpHost != null) {
3552      cpHost.postGetReplicationPeerConfig(peerId);
3553    }
3554    return peerConfig;
3555  }
3556
3557  @Override
3558  public long updateReplicationPeerConfig(String peerId, ReplicationPeerConfig peerConfig)
3559      throws ReplicationException, IOException {
3560    LOG.info(getClientIdAuditPrefix() + " update replication peer config, id=" + peerId +
3561      ", config=" + peerConfig);
3562    return executePeerProcedure(new UpdatePeerConfigProcedure(peerId, peerConfig));
3563  }
3564
3565  @Override
3566  public List<ReplicationPeerDescription> listReplicationPeers(String regex)
3567      throws ReplicationException, IOException {
3568    if (cpHost != null) {
3569      cpHost.preListReplicationPeers(regex);
3570    }
3571    LOG.debug("{} list replication peers, regex={}", getClientIdAuditPrefix(), regex);
3572    Pattern pattern = regex == null ? null : Pattern.compile(regex);
3573    List<ReplicationPeerDescription> peers =
3574      this.replicationPeerManager.listPeers(pattern);
3575    if (cpHost != null) {
3576      cpHost.postListReplicationPeers(regex);
3577    }
3578    return peers;
3579  }
3580
3581  @Override
3582  public long transitReplicationPeerSyncReplicationState(String peerId, SyncReplicationState state)
3583    throws ReplicationException, IOException {
3584    LOG.info(
3585      getClientIdAuditPrefix() +
3586        " transit current cluster state to {} in a synchronous replication peer id={}",
3587      state, peerId);
3588    return executePeerProcedure(new TransitPeerSyncReplicationStateProcedure(peerId, state));
3589  }
3590
3591  /**
3592   * Mark region server(s) as decommissioned (previously called 'draining') to prevent additional
3593   * regions from getting assigned to them. Also unload the regions on the servers asynchronously.0
3594   * @param servers Region servers to decommission.
3595   */
3596  public void decommissionRegionServers(final List<ServerName> servers, final boolean offload)
3597      throws HBaseIOException {
3598    List<ServerName> serversAdded = new ArrayList<>(servers.size());
3599    // Place the decommission marker first.
3600    String parentZnode = getZooKeeper().getZNodePaths().drainingZNode;
3601    for (ServerName server : servers) {
3602      try {
3603        String node = ZNodePaths.joinZNode(parentZnode, server.getServerName());
3604        ZKUtil.createAndFailSilent(getZooKeeper(), node);
3605      } catch (KeeperException ke) {
3606        throw new HBaseIOException(
3607          this.zooKeeper.prefix("Unable to decommission '" + server.getServerName() + "'."), ke);
3608      }
3609      if (this.serverManager.addServerToDrainList(server)) {
3610        serversAdded.add(server);
3611      }
3612    }
3613    // Move the regions off the decommissioned servers.
3614    if (offload) {
3615      final List<ServerName> destServers = this.serverManager.createDestinationServersList();
3616      for (ServerName server : serversAdded) {
3617        final List<RegionInfo> regionsOnServer = this.assignmentManager.getRegionsOnServer(server);
3618        for (RegionInfo hri : regionsOnServer) {
3619          ServerName dest = balancer.randomAssignment(hri, destServers);
3620          if (dest == null) {
3621            throw new HBaseIOException("Unable to determine a plan to move " + hri);
3622          }
3623          RegionPlan rp = new RegionPlan(hri, server, dest);
3624          this.assignmentManager.moveAsync(rp);
3625        }
3626      }
3627    }
3628  }
3629
3630  /**
3631   * List region servers marked as decommissioned (previously called 'draining') to not get regions
3632   * assigned to them.
3633   * @return List of decommissioned servers.
3634   */
3635  public List<ServerName> listDecommissionedRegionServers() {
3636    return this.serverManager.getDrainingServersList();
3637  }
3638
3639  /**
3640   * Remove decommission marker (previously called 'draining') from a region server to allow regions
3641   * assignments. Load regions onto the server asynchronously if a list of regions is given
3642   * @param server Region server to remove decommission marker from.
3643   */
3644  public void recommissionRegionServer(final ServerName server,
3645      final List<byte[]> encodedRegionNames) throws IOException {
3646    // Remove the server from decommissioned (draining) server list.
3647    String parentZnode = getZooKeeper().getZNodePaths().drainingZNode;
3648    String node = ZNodePaths.joinZNode(parentZnode, server.getServerName());
3649    try {
3650      ZKUtil.deleteNodeFailSilent(getZooKeeper(), node);
3651    } catch (KeeperException ke) {
3652      throw new HBaseIOException(
3653        this.zooKeeper.prefix("Unable to recommission '" + server.getServerName() + "'."), ke);
3654    }
3655    this.serverManager.removeServerFromDrainList(server);
3656
3657    // Load the regions onto the server if we are given a list of regions.
3658    if (encodedRegionNames == null || encodedRegionNames.isEmpty()) {
3659      return;
3660    }
3661    if (!this.serverManager.isServerOnline(server)) {
3662      return;
3663    }
3664    for (byte[] encodedRegionName : encodedRegionNames) {
3665      RegionState regionState =
3666        assignmentManager.getRegionStates().getRegionState(Bytes.toString(encodedRegionName));
3667      if (regionState == null) {
3668        LOG.warn("Unknown region " + Bytes.toStringBinary(encodedRegionName));
3669        continue;
3670      }
3671      RegionInfo hri = regionState.getRegion();
3672      if (server.equals(regionState.getServerName())) {
3673        LOG.info("Skipping move of region " + hri.getRegionNameAsString() +
3674          " because region already assigned to the same server " + server + ".");
3675        continue;
3676      }
3677      RegionPlan rp = new RegionPlan(hri, regionState.getServerName(), server);
3678      this.assignmentManager.moveAsync(rp);
3679    }
3680  }
3681
3682  @Override
3683  public LockManager getLockManager() {
3684    return lockManager;
3685  }
3686
3687  public QuotaObserverChore getQuotaObserverChore() {
3688    return this.quotaObserverChore;
3689  }
3690
3691  public SpaceQuotaSnapshotNotifier getSpaceQuotaSnapshotNotifier() {
3692    return this.spaceQuotaSnapshotNotifier;
3693  }
3694
3695  @SuppressWarnings("unchecked")
3696  private RemoteProcedure<MasterProcedureEnv, ?> getRemoteProcedure(long procId) {
3697    Procedure<?> procedure = procedureExecutor.getProcedure(procId);
3698    if (procedure == null) {
3699      return null;
3700    }
3701    assert procedure instanceof RemoteProcedure;
3702    return (RemoteProcedure<MasterProcedureEnv, ?>) procedure;
3703  }
3704
3705  public void remoteProcedureCompleted(long procId) {
3706    LOG.debug("Remote procedure done, pid={}", procId);
3707    RemoteProcedure<MasterProcedureEnv, ?> procedure = getRemoteProcedure(procId);
3708    if (procedure != null) {
3709      procedure.remoteOperationCompleted(procedureExecutor.getEnvironment());
3710    }
3711  }
3712
3713  public void remoteProcedureFailed(long procId, RemoteProcedureException error) {
3714    LOG.debug("Remote procedure failed, pid={}", procId, error);
3715    RemoteProcedure<MasterProcedureEnv, ?> procedure = getRemoteProcedure(procId);
3716    if (procedure != null) {
3717      procedure.remoteOperationFailed(procedureExecutor.getEnvironment(), error);
3718    }
3719  }
3720
3721  /**
3722   * Reopen regions provided in the argument
3723   *
3724   * @param tableName The current table name
3725   * @param regionNames The region names of the regions to reopen
3726   * @param nonceGroup Identifier for the source of the request, a client or process
3727   * @param nonce A unique identifier for this operation from the client or process identified by
3728   *   <code>nonceGroup</code> (the source must ensure each operation gets a unique id).
3729   * @return procedure Id
3730   * @throws IOException if reopening region fails while running procedure
3731   */
3732  long reopenRegions(final TableName tableName, final List<byte[]> regionNames,
3733      final long nonceGroup, final long nonce)
3734      throws IOException {
3735
3736    return MasterProcedureUtil
3737      .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
3738
3739        @Override
3740        protected void run() throws IOException {
3741          submitProcedure(new ReopenTableRegionsProcedure(tableName, regionNames));
3742        }
3743
3744        @Override
3745        protected String getDescription() {
3746          return "ReopenTableRegionsProcedure";
3747        }
3748
3749      });
3750
3751  }
3752
3753  @Override
3754  public ReplicationPeerManager getReplicationPeerManager() {
3755    return replicationPeerManager;
3756  }
3757
3758  public HashMap<String, List<Pair<ServerName, ReplicationLoadSource>>>
3759      getReplicationLoad(ServerName[] serverNames) {
3760    List<ReplicationPeerDescription> peerList = this.getReplicationPeerManager().listPeers(null);
3761    if (peerList == null) {
3762      return null;
3763    }
3764    HashMap<String, List<Pair<ServerName, ReplicationLoadSource>>> replicationLoadSourceMap =
3765        new HashMap<>(peerList.size());
3766    peerList.stream()
3767        .forEach(peer -> replicationLoadSourceMap.put(peer.getPeerId(), new ArrayList<>()));
3768    for (ServerName serverName : serverNames) {
3769      List<ReplicationLoadSource> replicationLoadSources =
3770          getServerManager().getLoad(serverName).getReplicationLoadSourceList();
3771      for (ReplicationLoadSource replicationLoadSource : replicationLoadSources) {
3772        replicationLoadSourceMap.get(replicationLoadSource.getPeerID())
3773            .add(new Pair<>(serverName, replicationLoadSource));
3774      }
3775    }
3776    for (List<Pair<ServerName, ReplicationLoadSource>> loads : replicationLoadSourceMap.values()) {
3777      if (loads.size() > 0) {
3778        loads.sort(Comparator.comparingLong(load -> (-1) * load.getSecond().getReplicationLag()));
3779      }
3780    }
3781    return replicationLoadSourceMap;
3782  }
3783
3784  /**
3785   * This method modifies the master's configuration in order to inject replication-related features
3786   */
3787  @VisibleForTesting
3788  public static void decorateMasterConfiguration(Configuration conf) {
3789    String plugins = conf.get(HBASE_MASTER_LOGCLEANER_PLUGINS);
3790    String cleanerClass = ReplicationLogCleaner.class.getCanonicalName();
3791    if (!plugins.contains(cleanerClass)) {
3792      conf.set(HBASE_MASTER_LOGCLEANER_PLUGINS, plugins + "," + cleanerClass);
3793    }
3794    if (ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf)) {
3795      plugins = conf.get(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
3796      cleanerClass = ReplicationHFileCleaner.class.getCanonicalName();
3797      if (!plugins.contains(cleanerClass)) {
3798        conf.set(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, plugins + "," + cleanerClass);
3799      }
3800    }
3801  }
3802
3803  public SnapshotQuotaObserverChore getSnapshotQuotaObserverChore() {
3804    return this.snapshotQuotaChore;
3805  }
3806
3807  @Override
3808  public SyncReplicationReplayWALManager getSyncReplicationReplayWALManager() {
3809    return this.syncReplicationReplayWALManager;
3810  }
3811
3812  @Override
3813  public Map<String, ReplicationStatus> getWalGroupsReplicationStatus() {
3814    if (!this.isOnline() || !LoadBalancer.isMasterCanHostUserRegions(conf)) {
3815      return new HashMap<>();
3816    }
3817    return super.getWalGroupsReplicationStatus();
3818  }
3819
3820  public HbckChore getHbckChore() {
3821    return this.hbckChore;
3822  }
3823
3824  @Override
3825  public String getClusterId() {
3826    if (activeMaster) {
3827      return super.getClusterId();
3828    }
3829    return cachedClusterId.getFromCacheOrFetch();
3830  }
3831
3832  @Override
3833  public void runReplicationBarrierCleaner() {
3834    ReplicationBarrierCleaner rbc = this.replicationBarrierCleaner;
3835    if (rbc != null) {
3836      rbc.chore();
3837    }
3838  }
3839}