001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
021import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
022import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
023import static org.apache.hadoop.hbase.util.DNS.MASTER_HOSTNAME_KEY;
024
025import java.io.IOException;
026import java.io.InterruptedIOException;
027import java.lang.reflect.Constructor;
028import java.lang.reflect.InvocationTargetException;
029import java.net.InetAddress;
030import java.net.InetSocketAddress;
031import java.net.UnknownHostException;
032import java.time.Duration;
033import java.util.ArrayList;
034import java.util.Arrays;
035import java.util.Collection;
036import java.util.Collections;
037import java.util.Comparator;
038import java.util.EnumSet;
039import java.util.HashMap;
040import java.util.Iterator;
041import java.util.LinkedList;
042import java.util.List;
043import java.util.Map;
044import java.util.Objects;
045import java.util.Optional;
046import java.util.Set;
047import java.util.concurrent.ExecutionException;
048import java.util.concurrent.Future;
049import java.util.concurrent.TimeUnit;
050import java.util.concurrent.TimeoutException;
051import java.util.concurrent.atomic.AtomicInteger;
052import java.util.concurrent.locks.ReentrantLock;
053import java.util.regex.Pattern;
054import java.util.stream.Collectors;
055import javax.servlet.ServletException;
056import javax.servlet.http.HttpServlet;
057import javax.servlet.http.HttpServletRequest;
058import javax.servlet.http.HttpServletResponse;
059import org.apache.commons.io.IOUtils;
060import org.apache.commons.lang3.StringUtils;
061import org.apache.hadoop.conf.Configuration;
062import org.apache.hadoop.fs.FSDataOutputStream;
063import org.apache.hadoop.fs.Path;
064import org.apache.hadoop.hbase.ChoreService;
065import org.apache.hadoop.hbase.ClusterId;
066import org.apache.hadoop.hbase.ClusterMetrics;
067import org.apache.hadoop.hbase.ClusterMetrics.Option;
068import org.apache.hadoop.hbase.ClusterMetricsBuilder;
069import org.apache.hadoop.hbase.DoNotRetryIOException;
070import org.apache.hadoop.hbase.HBaseIOException;
071import org.apache.hadoop.hbase.HBaseInterfaceAudience;
072import org.apache.hadoop.hbase.HConstants;
073import org.apache.hadoop.hbase.InvalidFamilyOperationException;
074import org.apache.hadoop.hbase.MasterNotRunningException;
075import org.apache.hadoop.hbase.MetaTableAccessor;
076import org.apache.hadoop.hbase.NamespaceDescriptor;
077import org.apache.hadoop.hbase.PleaseHoldException;
078import org.apache.hadoop.hbase.RegionMetrics;
079import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
080import org.apache.hadoop.hbase.ServerMetrics;
081import org.apache.hadoop.hbase.ServerName;
082import org.apache.hadoop.hbase.TableName;
083import org.apache.hadoop.hbase.TableNotDisabledException;
084import org.apache.hadoop.hbase.TableNotFoundException;
085import org.apache.hadoop.hbase.UnknownRegionException;
086import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
087import org.apache.hadoop.hbase.client.CompactionState;
088import org.apache.hadoop.hbase.client.MasterSwitchType;
089import org.apache.hadoop.hbase.client.NormalizeTableFilterParams;
090import org.apache.hadoop.hbase.client.RegionInfo;
091import org.apache.hadoop.hbase.client.RegionInfoBuilder;
092import org.apache.hadoop.hbase.client.RegionStatesCount;
093import org.apache.hadoop.hbase.client.TableDescriptor;
094import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
095import org.apache.hadoop.hbase.client.TableState;
096import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
097import org.apache.hadoop.hbase.executor.ExecutorType;
098import org.apache.hadoop.hbase.favored.FavoredNodesManager;
099import org.apache.hadoop.hbase.http.InfoServer;
100import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
101import org.apache.hadoop.hbase.ipc.RpcServer;
102import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
103import org.apache.hadoop.hbase.log.HBaseMarkers;
104import org.apache.hadoop.hbase.master.MasterRpcServices.BalanceSwitchMode;
105import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
106import org.apache.hadoop.hbase.master.assignment.MergeTableRegionsProcedure;
107import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
108import org.apache.hadoop.hbase.master.assignment.RegionStates;
109import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
110import org.apache.hadoop.hbase.master.balancer.BalancerChore;
111import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
112import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
113import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
114import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
115import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
116import org.apache.hadoop.hbase.master.cleaner.ReplicationBarrierCleaner;
117import org.apache.hadoop.hbase.master.cleaner.SnapshotCleanerChore;
118import org.apache.hadoop.hbase.master.janitor.CatalogJanitor;
119import org.apache.hadoop.hbase.master.locking.LockManager;
120import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan;
121import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
122import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
123import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerChore;
124import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerFactory;
125import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
126import org.apache.hadoop.hbase.master.procedure.DeleteNamespaceProcedure;
127import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure;
128import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure;
129import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure;
130import org.apache.hadoop.hbase.master.procedure.InitMetaProcedure;
131import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
132import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
133import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
134import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
135import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil.NonceProcedureRunnable;
136import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure;
137import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
138import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
139import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure;
140import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
141import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure;
142import org.apache.hadoop.hbase.master.region.MasterRegion;
143import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
144import org.apache.hadoop.hbase.master.replication.AbstractPeerProcedure;
145import org.apache.hadoop.hbase.master.replication.AddPeerProcedure;
146import org.apache.hadoop.hbase.master.replication.DisablePeerProcedure;
147import org.apache.hadoop.hbase.master.replication.EnablePeerProcedure;
148import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure;
149import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
150import org.apache.hadoop.hbase.master.replication.SyncReplicationReplayWALManager;
151import org.apache.hadoop.hbase.master.replication.TransitPeerSyncReplicationStateProcedure;
152import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure;
153import org.apache.hadoop.hbase.master.slowlog.SlowLogMasterService;
154import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
155import org.apache.hadoop.hbase.master.zksyncer.MasterAddressSyncer;
156import org.apache.hadoop.hbase.master.zksyncer.MetaLocationSyncer;
157import org.apache.hadoop.hbase.mob.MobFileCleanerChore;
158import org.apache.hadoop.hbase.mob.MobFileCompactionChore;
159import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
160import org.apache.hadoop.hbase.monitoring.MonitoredTask;
161import org.apache.hadoop.hbase.monitoring.TaskMonitor;
162import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
163import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager;
164import org.apache.hadoop.hbase.procedure2.LockedResource;
165import org.apache.hadoop.hbase.procedure2.Procedure;
166import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
167import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
168import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
169import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
170import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
171import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener;
172import org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore;
173import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
174import org.apache.hadoop.hbase.quotas.MasterQuotasObserver;
175import org.apache.hadoop.hbase.quotas.QuotaObserverChore;
176import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
177import org.apache.hadoop.hbase.quotas.QuotaUtil;
178import org.apache.hadoop.hbase.quotas.SnapshotQuotaObserverChore;
179import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
180import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
181import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshotNotifier;
182import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshotNotifierFactory;
183import org.apache.hadoop.hbase.quotas.SpaceViolationPolicy;
184import org.apache.hadoop.hbase.regionserver.HRegionServer;
185import org.apache.hadoop.hbase.regionserver.RSRpcServices;
186import org.apache.hadoop.hbase.replication.ReplicationException;
187import org.apache.hadoop.hbase.replication.ReplicationLoadSource;
188import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
189import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
190import org.apache.hadoop.hbase.replication.ReplicationUtils;
191import org.apache.hadoop.hbase.replication.SyncReplicationState;
192import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
193import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
194import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
195import org.apache.hadoop.hbase.rsgroup.RSGroupAdminEndpoint;
196import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
197import org.apache.hadoop.hbase.rsgroup.RSGroupInfoManager;
198import org.apache.hadoop.hbase.rsgroup.RSGroupUtil;
199import org.apache.hadoop.hbase.security.AccessDeniedException;
200import org.apache.hadoop.hbase.security.SecurityConstants;
201import org.apache.hadoop.hbase.security.UserProvider;
202import org.apache.hadoop.hbase.trace.TraceUtil;
203import org.apache.hadoop.hbase.util.Addressing;
204import org.apache.hadoop.hbase.util.Bytes;
205import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
206import org.apache.hadoop.hbase.util.FutureUtils;
207import org.apache.hadoop.hbase.util.HBaseFsck;
208import org.apache.hadoop.hbase.util.HFileArchiveUtil;
209import org.apache.hadoop.hbase.util.IdLock;
210import org.apache.hadoop.hbase.util.ModifyRegionUtils;
211import org.apache.hadoop.hbase.util.Pair;
212import org.apache.hadoop.hbase.util.RetryCounter;
213import org.apache.hadoop.hbase.util.RetryCounterFactory;
214import org.apache.hadoop.hbase.util.TableDescriptorChecker;
215import org.apache.hadoop.hbase.util.Threads;
216import org.apache.hadoop.hbase.util.VersionInfo;
217import org.apache.hadoop.hbase.zookeeper.LoadBalancerTracker;
218import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
219import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker;
220import org.apache.hadoop.hbase.zookeeper.SnapshotCleanupTracker;
221import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
222import org.apache.hadoop.hbase.zookeeper.ZKUtil;
223import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
224import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
225import org.apache.yetus.audience.InterfaceAudience;
226import org.apache.zookeeper.KeeperException;
227import org.slf4j.Logger;
228import org.slf4j.LoggerFactory;
229
230import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
231import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
232import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
233import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
234import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
235import org.apache.hbase.thirdparty.com.google.protobuf.Service;
236import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
237import org.apache.hbase.thirdparty.org.eclipse.jetty.server.Server;
238import org.apache.hbase.thirdparty.org.eclipse.jetty.server.ServerConnector;
239import org.apache.hbase.thirdparty.org.eclipse.jetty.servlet.ServletHolder;
240import org.apache.hbase.thirdparty.org.eclipse.jetty.webapp.WebAppContext;
241
242import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
243import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
244import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
245
246/**
247 * HMaster is the "master server" for HBase. An HBase cluster has one active
248 * master.  If many masters are started, all compete.  Whichever wins goes on to
249 * run the cluster.  All others park themselves in their constructor until
250 * master or cluster shutdown or until the active master loses its lease in
251 * zookeeper.  Thereafter, all running master jostle to take over master role.
252 *
253 * <p>The Master can be asked shutdown the cluster. See {@link #shutdown()}.  In
254 * this case it will tell all regionservers to go down and then wait on them
255 * all reporting in that they are down.  This master will then shut itself down.
256 *
257 * <p>You can also shutdown just this master.  Call {@link #stopMaster()}.
258 *
259 * @see org.apache.zookeeper.Watcher
260 */
261@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
262@SuppressWarnings("deprecation")
263public class HMaster extends HRegionServer implements MasterServices {
264  private static Logger LOG = LoggerFactory.getLogger(HMaster.class);
265
266  /**
267   * Protection against zombie master. Started once Master accepts active responsibility and
268   * starts taking over responsibilities. Allows a finite time window before giving up ownership.
269   */
270  private static class InitializationMonitor extends Thread {
271    /** The amount of time in milliseconds to sleep before checking initialization status. */
272    public static final String TIMEOUT_KEY = "hbase.master.initializationmonitor.timeout";
273    public static final long TIMEOUT_DEFAULT = TimeUnit.MILLISECONDS.convert(15, TimeUnit.MINUTES);
274
275    /**
276     * When timeout expired and initialization has not complete, call {@link System#exit(int)} when
277     * true, do nothing otherwise.
278     */
279    public static final String HALT_KEY = "hbase.master.initializationmonitor.haltontimeout";
280    public static final boolean HALT_DEFAULT = false;
281
282    private final HMaster master;
283    private final long timeout;
284    private final boolean haltOnTimeout;
285
286    /** Creates a Thread that monitors the {@link #isInitialized()} state. */
287    InitializationMonitor(HMaster master) {
288      super("MasterInitializationMonitor");
289      this.master = master;
290      this.timeout = master.getConfiguration().getLong(TIMEOUT_KEY, TIMEOUT_DEFAULT);
291      this.haltOnTimeout = master.getConfiguration().getBoolean(HALT_KEY, HALT_DEFAULT);
292      this.setDaemon(true);
293    }
294
295    @Override
296    public void run() {
297      try {
298        while (!master.isStopped() && master.isActiveMaster()) {
299          Thread.sleep(timeout);
300          if (master.isInitialized()) {
301            LOG.debug("Initialization completed within allotted tolerance. Monitor exiting.");
302          } else {
303            LOG.error("Master failed to complete initialization after " + timeout + "ms. Please"
304                + " consider submitting a bug report including a thread dump of this process.");
305            if (haltOnTimeout) {
306              LOG.error("Zombie Master exiting. Thread dump to stdout");
307              Threads.printThreadInfo(System.out, "Zombie HMaster");
308              System.exit(-1);
309            }
310          }
311        }
312      } catch (InterruptedException ie) {
313        LOG.trace("InitMonitor thread interrupted. Existing.");
314      }
315    }
316  }
317
318  // MASTER is name of the webapp and the attribute name used stuffing this
319  //instance into web context.
320  public static final String MASTER = "master";
321
322  // Manager and zk listener for master election
323  private final ActiveMasterManager activeMasterManager;
324  // Region server tracker
325  private RegionServerTracker regionServerTracker;
326  // Draining region server tracker
327  private DrainingServerTracker drainingServerTracker;
328  // Tracker for load balancer state
329  LoadBalancerTracker loadBalancerTracker;
330  // Tracker for meta location, if any client ZK quorum specified
331  MetaLocationSyncer metaLocationSyncer;
332  // Tracker for active master location, if any client ZK quorum specified
333  MasterAddressSyncer masterAddressSyncer;
334  // Tracker for auto snapshot cleanup state
335  SnapshotCleanupTracker snapshotCleanupTracker;
336
337  // Tracker for split and merge state
338  private SplitOrMergeTracker splitOrMergeTracker;
339
340  // Tracker for region normalizer state
341  private RegionNormalizerTracker regionNormalizerTracker;
342
343  private ClusterSchemaService clusterSchemaService;
344
345  public static final String HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS =
346    "hbase.master.wait.on.service.seconds";
347  public static final int DEFAULT_HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS = 5 * 60;
348
349  public static final String HBASE_MASTER_CLEANER_INTERVAL = "hbase.master.cleaner.interval";
350
351  public static final int DEFAULT_HBASE_MASTER_CLEANER_INTERVAL = 600 * 1000;
352
353  // Metrics for the HMaster
354  final MetricsMaster metricsMaster;
355  // file system manager for the master FS operations
356  private MasterFileSystem fileSystemManager;
357  private MasterWalManager walManager;
358
359  // manager to manage procedure-based WAL splitting, can be null if current
360  // is zk-based WAL splitting. SplitWALManager will replace SplitLogManager
361  // and MasterWalManager, which means zk-based WAL splitting code will be
362  // useless after we switch to the procedure-based one. our eventual goal
363  // is to remove all the zk-based WAL splitting code.
364  private SplitWALManager splitWALManager;
365
366  // server manager to deal with region server info
367  private volatile ServerManager serverManager;
368
369  // manager of assignment nodes in zookeeper
370  private AssignmentManager assignmentManager;
371
372
373  /**
374   * Cache for the meta region replica's locations. Also tracks their changes to avoid stale
375   * cache entries.
376   */
377  private final MetaRegionLocationCache metaRegionLocationCache;
378
379  private RSGroupInfoManager rsGroupInfoManager;
380
381  // manager of replication
382  private ReplicationPeerManager replicationPeerManager;
383
384  private SyncReplicationReplayWALManager syncReplicationReplayWALManager;
385
386  // buffer for "fatal error" notices from region servers
387  // in the cluster. This is only used for assisting
388  // operations/debugging.
389  MemoryBoundedLogMessageBuffer rsFatals;
390
391  // flag set after we become the active master (used for testing)
392  private volatile boolean activeMaster = false;
393
394  // flag set after we complete initialization once active
395  private final ProcedureEvent<?> initialized = new ProcedureEvent<>("master initialized");
396
397  // flag set after master services are started,
398  // initialization may have not completed yet.
399  volatile boolean serviceStarted = false;
400
401  // Maximum time we should run balancer for
402  private final int maxBalancingTime;
403  // Maximum percent of regions in transition when balancing
404  private final double maxRitPercent;
405
406  private final LockManager lockManager = new LockManager(this);
407
408  private RSGroupBasedLoadBalancer balancer;
409  // a lock to prevent concurrent normalization actions.
410  private final ReentrantLock normalizationInProgressLock = new ReentrantLock();
411  private RegionNormalizer normalizer;
412  private BalancerChore balancerChore;
413  private RegionNormalizerChore normalizerChore;
414  private ClusterStatusChore clusterStatusChore;
415  private ClusterStatusPublisher clusterStatusPublisherChore = null;
416  private SnapshotCleanerChore snapshotCleanerChore = null;
417
418  private HbckChore hbckChore;
419  CatalogJanitor catalogJanitorChore;
420  private DirScanPool cleanerPool;
421  private LogCleaner logCleaner;
422  private HFileCleaner hfileCleaner;
423  private ReplicationBarrierCleaner replicationBarrierCleaner;
424  private MobFileCleanerChore mobFileCleanerChore;
425  private MobFileCompactionChore mobFileCompactionChore;
426  // used to synchronize the mobCompactionStates
427  private final IdLock mobCompactionLock = new IdLock();
428  // save the information of mob compactions in tables.
429  // the key is table name, the value is the number of compactions in that table.
430  private Map<TableName, AtomicInteger> mobCompactionStates = Maps.newConcurrentMap();
431
432  MasterCoprocessorHost cpHost;
433
434  private final boolean preLoadTableDescriptors;
435
436  // Time stamps for when a hmaster became active
437  private long masterActiveTime;
438
439  // Time stamp for when HMaster finishes becoming Active Master
440  private long masterFinishedInitializationTime;
441
442  Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
443
444  // monitor for snapshot of hbase tables
445  SnapshotManager snapshotManager;
446  // monitor for distributed procedures
447  private MasterProcedureManagerHost mpmHost;
448
449  private RegionsRecoveryChore regionsRecoveryChore = null;
450
451  private RegionsRecoveryConfigManager regionsRecoveryConfigManager = null;
452  // it is assigned after 'initialized' guard set to true, so should be volatile
453  private volatile MasterQuotaManager quotaManager;
454  private SpaceQuotaSnapshotNotifier spaceQuotaSnapshotNotifier;
455  private QuotaObserverChore quotaObserverChore;
456  private SnapshotQuotaObserverChore snapshotQuotaChore;
457
458  private ProcedureExecutor<MasterProcedureEnv> procedureExecutor;
459  private ProcedureStore procedureStore;
460
461  // the master local storage to store procedure data, etc.
462  private MasterRegion masterRegion;
463
464  // handle table states
465  private TableStateManager tableStateManager;
466
467  private long splitPlanCount;
468  private long mergePlanCount;
469
470  /** jetty server for master to redirect requests to regionserver infoServer */
471  private Server masterJettyServer;
472
473  // Determine if we should do normal startup or minimal "single-user" mode with no region
474  // servers and no user tables. Useful for repair and recovery of hbase:meta
475  private final boolean maintenanceMode;
476  static final String MAINTENANCE_MODE = "hbase.master.maintenance_mode";
477
478  // Cached clusterId on stand by masters to serve clusterID requests from clients.
479  private final CachedClusterId cachedClusterId;
480
481  public static class RedirectServlet extends HttpServlet {
482    private static final long serialVersionUID = 2894774810058302473L;
483    private final int regionServerInfoPort;
484    private final String regionServerHostname;
485
486    /**
487     * @param infoServer that we're trying to send all requests to
488     * @param hostname may be null. if given, will be used for redirects instead of host from client.
489     */
490    public RedirectServlet(InfoServer infoServer, String hostname) {
491       regionServerInfoPort = infoServer.getPort();
492       regionServerHostname = hostname;
493    }
494
495    @Override
496    public void doGet(HttpServletRequest request,
497        HttpServletResponse response) throws ServletException, IOException {
498      String redirectHost = regionServerHostname;
499      if(redirectHost == null) {
500        redirectHost = request.getServerName();
501        if(!Addressing.isLocalAddress(InetAddress.getByName(redirectHost))) {
502          LOG.warn("Couldn't resolve '" + redirectHost + "' as an address local to this node and '" +
503              MASTER_HOSTNAME_KEY + "' is not set; client will get an HTTP 400 response. If " +
504              "your HBase deployment relies on client accessible names that the region server process " +
505              "can't resolve locally, then you should set the previously mentioned configuration variable " +
506              "to an appropriate hostname.");
507          // no sending client provided input back to the client, so the goal host is just in the logs.
508          response.sendError(400, "Request was to a host that I can't resolve for any of the network interfaces on " +
509              "this node. If this is due to an intermediary such as an HTTP load balancer or other proxy, your HBase " +
510              "administrator can set '" + MASTER_HOSTNAME_KEY + "' to point to the correct hostname.");
511          return;
512        }
513      }
514      // TODO this scheme should come from looking at the scheme registered in the infoserver's http server for the
515      // host and port we're using, but it's buried way too deep to do that ATM.
516      String redirectUrl = request.getScheme() + "://"
517        + redirectHost + ":" + regionServerInfoPort
518        + request.getRequestURI();
519      response.sendRedirect(redirectUrl);
520    }
521  }
522
523  /**
524   * Initializes the HMaster. The steps are as follows:
525   * <p>
526   * <ol>
527   * <li>Initialize the local HRegionServer
528   * <li>Start the ActiveMasterManager.
529   * </ol>
530   * <p>
531   * Remaining steps of initialization occur in
532   * {@link #finishActiveMasterInitialization(MonitoredTask)} after the master becomes the
533   * active one.
534   */
535  public HMaster(final Configuration conf) throws IOException {
536    super(conf);
537    TraceUtil.initTracer(conf);
538    try {
539      if (conf.getBoolean(MAINTENANCE_MODE, false)) {
540        LOG.info("Detected {}=true via configuration.", MAINTENANCE_MODE);
541        maintenanceMode = true;
542      } else if (Boolean.getBoolean(MAINTENANCE_MODE)) {
543        LOG.info("Detected {}=true via environment variables.", MAINTENANCE_MODE);
544        maintenanceMode = true;
545      } else {
546        maintenanceMode = false;
547      }
548      this.rsFatals = new MemoryBoundedLogMessageBuffer(
549          conf.getLong("hbase.master.buffer.for.rs.fatals", 1 * 1024 * 1024));
550      LOG.info("hbase.rootdir={}, hbase.cluster.distributed={}", getDataRootDir(),
551          this.conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, false));
552
553      // Disable usage of meta replicas in the master
554      this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
555
556      decorateMasterConfiguration(this.conf);
557
558      // Hack! Maps DFSClient => Master for logs.  HDFS made this
559      // config param for task trackers, but we can piggyback off of it.
560      if (this.conf.get("mapreduce.task.attempt.id") == null) {
561        this.conf.set("mapreduce.task.attempt.id", "hb_m_" + this.serverName.toString());
562      }
563
564      this.metricsMaster = new MetricsMaster(new MetricsMasterWrapperImpl(this));
565
566      // preload table descriptor at startup
567      this.preLoadTableDescriptors = conf.getBoolean("hbase.master.preload.tabledescriptors", true);
568
569      this.maxBalancingTime = getMaxBalancingTime();
570      this.maxRitPercent = conf.getDouble(HConstants.HBASE_MASTER_BALANCER_MAX_RIT_PERCENT,
571          HConstants.DEFAULT_HBASE_MASTER_BALANCER_MAX_RIT_PERCENT);
572
573      // Do we publish the status?
574
575      boolean shouldPublish = conf.getBoolean(HConstants.STATUS_PUBLISHED,
576          HConstants.STATUS_PUBLISHED_DEFAULT);
577      Class<? extends ClusterStatusPublisher.Publisher> publisherClass =
578          conf.getClass(ClusterStatusPublisher.STATUS_PUBLISHER_CLASS,
579              ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS,
580              ClusterStatusPublisher.Publisher.class);
581
582      if (shouldPublish) {
583        if (publisherClass == null) {
584          LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
585              ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS +
586              " is not set - not publishing status");
587        } else {
588          clusterStatusPublisherChore = new ClusterStatusPublisher(this, conf, publisherClass);
589          LOG.debug("Created {}", this.clusterStatusPublisherChore);
590          getChoreService().scheduleChore(clusterStatusPublisherChore);
591        }
592      }
593
594      this.metaRegionLocationCache = new MetaRegionLocationCache(this.zooKeeper);
595      this.activeMasterManager = createActiveMasterManager(zooKeeper, serverName, this);
596
597      cachedClusterId = new CachedClusterId(this, conf);
598    } catch (Throwable t) {
599      // Make sure we log the exception. HMaster is often started via reflection and the
600      // cause of failed startup is lost.
601      LOG.error("Failed construction of Master", t);
602      throw t;
603    }
604  }
605
606  /**
607   * Protected to have custom implementations in tests override the default ActiveMaster
608   * implementation.
609   */
610  protected ActiveMasterManager createActiveMasterManager(ZKWatcher zk, ServerName sn,
611      org.apache.hadoop.hbase.Server server) throws InterruptedIOException {
612    return new ActiveMasterManager(zk, sn, server);
613  }
614
615  @Override
616  protected String getUseThisHostnameInstead(Configuration conf) {
617    return conf.get(MASTER_HOSTNAME_KEY);
618  }
619
620  // Main run loop. Calls through to the regionserver run loop AFTER becoming active Master; will
621  // block in here until then.
622  @Override
623  public void run() {
624    try {
625      Threads.setDaemonThreadRunning(new Thread(() -> {
626        try {
627          int infoPort = putUpJettyServer();
628          startActiveMasterManager(infoPort);
629        } catch (Throwable t) {
630          // Make sure we log the exception.
631          String error = "Failed to become Active Master";
632          LOG.error(error, t);
633          // Abort should have been called already.
634          if (!isAborted()) {
635            abort(error, t);
636          }
637        }
638      }), getName() + ":becomeActiveMaster");
639      // Fall in here even if we have been aborted. Need to run the shutdown services and
640      // the super run call will do this for us.
641      super.run();
642    } finally {
643      if (this.clusterSchemaService != null) {
644        // If on way out, then we are no longer active master.
645        this.clusterSchemaService.stopAsync();
646        try {
647          this.clusterSchemaService.awaitTerminated(
648              getConfiguration().getInt(HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS,
649              DEFAULT_HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS), TimeUnit.SECONDS);
650        } catch (TimeoutException te) {
651          LOG.warn("Failed shutdown of clusterSchemaService", te);
652        }
653      }
654      this.activeMaster = false;
655    }
656  }
657
658  // return the actual infoPort, -1 means disable info server.
659  private int putUpJettyServer() throws IOException {
660    if (!conf.getBoolean("hbase.master.infoserver.redirect", true)) {
661      return -1;
662    }
663    final int infoPort = conf.getInt("hbase.master.info.port.orig",
664      HConstants.DEFAULT_MASTER_INFOPORT);
665    // -1 is for disabling info server, so no redirecting
666    if (infoPort < 0 || infoServer == null) {
667      return -1;
668    }
669    if(infoPort == infoServer.getPort()) {
670      return infoPort;
671    }
672    final String addr = conf.get("hbase.master.info.bindAddress", "0.0.0.0");
673    if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) {
674      String msg =
675          "Failed to start redirecting jetty server. Address " + addr
676              + " does not belong to this host. Correct configuration parameter: "
677              + "hbase.master.info.bindAddress";
678      LOG.error(msg);
679      throw new IOException(msg);
680    }
681
682    // TODO I'm pretty sure we could just add another binding to the InfoServer run by
683    // the RegionServer and have it run the RedirectServlet instead of standing up
684    // a second entire stack here.
685    masterJettyServer = new Server();
686    final ServerConnector connector = new ServerConnector(masterJettyServer);
687    connector.setHost(addr);
688    connector.setPort(infoPort);
689    masterJettyServer.addConnector(connector);
690    masterJettyServer.setStopAtShutdown(true);
691
692    final String redirectHostname =
693        StringUtils.isBlank(useThisHostnameInstead) ? null : useThisHostnameInstead;
694
695    final RedirectServlet redirect = new RedirectServlet(infoServer, redirectHostname);
696    final WebAppContext context = new WebAppContext(null, "/", null, null, null, null, WebAppContext.NO_SESSIONS);
697    context.addServlet(new ServletHolder(redirect), "/*");
698    context.setServer(masterJettyServer);
699
700    try {
701      masterJettyServer.start();
702    } catch (Exception e) {
703      throw new IOException("Failed to start redirecting jetty server", e);
704    }
705    return connector.getLocalPort();
706  }
707
708  /**
709   * For compatibility, if failed with regionserver credentials, try the master one
710   */
711  @Override
712  protected void login(UserProvider user, String host) throws IOException {
713    try {
714      super.login(user, host);
715    } catch (IOException ie) {
716      user.login(SecurityConstants.MASTER_KRB_KEYTAB_FILE,
717              SecurityConstants.MASTER_KRB_PRINCIPAL, host);
718    }
719  }
720
721  /**
722   * If configured to put regions on active master,
723   * wait till a backup master becomes active.
724   * Otherwise, loop till the server is stopped or aborted.
725   */
726  @Override
727  protected void waitForMasterActive(){
728    if (maintenanceMode) {
729      return;
730    }
731    boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(conf);
732    while (!(tablesOnMaster && activeMaster) && !isStopped() && !isAborted()) {
733      sleeper.sleep();
734    }
735  }
736
737  @VisibleForTesting
738  public MasterRpcServices getMasterRpcServices() {
739    return (MasterRpcServices)rpcServices;
740  }
741
742  public boolean balanceSwitch(final boolean b) throws IOException {
743    return getMasterRpcServices().switchBalancer(b, BalanceSwitchMode.ASYNC);
744  }
745
746  @Override
747  protected String getProcessName() {
748    return MASTER;
749  }
750
751  @Override
752  protected boolean canCreateBaseZNode() {
753    return true;
754  }
755
756  @Override
757  protected boolean canUpdateTableDescriptor() {
758    return true;
759  }
760
761  @Override
762  protected boolean cacheTableDescriptor() {
763    return true;
764  }
765
766  @Override
767  protected RSRpcServices createRpcServices() throws IOException {
768    return new MasterRpcServices(this);
769  }
770
771  @Override
772  protected void configureInfoServer() {
773    infoServer.addUnprivilegedServlet("master-status", "/master-status", MasterStatusServlet.class);
774    infoServer.setAttribute(MASTER, this);
775    if (LoadBalancer.isTablesOnMaster(conf)) {
776      super.configureInfoServer();
777    }
778  }
779
780  @Override
781  protected Class<? extends HttpServlet> getDumpServlet() {
782    return MasterDumpServlet.class;
783  }
784
785  @Override
786  public MetricsMaster getMasterMetrics() {
787    return metricsMaster;
788  }
789
790  /**
791   * <p>
792   * Initialize all ZK based system trackers. But do not include {@link RegionServerTracker}, it
793   * should have already been initialized along with {@link ServerManager}.
794   * </p>
795   * <p>
796   * Will be overridden in tests.
797   * </p>
798   */
799  @VisibleForTesting
800  protected void initializeZKBasedSystemTrackers()
801      throws IOException, InterruptedException, KeeperException, ReplicationException {
802    this.balancer = new RSGroupBasedLoadBalancer();
803    this.balancer.setConf(conf);
804    this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this);
805    this.loadBalancerTracker.start();
806
807    this.normalizer = RegionNormalizerFactory.getRegionNormalizer(conf);
808    this.normalizer.setMasterServices(this);
809    this.regionNormalizerTracker = new RegionNormalizerTracker(zooKeeper, this);
810    this.regionNormalizerTracker.start();
811
812    this.splitOrMergeTracker = new SplitOrMergeTracker(zooKeeper, conf, this);
813    this.splitOrMergeTracker.start();
814
815    // This is for backwards compatible. We do not need the CP for rs group now but if user want to
816    // load it, we need to enable rs group.
817    String[] cpClasses = conf.getStrings(MasterCoprocessorHost.MASTER_COPROCESSOR_CONF_KEY);
818    if (cpClasses != null) {
819      for (String cpClass : cpClasses) {
820        if (RSGroupAdminEndpoint.class.getName().equals(cpClass)) {
821          RSGroupUtil.enableRSGroup(conf);
822          break;
823        }
824      }
825    }
826    this.rsGroupInfoManager = RSGroupInfoManager.create(this);
827
828    this.replicationPeerManager = ReplicationPeerManager.create(zooKeeper, conf, clusterId);
829
830    this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, this.serverManager);
831    this.drainingServerTracker.start();
832
833    this.snapshotCleanupTracker = new SnapshotCleanupTracker(zooKeeper, this);
834    this.snapshotCleanupTracker.start();
835
836    String clientQuorumServers = conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM);
837    boolean clientZkObserverMode = conf.getBoolean(HConstants.CLIENT_ZOOKEEPER_OBSERVER_MODE,
838      HConstants.DEFAULT_CLIENT_ZOOKEEPER_OBSERVER_MODE);
839    if (clientQuorumServers != null && !clientZkObserverMode) {
840      // we need to take care of the ZK information synchronization
841      // if given client ZK are not observer nodes
842      ZKWatcher clientZkWatcher = new ZKWatcher(conf,
843          getProcessName() + ":" + rpcServices.getSocketAddress().getPort() + "-clientZK", this,
844          false, true);
845      this.metaLocationSyncer = new MetaLocationSyncer(zooKeeper, clientZkWatcher, this);
846      this.metaLocationSyncer.start();
847      this.masterAddressSyncer = new MasterAddressSyncer(zooKeeper, clientZkWatcher, this);
848      this.masterAddressSyncer.start();
849      // set cluster id is a one-go effort
850      ZKClusterId.setClusterId(clientZkWatcher, fileSystemManager.getClusterId());
851    }
852
853    // Set the cluster as up.  If new RSs, they'll be waiting on this before
854    // going ahead with their startup.
855    boolean wasUp = this.clusterStatusTracker.isClusterUp();
856    if (!wasUp) this.clusterStatusTracker.setClusterUp();
857
858    LOG.info("Active/primary master=" + this.serverName +
859        ", sessionid=0x" +
860        Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) +
861        ", setting cluster-up flag (Was=" + wasUp + ")");
862
863    // create/initialize the snapshot manager and other procedure managers
864    this.snapshotManager = new SnapshotManager();
865    this.mpmHost = new MasterProcedureManagerHost();
866    this.mpmHost.register(this.snapshotManager);
867    this.mpmHost.register(new MasterFlushTableProcedureManager());
868    this.mpmHost.loadProcedures(conf);
869    this.mpmHost.initialize(this, this.metricsMaster);
870  }
871
872  // Will be overriden in test to inject customized AssignmentManager
873  @VisibleForTesting
874  protected AssignmentManager createAssignmentManager(MasterServices master) {
875    return new AssignmentManager(master);
876  }
877
878  /**
879   * Finish initialization of HMaster after becoming the primary master.
880   * <p/>
881   * The startup order is a bit complicated but very important, do not change it unless you know
882   * what you are doing.
883   * <ol>
884   * <li>Initialize file system based components - file system manager, wal manager, table
885   * descriptors, etc</li>
886   * <li>Publish cluster id</li>
887   * <li>Here comes the most complicated part - initialize server manager, assignment manager and
888   * region server tracker
889   * <ol type='i'>
890   * <li>Create server manager</li>
891   * <li>Create procedure executor, load the procedures, but do not start workers. We will start it
892   * later after we finish scheduling SCPs to avoid scheduling duplicated SCPs for the same
893   * server</li>
894   * <li>Create assignment manager and start it, load the meta region state, but do not load data
895   * from meta region</li>
896   * <li>Start region server tracker, construct the online servers set and find out dead servers and
897   * schedule SCP for them. The online servers will be constructed by scanning zk, and we will also
898   * scan the wal directory to find out possible live region servers, and the differences between
899   * these two sets are the dead servers</li>
900   * </ol>
901   * </li>
902   * <li>If this is a new deploy, schedule a InitMetaProcedure to initialize meta</li>
903   * <li>Start necessary service threads - balancer, catalog janior, executor services, and also the
904   * procedure executor, etc. Notice that the balancer must be created first as assignment manager
905   * may use it when assigning regions.</li>
906   * <li>Wait for meta to be initialized if necesssary, start table state manager.</li>
907   * <li>Wait for enough region servers to check-in</li>
908   * <li>Let assignment manager load data from meta and construct region states</li>
909   * <li>Start all other things such as chore services, etc</li>
910   * </ol>
911   * <p/>
912   * Notice that now we will not schedule a special procedure to make meta online(unless the first
913   * time where meta has not been created yet), we will rely on SCP to bring meta online.
914   */
915  private void finishActiveMasterInitialization(MonitoredTask status) throws IOException,
916          InterruptedException, KeeperException, ReplicationException {
917    /*
918     * We are active master now... go initialize components we need to run.
919     */
920    status.setStatus("Initializing Master file system");
921
922    this.masterActiveTime = System.currentTimeMillis();
923    // TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring.
924
925    // always initialize the MemStoreLAB as we use a region to store data in master now, see
926    // localStore.
927    initializeMemStoreChunkCreator();
928    this.fileSystemManager = new MasterFileSystem(conf);
929    this.walManager = new MasterWalManager(this);
930
931    // warm-up HTDs cache on master initialization
932    if (preLoadTableDescriptors) {
933      status.setStatus("Pre-loading table descriptors");
934      this.tableDescriptors.getAll();
935    }
936
937    // Publish cluster ID; set it in Master too. The superclass RegionServer does this later but
938    // only after it has checked in with the Master. At least a few tests ask Master for clusterId
939    // before it has called its run method and before RegionServer has done the reportForDuty.
940    ClusterId clusterId = fileSystemManager.getClusterId();
941    status.setStatus("Publishing Cluster ID " + clusterId + " in ZooKeeper");
942    ZKClusterId.setClusterId(this.zooKeeper, fileSystemManager.getClusterId());
943    this.clusterId = clusterId.toString();
944
945    // Precaution. Put in place the old hbck1 lock file to fence out old hbase1s running their
946    // hbck1s against an hbase2 cluster; it could do damage. To skip this behavior, set
947    // hbase.write.hbck1.lock.file to false.
948    if (this.conf.getBoolean("hbase.write.hbck1.lock.file", true)) {
949      Pair<Path, FSDataOutputStream> result = null;
950      try {
951        result = HBaseFsck.checkAndMarkRunningHbck(this.conf,
952            HBaseFsck.createLockRetryCounterFactory(this.conf).create());
953      } finally {
954        if (result != null) {
955          IOUtils.closeQuietly(result.getSecond());
956        }
957      }
958    }
959
960    status.setStatus("Initialize ServerManager and schedule SCP for crash servers");
961    // The below two managers must be created before loading procedures, as they will be used during
962    // loading.
963    this.serverManager = createServerManager(this);
964    this.syncReplicationReplayWALManager = new SyncReplicationReplayWALManager(this);
965    if (!conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
966      DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) {
967      this.splitWALManager = new SplitWALManager(this);
968    }
969
970    // initialize master local region
971    masterRegion = MasterRegionFactory.create(this);
972    createProcedureExecutor();
973    Map<Class<?>, List<Procedure<MasterProcedureEnv>>> procsByType =
974      procedureExecutor.getActiveProceduresNoCopy().stream()
975        .collect(Collectors.groupingBy(p -> p.getClass()));
976
977    // Create Assignment Manager
978    this.assignmentManager = createAssignmentManager(this);
979    this.assignmentManager.start();
980    // TODO: TRSP can perform as the sub procedure for other procedures, so even if it is marked as
981    // completed, it could still be in the procedure list. This is a bit strange but is another
982    // story, need to verify the implementation for ProcedureExecutor and ProcedureStore.
983    List<TransitRegionStateProcedure> ritList =
984      procsByType.getOrDefault(TransitRegionStateProcedure.class, Collections.emptyList()).stream()
985        .filter(p -> !p.isFinished()).map(p -> (TransitRegionStateProcedure) p)
986        .collect(Collectors.toList());
987    this.assignmentManager.setupRIT(ritList);
988
989    // Start RegionServerTracker with listing of servers found with exiting SCPs -- these should
990    // be registered in the deadServers set -- and with the list of servernames out on the
991    // filesystem that COULD BE 'alive' (we'll schedule SCPs for each and let SCP figure it out).
992    // We also pass dirs that are already 'splitting'... so we can do some checks down in tracker.
993    // TODO: Generate the splitting and live Set in one pass instead of two as we currently do.
994    this.regionServerTracker = new RegionServerTracker(zooKeeper, this, this.serverManager);
995    this.regionServerTracker.start(
996      procsByType.getOrDefault(ServerCrashProcedure.class, Collections.emptyList()).stream()
997        .map(p -> (ServerCrashProcedure) p).map(p -> p.getServerName()).collect(Collectors.toSet()),
998      walManager.getLiveServersFromWALDir(), walManager.getSplittingServersFromWALDir());
999    // This manager will be started AFTER hbase:meta is confirmed on line.
1000    // hbase.mirror.table.state.to.zookeeper is so hbase1 clients can connect. They read table
1001    // state from zookeeper while hbase2 reads it from hbase:meta. Disable if no hbase1 clients.
1002    this.tableStateManager =
1003      this.conf.getBoolean(MirroringTableStateManager.MIRROR_TABLE_STATE_TO_ZK_KEY, true)
1004        ?
1005        new MirroringTableStateManager(this):
1006        new TableStateManager(this);
1007
1008    status.setStatus("Initializing ZK system trackers");
1009    initializeZKBasedSystemTrackers();
1010    status.setStatus("Loading last flushed sequence id of regions");
1011    try {
1012      this.serverManager.loadLastFlushedSequenceIds();
1013    } catch (IOException e) {
1014      LOG.info("Failed to load last flushed sequence id of regions"
1015          + " from file system", e);
1016    }
1017    // Set ourselves as active Master now our claim has succeeded up in zk.
1018    this.activeMaster = true;
1019
1020    // Start the Zombie master detector after setting master as active, see HBASE-21535
1021    Thread zombieDetector = new Thread(new InitializationMonitor(this),
1022        "ActiveMasterInitializationMonitor-" + System.currentTimeMillis());
1023    zombieDetector.setDaemon(true);
1024    zombieDetector.start();
1025
1026    if (!maintenanceMode) {
1027      // Add the Observer to delete quotas on table deletion before starting all CPs by
1028      // default with quota support, avoiding if user specifically asks to not load this Observer.
1029      if (QuotaUtil.isQuotaEnabled(conf)) {
1030        updateConfigurationForQuotasObserver(conf);
1031      }
1032      // initialize master side coprocessors before we start handling requests
1033      status.setStatus("Initializing master coprocessors");
1034      this.cpHost = new MasterCoprocessorHost(this, this.conf);
1035    }
1036
1037    // Checking if meta needs initializing.
1038    status.setStatus("Initializing meta table if this is a new deploy");
1039    InitMetaProcedure initMetaProc = null;
1040    // Print out state of hbase:meta on startup; helps debugging.
1041    RegionState rs = this.assignmentManager.getRegionStates().
1042        getRegionState(RegionInfoBuilder.FIRST_META_REGIONINFO);
1043    LOG.info("hbase:meta {}", rs);
1044    if (rs != null && rs.isOffline()) {
1045      Optional<InitMetaProcedure> optProc = procedureExecutor.getProcedures().stream()
1046        .filter(p -> p instanceof InitMetaProcedure).map(o -> (InitMetaProcedure) o).findAny();
1047      initMetaProc = optProc.orElseGet(() -> {
1048        // schedule an init meta procedure if meta has not been deployed yet
1049        InitMetaProcedure temp = new InitMetaProcedure();
1050        procedureExecutor.submitProcedure(temp);
1051        return temp;
1052      });
1053    }
1054
1055    // initialize load balancer
1056    this.balancer.setMasterServices(this);
1057    this.balancer.setClusterMetrics(getClusterMetricsWithoutCoprocessor());
1058    this.balancer.initialize();
1059
1060    // start up all service threads.
1061    status.setStatus("Initializing master service threads");
1062    startServiceThreads();
1063    // wait meta to be initialized after we start procedure executor
1064    if (initMetaProc != null) {
1065      initMetaProc.await();
1066    }
1067    // Wake up this server to check in
1068    sleeper.skipSleepCycle();
1069
1070    // Wait for region servers to report in.
1071    // With this as part of master initialization, it precludes our being able to start a single
1072    // server that is both Master and RegionServer. Needs more thought. TODO.
1073    String statusStr = "Wait for region servers to report in";
1074    status.setStatus(statusStr);
1075    LOG.info(Objects.toString(status));
1076    waitForRegionServers(status);
1077
1078    // Check if master is shutting down because issue initializing regionservers or balancer.
1079    if (isStopped()) {
1080      return;
1081    }
1082
1083    status.setStatus("Starting assignment manager");
1084    // FIRST HBASE:META READ!!!!
1085    // The below cannot make progress w/o hbase:meta being online.
1086    // This is the FIRST attempt at going to hbase:meta. Meta on-lining is going on in background
1087    // as procedures run -- in particular SCPs for crashed servers... One should put up hbase:meta
1088    // if it is down. It may take a while to come online. So, wait here until meta if for sure
1089    // available. That's what waitForMetaOnline does.
1090    if (!waitForMetaOnline()) {
1091      return;
1092    }
1093    this.assignmentManager.joinCluster();
1094    // The below depends on hbase:meta being online.
1095    this.tableStateManager.start();
1096    // Below has to happen after tablestatemanager has started in the case where this hbase-2.x
1097    // is being started over an hbase-1.x dataset. tablestatemanager runs a migration as part
1098    // of its 'start' moving table state from zookeeper to hbase:meta. This migration needs to
1099    // complete before we do this next step processing offline regions else it fails reading
1100    // table states messing up master launch (namespace table, etc., are not assigned).
1101    this.assignmentManager.processOfflineRegions();
1102    // Initialize after meta is up as below scans meta
1103    if (getFavoredNodesManager() != null && !maintenanceMode) {
1104      SnapshotOfRegionAssignmentFromMeta snapshotOfRegionAssignment =
1105          new SnapshotOfRegionAssignmentFromMeta(getConnection());
1106      snapshotOfRegionAssignment.initialize();
1107      getFavoredNodesManager().initialize(snapshotOfRegionAssignment);
1108    }
1109
1110    // set cluster status again after user regions are assigned
1111    this.balancer.setClusterMetrics(getClusterMetricsWithoutCoprocessor());
1112
1113    // Start balancer and meta catalog janitor after meta and regions have been assigned.
1114    status.setStatus("Starting balancer and catalog janitor");
1115    this.clusterStatusChore = new ClusterStatusChore(this, balancer);
1116    getChoreService().scheduleChore(clusterStatusChore);
1117    this.balancerChore = new BalancerChore(this);
1118    getChoreService().scheduleChore(balancerChore);
1119    this.normalizerChore = new RegionNormalizerChore(this);
1120    getChoreService().scheduleChore(normalizerChore);
1121    this.catalogJanitorChore = new CatalogJanitor(this);
1122    getChoreService().scheduleChore(catalogJanitorChore);
1123    this.hbckChore = new HbckChore(this);
1124    getChoreService().scheduleChore(hbckChore);
1125    this.serverManager.startChore();
1126
1127    // Only for rolling upgrade, where we need to migrate the data in namespace table to meta table.
1128    if (!waitForNamespaceOnline()) {
1129      return;
1130    }
1131    status.setStatus("Starting cluster schema service");
1132    initClusterSchemaService();
1133
1134    if (this.cpHost != null) {
1135      try {
1136        this.cpHost.preMasterInitialization();
1137      } catch (IOException e) {
1138        LOG.error("Coprocessor preMasterInitialization() hook failed", e);
1139      }
1140    }
1141
1142    status.markComplete("Initialization successful");
1143    LOG.info(String.format("Master has completed initialization %.3fsec",
1144       (System.currentTimeMillis() - masterActiveTime) / 1000.0f));
1145    this.masterFinishedInitializationTime = System.currentTimeMillis();
1146    configurationManager.registerObserver(this.balancer);
1147    configurationManager.registerObserver(this.cleanerPool);
1148    configurationManager.registerObserver(this.hfileCleaner);
1149    configurationManager.registerObserver(this.logCleaner);
1150    configurationManager.registerObserver(this.regionsRecoveryConfigManager);
1151    // Set master as 'initialized'.
1152    setInitialized(true);
1153
1154    if (maintenanceMode) {
1155      LOG.info("Detected repair mode, skipping final initialization steps.");
1156      return;
1157    }
1158
1159    assignmentManager.checkIfShouldMoveSystemRegionAsync();
1160    status.setStatus("Assign meta replicas");
1161    MasterMetaBootstrap metaBootstrap = createMetaBootstrap();
1162    try {
1163      metaBootstrap.assignMetaReplicas();
1164    } catch (IOException | KeeperException e){
1165      LOG.error("Assigning meta replica failed: ", e);
1166    }
1167    status.setStatus("Starting quota manager");
1168    initQuotaManager();
1169    if (QuotaUtil.isQuotaEnabled(conf)) {
1170      // Create the quota snapshot notifier
1171      spaceQuotaSnapshotNotifier = createQuotaSnapshotNotifier();
1172      spaceQuotaSnapshotNotifier.initialize(getConnection());
1173      this.quotaObserverChore = new QuotaObserverChore(this, getMasterMetrics());
1174      // Start the chore to read the region FS space reports and act on them
1175      getChoreService().scheduleChore(quotaObserverChore);
1176
1177      this.snapshotQuotaChore = new SnapshotQuotaObserverChore(this, getMasterMetrics());
1178      // Start the chore to read snapshots and add their usage to table/NS quotas
1179      getChoreService().scheduleChore(snapshotQuotaChore);
1180    }
1181    final SlowLogMasterService slowLogMasterService = new SlowLogMasterService(conf, this);
1182    slowLogMasterService.init();
1183
1184    // clear the dead servers with same host name and port of online server because we are not
1185    // removing dead server with same hostname and port of rs which is trying to check in before
1186    // master initialization. See HBASE-5916.
1187    this.serverManager.clearDeadServersWithSameHostNameAndPortOfOnlineServer();
1188
1189    // Check and set the znode ACLs if needed in case we are overtaking a non-secure configuration
1190    status.setStatus("Checking ZNode ACLs");
1191    zooKeeper.checkAndSetZNodeAcls();
1192
1193    status.setStatus("Initializing MOB Cleaner");
1194    initMobCleaner();
1195
1196    status.setStatus("Calling postStartMaster coprocessors");
1197    if (this.cpHost != null) {
1198      // don't let cp initialization errors kill the master
1199      try {
1200        this.cpHost.postStartMaster();
1201      } catch (IOException ioe) {
1202        LOG.error("Coprocessor postStartMaster() hook failed", ioe);
1203      }
1204    }
1205
1206    zombieDetector.interrupt();
1207
1208    /*
1209     * After master has started up, lets do balancer post startup initialization. Since this runs
1210     * in activeMasterManager thread, it should be fine.
1211     */
1212    long start = System.currentTimeMillis();
1213    this.balancer.postMasterStartupInitialize();
1214    if (LOG.isDebugEnabled()) {
1215      LOG.debug("Balancer post startup initialization complete, took " + (
1216          (System.currentTimeMillis() - start) / 1000) + " seconds");
1217    }
1218  }
1219
1220  /**
1221   * Check hbase:meta is up and ready for reading. For use during Master startup only.
1222   * @return True if meta is UP and online and startup can progress. Otherwise, meta is not online
1223   *   and we will hold here until operator intervention.
1224   */
1225  @VisibleForTesting
1226  public boolean waitForMetaOnline() {
1227    return isRegionOnline(RegionInfoBuilder.FIRST_META_REGIONINFO);
1228  }
1229
1230  /**
1231   * @return True if region is online and scannable else false if an error or shutdown (Otherwise
1232   *   we just block in here holding up all forward-progess).
1233   */
1234  private boolean isRegionOnline(RegionInfo ri) {
1235    RetryCounter rc = null;
1236    while (!isStopped()) {
1237      RegionState rs = this.assignmentManager.getRegionStates().getRegionState(ri);
1238      if (rs.isOpened()) {
1239        if (this.getServerManager().isServerOnline(rs.getServerName())) {
1240          return true;
1241        }
1242      }
1243      // Region is not OPEN.
1244      Optional<Procedure<MasterProcedureEnv>> optProc = this.procedureExecutor.getProcedures().
1245          stream().filter(p -> p instanceof ServerCrashProcedure).findAny();
1246      // TODO: Add a page to refguide on how to do repair. Have this log message point to it.
1247      // Page will talk about loss of edits, how to schedule at least the meta WAL recovery, and
1248      // then how to assign including how to break region lock if one held.
1249      LOG.warn("{} is NOT online; state={}; ServerCrashProcedures={}. Master startup cannot " +
1250          "progress, in holding-pattern until region onlined.",
1251          ri.getRegionNameAsString(), rs, optProc.isPresent());
1252      // Check once-a-minute.
1253      if (rc == null) {
1254        rc = new RetryCounterFactory(1000).create();
1255      }
1256      Threads.sleep(rc.getBackoffTimeAndIncrementAttempts());
1257    }
1258    return false;
1259  }
1260
1261  /**
1262   * Check hbase:namespace table is assigned. If not, startup will hang looking for the ns table
1263   * <p/>
1264   * This is for rolling upgrading, later we will migrate the data in ns table to the ns family of
1265   * meta table. And if this is a new cluster, this method will return immediately as there will be
1266   * no namespace table/region.
1267   * @return True if namespace table is up/online.
1268   */
1269  private boolean waitForNamespaceOnline() throws IOException {
1270    TableState nsTableState =
1271      MetaTableAccessor.getTableState(getConnection(), TableName.NAMESPACE_TABLE_NAME);
1272    if (nsTableState == null || nsTableState.isDisabled()) {
1273      // this means we have already migrated the data and disabled or deleted the namespace table,
1274      // or this is a new deploy which does not have a namespace table from the beginning.
1275      return true;
1276    }
1277    List<RegionInfo> ris =
1278      this.assignmentManager.getRegionStates().getRegionsOfTable(TableName.NAMESPACE_TABLE_NAME);
1279    if (ris.isEmpty()) {
1280      // maybe this will not happen any more, but anyway, no harm to add a check here...
1281      return true;
1282    }
1283    // Else there are namespace regions up in meta. Ensure they are assigned before we go on.
1284    for (RegionInfo ri : ris) {
1285      if (!isRegionOnline(ri)) {
1286        return false;
1287      }
1288    }
1289    return true;
1290  }
1291
1292  /**
1293   * Adds the {@code MasterQuotasObserver} to the list of configured Master observers to
1294   * automatically remove quotas for a table when that table is deleted.
1295   */
1296  @VisibleForTesting
1297  public void updateConfigurationForQuotasObserver(Configuration conf) {
1298    // We're configured to not delete quotas on table deletion, so we don't need to add the obs.
1299    if (!conf.getBoolean(
1300          MasterQuotasObserver.REMOVE_QUOTA_ON_TABLE_DELETE,
1301          MasterQuotasObserver.REMOVE_QUOTA_ON_TABLE_DELETE_DEFAULT)) {
1302      return;
1303    }
1304    String[] masterCoprocs = conf.getStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY);
1305    final int length = null == masterCoprocs ? 0 : masterCoprocs.length;
1306    String[] updatedCoprocs = new String[length + 1];
1307    if (length > 0) {
1308      System.arraycopy(masterCoprocs, 0, updatedCoprocs, 0, masterCoprocs.length);
1309    }
1310    updatedCoprocs[length] = MasterQuotasObserver.class.getName();
1311    conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, updatedCoprocs);
1312  }
1313
1314  private void initMobCleaner() {
1315    this.mobFileCleanerChore = new MobFileCleanerChore(this);
1316    getChoreService().scheduleChore(mobFileCleanerChore);
1317    this.mobFileCompactionChore = new MobFileCompactionChore(this);
1318    getChoreService().scheduleChore(mobFileCompactionChore);
1319  }
1320
1321  /**
1322   * <p>
1323   * Create a {@link MasterMetaBootstrap} instance.
1324   * </p>
1325   * <p>
1326   * Will be overridden in tests.
1327   * </p>
1328   */
1329  @VisibleForTesting
1330  protected MasterMetaBootstrap createMetaBootstrap() {
1331    // We put this out here in a method so can do a Mockito.spy and stub it out
1332    // w/ a mocked up MasterMetaBootstrap.
1333    return new MasterMetaBootstrap(this);
1334  }
1335
1336  /**
1337   * <p>
1338   * Create a {@link ServerManager} instance.
1339   * </p>
1340   * <p>
1341   * Will be overridden in tests.
1342   * </p>
1343   */
1344  @VisibleForTesting
1345  protected ServerManager createServerManager(final MasterServices master) throws IOException {
1346    // We put this out here in a method so can do a Mockito.spy and stub it out
1347    // w/ a mocked up ServerManager.
1348    setupClusterConnection();
1349    return new ServerManager(master);
1350  }
1351
1352  private void waitForRegionServers(final MonitoredTask status)
1353      throws IOException, InterruptedException {
1354    this.serverManager.waitForRegionServers(status);
1355  }
1356
1357  // Will be overridden in tests
1358  @VisibleForTesting
1359  protected void initClusterSchemaService() throws IOException, InterruptedException {
1360    this.clusterSchemaService = new ClusterSchemaServiceImpl(this);
1361    this.clusterSchemaService.startAsync();
1362    try {
1363      this.clusterSchemaService.awaitRunning(getConfiguration().getInt(
1364        HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS,
1365        DEFAULT_HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS), TimeUnit.SECONDS);
1366    } catch (TimeoutException toe) {
1367      throw new IOException("Timedout starting ClusterSchemaService", toe);
1368    }
1369  }
1370
1371  private void initQuotaManager() throws IOException {
1372    MasterQuotaManager quotaManager = new MasterQuotaManager(this);
1373    quotaManager.start();
1374    this.quotaManager = quotaManager;
1375  }
1376
1377  private SpaceQuotaSnapshotNotifier createQuotaSnapshotNotifier() {
1378    SpaceQuotaSnapshotNotifier notifier =
1379        SpaceQuotaSnapshotNotifierFactory.getInstance().create(getConfiguration());
1380    return notifier;
1381  }
1382
1383  boolean isCatalogJanitorEnabled() {
1384    return catalogJanitorChore != null ? catalogJanitorChore.getEnabled() : false;
1385  }
1386
1387  boolean isCleanerChoreEnabled() {
1388    boolean hfileCleanerFlag = true, logCleanerFlag = true;
1389
1390    if (hfileCleaner != null) {
1391      hfileCleanerFlag = hfileCleaner.getEnabled();
1392    }
1393
1394    if (logCleaner != null) {
1395      logCleanerFlag = logCleaner.getEnabled();
1396    }
1397
1398    return (hfileCleanerFlag && logCleanerFlag);
1399  }
1400
1401  @Override
1402  public ServerManager getServerManager() {
1403    return this.serverManager;
1404  }
1405
1406  @Override
1407  public MasterFileSystem getMasterFileSystem() {
1408    return this.fileSystemManager;
1409  }
1410
1411  @Override
1412  public MasterWalManager getMasterWalManager() {
1413    return this.walManager;
1414  }
1415
1416  @Override
1417  public SplitWALManager getSplitWALManager() {
1418    return splitWALManager;
1419  }
1420
1421  @Override
1422  public TableStateManager getTableStateManager() {
1423    return tableStateManager;
1424  }
1425
1426  /*
1427   * Start up all services. If any of these threads gets an unhandled exception
1428   * then they just die with a logged message.  This should be fine because
1429   * in general, we do not expect the master to get such unhandled exceptions
1430   *  as OOMEs; it should be lightly loaded. See what HRegionServer does if
1431   *  need to install an unexpected exception handler.
1432   */
1433  private void startServiceThreads() throws IOException {
1434    // Start the executor service pools
1435    this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION, conf.getInt(
1436      HConstants.MASTER_OPEN_REGION_THREADS, HConstants.MASTER_OPEN_REGION_THREADS_DEFAULT));
1437    this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION, conf.getInt(
1438      HConstants.MASTER_CLOSE_REGION_THREADS, HConstants.MASTER_CLOSE_REGION_THREADS_DEFAULT));
1439    this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
1440      conf.getInt(HConstants.MASTER_SERVER_OPERATIONS_THREADS,
1441        HConstants.MASTER_SERVER_OPERATIONS_THREADS_DEFAULT));
1442    this.executorService.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS,
1443      conf.getInt(HConstants.MASTER_META_SERVER_OPERATIONS_THREADS,
1444        HConstants.MASTER_META_SERVER_OPERATIONS_THREADS_DEFAULT));
1445    this.executorService.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS, conf.getInt(
1446      HConstants.MASTER_LOG_REPLAY_OPS_THREADS, HConstants.MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT));
1447    this.executorService.startExecutorService(ExecutorType.MASTER_SNAPSHOT_OPERATIONS, conf.getInt(
1448      SnapshotManager.SNAPSHOT_POOL_THREADS_KEY, SnapshotManager.SNAPSHOT_POOL_THREADS_DEFAULT));
1449
1450    // We depend on there being only one instance of this executor running
1451    // at a time. To do concurrency, would need fencing of enable/disable of
1452    // tables.
1453    // Any time changing this maxThreads to > 1, pls see the comment at
1454    // AccessController#postCompletedCreateTableAction
1455    this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
1456    startProcedureExecutor();
1457
1458    // Create cleaner thread pool
1459    cleanerPool = new DirScanPool(conf);
1460    // Start log cleaner thread
1461    int cleanerInterval =
1462      conf.getInt(HBASE_MASTER_CLEANER_INTERVAL, DEFAULT_HBASE_MASTER_CLEANER_INTERVAL);
1463    this.logCleaner = new LogCleaner(cleanerInterval, this, conf,
1464      getMasterWalManager().getFileSystem(), getMasterWalManager().getOldLogDir(), cleanerPool);
1465    getChoreService().scheduleChore(logCleaner);
1466
1467    // start the hfile archive cleaner thread
1468    Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
1469    Map<String, Object> params = new HashMap<>();
1470    params.put(MASTER, this);
1471    this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf,
1472      getMasterFileSystem().getFileSystem(), archiveDir, cleanerPool, params);
1473    getChoreService().scheduleChore(hfileCleaner);
1474
1475    // Regions Reopen based on very high storeFileRefCount is considered enabled
1476    // only if hbase.regions.recovery.store.file.ref.count has value > 0
1477    final int maxStoreFileRefCount = conf.getInt(
1478      HConstants.STORE_FILE_REF_COUNT_THRESHOLD,
1479      HConstants.DEFAULT_STORE_FILE_REF_COUNT_THRESHOLD);
1480    if (maxStoreFileRefCount > 0) {
1481      this.regionsRecoveryChore = new RegionsRecoveryChore(this, conf, this);
1482      getChoreService().scheduleChore(this.regionsRecoveryChore);
1483    } else {
1484      LOG.info("Reopening regions with very high storeFileRefCount is disabled. " +
1485          "Provide threshold value > 0 for {} to enable it.",
1486        HConstants.STORE_FILE_REF_COUNT_THRESHOLD);
1487    }
1488
1489    this.regionsRecoveryConfigManager = new RegionsRecoveryConfigManager(this);
1490
1491    replicationBarrierCleaner = new ReplicationBarrierCleaner(conf, this, getConnection(),
1492      replicationPeerManager);
1493    getChoreService().scheduleChore(replicationBarrierCleaner);
1494
1495    final boolean isSnapshotChoreEnabled = this.snapshotCleanupTracker
1496        .isSnapshotCleanupEnabled();
1497    this.snapshotCleanerChore = new SnapshotCleanerChore(this, conf, getSnapshotManager());
1498    if (isSnapshotChoreEnabled) {
1499      getChoreService().scheduleChore(this.snapshotCleanerChore);
1500    } else {
1501      if (LOG.isTraceEnabled()) {
1502        LOG.trace("Snapshot Cleaner Chore is disabled. Not starting up the chore..");
1503      }
1504    }
1505    serviceStarted = true;
1506    if (LOG.isTraceEnabled()) {
1507      LOG.trace("Started service threads");
1508    }
1509  }
1510
1511  @Override
1512  protected void stopServiceThreads() {
1513    if (masterJettyServer != null) {
1514      LOG.info("Stopping master jetty server");
1515      try {
1516        masterJettyServer.stop();
1517      } catch (Exception e) {
1518        LOG.error("Failed to stop master jetty server", e);
1519      }
1520    }
1521    stopChores();
1522
1523    super.stopServiceThreads();
1524    if (cleanerPool != null) {
1525      cleanerPool.shutdownNow();
1526      cleanerPool = null;
1527    }
1528
1529    LOG.debug("Stopping service threads");
1530
1531    // stop procedure executor prior to other services such as server manager and assignment
1532    // manager, as these services are important for some running procedures. See HBASE-24117 for
1533    // example.
1534    stopProcedureExecutor();
1535
1536    if (this.quotaManager != null) {
1537      this.quotaManager.stop();
1538    }
1539
1540    if (this.activeMasterManager != null) {
1541      this.activeMasterManager.stop();
1542    }
1543    if (this.serverManager != null) {
1544      this.serverManager.stop();
1545    }
1546    if (this.assignmentManager != null) {
1547      this.assignmentManager.stop();
1548    }
1549
1550    if (masterRegion != null) {
1551      masterRegion.close(isAborted());
1552    }
1553    if (this.walManager != null) {
1554      this.walManager.stop();
1555    }
1556    if (this.fileSystemManager != null) {
1557      this.fileSystemManager.stop();
1558    }
1559    if (this.mpmHost != null) {
1560      this.mpmHost.stop("server shutting down.");
1561    }
1562    if (this.regionServerTracker != null) {
1563      this.regionServerTracker.stop();
1564    }
1565  }
1566
1567  private void createProcedureExecutor() throws IOException {
1568    MasterProcedureEnv procEnv = new MasterProcedureEnv(this);
1569    procedureStore =
1570      new RegionProcedureStore(this, masterRegion, new MasterProcedureEnv.FsUtilsLeaseRecovery(this));
1571    procedureStore.registerListener(new ProcedureStoreListener() {
1572
1573      @Override
1574      public void abortProcess() {
1575        abort("The Procedure Store lost the lease", null);
1576      }
1577    });
1578    MasterProcedureScheduler procedureScheduler = procEnv.getProcedureScheduler();
1579    procedureExecutor = new ProcedureExecutor<>(conf, procEnv, procedureStore, procedureScheduler);
1580    configurationManager.registerObserver(procEnv);
1581
1582    int cpus = Runtime.getRuntime().availableProcessors();
1583    final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, Math.max(
1584      (cpus > 0 ? cpus / 4 : 0), MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS));
1585    final boolean abortOnCorruption =
1586      conf.getBoolean(MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION,
1587        MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION);
1588    procedureStore.start(numThreads);
1589    // Just initialize it but do not start the workers, we will start the workers later by calling
1590    // startProcedureExecutor. See the javadoc for finishActiveMasterInitialization for more
1591    // details.
1592    procedureExecutor.init(numThreads, abortOnCorruption);
1593    if (!procEnv.getRemoteDispatcher().start()) {
1594      throw new HBaseIOException("Failed start of remote dispatcher");
1595    }
1596  }
1597
1598  private void startProcedureExecutor() throws IOException {
1599    procedureExecutor.startWorkers();
1600  }
1601
1602  /**
1603   * Turn on/off Snapshot Cleanup Chore
1604   *
1605   * @param on indicates whether Snapshot Cleanup Chore is to be run
1606   */
1607  void switchSnapshotCleanup(final boolean on, final boolean synchronous) {
1608    if (synchronous) {
1609      synchronized (this.snapshotCleanerChore) {
1610        switchSnapshotCleanup(on);
1611      }
1612    } else {
1613      switchSnapshotCleanup(on);
1614    }
1615  }
1616
1617  private void switchSnapshotCleanup(final boolean on) {
1618    try {
1619      snapshotCleanupTracker.setSnapshotCleanupEnabled(on);
1620      if (on) {
1621        if (!getChoreService().isChoreScheduled(this.snapshotCleanerChore)) {
1622          getChoreService().scheduleChore(this.snapshotCleanerChore);
1623        }
1624      } else {
1625        getChoreService().cancelChore(this.snapshotCleanerChore);
1626      }
1627    } catch (KeeperException e) {
1628      LOG.error("Error updating snapshot cleanup mode to {}", on, e);
1629    }
1630  }
1631
1632
1633  private void stopProcedureExecutor() {
1634    if (procedureExecutor != null) {
1635      configurationManager.deregisterObserver(procedureExecutor.getEnvironment());
1636      procedureExecutor.getEnvironment().getRemoteDispatcher().stop();
1637      procedureExecutor.stop();
1638      procedureExecutor.join();
1639      procedureExecutor = null;
1640    }
1641
1642    if (procedureStore != null) {
1643      procedureStore.stop(isAborted());
1644      procedureStore = null;
1645    }
1646  }
1647
1648  private void stopChores() {
1649    ChoreService choreService = getChoreService();
1650    if (choreService != null) {
1651      choreService.cancelChore(this.mobFileCleanerChore);
1652      choreService.cancelChore(this.mobFileCompactionChore);
1653      choreService.cancelChore(this.balancerChore);
1654      choreService.cancelChore(this.normalizerChore);
1655      choreService.cancelChore(this.clusterStatusChore);
1656      choreService.cancelChore(this.catalogJanitorChore);
1657      choreService.cancelChore(this.clusterStatusPublisherChore);
1658      choreService.cancelChore(this.snapshotQuotaChore);
1659      choreService.cancelChore(this.logCleaner);
1660      choreService.cancelChore(this.hfileCleaner);
1661      choreService.cancelChore(this.replicationBarrierCleaner);
1662      choreService.cancelChore(this.snapshotCleanerChore);
1663      choreService.cancelChore(this.hbckChore);
1664      choreService.cancelChore(this.regionsRecoveryChore);
1665    }
1666  }
1667
1668  /**
1669   * @return Get remote side's InetAddress
1670   */
1671  InetAddress getRemoteInetAddress(final int port,
1672      final long serverStartCode) throws UnknownHostException {
1673    // Do it out here in its own little method so can fake an address when
1674    // mocking up in tests.
1675    InetAddress ia = RpcServer.getRemoteIp();
1676
1677    // The call could be from the local regionserver,
1678    // in which case, there is no remote address.
1679    if (ia == null && serverStartCode == startcode) {
1680      InetSocketAddress isa = rpcServices.getSocketAddress();
1681      if (isa != null && isa.getPort() == port) {
1682        ia = isa.getAddress();
1683      }
1684    }
1685    return ia;
1686  }
1687
1688  /**
1689   * @return Maximum time we should run balancer for
1690   */
1691  private int getMaxBalancingTime() {
1692    // if max balancing time isn't set, defaulting it to period time
1693    int maxBalancingTime = getConfiguration().getInt(HConstants.HBASE_BALANCER_MAX_BALANCING,
1694      getConfiguration()
1695        .getInt(HConstants.HBASE_BALANCER_PERIOD, HConstants.DEFAULT_HBASE_BALANCER_PERIOD));
1696    return maxBalancingTime;
1697  }
1698
1699  /**
1700   * @return Maximum number of regions in transition
1701   */
1702  private int getMaxRegionsInTransition() {
1703    int numRegions = this.assignmentManager.getRegionStates().getRegionAssignments().size();
1704    return Math.max((int) Math.floor(numRegions * this.maxRitPercent), 1);
1705  }
1706
1707  /**
1708   * It first sleep to the next balance plan start time. Meanwhile, throttling by the max
1709   * number regions in transition to protect availability.
1710   * @param nextBalanceStartTime The next balance plan start time
1711   * @param maxRegionsInTransition max number of regions in transition
1712   * @param cutoffTime when to exit balancer
1713   */
1714  private void balanceThrottling(long nextBalanceStartTime, int maxRegionsInTransition,
1715      long cutoffTime) {
1716    boolean interrupted = false;
1717
1718    // Sleep to next balance plan start time
1719    // But if there are zero regions in transition, it can skip sleep to speed up.
1720    while (!interrupted && System.currentTimeMillis() < nextBalanceStartTime
1721        && this.assignmentManager.getRegionStates().hasRegionsInTransition()) {
1722      try {
1723        Thread.sleep(100);
1724      } catch (InterruptedException ie) {
1725        interrupted = true;
1726      }
1727    }
1728
1729    // Throttling by max number regions in transition
1730    while (!interrupted
1731        && maxRegionsInTransition > 0
1732        && this.assignmentManager.getRegionStates().getRegionsInTransitionCount()
1733        >= maxRegionsInTransition && System.currentTimeMillis() <= cutoffTime) {
1734      try {
1735        // sleep if the number of regions in transition exceeds the limit
1736        Thread.sleep(100);
1737      } catch (InterruptedException ie) {
1738        interrupted = true;
1739      }
1740    }
1741
1742    if (interrupted) Thread.currentThread().interrupt();
1743  }
1744
1745  public boolean balance() throws IOException {
1746    return balance(false);
1747  }
1748
1749  /**
1750   * Checks master state before initiating action over region topology.
1751   * @param action the name of the action under consideration, for logging.
1752   * @return {@code true} when the caller should exit early, {@code false} otherwise.
1753   */
1754  private boolean skipRegionManagementAction(final String action) {
1755    if (!isInitialized()) {
1756      LOG.debug("Master has not been initialized, don't run {}.", action);
1757      return true;
1758    }
1759    if (this.getServerManager().isClusterShutdown()) {
1760      LOG.info("Cluster is shutting down, don't run {}.", action);
1761      return true;
1762    }
1763    if (isInMaintenanceMode()) {
1764      LOG.info("Master is in maintenance mode, don't run {}.", action);
1765      return true;
1766    }
1767    return false;
1768  }
1769
1770  public boolean balance(boolean force) throws IOException {
1771    if (loadBalancerTracker == null || !loadBalancerTracker.isBalancerOn()) {
1772      return false;
1773    }
1774    if (skipRegionManagementAction("balancer")) {
1775      return false;
1776    }
1777
1778    synchronized (this.balancer) {
1779        // Only allow one balance run at at time.
1780      if (this.assignmentManager.hasRegionsInTransition()) {
1781        List<RegionStateNode> regionsInTransition = assignmentManager.getRegionsInTransition();
1782        // if hbase:meta region is in transition, result of assignment cannot be recorded
1783        // ignore the force flag in that case
1784        boolean metaInTransition = assignmentManager.isMetaRegionInTransition();
1785        String prefix = force && !metaInTransition ? "R" : "Not r";
1786        List<RegionStateNode> toPrint = regionsInTransition;
1787        int max = 5;
1788        boolean truncated = false;
1789        if (regionsInTransition.size() > max) {
1790          toPrint = regionsInTransition.subList(0, max);
1791          truncated = true;
1792        }
1793        LOG.info(prefix + " not running balancer because " + regionsInTransition.size() +
1794          " region(s) in transition: " + toPrint + (truncated? "(truncated list)": ""));
1795        if (!force || metaInTransition) return false;
1796      }
1797      if (this.serverManager.areDeadServersInProgress()) {
1798        LOG.info("Not running balancer because processing dead regionserver(s): " +
1799          this.serverManager.getDeadServers());
1800        return false;
1801      }
1802
1803      if (this.cpHost != null) {
1804        try {
1805          if (this.cpHost.preBalance()) {
1806            LOG.debug("Coprocessor bypassing balancer request");
1807            return false;
1808          }
1809        } catch (IOException ioe) {
1810          LOG.error("Error invoking master coprocessor preBalance()", ioe);
1811          return false;
1812        }
1813      }
1814
1815      Map<TableName, Map<ServerName, List<RegionInfo>>> assignments =
1816        this.assignmentManager.getRegionStates()
1817          .getAssignmentsForBalancer(tableStateManager, this.serverManager.getOnlineServersList());
1818      for (Map<ServerName, List<RegionInfo>> serverMap : assignments.values()) {
1819        serverMap.keySet().removeAll(this.serverManager.getDrainingServersList());
1820      }
1821
1822      //Give the balancer the current cluster state.
1823      this.balancer.setClusterMetrics(getClusterMetricsWithoutCoprocessor());
1824
1825      List<RegionPlan> plans = this.balancer.balanceCluster(assignments);
1826
1827      if (skipRegionManagementAction("balancer")) {
1828        // make one last check that the cluster isn't shutting down before proceeding.
1829        return false;
1830      }
1831
1832      List<RegionPlan> sucRPs = executeRegionPlansWithThrottling(plans);
1833
1834      if (this.cpHost != null) {
1835        try {
1836          this.cpHost.postBalance(sucRPs);
1837        } catch (IOException ioe) {
1838          // balancing already succeeded so don't change the result
1839          LOG.error("Error invoking master coprocessor postBalance()", ioe);
1840        }
1841      }
1842    }
1843    // If LoadBalancer did not generate any plans, it means the cluster is already balanced.
1844    // Return true indicating a success.
1845    return true;
1846  }
1847
1848  /**
1849   * Execute region plans with throttling
1850   * @param plans to execute
1851   * @return succeeded plans
1852   */
1853  public List<RegionPlan> executeRegionPlansWithThrottling(List<RegionPlan> plans) {
1854    List<RegionPlan> successRegionPlans = new ArrayList<>();
1855    int maxRegionsInTransition = getMaxRegionsInTransition();
1856    long balanceStartTime = System.currentTimeMillis();
1857    long cutoffTime = balanceStartTime + this.maxBalancingTime;
1858    int rpCount = 0;  // number of RegionPlans balanced so far
1859    if (plans != null && !plans.isEmpty()) {
1860      int balanceInterval = this.maxBalancingTime / plans.size();
1861      LOG.info("Balancer plans size is " + plans.size() + ", the balance interval is "
1862          + balanceInterval + " ms, and the max number regions in transition is "
1863          + maxRegionsInTransition);
1864
1865      for (RegionPlan plan: plans) {
1866        LOG.info("balance " + plan);
1867        //TODO: bulk assign
1868        try {
1869          this.assignmentManager.moveAsync(plan);
1870        } catch (HBaseIOException hioe) {
1871          //should ignore failed plans here, avoiding the whole balance plans be aborted
1872          //later calls of balance() can fetch up the failed and skipped plans
1873          LOG.warn("Failed balance plan {}, skipping...", plan, hioe);
1874        }
1875        //rpCount records balance plans processed, does not care if a plan succeeds
1876        rpCount++;
1877        successRegionPlans.add(plan);
1878
1879        if (this.maxBalancingTime > 0) {
1880          balanceThrottling(balanceStartTime + rpCount * balanceInterval, maxRegionsInTransition,
1881            cutoffTime);
1882        }
1883
1884        // if performing next balance exceeds cutoff time, exit the loop
1885        if (this.maxBalancingTime > 0 && rpCount < plans.size()
1886          && System.currentTimeMillis() > cutoffTime) {
1887          // TODO: After balance, there should not be a cutoff time (keeping it as
1888          // a security net for now)
1889          LOG.debug("No more balancing till next balance run; maxBalanceTime="
1890              + this.maxBalancingTime);
1891          break;
1892        }
1893      }
1894    }
1895    return successRegionPlans;
1896  }
1897
1898  @Override
1899  public RegionNormalizer getRegionNormalizer() {
1900    return this.normalizer;
1901  }
1902
1903  public boolean normalizeRegions() throws IOException {
1904    return normalizeRegions(new NormalizeTableFilterParams.Builder().build());
1905  }
1906
1907  /**
1908   * Perform normalization of cluster.
1909   *
1910   * @return true if an existing normalization was already in progress, or if a new normalization
1911   *   was performed successfully; false otherwise (specifically, if HMaster finished initializing
1912   *   or normalization is globally disabled).
1913   */
1914  public boolean normalizeRegions(final NormalizeTableFilterParams ntfp) throws IOException {
1915    final long startTime = EnvironmentEdgeManager.currentTime();
1916    if (regionNormalizerTracker == null || !regionNormalizerTracker.isNormalizerOn()) {
1917      LOG.debug("Region normalization is disabled, don't run region normalizer.");
1918      return false;
1919    }
1920    if (skipRegionManagementAction("region normalizer")) {
1921      return false;
1922    }
1923    if (assignmentManager.hasRegionsInTransition()) {
1924      return false;
1925    }
1926
1927    if (!normalizationInProgressLock.tryLock()) {
1928      // Don't run the normalizer concurrently
1929      LOG.info("Normalization already in progress. Skipping request.");
1930      return true;
1931    }
1932
1933    int affectedTables = 0;
1934    try {
1935      final Set<TableName> matchingTables = getTableDescriptors(new LinkedList<>(),
1936        ntfp.getNamespace(), ntfp.getRegex(), ntfp.getTableNames(), false)
1937        .stream()
1938        .map(TableDescriptor::getTableName)
1939        .collect(Collectors.toSet());
1940      final Set<TableName> allEnabledTables =
1941        tableStateManager.getTablesInStates(TableState.State.ENABLED);
1942      final List<TableName> targetTables =
1943        new ArrayList<>(Sets.intersection(matchingTables, allEnabledTables));
1944      Collections.shuffle(targetTables);
1945
1946      final List<Long> submittedPlanProcIds = new ArrayList<>();
1947      for (TableName table : targetTables) {
1948        if (table.isSystemTable()) {
1949          continue;
1950        }
1951        final TableDescriptor tblDesc = getTableDescriptors().get(table);
1952        if (tblDesc != null && !tblDesc.isNormalizationEnabled()) {
1953          LOG.debug(
1954            "Skipping table {} because normalization is disabled in its table properties.", table);
1955          continue;
1956        }
1957
1958        // make one last check that the cluster isn't shutting down before proceeding.
1959        if (skipRegionManagementAction("region normalizer")) {
1960          return false;
1961        }
1962
1963        final List<NormalizationPlan> plans = normalizer.computePlansForTable(table);
1964        if (CollectionUtils.isEmpty(plans)) {
1965          LOG.debug("No normalization required for table {}.", table);
1966          continue;
1967        }
1968
1969        affectedTables++;
1970        // as of this writing, `plan.submit()` is non-blocking and uses Async Admin APIs to
1971        // submit task , so there's no artificial rate-
1972        // limiting of merge/split requests due to this serial loop.
1973        for (NormalizationPlan plan : plans) {
1974          long procId = plan.submit(this);
1975          submittedPlanProcIds.add(procId);
1976          if (plan.getType() == PlanType.SPLIT) {
1977            splitPlanCount++;
1978          } else if (plan.getType() == PlanType.MERGE) {
1979            mergePlanCount++;
1980          }
1981        }
1982      }
1983      final long endTime = EnvironmentEdgeManager.currentTime();
1984      LOG.info("Normalizer ran successfully in {}. Submitted {} plans, affecting {} tables.",
1985        Duration.ofMillis(endTime - startTime), submittedPlanProcIds.size(), affectedTables);
1986      LOG.debug("Normalizer submitted procID list: {}", submittedPlanProcIds);
1987    } finally {
1988      normalizationInProgressLock.unlock();
1989    }
1990    return true;
1991  }
1992
1993  /**
1994   * @return Client info for use as prefix on an audit log string; who did an action
1995   */
1996  @Override
1997  public String getClientIdAuditPrefix() {
1998    return "Client=" + RpcServer.getRequestUserName().orElse(null)
1999        + "/" + RpcServer.getRemoteAddress().orElse(null);
2000  }
2001
2002  /**
2003   * Switch for the background CatalogJanitor thread.
2004   * Used for testing.  The thread will continue to run.  It will just be a noop
2005   * if disabled.
2006   * @param b If false, the catalog janitor won't do anything.
2007   */
2008  public void setCatalogJanitorEnabled(final boolean b) {
2009    this.catalogJanitorChore.setEnabled(b);
2010  }
2011
2012  @Override
2013  public long mergeRegions(
2014      final RegionInfo[] regionsToMerge,
2015      final boolean forcible,
2016      final long ng,
2017      final long nonce) throws IOException {
2018    checkInitialized();
2019
2020    if (!isSplitOrMergeEnabled(MasterSwitchType.MERGE)) {
2021      String regionsStr = Arrays.deepToString(regionsToMerge);
2022      LOG.warn("Merge switch is off! skip merge of " + regionsStr);
2023      throw new DoNotRetryIOException("Merge of " + regionsStr +
2024          " failed because merge switch is off");
2025    }
2026
2027    final String mergeRegionsStr = Arrays.stream(regionsToMerge).map(RegionInfo::getEncodedName)
2028      .collect(Collectors.joining(", "));
2029    return MasterProcedureUtil.submitProcedure(new NonceProcedureRunnable(this, ng, nonce) {
2030      @Override
2031      protected void run() throws IOException {
2032        getMaster().getMasterCoprocessorHost().preMergeRegions(regionsToMerge);
2033        String aid = getClientIdAuditPrefix();
2034        LOG.info("{} merge regions {}", aid, mergeRegionsStr);
2035        submitProcedure(new MergeTableRegionsProcedure(procedureExecutor.getEnvironment(),
2036            regionsToMerge, forcible));
2037        getMaster().getMasterCoprocessorHost().postMergeRegions(regionsToMerge);
2038      }
2039
2040      @Override
2041      protected String getDescription() {
2042        return "MergeTableProcedure";
2043      }
2044    });
2045  }
2046
2047  @Override
2048  public long splitRegion(final RegionInfo regionInfo, final byte[] splitRow,
2049      final long nonceGroup, final long nonce)
2050  throws IOException {
2051    checkInitialized();
2052
2053    if (!isSplitOrMergeEnabled(MasterSwitchType.SPLIT)) {
2054      LOG.warn("Split switch is off! skip split of " + regionInfo);
2055      throw new DoNotRetryIOException("Split region " + regionInfo.getRegionNameAsString() +
2056          " failed due to split switch off");
2057    }
2058
2059    return MasterProcedureUtil.submitProcedure(
2060        new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2061      @Override
2062      protected void run() throws IOException {
2063        getMaster().getMasterCoprocessorHost().preSplitRegion(regionInfo.getTable(), splitRow);
2064        LOG.info(getClientIdAuditPrefix() + " split " + regionInfo.getRegionNameAsString());
2065
2066        // Execute the operation asynchronously
2067        submitProcedure(getAssignmentManager().createSplitProcedure(regionInfo, splitRow));
2068      }
2069
2070      @Override
2071      protected String getDescription() {
2072        return "SplitTableProcedure";
2073      }
2074    });
2075  }
2076
2077  private void warmUpRegion(ServerName server, RegionInfo region) {
2078    FutureUtils.addListener(asyncClusterConnection.getRegionServerAdmin(server)
2079      .warmupRegion(RequestConverter.buildWarmupRegionRequest(region)), (r, e) -> {
2080        if (e != null) {
2081          LOG.warn("Failed to warm up region {} on server {}", region, server, e);
2082        }
2083      });
2084  }
2085
2086  // Public so can be accessed by tests. Blocks until move is done.
2087  // Replace with an async implementation from which you can get
2088  // a success/failure result.
2089  @VisibleForTesting
2090  public void move(final byte[] encodedRegionName, byte[] destServerName) throws IOException {
2091    RegionState regionState = assignmentManager.getRegionStates().
2092      getRegionState(Bytes.toString(encodedRegionName));
2093
2094    RegionInfo hri;
2095    if (regionState != null) {
2096      hri = regionState.getRegion();
2097    } else {
2098      throw new UnknownRegionException(Bytes.toStringBinary(encodedRegionName));
2099    }
2100
2101    ServerName dest;
2102    List<ServerName> exclude = hri.getTable().isSystemTable() ? assignmentManager.getExcludedServersForSystemTable()
2103        : new ArrayList<>(1);
2104    if (destServerName != null && exclude.contains(ServerName.valueOf(Bytes.toString(destServerName)))) {
2105      LOG.info(
2106          Bytes.toString(encodedRegionName) + " can not move to " + Bytes.toString(destServerName)
2107              + " because the server is in exclude list");
2108      destServerName = null;
2109    }
2110    if (destServerName == null || destServerName.length == 0) {
2111      LOG.info("Passed destination servername is null/empty so " +
2112        "choosing a server at random");
2113      exclude.add(regionState.getServerName());
2114      final List<ServerName> destServers = this.serverManager.createDestinationServersList(exclude);
2115      dest = balancer.randomAssignment(hri, destServers);
2116      if (dest == null) {
2117        LOG.debug("Unable to determine a plan to assign " + hri);
2118        return;
2119      }
2120    } else {
2121      ServerName candidate = ServerName.valueOf(Bytes.toString(destServerName));
2122      dest = balancer.randomAssignment(hri, Lists.newArrayList(candidate));
2123      if (dest == null) {
2124        LOG.debug("Unable to determine a plan to assign " + hri);
2125        return;
2126      }
2127      // TODO: deal with table on master for rs group.
2128      if (dest.equals(serverName)) {
2129        // To avoid unnecessary region moving later by balancer. Don't put user
2130        // regions on master.
2131        LOG.debug("Skipping move of region " + hri.getRegionNameAsString() +
2132          " to avoid unnecessary region moving later by load balancer," +
2133          " because it should not be on master");
2134        return;
2135      }
2136    }
2137
2138    if (dest.equals(regionState.getServerName())) {
2139      LOG.debug("Skipping move of region " + hri.getRegionNameAsString()
2140        + " because region already assigned to the same server " + dest + ".");
2141      return;
2142    }
2143
2144    // Now we can do the move
2145    RegionPlan rp = new RegionPlan(hri, regionState.getServerName(), dest);
2146    assert rp.getDestination() != null: rp.toString() + " " + dest;
2147
2148    try {
2149      checkInitialized();
2150      if (this.cpHost != null) {
2151        this.cpHost.preMove(hri, rp.getSource(), rp.getDestination());
2152      }
2153
2154      TransitRegionStateProcedure proc =
2155        this.assignmentManager.createMoveRegionProcedure(rp.getRegionInfo(), rp.getDestination());
2156      // Warmup the region on the destination before initiating the move.
2157      // A region server could reject the close request because it either does not
2158      // have the specified region or the region is being split.
2159      warmUpRegion(rp.getDestination(), hri);
2160
2161      LOG.info(getClientIdAuditPrefix() + " move " + rp + ", running balancer");
2162      Future<byte[]> future = ProcedureSyncWait.submitProcedure(this.procedureExecutor, proc);
2163      try {
2164        // Is this going to work? Will we throw exception on error?
2165        // TODO: CompletableFuture rather than this stunted Future.
2166        future.get();
2167      } catch (InterruptedException | ExecutionException e) {
2168        throw new HBaseIOException(e);
2169      }
2170      if (this.cpHost != null) {
2171        this.cpHost.postMove(hri, rp.getSource(), rp.getDestination());
2172      }
2173    } catch (IOException ioe) {
2174      if (ioe instanceof HBaseIOException) {
2175        throw (HBaseIOException)ioe;
2176      }
2177      throw new HBaseIOException(ioe);
2178    }
2179  }
2180
2181  @Override
2182  public long createTable(final TableDescriptor tableDescriptor, final byte[][] splitKeys,
2183      final long nonceGroup, final long nonce) throws IOException {
2184    checkInitialized();
2185    TableDescriptor desc = getMasterCoprocessorHost().preCreateTableRegionsInfos(tableDescriptor);
2186    if (desc == null) {
2187      throw new IOException("Creation for " + tableDescriptor + " is canceled by CP");
2188    }
2189    String namespace = desc.getTableName().getNamespaceAsString();
2190    this.clusterSchemaService.getNamespace(namespace);
2191
2192    RegionInfo[] newRegions = ModifyRegionUtils.createRegionInfos(desc, splitKeys);
2193    TableDescriptorChecker.sanityCheck(conf, desc);
2194
2195    return MasterProcedureUtil
2196      .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2197        @Override
2198        protected void run() throws IOException {
2199          getMaster().getMasterCoprocessorHost().preCreateTable(desc, newRegions);
2200
2201          LOG.info(getClientIdAuditPrefix() + " create " + desc);
2202
2203          // TODO: We can handle/merge duplicate requests, and differentiate the case of
2204          // TableExistsException by saying if the schema is the same or not.
2205          //
2206          // We need to wait for the procedure to potentially fail due to "prepare" sanity
2207          // checks. This will block only the beginning of the procedure. See HBASE-19953.
2208          ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch();
2209          submitProcedure(
2210            new CreateTableProcedure(procedureExecutor.getEnvironment(), desc, newRegions, latch));
2211          latch.await();
2212
2213          getMaster().getMasterCoprocessorHost().postCreateTable(desc, newRegions);
2214        }
2215
2216        @Override
2217        protected String getDescription() {
2218          return "CreateTableProcedure";
2219        }
2220      });
2221  }
2222
2223  @Override
2224  public long createSystemTable(final TableDescriptor tableDescriptor) throws IOException {
2225    if (isStopped()) {
2226      throw new MasterNotRunningException();
2227    }
2228
2229    TableName tableName = tableDescriptor.getTableName();
2230    if (!(tableName.isSystemTable())) {
2231      throw new IllegalArgumentException(
2232        "Only system table creation can use this createSystemTable API");
2233    }
2234
2235    RegionInfo[] newRegions = ModifyRegionUtils.createRegionInfos(tableDescriptor, null);
2236
2237    LOG.info(getClientIdAuditPrefix() + " create " + tableDescriptor);
2238
2239    // This special create table is called locally to master.  Therefore, no RPC means no need
2240    // to use nonce to detect duplicated RPC call.
2241    long procId = this.procedureExecutor.submitProcedure(
2242      new CreateTableProcedure(procedureExecutor.getEnvironment(), tableDescriptor, newRegions));
2243
2244    return procId;
2245  }
2246
2247  private void startActiveMasterManager(int infoPort) throws KeeperException {
2248    String backupZNode = ZNodePaths.joinZNode(
2249      zooKeeper.getZNodePaths().backupMasterAddressesZNode, serverName.toString());
2250    /*
2251    * Add a ZNode for ourselves in the backup master directory since we
2252    * may not become the active master. If so, we want the actual active
2253    * master to know we are backup masters, so that it won't assign
2254    * regions to us if so configured.
2255    *
2256    * If we become the active master later, ActiveMasterManager will delete
2257    * this node explicitly.  If we crash before then, ZooKeeper will delete
2258    * this node for us since it is ephemeral.
2259    */
2260    LOG.info("Adding backup master ZNode " + backupZNode);
2261    if (!MasterAddressTracker.setMasterAddress(zooKeeper, backupZNode, serverName, infoPort)) {
2262      LOG.warn("Failed create of " + backupZNode + " by " + serverName);
2263    }
2264    this.activeMasterManager.setInfoPort(infoPort);
2265    int timeout = conf.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
2266    // If we're a backup master, stall until a primary to write this address
2267    if (conf.getBoolean(HConstants.MASTER_TYPE_BACKUP, HConstants.DEFAULT_MASTER_TYPE_BACKUP)) {
2268      LOG.debug("HMaster started in backup mode. Stalling until master znode is written.");
2269      // This will only be a minute or so while the cluster starts up,
2270      // so don't worry about setting watches on the parent znode
2271      while (!activeMasterManager.hasActiveMaster()) {
2272        LOG.debug("Waiting for master address and cluster state znode to be written.");
2273        Threads.sleep(timeout);
2274      }
2275    }
2276    MonitoredTask status = TaskMonitor.get().createStatus("Master startup");
2277    status.setDescription("Master startup");
2278    try {
2279      if (activeMasterManager.blockUntilBecomingActiveMaster(timeout, status)) {
2280        finishActiveMasterInitialization(status);
2281      }
2282    } catch (Throwable t) {
2283      status.setStatus("Failed to become active: " + t.getMessage());
2284      LOG.error(HBaseMarkers.FATAL, "Failed to become active master", t);
2285      // HBASE-5680: Likely hadoop23 vs hadoop 20.x/1.x incompatibility
2286      if (t instanceof NoClassDefFoundError && t.getMessage().
2287          contains("org/apache/hadoop/hdfs/protocol/HdfsConstants$SafeModeAction")) {
2288        // improved error message for this special case
2289        abort("HBase is having a problem with its Hadoop jars.  You may need to recompile " +
2290          "HBase against Hadoop version " + org.apache.hadoop.util.VersionInfo.getVersion() +
2291          " or change your hadoop jars to start properly", t);
2292      } else {
2293        abort("Unhandled exception. Starting shutdown.", t);
2294      }
2295    } finally {
2296      status.cleanup();
2297    }
2298  }
2299
2300  private static boolean isCatalogTable(final TableName tableName) {
2301    return tableName.equals(TableName.META_TABLE_NAME);
2302  }
2303
2304  @Override
2305  public long deleteTable(
2306      final TableName tableName,
2307      final long nonceGroup,
2308      final long nonce) throws IOException {
2309    checkInitialized();
2310
2311    return MasterProcedureUtil.submitProcedure(
2312        new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2313      @Override
2314      protected void run() throws IOException {
2315        getMaster().getMasterCoprocessorHost().preDeleteTable(tableName);
2316
2317        LOG.info(getClientIdAuditPrefix() + " delete " + tableName);
2318
2319        // TODO: We can handle/merge duplicate request
2320        //
2321        // We need to wait for the procedure to potentially fail due to "prepare" sanity
2322        // checks. This will block only the beginning of the procedure. See HBASE-19953.
2323        ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch();
2324        submitProcedure(new DeleteTableProcedure(procedureExecutor.getEnvironment(),
2325            tableName, latch));
2326        latch.await();
2327
2328        getMaster().getMasterCoprocessorHost().postDeleteTable(tableName);
2329      }
2330
2331      @Override
2332      protected String getDescription() {
2333        return "DeleteTableProcedure";
2334      }
2335    });
2336  }
2337
2338  @Override
2339  public long truncateTable(
2340      final TableName tableName,
2341      final boolean preserveSplits,
2342      final long nonceGroup,
2343      final long nonce) throws IOException {
2344    checkInitialized();
2345
2346    return MasterProcedureUtil.submitProcedure(
2347        new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2348      @Override
2349      protected void run() throws IOException {
2350        getMaster().getMasterCoprocessorHost().preTruncateTable(tableName);
2351
2352        LOG.info(getClientIdAuditPrefix() + " truncate " + tableName);
2353        ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(2, 0);
2354        submitProcedure(new TruncateTableProcedure(procedureExecutor.getEnvironment(),
2355            tableName, preserveSplits, latch));
2356        latch.await();
2357
2358        getMaster().getMasterCoprocessorHost().postTruncateTable(tableName);
2359      }
2360
2361      @Override
2362      protected String getDescription() {
2363        return "TruncateTableProcedure";
2364      }
2365    });
2366  }
2367
2368  @Override
2369  public long addColumn(final TableName tableName, final ColumnFamilyDescriptor column,
2370      final long nonceGroup, final long nonce) throws IOException {
2371    checkInitialized();
2372    checkTableExists(tableName);
2373
2374    return modifyTable(tableName, new TableDescriptorGetter() {
2375
2376      @Override
2377      public TableDescriptor get() throws IOException {
2378        TableDescriptor old = getTableDescriptors().get(tableName);
2379        if (old.hasColumnFamily(column.getName())) {
2380          throw new InvalidFamilyOperationException("Column family '" + column.getNameAsString()
2381              + "' in table '" + tableName + "' already exists so cannot be added");
2382        }
2383
2384        return TableDescriptorBuilder.newBuilder(old).setColumnFamily(column).build();
2385      }
2386    }, nonceGroup, nonce, true);
2387  }
2388
2389  /**
2390   * Implement to return TableDescriptor after pre-checks
2391   */
2392  protected interface TableDescriptorGetter {
2393    TableDescriptor get() throws IOException;
2394  }
2395
2396  @Override
2397  public long modifyColumn(final TableName tableName, final ColumnFamilyDescriptor descriptor,
2398      final long nonceGroup, final long nonce) throws IOException {
2399    checkInitialized();
2400    checkTableExists(tableName);
2401    return modifyTable(tableName, new TableDescriptorGetter() {
2402
2403      @Override
2404      public TableDescriptor get() throws IOException {
2405        TableDescriptor old = getTableDescriptors().get(tableName);
2406        if (!old.hasColumnFamily(descriptor.getName())) {
2407          throw new InvalidFamilyOperationException("Family '" + descriptor.getNameAsString()
2408              + "' does not exist, so it cannot be modified");
2409        }
2410
2411        return TableDescriptorBuilder.newBuilder(old).modifyColumnFamily(descriptor).build();
2412      }
2413    }, nonceGroup, nonce, true);
2414  }
2415
2416  @Override
2417  public long deleteColumn(final TableName tableName, final byte[] columnName,
2418      final long nonceGroup, final long nonce) throws IOException {
2419    checkInitialized();
2420    checkTableExists(tableName);
2421
2422    return modifyTable(tableName, new TableDescriptorGetter() {
2423
2424      @Override
2425      public TableDescriptor get() throws IOException {
2426        TableDescriptor old = getTableDescriptors().get(tableName);
2427
2428        if (!old.hasColumnFamily(columnName)) {
2429          throw new InvalidFamilyOperationException("Family '" + Bytes.toString(columnName)
2430              + "' does not exist, so it cannot be deleted");
2431        }
2432        if (old.getColumnFamilyCount() == 1) {
2433          throw new InvalidFamilyOperationException("Family '" + Bytes.toString(columnName)
2434              + "' is the only column family in the table, so it cannot be deleted");
2435        }
2436        return TableDescriptorBuilder.newBuilder(old).removeColumnFamily(columnName).build();
2437      }
2438    }, nonceGroup, nonce, true);
2439  }
2440
2441  @Override
2442  public long enableTable(final TableName tableName, final long nonceGroup, final long nonce)
2443      throws IOException {
2444    checkInitialized();
2445
2446    return MasterProcedureUtil.submitProcedure(
2447        new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2448      @Override
2449      protected void run() throws IOException {
2450        getMaster().getMasterCoprocessorHost().preEnableTable(tableName);
2451
2452        // Normally, it would make sense for this authorization check to exist inside
2453        // AccessController, but because the authorization check is done based on internal state
2454        // (rather than explicit permissions) we'll do the check here instead of in the
2455        // coprocessor.
2456        MasterQuotaManager quotaManager = getMasterQuotaManager();
2457        if (quotaManager != null) {
2458          if (quotaManager.isQuotaInitialized()) {
2459              SpaceQuotaSnapshot currSnapshotOfTable =
2460                  QuotaTableUtil.getCurrentSnapshotFromQuotaTable(getConnection(), tableName);
2461              if (currSnapshotOfTable != null) {
2462                SpaceQuotaStatus quotaStatus = currSnapshotOfTable.getQuotaStatus();
2463                if (quotaStatus.isInViolation()
2464                    && SpaceViolationPolicy.DISABLE == quotaStatus.getPolicy().orElse(null)) {
2465                throw new AccessDeniedException("Enabling the table '" + tableName
2466                    + "' is disallowed due to a violated space quota.");
2467              }
2468            }
2469          } else if (LOG.isTraceEnabled()) {
2470            LOG.trace("Unable to check for space quotas as the MasterQuotaManager is not enabled");
2471          }
2472        }
2473
2474        LOG.info(getClientIdAuditPrefix() + " enable " + tableName);
2475
2476        // Execute the operation asynchronously - client will check the progress of the operation
2477        // In case the request is from a <1.1 client before returning,
2478        // we want to make sure that the table is prepared to be
2479        // enabled (the table is locked and the table state is set).
2480        // Note: if the procedure throws exception, we will catch it and rethrow.
2481        final ProcedurePrepareLatch prepareLatch = ProcedurePrepareLatch.createLatch();
2482        submitProcedure(new EnableTableProcedure(procedureExecutor.getEnvironment(),
2483            tableName, prepareLatch));
2484        prepareLatch.await();
2485
2486        getMaster().getMasterCoprocessorHost().postEnableTable(tableName);
2487      }
2488
2489      @Override
2490      protected String getDescription() {
2491        return "EnableTableProcedure";
2492      }
2493    });
2494  }
2495
2496  @Override
2497  public long disableTable(final TableName tableName, final long nonceGroup, final long nonce)
2498      throws IOException {
2499    checkInitialized();
2500
2501    return MasterProcedureUtil.submitProcedure(
2502        new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2503      @Override
2504      protected void run() throws IOException {
2505        getMaster().getMasterCoprocessorHost().preDisableTable(tableName);
2506
2507        LOG.info(getClientIdAuditPrefix() + " disable " + tableName);
2508
2509        // Execute the operation asynchronously - client will check the progress of the operation
2510        // In case the request is from a <1.1 client before returning,
2511        // we want to make sure that the table is prepared to be
2512        // enabled (the table is locked and the table state is set).
2513        // Note: if the procedure throws exception, we will catch it and rethrow.
2514        //
2515        // We need to wait for the procedure to potentially fail due to "prepare" sanity
2516        // checks. This will block only the beginning of the procedure. See HBASE-19953.
2517        final ProcedurePrepareLatch prepareLatch = ProcedurePrepareLatch.createBlockingLatch();
2518        submitProcedure(new DisableTableProcedure(procedureExecutor.getEnvironment(),
2519            tableName, false, prepareLatch));
2520        prepareLatch.await();
2521
2522        getMaster().getMasterCoprocessorHost().postDisableTable(tableName);
2523      }
2524
2525      @Override
2526      protected String getDescription() {
2527        return "DisableTableProcedure";
2528      }
2529    });
2530  }
2531
2532  private long modifyTable(final TableName tableName,
2533      final TableDescriptorGetter newDescriptorGetter, final long nonceGroup, final long nonce,
2534      final boolean shouldCheckDescriptor) throws IOException {
2535    return MasterProcedureUtil
2536        .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2537          @Override
2538          protected void run() throws IOException {
2539            TableDescriptor oldDescriptor = getMaster().getTableDescriptors().get(tableName);
2540            TableDescriptor newDescriptor = getMaster().getMasterCoprocessorHost()
2541                .preModifyTable(tableName, oldDescriptor, newDescriptorGetter.get());
2542            TableDescriptorChecker.sanityCheck(conf, newDescriptor);
2543            LOG.info("{} modify table {} from {} to {}", getClientIdAuditPrefix(), tableName,
2544                oldDescriptor, newDescriptor);
2545
2546            // Execute the operation synchronously - wait for the operation completes before
2547            // continuing.
2548            //
2549            // We need to wait for the procedure to potentially fail due to "prepare" sanity
2550            // checks. This will block only the beginning of the procedure. See HBASE-19953.
2551            ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch();
2552            submitProcedure(new ModifyTableProcedure(procedureExecutor.getEnvironment(),
2553                newDescriptor, latch, oldDescriptor, shouldCheckDescriptor));
2554            latch.await();
2555
2556            getMaster().getMasterCoprocessorHost().postModifyTable(tableName, oldDescriptor,
2557              newDescriptor);
2558          }
2559
2560          @Override
2561          protected String getDescription() {
2562            return "ModifyTableProcedure";
2563          }
2564        });
2565
2566  }
2567
2568  @Override
2569  public long modifyTable(final TableName tableName, final TableDescriptor newDescriptor,
2570      final long nonceGroup, final long nonce) throws IOException {
2571    checkInitialized();
2572    return modifyTable(tableName, new TableDescriptorGetter() {
2573      @Override
2574      public TableDescriptor get() throws IOException {
2575        return newDescriptor;
2576      }
2577    }, nonceGroup, nonce, false);
2578
2579  }
2580
2581  public long restoreSnapshot(final SnapshotDescription snapshotDesc,
2582      final long nonceGroup, final long nonce, final boolean restoreAcl) throws IOException {
2583    checkInitialized();
2584    getSnapshotManager().checkSnapshotSupport();
2585
2586    // Ensure namespace exists. Will throw exception if non-known NS.
2587    final TableName dstTable = TableName.valueOf(snapshotDesc.getTable());
2588    getClusterSchema().getNamespace(dstTable.getNamespaceAsString());
2589
2590    return MasterProcedureUtil.submitProcedure(
2591        new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2592      @Override
2593      protected void run() throws IOException {
2594          setProcId(
2595            getSnapshotManager().restoreOrCloneSnapshot(snapshotDesc, getNonceKey(), restoreAcl));
2596      }
2597
2598      @Override
2599      protected String getDescription() {
2600        return "RestoreSnapshotProcedure";
2601      }
2602    });
2603  }
2604
2605  private void checkTableExists(final TableName tableName)
2606    throws IOException, TableNotFoundException {
2607    if (!tableDescriptors.exists(tableName)) {
2608      throw new TableNotFoundException(tableName);
2609    }
2610  }
2611
2612  @Override
2613  public void checkTableModifiable(final TableName tableName)
2614      throws IOException, TableNotFoundException, TableNotDisabledException {
2615    if (isCatalogTable(tableName)) {
2616      throw new IOException("Can't modify catalog tables");
2617    }
2618    checkTableExists(tableName);
2619    TableState ts = getTableStateManager().getTableState(tableName);
2620    if (!ts.isDisabled()) {
2621      throw new TableNotDisabledException("Not DISABLED; " + ts);
2622    }
2623  }
2624
2625  public ClusterMetrics getClusterMetricsWithoutCoprocessor() throws InterruptedIOException {
2626    return getClusterMetricsWithoutCoprocessor(EnumSet.allOf(Option.class));
2627  }
2628
2629  public ClusterMetrics getClusterMetricsWithoutCoprocessor(EnumSet<Option> options)
2630      throws InterruptedIOException {
2631    ClusterMetricsBuilder builder = ClusterMetricsBuilder.newBuilder();
2632    // given that hbase1 can't submit the request with Option,
2633    // we return all information to client if the list of Option is empty.
2634    if (options.isEmpty()) {
2635      options = EnumSet.allOf(Option.class);
2636    }
2637
2638    for (Option opt : options) {
2639      switch (opt) {
2640        case HBASE_VERSION: builder.setHBaseVersion(VersionInfo.getVersion()); break;
2641        case CLUSTER_ID: builder.setClusterId(getClusterId()); break;
2642        case MASTER: builder.setMasterName(getServerName()); break;
2643        case BACKUP_MASTERS: builder.setBackerMasterNames(getBackupMasters()); break;
2644        case LIVE_SERVERS: {
2645          if (serverManager != null) {
2646            builder.setLiveServerMetrics(serverManager.getOnlineServers().entrySet().stream()
2647              .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())));
2648          }
2649          break;
2650        }
2651        case DEAD_SERVERS: {
2652          if (serverManager != null) {
2653            builder.setDeadServerNames(new ArrayList<>(
2654              serverManager.getDeadServers().copyServerNames()));
2655          }
2656          break;
2657        }
2658        case MASTER_COPROCESSORS: {
2659          if (cpHost != null) {
2660            builder.setMasterCoprocessorNames(Arrays.asList(getMasterCoprocessors()));
2661          }
2662          break;
2663        }
2664        case REGIONS_IN_TRANSITION: {
2665          if (assignmentManager != null) {
2666            builder.setRegionsInTransition(assignmentManager.getRegionStates()
2667                .getRegionsStateInTransition());
2668          }
2669          break;
2670        }
2671        case BALANCER_ON: {
2672          if (loadBalancerTracker != null) {
2673            builder.setBalancerOn(loadBalancerTracker.isBalancerOn());
2674          }
2675          break;
2676        }
2677        case MASTER_INFO_PORT: {
2678          if (infoServer != null) {
2679            builder.setMasterInfoPort(infoServer.getPort());
2680          }
2681          break;
2682        }
2683        case SERVERS_NAME: {
2684          if (serverManager != null) {
2685            builder.setServerNames(serverManager.getOnlineServersList());
2686          }
2687          break;
2688        }
2689        case TABLE_TO_REGIONS_COUNT: {
2690          if (isActiveMaster() && isInitialized() && assignmentManager != null) {
2691            try {
2692              Map<TableName, RegionStatesCount> tableRegionStatesCountMap = new HashMap<>();
2693              Map<String, TableDescriptor> tableDescriptorMap = getTableDescriptors().getAll();
2694              for (TableDescriptor tableDescriptor : tableDescriptorMap.values()) {
2695                TableName tableName = tableDescriptor.getTableName();
2696                RegionStatesCount regionStatesCount = assignmentManager
2697                  .getRegionStatesCount(tableName);
2698                tableRegionStatesCountMap.put(tableName, regionStatesCount);
2699              }
2700              builder.setTableRegionStatesCount(tableRegionStatesCountMap);
2701            } catch (IOException e) {
2702              LOG.error("Error while populating TABLE_TO_REGIONS_COUNT for Cluster Metrics..", e);
2703            }
2704          }
2705          break;
2706        }
2707      }
2708    }
2709    return builder.build();
2710  }
2711
2712  /**
2713   * @return cluster status
2714   */
2715  public ClusterMetrics getClusterMetrics() throws IOException {
2716    return getClusterMetrics(EnumSet.allOf(Option.class));
2717  }
2718
2719  public ClusterMetrics getClusterMetrics(EnumSet<Option> options) throws IOException {
2720    if (cpHost != null) {
2721      cpHost.preGetClusterMetrics();
2722    }
2723    ClusterMetrics status = getClusterMetricsWithoutCoprocessor(options);
2724    if (cpHost != null) {
2725      cpHost.postGetClusterMetrics(status);
2726    }
2727    return status;
2728  }
2729
2730  List<ServerName> getBackupMasters() {
2731    return activeMasterManager.getBackupMasters();
2732  }
2733
2734  /**
2735   * The set of loaded coprocessors is stored in a static set. Since it's
2736   * statically allocated, it does not require that HMaster's cpHost be
2737   * initialized prior to accessing it.
2738   * @return a String representation of the set of names of the loaded coprocessors.
2739   */
2740  public static String getLoadedCoprocessors() {
2741    return CoprocessorHost.getLoadedCoprocessors().toString();
2742  }
2743
2744  /**
2745   * @return timestamp in millis when HMaster was started.
2746   */
2747  public long getMasterStartTime() {
2748    return startcode;
2749  }
2750
2751  /**
2752   * @return timestamp in millis when HMaster became the active master.
2753   */
2754  public long getMasterActiveTime() {
2755    return masterActiveTime;
2756  }
2757
2758  /**
2759   * @return timestamp in millis when HMaster finished becoming the active master
2760   */
2761  public long getMasterFinishedInitializationTime() {
2762    return masterFinishedInitializationTime;
2763  }
2764
2765  public int getNumWALFiles() {
2766    return 0;
2767  }
2768
2769  public ProcedureStore getProcedureStore() {
2770    return procedureStore;
2771  }
2772
2773  public int getRegionServerInfoPort(final ServerName sn) {
2774    int port = this.serverManager.getInfoPort(sn);
2775    return port == 0 ? conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
2776      HConstants.DEFAULT_REGIONSERVER_INFOPORT) : port;
2777  }
2778
2779  @Override
2780  public String getRegionServerVersion(ServerName sn) {
2781    // Will return "0.0.0" if the server is not online to prevent move system region to unknown
2782    // version RS.
2783    return this.serverManager.getVersion(sn);
2784  }
2785
2786  @Override
2787  public void checkIfShouldMoveSystemRegionAsync() {
2788    assignmentManager.checkIfShouldMoveSystemRegionAsync();
2789  }
2790
2791  /**
2792   * @return array of coprocessor SimpleNames.
2793   */
2794  public String[] getMasterCoprocessors() {
2795    Set<String> masterCoprocessors = getMasterCoprocessorHost().getCoprocessors();
2796    return masterCoprocessors.toArray(new String[masterCoprocessors.size()]);
2797  }
2798
2799  @Override
2800  public void abort(String reason, Throwable cause) {
2801    if (!setAbortRequested() || isStopped()) {
2802      LOG.debug("Abort called but aborted={}, stopped={}", isAborted(), isStopped());
2803      return;
2804    }
2805    if (cpHost != null) {
2806      // HBASE-4014: dump a list of loaded coprocessors.
2807      LOG.error(HBaseMarkers.FATAL, "Master server abort: loaded coprocessors are: " +
2808          getLoadedCoprocessors());
2809    }
2810    String msg = "***** ABORTING master " + this + ": " + reason + " *****";
2811    if (cause != null) {
2812      LOG.error(HBaseMarkers.FATAL, msg, cause);
2813    } else {
2814      LOG.error(HBaseMarkers.FATAL, msg);
2815    }
2816
2817    try {
2818      stopMaster();
2819    } catch (IOException e) {
2820      LOG.error("Exception occurred while stopping master", e);
2821    }
2822  }
2823
2824  @Override
2825  public ZKWatcher getZooKeeper() {
2826    return zooKeeper;
2827  }
2828
2829  @Override
2830  public MasterCoprocessorHost getMasterCoprocessorHost() {
2831    return cpHost;
2832  }
2833
2834  @Override
2835  public MasterQuotaManager getMasterQuotaManager() {
2836    return quotaManager;
2837  }
2838
2839  @Override
2840  public ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
2841    return procedureExecutor;
2842  }
2843
2844  @Override
2845  public ServerName getServerName() {
2846    return this.serverName;
2847  }
2848
2849  @Override
2850  public AssignmentManager getAssignmentManager() {
2851    return this.assignmentManager;
2852  }
2853
2854  @Override
2855  public CatalogJanitor getCatalogJanitor() {
2856    return this.catalogJanitorChore;
2857  }
2858
2859  public MemoryBoundedLogMessageBuffer getRegionServerFatalLogBuffer() {
2860    return rsFatals;
2861  }
2862
2863  /**
2864   * Shutdown the cluster.
2865   * Master runs a coordinated stop of all RegionServers and then itself.
2866   */
2867  public void shutdown() throws IOException {
2868    if (cpHost != null) {
2869      cpHost.preShutdown();
2870    }
2871
2872    // Tell the servermanager cluster shutdown has been called. This makes it so when Master is
2873    // last running server, it'll stop itself. Next, we broadcast the cluster shutdown by setting
2874    // the cluster status as down. RegionServers will notice this change in state and will start
2875    // shutting themselves down. When last has exited, Master can go down.
2876    if (this.serverManager != null) {
2877      this.serverManager.shutdownCluster();
2878    }
2879    if (this.clusterStatusTracker != null) {
2880      try {
2881        this.clusterStatusTracker.setClusterDown();
2882      } catch (KeeperException e) {
2883        LOG.error("ZooKeeper exception trying to set cluster as down in ZK", e);
2884      }
2885    }
2886    // Stop the procedure executor. Will stop any ongoing assign, unassign, server crash etc.,
2887    // processing so we can go down.
2888    if (this.procedureExecutor != null) {
2889      this.procedureExecutor.stop();
2890    }
2891    // Shutdown our cluster connection. This will kill any hosted RPCs that might be going on;
2892    // this is what we want especially if the Master is in startup phase doing call outs to
2893    // hbase:meta, etc. when cluster is down. Without ths connection close, we'd have to wait on
2894    // the rpc to timeout.
2895    if (this.asyncClusterConnection != null) {
2896      this.asyncClusterConnection.close();
2897    }
2898  }
2899
2900  public void stopMaster() throws IOException {
2901    if (cpHost != null) {
2902      cpHost.preStopMaster();
2903    }
2904    stop("Stopped by " + Thread.currentThread().getName());
2905  }
2906
2907  @Override
2908  public void stop(String msg) {
2909    if (!isStopped()) {
2910      super.stop(msg);
2911      if (this.activeMasterManager != null) {
2912        this.activeMasterManager.stop();
2913      }
2914    }
2915  }
2916
2917  @VisibleForTesting
2918  protected void checkServiceStarted() throws ServerNotRunningYetException {
2919    if (!serviceStarted) {
2920      throw new ServerNotRunningYetException("Server is not running yet");
2921    }
2922  }
2923
2924  public static class MasterStoppedException extends DoNotRetryIOException {
2925    MasterStoppedException() {
2926      super();
2927    }
2928  }
2929
2930  void checkInitialized() throws PleaseHoldException, ServerNotRunningYetException,
2931      MasterNotRunningException, MasterStoppedException {
2932    checkServiceStarted();
2933    if (!isInitialized()) {
2934      throw new PleaseHoldException("Master is initializing");
2935    }
2936    if (isStopped()) {
2937      throw new MasterStoppedException();
2938    }
2939  }
2940
2941  /**
2942   * Report whether this master is currently the active master or not.
2943   * If not active master, we are parked on ZK waiting to become active.
2944   *
2945   * This method is used for testing.
2946   *
2947   * @return true if active master, false if not.
2948   */
2949  @Override
2950  public boolean isActiveMaster() {
2951    return activeMaster;
2952  }
2953
2954  /**
2955   * Report whether this master has completed with its initialization and is
2956   * ready.  If ready, the master is also the active master.  A standby master
2957   * is never ready.
2958   *
2959   * This method is used for testing.
2960   *
2961   * @return true if master is ready to go, false if not.
2962   */
2963  @Override
2964  public boolean isInitialized() {
2965    return initialized.isReady();
2966  }
2967
2968  /**
2969   * Report whether this master is in maintenance mode.
2970   *
2971   * @return true if master is in maintenanceMode
2972   */
2973  @Override
2974  public boolean isInMaintenanceMode() {
2975    return maintenanceMode;
2976  }
2977
2978  @VisibleForTesting
2979  public void setInitialized(boolean isInitialized) {
2980    procedureExecutor.getEnvironment().setEventReady(initialized, isInitialized);
2981  }
2982
2983  @Override
2984  public ProcedureEvent<?> getInitializedEvent() {
2985    return initialized;
2986  }
2987
2988  /**
2989   * Compute the average load across all region servers.
2990   * Currently, this uses a very naive computation - just uses the number of
2991   * regions being served, ignoring stats about number of requests.
2992   * @return the average load
2993   */
2994  public double getAverageLoad() {
2995    if (this.assignmentManager == null) {
2996      return 0;
2997    }
2998
2999    RegionStates regionStates = this.assignmentManager.getRegionStates();
3000    if (regionStates == null) {
3001      return 0;
3002    }
3003    return regionStates.getAverageLoad();
3004  }
3005
3006  /*
3007   * @return the count of region split plans executed
3008   */
3009  public long getSplitPlanCount() {
3010    return splitPlanCount;
3011  }
3012
3013  /*
3014   * @return the count of region merge plans executed
3015   */
3016  public long getMergePlanCount() {
3017    return mergePlanCount;
3018  }
3019
3020  @Override
3021  public boolean registerService(Service instance) {
3022    /*
3023     * No stacking of instances is allowed for a single service name
3024     */
3025    Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
3026    String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc);
3027    if (coprocessorServiceHandlers.containsKey(serviceName)) {
3028      LOG.error("Coprocessor service "+serviceName+
3029          " already registered, rejecting request from "+instance
3030      );
3031      return false;
3032    }
3033
3034    coprocessorServiceHandlers.put(serviceName, instance);
3035    if (LOG.isDebugEnabled()) {
3036      LOG.debug("Registered master coprocessor service: service="+serviceName);
3037    }
3038    return true;
3039  }
3040
3041  /**
3042   * Utility for constructing an instance of the passed HMaster class.
3043   * @param masterClass
3044   * @return HMaster instance.
3045   */
3046  public static HMaster constructMaster(Class<? extends HMaster> masterClass,
3047      final Configuration conf)  {
3048    try {
3049      Constructor<? extends HMaster> c = masterClass.getConstructor(Configuration.class);
3050      return c.newInstance(conf);
3051    } catch(Exception e) {
3052      Throwable error = e;
3053      if (e instanceof InvocationTargetException &&
3054          ((InvocationTargetException)e).getTargetException() != null) {
3055        error = ((InvocationTargetException)e).getTargetException();
3056      }
3057      throw new RuntimeException("Failed construction of Master: " + masterClass.toString() + ". "
3058        , error);
3059    }
3060  }
3061
3062  /**
3063   * @see org.apache.hadoop.hbase.master.HMasterCommandLine
3064   */
3065  public static void main(String [] args) {
3066    LOG.info("STARTING service " + HMaster.class.getSimpleName());
3067    VersionInfo.logVersion();
3068    new HMasterCommandLine(HMaster.class).doMain(args);
3069  }
3070
3071  public HFileCleaner getHFileCleaner() {
3072    return this.hfileCleaner;
3073  }
3074
3075  public LogCleaner getLogCleaner() {
3076    return this.logCleaner;
3077  }
3078
3079  /**
3080   * @return the underlying snapshot manager
3081   */
3082  @Override
3083  public SnapshotManager getSnapshotManager() {
3084    return this.snapshotManager;
3085  }
3086
3087  /**
3088   * @return the underlying MasterProcedureManagerHost
3089   */
3090  @Override
3091  public MasterProcedureManagerHost getMasterProcedureManagerHost() {
3092    return mpmHost;
3093  }
3094
3095  @Override
3096  public ClusterSchema getClusterSchema() {
3097    return this.clusterSchemaService;
3098  }
3099
3100  /**
3101   * Create a new Namespace.
3102   * @param namespaceDescriptor descriptor for new Namespace
3103   * @param nonceGroup Identifier for the source of the request, a client or process.
3104   * @param nonce A unique identifier for this operation from the client or process identified by
3105   * <code>nonceGroup</code> (the source must ensure each operation gets a unique id).
3106   * @return procedure id
3107   */
3108  long createNamespace(final NamespaceDescriptor namespaceDescriptor, final long nonceGroup,
3109      final long nonce) throws IOException {
3110    checkInitialized();
3111
3112    TableName.isLegalNamespaceName(Bytes.toBytes(namespaceDescriptor.getName()));
3113
3114    return MasterProcedureUtil.submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this,
3115          nonceGroup, nonce) {
3116      @Override
3117      protected void run() throws IOException {
3118        getMaster().getMasterCoprocessorHost().preCreateNamespace(namespaceDescriptor);
3119        // We need to wait for the procedure to potentially fail due to "prepare" sanity
3120        // checks. This will block only the beginning of the procedure. See HBASE-19953.
3121        ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch();
3122        LOG.info(getClientIdAuditPrefix() + " creating " + namespaceDescriptor);
3123        // Execute the operation synchronously - wait for the operation to complete before
3124        // continuing.
3125        setProcId(getClusterSchema().createNamespace(namespaceDescriptor, getNonceKey(), latch));
3126        latch.await();
3127        getMaster().getMasterCoprocessorHost().postCreateNamespace(namespaceDescriptor);
3128      }
3129
3130      @Override
3131      protected String getDescription() {
3132        return "CreateNamespaceProcedure";
3133      }
3134    });
3135  }
3136
3137  /**
3138   * Modify an existing Namespace.
3139   * @param nonceGroup Identifier for the source of the request, a client or process.
3140   * @param nonce A unique identifier for this operation from the client or process identified by
3141   * <code>nonceGroup</code> (the source must ensure each operation gets a unique id).
3142   * @return procedure id
3143   */
3144  long modifyNamespace(final NamespaceDescriptor newNsDescriptor, final long nonceGroup,
3145      final long nonce) throws IOException {
3146    checkInitialized();
3147
3148    TableName.isLegalNamespaceName(Bytes.toBytes(newNsDescriptor.getName()));
3149
3150    return MasterProcedureUtil.submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this,
3151          nonceGroup, nonce) {
3152      @Override
3153      protected void run() throws IOException {
3154        NamespaceDescriptor oldNsDescriptor = getNamespace(newNsDescriptor.getName());
3155        getMaster().getMasterCoprocessorHost().preModifyNamespace(oldNsDescriptor, newNsDescriptor);
3156        // We need to wait for the procedure to potentially fail due to "prepare" sanity
3157        // checks. This will block only the beginning of the procedure. See HBASE-19953.
3158        ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch();
3159        LOG.info(getClientIdAuditPrefix() + " modify " + newNsDescriptor);
3160        // Execute the operation synchronously - wait for the operation to complete before
3161        // continuing.
3162        setProcId(getClusterSchema().modifyNamespace(newNsDescriptor, getNonceKey(), latch));
3163        latch.await();
3164        getMaster().getMasterCoprocessorHost().postModifyNamespace(oldNsDescriptor,
3165          newNsDescriptor);
3166      }
3167
3168      @Override
3169      protected String getDescription() {
3170        return "ModifyNamespaceProcedure";
3171      }
3172    });
3173  }
3174
3175  /**
3176   * Delete an existing Namespace. Only empty Namespaces (no tables) can be removed.
3177   * @param nonceGroup Identifier for the source of the request, a client or process.
3178   * @param nonce A unique identifier for this operation from the client or process identified by
3179   * <code>nonceGroup</code> (the source must ensure each operation gets a unique id).
3180   * @return procedure id
3181   */
3182  long deleteNamespace(final String name, final long nonceGroup, final long nonce)
3183      throws IOException {
3184    checkInitialized();
3185
3186    return MasterProcedureUtil.submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this,
3187          nonceGroup, nonce) {
3188      @Override
3189      protected void run() throws IOException {
3190        getMaster().getMasterCoprocessorHost().preDeleteNamespace(name);
3191        LOG.info(getClientIdAuditPrefix() + " delete " + name);
3192        // Execute the operation synchronously - wait for the operation to complete before
3193        // continuing.
3194        //
3195        // We need to wait for the procedure to potentially fail due to "prepare" sanity
3196        // checks. This will block only the beginning of the procedure. See HBASE-19953.
3197        ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch();
3198        setProcId(submitProcedure(
3199              new DeleteNamespaceProcedure(procedureExecutor.getEnvironment(), name, latch)));
3200        latch.await();
3201        // Will not be invoked in the face of Exception thrown by the Procedure's execution
3202        getMaster().getMasterCoprocessorHost().postDeleteNamespace(name);
3203      }
3204
3205      @Override
3206      protected String getDescription() {
3207        return "DeleteNamespaceProcedure";
3208      }
3209    });
3210  }
3211
3212  /**
3213   * Get a Namespace
3214   * @param name Name of the Namespace
3215   * @return Namespace descriptor for <code>name</code>
3216   */
3217  NamespaceDescriptor getNamespace(String name) throws IOException {
3218    checkInitialized();
3219    if (this.cpHost != null) this.cpHost.preGetNamespaceDescriptor(name);
3220    NamespaceDescriptor nsd = this.clusterSchemaService.getNamespace(name);
3221    if (this.cpHost != null) this.cpHost.postGetNamespaceDescriptor(nsd);
3222    return nsd;
3223  }
3224
3225  /**
3226   * Get all Namespaces
3227   * @return All Namespace descriptors
3228   */
3229  List<NamespaceDescriptor> getNamespaces() throws IOException {
3230    checkInitialized();
3231    final List<NamespaceDescriptor> nsds = new ArrayList<>();
3232    if (cpHost != null) {
3233      cpHost.preListNamespaceDescriptors(nsds);
3234    }
3235    nsds.addAll(this.clusterSchemaService.getNamespaces());
3236    if (this.cpHost != null) {
3237      this.cpHost.postListNamespaceDescriptors(nsds);
3238    }
3239    return nsds;
3240  }
3241
3242  /**
3243   * List namespace names
3244   * @return All namespace names
3245   */
3246  public List<String> listNamespaces() throws IOException {
3247    checkInitialized();
3248    List<String> namespaces = new ArrayList<>();
3249    if (cpHost != null) {
3250      cpHost.preListNamespaces(namespaces);
3251    }
3252    for (NamespaceDescriptor namespace : clusterSchemaService.getNamespaces()) {
3253      namespaces.add(namespace.getName());
3254    }
3255    if (cpHost != null) {
3256      cpHost.postListNamespaces(namespaces);
3257    }
3258    return namespaces;
3259  }
3260
3261  @Override
3262  public List<TableName> listTableNamesByNamespace(String name) throws IOException {
3263    checkInitialized();
3264    return listTableNames(name, null, true);
3265  }
3266
3267  @Override
3268  public List<TableDescriptor> listTableDescriptorsByNamespace(String name) throws IOException {
3269    checkInitialized();
3270    return listTableDescriptors(name, null, null, true);
3271  }
3272
3273  @Override
3274  public boolean abortProcedure(final long procId, final boolean mayInterruptIfRunning)
3275      throws IOException {
3276    if (cpHost != null) {
3277      cpHost.preAbortProcedure(this.procedureExecutor, procId);
3278    }
3279
3280    final boolean result = this.procedureExecutor.abort(procId, mayInterruptIfRunning);
3281
3282    if (cpHost != null) {
3283      cpHost.postAbortProcedure();
3284    }
3285
3286    return result;
3287  }
3288
3289  @Override
3290  public List<Procedure<?>> getProcedures() throws IOException {
3291    if (cpHost != null) {
3292      cpHost.preGetProcedures();
3293    }
3294
3295    @SuppressWarnings({ "unchecked", "rawtypes" })
3296    List<Procedure<?>> procList = (List) this.procedureExecutor.getProcedures();
3297
3298    if (cpHost != null) {
3299      cpHost.postGetProcedures(procList);
3300    }
3301
3302    return procList;
3303  }
3304
3305  @Override
3306  public List<LockedResource> getLocks() throws IOException {
3307    if (cpHost != null) {
3308      cpHost.preGetLocks();
3309    }
3310
3311    MasterProcedureScheduler procedureScheduler =
3312      procedureExecutor.getEnvironment().getProcedureScheduler();
3313
3314    final List<LockedResource> lockedResources = procedureScheduler.getLocks();
3315
3316    if (cpHost != null) {
3317      cpHost.postGetLocks(lockedResources);
3318    }
3319
3320    return lockedResources;
3321  }
3322
3323  /**
3324   * Returns the list of table descriptors that match the specified request
3325   * @param namespace the namespace to query, or null if querying for all
3326   * @param regex The regular expression to match against, or null if querying for all
3327   * @param tableNameList the list of table names, or null if querying for all
3328   * @param includeSysTables False to match only against userspace tables
3329   * @return the list of table descriptors
3330   */
3331  public List<TableDescriptor> listTableDescriptors(final String namespace, final String regex,
3332      final List<TableName> tableNameList, final boolean includeSysTables)
3333  throws IOException {
3334    List<TableDescriptor> htds = new ArrayList<>();
3335    if (cpHost != null) {
3336      cpHost.preGetTableDescriptors(tableNameList, htds, regex);
3337    }
3338    htds = getTableDescriptors(htds, namespace, regex, tableNameList, includeSysTables);
3339    if (cpHost != null) {
3340      cpHost.postGetTableDescriptors(tableNameList, htds, regex);
3341    }
3342    return htds;
3343  }
3344
3345  /**
3346   * Returns the list of table names that match the specified request
3347   * @param regex The regular expression to match against, or null if querying for all
3348   * @param namespace the namespace to query, or null if querying for all
3349   * @param includeSysTables False to match only against userspace tables
3350   * @return the list of table names
3351   */
3352  public List<TableName> listTableNames(final String namespace, final String regex,
3353      final boolean includeSysTables) throws IOException {
3354    List<TableDescriptor> htds = new ArrayList<>();
3355    if (cpHost != null) {
3356      cpHost.preGetTableNames(htds, regex);
3357    }
3358    htds = getTableDescriptors(htds, namespace, regex, null, includeSysTables);
3359    if (cpHost != null) {
3360      cpHost.postGetTableNames(htds, regex);
3361    }
3362    List<TableName> result = new ArrayList<>(htds.size());
3363    for (TableDescriptor htd: htds) result.add(htd.getTableName());
3364    return result;
3365  }
3366
3367  /**
3368   * Return a list of table table descriptors after applying any provided filter parameters. Note
3369   * that the user-facing description of this filter logic is presented on the class-level javadoc
3370   * of {@link NormalizeTableFilterParams}.
3371   */
3372  private List<TableDescriptor> getTableDescriptors(final List<TableDescriptor> htds,
3373      final String namespace, final String regex, final List<TableName> tableNameList,
3374      final boolean includeSysTables)
3375  throws IOException {
3376    if (tableNameList == null || tableNameList.isEmpty()) {
3377      // request for all TableDescriptors
3378      Collection<TableDescriptor> allHtds;
3379      if (namespace != null && namespace.length() > 0) {
3380        // Do a check on the namespace existence. Will fail if does not exist.
3381        this.clusterSchemaService.getNamespace(namespace);
3382        allHtds = tableDescriptors.getByNamespace(namespace).values();
3383      } else {
3384        allHtds = tableDescriptors.getAll().values();
3385      }
3386      for (TableDescriptor desc: allHtds) {
3387        if (tableStateManager.isTablePresent(desc.getTableName())
3388            && (includeSysTables || !desc.getTableName().isSystemTable())) {
3389          htds.add(desc);
3390        }
3391      }
3392    } else {
3393      for (TableName s: tableNameList) {
3394        if (tableStateManager.isTablePresent(s)) {
3395          TableDescriptor desc = tableDescriptors.get(s);
3396          if (desc != null) {
3397            htds.add(desc);
3398          }
3399        }
3400      }
3401    }
3402
3403    // Retains only those matched by regular expression.
3404    if (regex != null) filterTablesByRegex(htds, Pattern.compile(regex));
3405    return htds;
3406  }
3407
3408  /**
3409   * Removes the table descriptors that don't match the pattern.
3410   * @param descriptors list of table descriptors to filter
3411   * @param pattern the regex to use
3412   */
3413  private static void filterTablesByRegex(final Collection<TableDescriptor> descriptors,
3414      final Pattern pattern) {
3415    final String defaultNS = NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR;
3416    Iterator<TableDescriptor> itr = descriptors.iterator();
3417    while (itr.hasNext()) {
3418      TableDescriptor htd = itr.next();
3419      String tableName = htd.getTableName().getNameAsString();
3420      boolean matched = pattern.matcher(tableName).matches();
3421      if (!matched && htd.getTableName().getNamespaceAsString().equals(defaultNS)) {
3422        matched = pattern.matcher(defaultNS + TableName.NAMESPACE_DELIM + tableName).matches();
3423      }
3424      if (!matched) {
3425        itr.remove();
3426      }
3427    }
3428  }
3429
3430  @Override
3431  public long getLastMajorCompactionTimestamp(TableName table) throws IOException {
3432    return getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))
3433        .getLastMajorCompactionTimestamp(table);
3434  }
3435
3436  @Override
3437  public long getLastMajorCompactionTimestampForRegion(byte[] regionName) throws IOException {
3438    return getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))
3439        .getLastMajorCompactionTimestamp(regionName);
3440  }
3441
3442  /**
3443   * Gets the mob file compaction state for a specific table.
3444   * Whether all the mob files are selected is known during the compaction execution, but
3445   * the statistic is done just before compaction starts, it is hard to know the compaction
3446   * type at that time, so the rough statistics are chosen for the mob file compaction. Only two
3447   * compaction states are available, CompactionState.MAJOR_AND_MINOR and CompactionState.NONE.
3448   * @param tableName The current table name.
3449   * @return If a given table is in mob file compaction now.
3450   */
3451  public GetRegionInfoResponse.CompactionState getMobCompactionState(TableName tableName) {
3452    AtomicInteger compactionsCount = mobCompactionStates.get(tableName);
3453    if (compactionsCount != null && compactionsCount.get() != 0) {
3454      return GetRegionInfoResponse.CompactionState.MAJOR_AND_MINOR;
3455    }
3456    return GetRegionInfoResponse.CompactionState.NONE;
3457  }
3458
3459  public void reportMobCompactionStart(TableName tableName) throws IOException {
3460    IdLock.Entry lockEntry = null;
3461    try {
3462      lockEntry = mobCompactionLock.getLockEntry(tableName.hashCode());
3463      AtomicInteger compactionsCount = mobCompactionStates.get(tableName);
3464      if (compactionsCount == null) {
3465        compactionsCount = new AtomicInteger(0);
3466        mobCompactionStates.put(tableName, compactionsCount);
3467      }
3468      compactionsCount.incrementAndGet();
3469    } finally {
3470      if (lockEntry != null) {
3471        mobCompactionLock.releaseLockEntry(lockEntry);
3472      }
3473    }
3474  }
3475
3476  public void reportMobCompactionEnd(TableName tableName) throws IOException {
3477    IdLock.Entry lockEntry = null;
3478    try {
3479      lockEntry = mobCompactionLock.getLockEntry(tableName.hashCode());
3480      AtomicInteger compactionsCount = mobCompactionStates.get(tableName);
3481      if (compactionsCount != null) {
3482        int count = compactionsCount.decrementAndGet();
3483        // remove the entry if the count is 0.
3484        if (count == 0) {
3485          mobCompactionStates.remove(tableName);
3486        }
3487      }
3488    } finally {
3489      if (lockEntry != null) {
3490        mobCompactionLock.releaseLockEntry(lockEntry);
3491      }
3492    }
3493  }
3494
3495
3496  /**
3497   * Queries the state of the {@link LoadBalancerTracker}. If the balancer is not initialized,
3498   * false is returned.
3499   *
3500   * @return The state of the load balancer, or false if the load balancer isn't defined.
3501   */
3502  public boolean isBalancerOn() {
3503    return !isInMaintenanceMode()
3504        && loadBalancerTracker != null
3505        && loadBalancerTracker.isBalancerOn();
3506  }
3507
3508  /**
3509   * Queries the state of the {@link RegionNormalizerTracker}. If it's not initialized,
3510   * false is returned.
3511   */
3512  public boolean isNormalizerOn() {
3513    return !isInMaintenanceMode()
3514        && regionNormalizerTracker != null
3515        && regionNormalizerTracker.isNormalizerOn();
3516  }
3517
3518  /**
3519   * Queries the state of the {@link SplitOrMergeTracker}. If it is not initialized,
3520   * false is returned. If switchType is illegal, false will return.
3521   * @param switchType see {@link org.apache.hadoop.hbase.client.MasterSwitchType}
3522   * @return The state of the switch
3523   */
3524  @Override
3525  public boolean isSplitOrMergeEnabled(MasterSwitchType switchType) {
3526    return !isInMaintenanceMode()
3527        && splitOrMergeTracker != null
3528        && splitOrMergeTracker.isSplitOrMergeEnabled(switchType);
3529  }
3530
3531  /**
3532   * Fetch the configured {@link LoadBalancer} class name. If none is set, a default is returned.
3533   * <p/>
3534   * Notice that, the base load balancer will always be {@link RSGroupBasedLoadBalancer} now, so
3535   * this method will return the balancer used inside each rs group.
3536   * @return The name of the {@link LoadBalancer} in use.
3537   */
3538  public String getLoadBalancerClassName() {
3539    return conf.get(HConstants.HBASE_MASTER_LOADBALANCER_CLASS,
3540      LoadBalancerFactory.getDefaultLoadBalancerClass().getName());
3541  }
3542
3543  /**
3544   * @return RegionNormalizerTracker instance
3545   */
3546  public RegionNormalizerTracker getRegionNormalizerTracker() {
3547    return regionNormalizerTracker;
3548  }
3549
3550  public SplitOrMergeTracker getSplitOrMergeTracker() {
3551    return splitOrMergeTracker;
3552  }
3553
3554  @Override
3555  public RSGroupBasedLoadBalancer getLoadBalancer() {
3556    return balancer;
3557  }
3558
3559  @Override
3560  public FavoredNodesManager getFavoredNodesManager() {
3561    return balancer.getFavoredNodesManager();
3562  }
3563
3564  private long executePeerProcedure(AbstractPeerProcedure<?> procedure) throws IOException {
3565    long procId = procedureExecutor.submitProcedure(procedure);
3566    procedure.getLatch().await();
3567    return procId;
3568  }
3569
3570  @Override
3571  public long addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
3572      throws ReplicationException, IOException {
3573    LOG.info(getClientIdAuditPrefix() + " creating replication peer, id=" + peerId + ", config=" +
3574      peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED"));
3575    return executePeerProcedure(new AddPeerProcedure(peerId, peerConfig, enabled));
3576  }
3577
3578  @Override
3579  public long removeReplicationPeer(String peerId) throws ReplicationException, IOException {
3580    LOG.info(getClientIdAuditPrefix() + " removing replication peer, id=" + peerId);
3581    return executePeerProcedure(new RemovePeerProcedure(peerId));
3582  }
3583
3584  @Override
3585  public long enableReplicationPeer(String peerId) throws ReplicationException, IOException {
3586    LOG.info(getClientIdAuditPrefix() + " enable replication peer, id=" + peerId);
3587    return executePeerProcedure(new EnablePeerProcedure(peerId));
3588  }
3589
3590  @Override
3591  public long disableReplicationPeer(String peerId) throws ReplicationException, IOException {
3592    LOG.info(getClientIdAuditPrefix() + " disable replication peer, id=" + peerId);
3593    return executePeerProcedure(new DisablePeerProcedure(peerId));
3594  }
3595
3596  @Override
3597  public ReplicationPeerConfig getReplicationPeerConfig(String peerId)
3598      throws ReplicationException, IOException {
3599    if (cpHost != null) {
3600      cpHost.preGetReplicationPeerConfig(peerId);
3601    }
3602    LOG.info(getClientIdAuditPrefix() + " get replication peer config, id=" + peerId);
3603    ReplicationPeerConfig peerConfig = this.replicationPeerManager.getPeerConfig(peerId)
3604        .orElseThrow(() -> new ReplicationPeerNotFoundException(peerId));
3605    if (cpHost != null) {
3606      cpHost.postGetReplicationPeerConfig(peerId);
3607    }
3608    return peerConfig;
3609  }
3610
3611  @Override
3612  public long updateReplicationPeerConfig(String peerId, ReplicationPeerConfig peerConfig)
3613      throws ReplicationException, IOException {
3614    LOG.info(getClientIdAuditPrefix() + " update replication peer config, id=" + peerId +
3615      ", config=" + peerConfig);
3616    return executePeerProcedure(new UpdatePeerConfigProcedure(peerId, peerConfig));
3617  }
3618
3619  @Override
3620  public List<ReplicationPeerDescription> listReplicationPeers(String regex)
3621      throws ReplicationException, IOException {
3622    if (cpHost != null) {
3623      cpHost.preListReplicationPeers(regex);
3624    }
3625    LOG.debug("{} list replication peers, regex={}", getClientIdAuditPrefix(), regex);
3626    Pattern pattern = regex == null ? null : Pattern.compile(regex);
3627    List<ReplicationPeerDescription> peers =
3628      this.replicationPeerManager.listPeers(pattern);
3629    if (cpHost != null) {
3630      cpHost.postListReplicationPeers(regex);
3631    }
3632    return peers;
3633  }
3634
3635  @Override
3636  public long transitReplicationPeerSyncReplicationState(String peerId, SyncReplicationState state)
3637    throws ReplicationException, IOException {
3638    LOG.info(
3639      getClientIdAuditPrefix() +
3640        " transit current cluster state to {} in a synchronous replication peer id={}",
3641      state, peerId);
3642    return executePeerProcedure(new TransitPeerSyncReplicationStateProcedure(peerId, state));
3643  }
3644
3645  /**
3646   * Mark region server(s) as decommissioned (previously called 'draining') to prevent additional
3647   * regions from getting assigned to them. Also unload the regions on the servers asynchronously.0
3648   * @param servers Region servers to decommission.
3649   */
3650  public void decommissionRegionServers(final List<ServerName> servers, final boolean offload)
3651      throws IOException {
3652    List<ServerName> serversAdded = new ArrayList<>(servers.size());
3653    // Place the decommission marker first.
3654    String parentZnode = getZooKeeper().getZNodePaths().drainingZNode;
3655    for (ServerName server : servers) {
3656      try {
3657        String node = ZNodePaths.joinZNode(parentZnode, server.getServerName());
3658        ZKUtil.createAndFailSilent(getZooKeeper(), node);
3659      } catch (KeeperException ke) {
3660        throw new HBaseIOException(
3661          this.zooKeeper.prefix("Unable to decommission '" + server.getServerName() + "'."), ke);
3662      }
3663      if (this.serverManager.addServerToDrainList(server)) {
3664        serversAdded.add(server);
3665      }
3666    }
3667    // Move the regions off the decommissioned servers.
3668    if (offload) {
3669      final List<ServerName> destServers = this.serverManager.createDestinationServersList();
3670      for (ServerName server : serversAdded) {
3671        final List<RegionInfo> regionsOnServer = this.assignmentManager.getRegionsOnServer(server);
3672        for (RegionInfo hri : regionsOnServer) {
3673          ServerName dest = balancer.randomAssignment(hri, destServers);
3674          if (dest == null) {
3675            throw new HBaseIOException("Unable to determine a plan to move " + hri);
3676          }
3677          RegionPlan rp = new RegionPlan(hri, server, dest);
3678          this.assignmentManager.moveAsync(rp);
3679        }
3680      }
3681    }
3682  }
3683
3684  /**
3685   * List region servers marked as decommissioned (previously called 'draining') to not get regions
3686   * assigned to them.
3687   * @return List of decommissioned servers.
3688   */
3689  public List<ServerName> listDecommissionedRegionServers() {
3690    return this.serverManager.getDrainingServersList();
3691  }
3692
3693  /**
3694   * Remove decommission marker (previously called 'draining') from a region server to allow regions
3695   * assignments. Load regions onto the server asynchronously if a list of regions is given
3696   * @param server Region server to remove decommission marker from.
3697   */
3698  public void recommissionRegionServer(final ServerName server,
3699      final List<byte[]> encodedRegionNames) throws IOException {
3700    // Remove the server from decommissioned (draining) server list.
3701    String parentZnode = getZooKeeper().getZNodePaths().drainingZNode;
3702    String node = ZNodePaths.joinZNode(parentZnode, server.getServerName());
3703    try {
3704      ZKUtil.deleteNodeFailSilent(getZooKeeper(), node);
3705    } catch (KeeperException ke) {
3706      throw new HBaseIOException(
3707        this.zooKeeper.prefix("Unable to recommission '" + server.getServerName() + "'."), ke);
3708    }
3709    this.serverManager.removeServerFromDrainList(server);
3710
3711    // Load the regions onto the server if we are given a list of regions.
3712    if (encodedRegionNames == null || encodedRegionNames.isEmpty()) {
3713      return;
3714    }
3715    if (!this.serverManager.isServerOnline(server)) {
3716      return;
3717    }
3718    for (byte[] encodedRegionName : encodedRegionNames) {
3719      RegionState regionState =
3720        assignmentManager.getRegionStates().getRegionState(Bytes.toString(encodedRegionName));
3721      if (regionState == null) {
3722        LOG.warn("Unknown region " + Bytes.toStringBinary(encodedRegionName));
3723        continue;
3724      }
3725      RegionInfo hri = regionState.getRegion();
3726      if (server.equals(regionState.getServerName())) {
3727        LOG.info("Skipping move of region " + hri.getRegionNameAsString() +
3728          " because region already assigned to the same server " + server + ".");
3729        continue;
3730      }
3731      RegionPlan rp = new RegionPlan(hri, regionState.getServerName(), server);
3732      this.assignmentManager.moveAsync(rp);
3733    }
3734  }
3735
3736  @Override
3737  public LockManager getLockManager() {
3738    return lockManager;
3739  }
3740
3741  public QuotaObserverChore getQuotaObserverChore() {
3742    return this.quotaObserverChore;
3743  }
3744
3745  public SpaceQuotaSnapshotNotifier getSpaceQuotaSnapshotNotifier() {
3746    return this.spaceQuotaSnapshotNotifier;
3747  }
3748
3749  @SuppressWarnings("unchecked")
3750  private RemoteProcedure<MasterProcedureEnv, ?> getRemoteProcedure(long procId) {
3751    Procedure<?> procedure = procedureExecutor.getProcedure(procId);
3752    if (procedure == null) {
3753      return null;
3754    }
3755    assert procedure instanceof RemoteProcedure;
3756    return (RemoteProcedure<MasterProcedureEnv, ?>) procedure;
3757  }
3758
3759  public void remoteProcedureCompleted(long procId) {
3760    LOG.debug("Remote procedure done, pid={}", procId);
3761    RemoteProcedure<MasterProcedureEnv, ?> procedure = getRemoteProcedure(procId);
3762    if (procedure != null) {
3763      procedure.remoteOperationCompleted(procedureExecutor.getEnvironment());
3764    }
3765  }
3766
3767  public void remoteProcedureFailed(long procId, RemoteProcedureException error) {
3768    LOG.debug("Remote procedure failed, pid={}", procId, error);
3769    RemoteProcedure<MasterProcedureEnv, ?> procedure = getRemoteProcedure(procId);
3770    if (procedure != null) {
3771      procedure.remoteOperationFailed(procedureExecutor.getEnvironment(), error);
3772    }
3773  }
3774
3775  /**
3776   * Reopen regions provided in the argument
3777   *
3778   * @param tableName The current table name
3779   * @param regionNames The region names of the regions to reopen
3780   * @param nonceGroup Identifier for the source of the request, a client or process
3781   * @param nonce A unique identifier for this operation from the client or process identified by
3782   *   <code>nonceGroup</code> (the source must ensure each operation gets a unique id).
3783   * @return procedure Id
3784   * @throws IOException if reopening region fails while running procedure
3785   */
3786  long reopenRegions(final TableName tableName, final List<byte[]> regionNames,
3787      final long nonceGroup, final long nonce)
3788      throws IOException {
3789
3790    return MasterProcedureUtil
3791      .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
3792
3793        @Override
3794        protected void run() throws IOException {
3795          submitProcedure(new ReopenTableRegionsProcedure(tableName, regionNames));
3796        }
3797
3798        @Override
3799        protected String getDescription() {
3800          return "ReopenTableRegionsProcedure";
3801        }
3802
3803      });
3804
3805  }
3806
3807  @Override
3808  public ReplicationPeerManager getReplicationPeerManager() {
3809    return replicationPeerManager;
3810  }
3811
3812  public HashMap<String, List<Pair<ServerName, ReplicationLoadSource>>>
3813      getReplicationLoad(ServerName[] serverNames) {
3814    List<ReplicationPeerDescription> peerList = this.getReplicationPeerManager().listPeers(null);
3815    if (peerList == null) {
3816      return null;
3817    }
3818    HashMap<String, List<Pair<ServerName, ReplicationLoadSource>>> replicationLoadSourceMap =
3819        new HashMap<>(peerList.size());
3820    peerList.stream()
3821        .forEach(peer -> replicationLoadSourceMap.put(peer.getPeerId(), new ArrayList<>()));
3822    for (ServerName serverName : serverNames) {
3823      List<ReplicationLoadSource> replicationLoadSources =
3824          getServerManager().getLoad(serverName).getReplicationLoadSourceList();
3825      for (ReplicationLoadSource replicationLoadSource : replicationLoadSources) {
3826        replicationLoadSourceMap.get(replicationLoadSource.getPeerID())
3827            .add(new Pair<>(serverName, replicationLoadSource));
3828      }
3829    }
3830    for (List<Pair<ServerName, ReplicationLoadSource>> loads : replicationLoadSourceMap.values()) {
3831      if (loads.size() > 0) {
3832        loads.sort(Comparator.comparingLong(load -> (-1) * load.getSecond().getReplicationLag()));
3833      }
3834    }
3835    return replicationLoadSourceMap;
3836  }
3837
3838  /**
3839   * This method modifies the master's configuration in order to inject replication-related features
3840   */
3841  @VisibleForTesting
3842  public static void decorateMasterConfiguration(Configuration conf) {
3843    String plugins = conf.get(HBASE_MASTER_LOGCLEANER_PLUGINS);
3844    String cleanerClass = ReplicationLogCleaner.class.getCanonicalName();
3845    if (plugins == null || !plugins.contains(cleanerClass)) {
3846      conf.set(HBASE_MASTER_LOGCLEANER_PLUGINS, plugins + "," + cleanerClass);
3847    }
3848    if (ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf)) {
3849      plugins = conf.get(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
3850      cleanerClass = ReplicationHFileCleaner.class.getCanonicalName();
3851      if (!plugins.contains(cleanerClass)) {
3852        conf.set(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, plugins + "," + cleanerClass);
3853      }
3854    }
3855  }
3856
3857  public SnapshotQuotaObserverChore getSnapshotQuotaObserverChore() {
3858    return this.snapshotQuotaChore;
3859  }
3860
3861  @Override
3862  public SyncReplicationReplayWALManager getSyncReplicationReplayWALManager() {
3863    return this.syncReplicationReplayWALManager;
3864  }
3865
3866  @Override
3867  public Map<String, ReplicationStatus> getWalGroupsReplicationStatus() {
3868    if (!this.isOnline() || !LoadBalancer.isMasterCanHostUserRegions(conf)) {
3869      return new HashMap<>();
3870    }
3871    return super.getWalGroupsReplicationStatus();
3872  }
3873
3874  public HbckChore getHbckChore() {
3875    return this.hbckChore;
3876  }
3877
3878  @Override
3879  public String getClusterId() {
3880    if (activeMaster) {
3881      return super.getClusterId();
3882    }
3883    return cachedClusterId.getFromCacheOrFetch();
3884  }
3885
3886  public Optional<ServerName> getActiveMaster() {
3887    return activeMasterManager.getActiveMasterServerName();
3888  }
3889
3890  @Override
3891  public void runReplicationBarrierCleaner() {
3892    ReplicationBarrierCleaner rbc = this.replicationBarrierCleaner;
3893    if (rbc != null) {
3894      rbc.chore();
3895    }
3896  }
3897
3898  public MetaRegionLocationCache getMetaRegionLocationCache() {
3899    return this.metaRegionLocationCache;
3900  }
3901
3902  @Override
3903  public RSGroupInfoManager getRSGroupInfoManager() {
3904    return rsGroupInfoManager;
3905  }
3906
3907  /**
3908   * Get the compaction state of the table
3909   *
3910   * @param tableName The table name
3911   * @return CompactionState Compaction state of the table
3912   */
3913  public CompactionState getCompactionState(final TableName tableName) {
3914    CompactionState compactionState = CompactionState.NONE;
3915    try {
3916      List<RegionInfo> regions =
3917        assignmentManager.getRegionStates().getRegionsOfTable(tableName, false);
3918      for (RegionInfo regionInfo : regions) {
3919        ServerName serverName =
3920          assignmentManager.getRegionStates().getRegionServerOfRegion(regionInfo);
3921        if (serverName == null) {
3922          continue;
3923        }
3924        ServerMetrics sl = serverManager.getLoad(serverName);
3925        if (sl == null) {
3926          continue;
3927        }
3928        RegionMetrics regionMetrics = sl.getRegionMetrics().get(regionInfo.getRegionName());
3929        if (regionMetrics.getCompactionState() == CompactionState.MAJOR) {
3930          if (compactionState == CompactionState.MINOR) {
3931            compactionState = CompactionState.MAJOR_AND_MINOR;
3932          } else {
3933            compactionState = CompactionState.MAJOR;
3934          }
3935        } else if (regionMetrics.getCompactionState() == CompactionState.MINOR) {
3936          if (compactionState == CompactionState.MAJOR) {
3937            compactionState = CompactionState.MAJOR_AND_MINOR;
3938          } else {
3939            compactionState = CompactionState.MINOR;
3940          }
3941        }
3942      }
3943    } catch (Exception e) {
3944      compactionState = null;
3945      LOG.error("Exception when get compaction state for " + tableName.getNameAsString(), e);
3946    }
3947    return compactionState;
3948  }
3949}