001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
021import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
022import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
023import static org.apache.hadoop.hbase.util.DNS.MASTER_HOSTNAME_KEY;
024
025import com.google.protobuf.Descriptors;
026import com.google.protobuf.Service;
027import java.io.IOException;
028import java.io.InterruptedIOException;
029import java.lang.reflect.Constructor;
030import java.lang.reflect.InvocationTargetException;
031import java.net.InetAddress;
032import java.net.InetSocketAddress;
033import java.net.UnknownHostException;
034import java.util.ArrayList;
035import java.util.Arrays;
036import java.util.Collection;
037import java.util.Collections;
038import java.util.Comparator;
039import java.util.EnumSet;
040import java.util.HashMap;
041import java.util.Iterator;
042import java.util.LinkedList;
043import java.util.List;
044import java.util.Map;
045import java.util.Objects;
046import java.util.Optional;
047import java.util.Set;
048import java.util.concurrent.ExecutionException;
049import java.util.concurrent.Future;
050import java.util.concurrent.TimeUnit;
051import java.util.concurrent.TimeoutException;
052import java.util.concurrent.atomic.AtomicInteger;
053import java.util.regex.Pattern;
054import java.util.stream.Collectors;
055import javax.servlet.http.HttpServlet;
056import org.apache.commons.io.IOUtils;
057import org.apache.commons.lang3.StringUtils;
058import org.apache.hadoop.conf.Configuration;
059import org.apache.hadoop.fs.FSDataOutputStream;
060import org.apache.hadoop.fs.Path;
061import org.apache.hadoop.hbase.ChoreService;
062import org.apache.hadoop.hbase.ClusterId;
063import org.apache.hadoop.hbase.ClusterMetrics;
064import org.apache.hadoop.hbase.ClusterMetrics.Option;
065import org.apache.hadoop.hbase.ClusterMetricsBuilder;
066import org.apache.hadoop.hbase.DoNotRetryIOException;
067import org.apache.hadoop.hbase.HBaseIOException;
068import org.apache.hadoop.hbase.HBaseInterfaceAudience;
069import org.apache.hadoop.hbase.HConstants;
070import org.apache.hadoop.hbase.InvalidFamilyOperationException;
071import org.apache.hadoop.hbase.MasterNotRunningException;
072import org.apache.hadoop.hbase.NamespaceDescriptor;
073import org.apache.hadoop.hbase.PleaseHoldException;
074import org.apache.hadoop.hbase.RegionMetrics;
075import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
076import org.apache.hadoop.hbase.ServerMetrics;
077import org.apache.hadoop.hbase.ServerName;
078import org.apache.hadoop.hbase.TableName;
079import org.apache.hadoop.hbase.TableNotDisabledException;
080import org.apache.hadoop.hbase.TableNotFoundException;
081import org.apache.hadoop.hbase.UnknownRegionException;
082import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
083import org.apache.hadoop.hbase.client.CompactionState;
084import org.apache.hadoop.hbase.client.MasterSwitchType;
085import org.apache.hadoop.hbase.client.NormalizeTableFilterParams;
086import org.apache.hadoop.hbase.client.RegionInfo;
087import org.apache.hadoop.hbase.client.RegionInfoBuilder;
088import org.apache.hadoop.hbase.client.RegionStatesCount;
089import org.apache.hadoop.hbase.client.TableDescriptor;
090import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
091import org.apache.hadoop.hbase.client.TableState;
092import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
093import org.apache.hadoop.hbase.exceptions.MasterStoppedException;
094import org.apache.hadoop.hbase.executor.ExecutorType;
095import org.apache.hadoop.hbase.favored.FavoredNodesManager;
096import org.apache.hadoop.hbase.favored.FavoredNodesPromoter;
097import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
098import org.apache.hadoop.hbase.ipc.RpcServer;
099import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
100import org.apache.hadoop.hbase.log.HBaseMarkers;
101import org.apache.hadoop.hbase.master.MasterRpcServices.BalanceSwitchMode;
102import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
103import org.apache.hadoop.hbase.master.assignment.MergeTableRegionsProcedure;
104import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
105import org.apache.hadoop.hbase.master.assignment.RegionStates;
106import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
107import org.apache.hadoop.hbase.master.balancer.BalancerChore;
108import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
109import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
110import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
111import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
112import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
113import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
114import org.apache.hadoop.hbase.master.cleaner.ReplicationBarrierCleaner;
115import org.apache.hadoop.hbase.master.cleaner.SnapshotCleanerChore;
116import org.apache.hadoop.hbase.master.janitor.CatalogJanitor;
117import org.apache.hadoop.hbase.master.locking.LockManager;
118import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerFactory;
119import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerManager;
120import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
121import org.apache.hadoop.hbase.master.procedure.DeleteNamespaceProcedure;
122import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure;
123import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure;
124import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure;
125import org.apache.hadoop.hbase.master.procedure.InitMetaProcedure;
126import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
127import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
128import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
129import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
130import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil.NonceProcedureRunnable;
131import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure;
132import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
133import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
134import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure;
135import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
136import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure;
137import org.apache.hadoop.hbase.master.region.MasterRegion;
138import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
139import org.apache.hadoop.hbase.master.replication.AddPeerProcedure;
140import org.apache.hadoop.hbase.master.replication.DisablePeerProcedure;
141import org.apache.hadoop.hbase.master.replication.EnablePeerProcedure;
142import org.apache.hadoop.hbase.master.replication.ModifyPeerProcedure;
143import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure;
144import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
145import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure;
146import org.apache.hadoop.hbase.master.slowlog.SlowLogMasterService;
147import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
148import org.apache.hadoop.hbase.master.zksyncer.MasterAddressSyncer;
149import org.apache.hadoop.hbase.master.zksyncer.MetaLocationSyncer;
150import org.apache.hadoop.hbase.mob.MobConstants;
151import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
152import org.apache.hadoop.hbase.monitoring.MonitoredTask;
153import org.apache.hadoop.hbase.monitoring.TaskMonitor;
154import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
155import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager;
156import org.apache.hadoop.hbase.procedure2.LockedResource;
157import org.apache.hadoop.hbase.procedure2.Procedure;
158import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
159import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
160import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
161import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
162import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
163import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener;
164import org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore;
165import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
166import org.apache.hadoop.hbase.quotas.MasterQuotasObserver;
167import org.apache.hadoop.hbase.quotas.QuotaObserverChore;
168import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
169import org.apache.hadoop.hbase.quotas.QuotaUtil;
170import org.apache.hadoop.hbase.quotas.SnapshotQuotaObserverChore;
171import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
172import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
173import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshotNotifier;
174import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshotNotifierFactory;
175import org.apache.hadoop.hbase.quotas.SpaceViolationPolicy;
176import org.apache.hadoop.hbase.regionserver.HRegionServer;
177import org.apache.hadoop.hbase.regionserver.RSRpcServices;
178import org.apache.hadoop.hbase.replication.ReplicationException;
179import org.apache.hadoop.hbase.replication.ReplicationLoadSource;
180import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
181import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
182import org.apache.hadoop.hbase.replication.ReplicationUtils;
183import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
184import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
185import org.apache.hadoop.hbase.replication.master.ReplicationPeerConfigUpgrader;
186import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
187import org.apache.hadoop.hbase.security.AccessDeniedException;
188import org.apache.hadoop.hbase.security.SecurityConstants;
189import org.apache.hadoop.hbase.security.UserProvider;
190import org.apache.hadoop.hbase.trace.TraceUtil;
191import org.apache.hadoop.hbase.util.Addressing;
192import org.apache.hadoop.hbase.util.Bytes;
193import org.apache.hadoop.hbase.util.HBaseFsck;
194import org.apache.hadoop.hbase.util.HFileArchiveUtil;
195import org.apache.hadoop.hbase.util.IdLock;
196import org.apache.hadoop.hbase.util.ModifyRegionUtils;
197import org.apache.hadoop.hbase.util.Pair;
198import org.apache.hadoop.hbase.util.RetryCounter;
199import org.apache.hadoop.hbase.util.RetryCounterFactory;
200import org.apache.hadoop.hbase.util.TableDescriptorChecker;
201import org.apache.hadoop.hbase.util.Threads;
202import org.apache.hadoop.hbase.util.VersionInfo;
203import org.apache.hadoop.hbase.zookeeper.LoadBalancerTracker;
204import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
205import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker;
206import org.apache.hadoop.hbase.zookeeper.SnapshotCleanupTracker;
207import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
208import org.apache.hadoop.hbase.zookeeper.ZKUtil;
209import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
210import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
211import org.apache.yetus.audience.InterfaceAudience;
212import org.apache.zookeeper.KeeperException;
213import org.slf4j.Logger;
214import org.slf4j.LoggerFactory;
215
216import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
217import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
218import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
219import org.apache.hbase.thirdparty.org.eclipse.jetty.server.Server;
220import org.apache.hbase.thirdparty.org.eclipse.jetty.server.ServerConnector;
221import org.apache.hbase.thirdparty.org.eclipse.jetty.servlet.ServletHolder;
222import org.apache.hbase.thirdparty.org.eclipse.jetty.webapp.WebAppContext;
223
224import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
227
228/**
229 * HMaster is the "master server" for HBase. An HBase cluster has one active master. If many masters
230 * are started, all compete. Whichever wins goes on to run the cluster. All others park themselves
231 * in their constructor until master or cluster shutdown or until the active master loses its lease
232 * in zookeeper. Thereafter, all running master jostle to take over master role.
233 * <p/>
234 * The Master can be asked shutdown the cluster. See {@link #shutdown()}. In this case it will tell
235 * all regionservers to go down and then wait on them all reporting in that they are down. This
236 * master will then shut itself down.
237 * <p/>
238 * You can also shutdown just this master. Call {@link #stopMaster()}.
239 * @see org.apache.zookeeper.Watcher
240 */
241@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
242@SuppressWarnings("deprecation")
243public class HMaster extends HRegionServer implements MasterServices {
244
245  private static final Logger LOG = LoggerFactory.getLogger(HMaster.class);
246
247  // MASTER is name of the webapp and the attribute name used stuffing this
248  //instance into web context.
249  public static final String MASTER = "master";
250
251  // Manager and zk listener for master election
252  private final ActiveMasterManager activeMasterManager;
253  // Region server tracker
254  private RegionServerTracker regionServerTracker;
255  // Draining region server tracker
256  private DrainingServerTracker drainingServerTracker;
257  // Tracker for load balancer state
258  LoadBalancerTracker loadBalancerTracker;
259  // Tracker for meta location, if any client ZK quorum specified
260  private MetaLocationSyncer metaLocationSyncer;
261  // Tracker for active master location, if any client ZK quorum specified
262  @InterfaceAudience.Private
263  MasterAddressSyncer masterAddressSyncer;
264  // Tracker for auto snapshot cleanup state
265  SnapshotCleanupTracker snapshotCleanupTracker;
266
267  // Tracker for split and merge state
268  private SplitOrMergeTracker splitOrMergeTracker;
269
270  private ClusterSchemaService clusterSchemaService;
271
272  public static final String HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS =
273    "hbase.master.wait.on.service.seconds";
274  public static final int DEFAULT_HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS = 5 * 60;
275
276  public static final String HBASE_MASTER_CLEANER_INTERVAL = "hbase.master.cleaner.interval";
277
278  public static final int DEFAULT_HBASE_MASTER_CLEANER_INTERVAL = 600 * 1000;
279
280  // Metrics for the HMaster
281  final MetricsMaster metricsMaster;
282  // file system manager for the master FS operations
283  private MasterFileSystem fileSystemManager;
284  private MasterWalManager walManager;
285
286  // manager to manage procedure-based WAL splitting, can be null if current
287  // is zk-based WAL splitting. SplitWALManager will replace SplitLogManager
288  // and MasterWalManager, which means zk-based WAL splitting code will be
289  // useless after we switch to the procedure-based one. our eventual goal
290  // is to remove all the zk-based WAL splitting code.
291  private SplitWALManager splitWALManager;
292
293  // server manager to deal with region server info
294  private volatile ServerManager serverManager;
295
296  // manager of assignment nodes in zookeeper
297  private AssignmentManager assignmentManager;
298
299  /**
300   * Cache for the meta region replica's locations. Also tracks their changes to avoid stale
301   * cache entries.
302   */
303  private final MetaRegionLocationCache metaRegionLocationCache;
304
305  // manager of replication
306  private ReplicationPeerManager replicationPeerManager;
307
308  // buffer for "fatal error" notices from region servers
309  // in the cluster. This is only used for assisting
310  // operations/debugging.
311  MemoryBoundedLogMessageBuffer rsFatals;
312
313  // flag set after we become the active master (used for testing)
314  private volatile boolean activeMaster = false;
315
316  // flag set after we complete initialization once active
317  private final ProcedureEvent<?> initialized = new ProcedureEvent<>("master initialized");
318
319  // flag set after master services are started,
320  // initialization may have not completed yet.
321  volatile boolean serviceStarted = false;
322
323  // Maximum time we should run balancer for
324  private final int maxBalancingTime;
325  // Maximum percent of regions in transition when balancing
326  private final double maxRitPercent;
327
328  private final LockManager lockManager = new LockManager(this);
329
330  private LoadBalancer balancer;
331  private BalancerChore balancerChore;
332  private RegionNormalizerManager regionNormalizerManager;
333  private ClusterStatusChore clusterStatusChore;
334  private ClusterStatusPublisher clusterStatusPublisherChore = null;
335  private SnapshotCleanerChore snapshotCleanerChore = null;
336
337  private HbckChore hbckChore;
338  CatalogJanitor catalogJanitorChore;
339  private DirScanPool cleanerPool;
340  private LogCleaner logCleaner;
341  private HFileCleaner hfileCleaner;
342  private ReplicationBarrierCleaner replicationBarrierCleaner;
343  private ExpiredMobFileCleanerChore expiredMobFileCleanerChore;
344  private MobCompactionChore mobCompactChore;
345  private MasterMobCompactionThread mobCompactThread;
346  // used to synchronize the mobCompactionStates
347  private final IdLock mobCompactionLock = new IdLock();
348  // save the information of mob compactions in tables.
349  // the key is table name, the value is the number of compactions in that table.
350  private Map<TableName, AtomicInteger> mobCompactionStates = Maps.newConcurrentMap();
351
352  MasterCoprocessorHost cpHost;
353
354  private final boolean preLoadTableDescriptors;
355
356  // Time stamps for when a hmaster became active
357  private long masterActiveTime;
358
359  // Time stamp for when HMaster finishes becoming Active Master
360  private long masterFinishedInitializationTime;
361
362  Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
363
364  // monitor for snapshot of hbase tables
365  SnapshotManager snapshotManager;
366  // monitor for distributed procedures
367  private MasterProcedureManagerHost mpmHost;
368
369  private RegionsRecoveryChore regionsRecoveryChore = null;
370
371  private RegionsRecoveryConfigManager regionsRecoveryConfigManager = null;
372  // it is assigned after 'initialized' guard set to true, so should be volatile
373  private volatile MasterQuotaManager quotaManager;
374  private SpaceQuotaSnapshotNotifier spaceQuotaSnapshotNotifier;
375  private QuotaObserverChore quotaObserverChore;
376  private SnapshotQuotaObserverChore snapshotQuotaChore;
377
378  private ProcedureExecutor<MasterProcedureEnv> procedureExecutor;
379  private ProcedureStore procedureStore;
380
381  // the master local storage to store procedure data, etc.
382  private MasterRegion masterRegion;
383
384  // handle table states
385  private TableStateManager tableStateManager;
386
387  /* Handle favored nodes information */
388  private FavoredNodesManager favoredNodesManager;
389
390  /** jetty server for master to redirect requests to regionserver infoServer */
391  private Server masterJettyServer;
392
393  // Determine if we should do normal startup or minimal "single-user" mode with no region
394  // servers and no user tables. Useful for repair and recovery of hbase:meta
395  private final boolean maintenanceMode;
396  static final String MAINTENANCE_MODE = "hbase.master.maintenance_mode";
397
398  // Cached clusterId on stand by masters to serve clusterID requests from clients.
399  private final CachedClusterId cachedClusterId;
400
401  /**
402   * Initializes the HMaster. The steps are as follows:
403   * <p>
404   * <ol>
405   * <li>Initialize the local HRegionServer
406   * <li>Start the ActiveMasterManager.
407   * </ol>
408   * <p>
409   * Remaining steps of initialization occur in
410   * {@link #finishActiveMasterInitialization(MonitoredTask)} after the master becomes the
411   * active one.
412   */
413  public HMaster(final Configuration conf) throws IOException {
414    super(conf);
415    TraceUtil.initTracer(conf);
416    try {
417      if (conf.getBoolean(MAINTENANCE_MODE, false)) {
418        LOG.info("Detected {}=true via configuration.", MAINTENANCE_MODE);
419        maintenanceMode = true;
420      } else if (Boolean.getBoolean(MAINTENANCE_MODE)) {
421        LOG.info("Detected {}=true via environment variables.", MAINTENANCE_MODE);
422        maintenanceMode = true;
423      } else {
424        maintenanceMode = false;
425      }
426      this.rsFatals = new MemoryBoundedLogMessageBuffer(
427          conf.getLong("hbase.master.buffer.for.rs.fatals", 1 * 1024 * 1024));
428      LOG.info("hbase.rootdir={}, hbase.cluster.distributed={}", getDataRootDir(),
429          this.conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, false));
430
431      // Disable usage of meta replicas in the master
432      this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
433
434      decorateMasterConfiguration(this.conf);
435
436      // Hack! Maps DFSClient => Master for logs.  HDFS made this
437      // config param for task trackers, but we can piggyback off of it.
438      if (this.conf.get("mapreduce.task.attempt.id") == null) {
439        this.conf.set("mapreduce.task.attempt.id", "hb_m_" + this.serverName.toString());
440      }
441
442      this.metricsMaster = new MetricsMaster(new MetricsMasterWrapperImpl(this));
443
444      // preload table descriptor at startup
445      this.preLoadTableDescriptors = conf.getBoolean("hbase.master.preload.tabledescriptors", true);
446
447      this.maxBalancingTime = getMaxBalancingTime();
448      this.maxRitPercent = conf.getDouble(HConstants.HBASE_MASTER_BALANCER_MAX_RIT_PERCENT,
449          HConstants.DEFAULT_HBASE_MASTER_BALANCER_MAX_RIT_PERCENT);
450
451      // Do we publish the status?
452
453      boolean shouldPublish = conf.getBoolean(HConstants.STATUS_PUBLISHED,
454          HConstants.STATUS_PUBLISHED_DEFAULT);
455      Class<? extends ClusterStatusPublisher.Publisher> publisherClass =
456          conf.getClass(ClusterStatusPublisher.STATUS_PUBLISHER_CLASS,
457              ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS,
458              ClusterStatusPublisher.Publisher.class);
459
460      if (shouldPublish) {
461        if (publisherClass == null) {
462          LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
463              ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS +
464              " is not set - not publishing status");
465        } else {
466          clusterStatusPublisherChore = new ClusterStatusPublisher(this, conf, publisherClass);
467          LOG.debug("Created {}", this.clusterStatusPublisherChore);
468          getChoreService().scheduleChore(clusterStatusPublisherChore);
469        }
470      }
471
472      this.metaRegionLocationCache = new MetaRegionLocationCache(this.zooKeeper);
473      this.activeMasterManager = createActiveMasterManager(zooKeeper, serverName, this);
474
475      cachedClusterId = new CachedClusterId(this, conf);
476    } catch (Throwable t) {
477      // Make sure we log the exception. HMaster is often started via reflection and the
478      // cause of failed startup is lost.
479      LOG.error("Failed construction of Master", t);
480      throw t;
481    }
482  }
483
484  /**
485   * Protected to have custom implementations in tests override the default ActiveMaster
486   * implementation.
487   */
488  protected ActiveMasterManager createActiveMasterManager(ZKWatcher zk, ServerName sn,
489      org.apache.hadoop.hbase.Server server) throws InterruptedIOException {
490    return new ActiveMasterManager(zk, sn, server);
491  }
492
493  @Override
494  protected String getUseThisHostnameInstead(Configuration conf) {
495    return conf.get(MASTER_HOSTNAME_KEY);
496  }
497
498  // Main run loop. Calls through to the regionserver run loop AFTER becoming active Master; will
499  // block in here until then.
500  @Override
501  public void run() {
502    try {
503      Threads.setDaemonThreadRunning(new Thread(() -> {
504        try {
505          int infoPort = putUpJettyServer();
506          startActiveMasterManager(infoPort);
507        } catch (Throwable t) {
508          // Make sure we log the exception.
509          String error = "Failed to become Active Master";
510          LOG.error(error, t);
511          // Abort should have been called already.
512          if (!isAborted()) {
513            abort(error, t);
514          }
515        }
516      }), getName() + ":becomeActiveMaster");
517      // Fall in here even if we have been aborted. Need to run the shutdown services and
518      // the super run call will do this for us.
519      super.run();
520    } finally {
521      if (this.clusterSchemaService != null) {
522        // If on way out, then we are no longer active master.
523        this.clusterSchemaService.stopAsync();
524        try {
525          this.clusterSchemaService.awaitTerminated(
526              getConfiguration().getInt(HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS,
527              DEFAULT_HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS), TimeUnit.SECONDS);
528        } catch (TimeoutException te) {
529          LOG.warn("Failed shutdown of clusterSchemaService", te);
530        }
531      }
532      this.activeMaster = false;
533    }
534  }
535
536  // return the actual infoPort, -1 means disable info server.
537  private int putUpJettyServer() throws IOException {
538    if (!conf.getBoolean("hbase.master.infoserver.redirect", true)) {
539      return -1;
540    }
541    final int infoPort = conf.getInt("hbase.master.info.port.orig",
542      HConstants.DEFAULT_MASTER_INFOPORT);
543    // -1 is for disabling info server, so no redirecting
544    if (infoPort < 0 || infoServer == null) {
545      return -1;
546    }
547    if(infoPort == infoServer.getPort()) {
548      return infoPort;
549    }
550    final String addr = conf.get("hbase.master.info.bindAddress", "0.0.0.0");
551    if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) {
552      String msg =
553          "Failed to start redirecting jetty server. Address " + addr
554              + " does not belong to this host. Correct configuration parameter: "
555              + "hbase.master.info.bindAddress";
556      LOG.error(msg);
557      throw new IOException(msg);
558    }
559
560    // TODO I'm pretty sure we could just add another binding to the InfoServer run by
561    // the RegionServer and have it run the RedirectServlet instead of standing up
562    // a second entire stack here.
563    masterJettyServer = new Server();
564    final ServerConnector connector = new ServerConnector(masterJettyServer);
565    connector.setHost(addr);
566    connector.setPort(infoPort);
567    masterJettyServer.addConnector(connector);
568    masterJettyServer.setStopAtShutdown(true);
569
570    final String redirectHostname =
571        StringUtils.isBlank(useThisHostnameInstead) ? null : useThisHostnameInstead;
572
573    final MasterRedirectServlet redirect = new MasterRedirectServlet(infoServer, redirectHostname);
574    final WebAppContext context = new WebAppContext(null, "/", null, null, null, null, WebAppContext.NO_SESSIONS);
575    context.addServlet(new ServletHolder(redirect), "/*");
576    context.setServer(masterJettyServer);
577
578    try {
579      masterJettyServer.start();
580    } catch (Exception e) {
581      throw new IOException("Failed to start redirecting jetty server", e);
582    }
583    return connector.getLocalPort();
584  }
585
586  /**
587   * For compatibility, if failed with regionserver credentials, try the master one
588   */
589  @Override
590  protected void login(UserProvider user, String host) throws IOException {
591    try {
592      super.login(user, host);
593    } catch (IOException ie) {
594      user.login(SecurityConstants.MASTER_KRB_KEYTAB_FILE,
595              SecurityConstants.MASTER_KRB_PRINCIPAL, host);
596    }
597  }
598
599  /**
600   * If configured to put regions on active master,
601   * wait till a backup master becomes active.
602   * Otherwise, loop till the server is stopped or aborted.
603   */
604  @Override
605  protected void waitForMasterActive(){
606    if (maintenanceMode) {
607      return;
608    }
609    boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(conf);
610    while (!(tablesOnMaster && activeMaster) && !isStopped() && !isAborted()) {
611      sleeper.sleep();
612    }
613  }
614
615  @InterfaceAudience.Private
616  public MasterRpcServices getMasterRpcServices() {
617    return (MasterRpcServices)rpcServices;
618  }
619
620  public boolean balanceSwitch(final boolean b) throws IOException {
621    return getMasterRpcServices().switchBalancer(b, BalanceSwitchMode.ASYNC);
622  }
623
624  @Override
625  protected String getProcessName() {
626    return MASTER;
627  }
628
629  @Override
630  protected boolean canCreateBaseZNode() {
631    return true;
632  }
633
634  @Override
635  protected boolean canUpdateTableDescriptor() {
636    return true;
637  }
638
639  @Override
640  protected boolean cacheTableDescriptor() {
641    return true;
642  }
643
644  @Override
645  protected RSRpcServices createRpcServices() throws IOException {
646    return new MasterRpcServices(this);
647  }
648
649  @Override
650  protected void configureInfoServer() {
651    infoServer.addUnprivilegedServlet("master-status", "/master-status", MasterStatusServlet.class);
652    infoServer.setAttribute(MASTER, this);
653    if (LoadBalancer.isTablesOnMaster(conf)) {
654      super.configureInfoServer();
655    }
656  }
657
658  @Override
659  protected Class<? extends HttpServlet> getDumpServlet() {
660    return MasterDumpServlet.class;
661  }
662
663  @Override
664  public MetricsMaster getMasterMetrics() {
665    return metricsMaster;
666  }
667
668  /**
669   * Initialize all ZK based system trackers. But do not include {@link RegionServerTracker}, it
670   * should have already been initialized along with {@link ServerManager}.
671   */
672  @InterfaceAudience.Private
673  protected void initializeZKBasedSystemTrackers()
674      throws IOException, InterruptedException, KeeperException, ReplicationException {
675    this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
676    this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this);
677    this.loadBalancerTracker.start();
678
679    this.regionNormalizerManager =
680      RegionNormalizerFactory.createNormalizerManager(conf, zooKeeper, this);
681    this.configurationManager.registerObserver(regionNormalizerManager);
682    this.regionNormalizerManager.start();
683
684    this.splitOrMergeTracker = new SplitOrMergeTracker(zooKeeper, conf, this);
685    this.splitOrMergeTracker.start();
686
687    this.replicationPeerManager = ReplicationPeerManager.create(zooKeeper, conf, clusterId);
688
689    this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, this.serverManager);
690    this.drainingServerTracker.start();
691
692    this.snapshotCleanupTracker = new SnapshotCleanupTracker(zooKeeper, this);
693    this.snapshotCleanupTracker.start();
694
695    String clientQuorumServers = conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM);
696    boolean clientZkObserverMode = conf.getBoolean(HConstants.CLIENT_ZOOKEEPER_OBSERVER_MODE,
697      HConstants.DEFAULT_CLIENT_ZOOKEEPER_OBSERVER_MODE);
698    if (clientQuorumServers != null && !clientZkObserverMode) {
699      // we need to take care of the ZK information synchronization
700      // if given client ZK are not observer nodes
701      ZKWatcher clientZkWatcher = new ZKWatcher(conf,
702          getProcessName() + ":" + rpcServices.getSocketAddress().getPort() + "-clientZK", this,
703          false, true);
704      this.metaLocationSyncer = new MetaLocationSyncer(zooKeeper, clientZkWatcher, this);
705      this.metaLocationSyncer.start();
706      this.masterAddressSyncer = new MasterAddressSyncer(zooKeeper, clientZkWatcher, this);
707      this.masterAddressSyncer.start();
708      // set cluster id is a one-go effort
709      ZKClusterId.setClusterId(clientZkWatcher, fileSystemManager.getClusterId());
710    }
711
712    // Set the cluster as up.  If new RSs, they'll be waiting on this before
713    // going ahead with their startup.
714    boolean wasUp = this.clusterStatusTracker.isClusterUp();
715    if (!wasUp) this.clusterStatusTracker.setClusterUp();
716
717    LOG.info("Active/primary master=" + this.serverName +
718        ", sessionid=0x" +
719        Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) +
720        ", setting cluster-up flag (Was=" + wasUp + ")");
721
722    // create/initialize the snapshot manager and other procedure managers
723    this.snapshotManager = new SnapshotManager();
724    this.mpmHost = new MasterProcedureManagerHost();
725    this.mpmHost.register(this.snapshotManager);
726    this.mpmHost.register(new MasterFlushTableProcedureManager());
727    this.mpmHost.loadProcedures(conf);
728    this.mpmHost.initialize(this, this.metricsMaster);
729  }
730
731  // Will be overriden in test to inject customized AssignmentManager
732  @InterfaceAudience.Private
733  protected AssignmentManager createAssignmentManager(MasterServices master) {
734    return new AssignmentManager(master);
735  }
736
737  /**
738   * Finish initialization of HMaster after becoming the primary master.
739   * <p/>
740   * The startup order is a bit complicated but very important, do not change it unless you know
741   * what you are doing.
742   * <ol>
743   * <li>Initialize file system based components - file system manager, wal manager, table
744   * descriptors, etc</li>
745   * <li>Publish cluster id</li>
746   * <li>Here comes the most complicated part - initialize server manager, assignment manager and
747   * region server tracker
748   * <ol type='i'>
749   * <li>Create server manager</li>
750   * <li>Create procedure executor, load the procedures, but do not start workers. We will start it
751   * later after we finish scheduling SCPs to avoid scheduling duplicated SCPs for the same
752   * server</li>
753   * <li>Create assignment manager and start it, load the meta region state, but do not load data
754   * from meta region</li>
755   * <li>Start region server tracker, construct the online servers set and find out dead servers and
756   * schedule SCP for them. The online servers will be constructed by scanning zk, and we will also
757   * scan the wal directory to find out possible live region servers, and the differences between
758   * these two sets are the dead servers</li>
759   * </ol>
760   * </li>
761   * <li>If this is a new deploy, schedule a InitMetaProcedure to initialize meta</li>
762   * <li>Start necessary service threads - balancer, catalog janitor, executor services, and also
763   * the procedure executor, etc. Notice that the balancer must be created first as assignment
764   * manager may use it when assigning regions.</li>
765   * <li>Wait for meta to be initialized if necessary, start table state manager.</li>
766   * <li>Wait for enough region servers to check-in</li>
767   * <li>Let assignment manager load data from meta and construct region states</li>
768   * <li>Start all other things such as chore services, etc</li>
769   * </ol>
770   * <p/>
771   * Notice that now we will not schedule a special procedure to make meta online(unless the first
772   * time where meta has not been created yet), we will rely on SCP to bring meta online.
773   */
774  private void finishActiveMasterInitialization(MonitoredTask status)
775      throws IOException, InterruptedException, KeeperException, ReplicationException {
776    /*
777     * We are active master now... go initialize components we need to run.
778     */
779    status.setStatus("Initializing Master file system");
780
781    this.masterActiveTime = System.currentTimeMillis();
782    // TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring.
783
784    // always initialize the MemStoreLAB as we use a region to store data in master now, see
785    // localStore.
786    initializeMemStoreChunkCreator();
787    this.fileSystemManager = new MasterFileSystem(conf);
788    this.walManager = new MasterWalManager(this);
789
790    // warm-up HTDs cache on master initialization
791    if (preLoadTableDescriptors) {
792      status.setStatus("Pre-loading table descriptors");
793      this.tableDescriptors.getAll();
794    }
795
796    // Publish cluster ID; set it in Master too. The superclass RegionServer does this later but
797    // only after it has checked in with the Master. At least a few tests ask Master for clusterId
798    // before it has called its run method and before RegionServer has done the reportForDuty.
799    ClusterId clusterId = fileSystemManager.getClusterId();
800    status.setStatus("Publishing Cluster ID " + clusterId + " in ZooKeeper");
801    ZKClusterId.setClusterId(this.zooKeeper, fileSystemManager.getClusterId());
802    this.clusterId = clusterId.toString();
803
804    // Precaution. Put in place the old hbck1 lock file to fence out old hbase1s running their
805    // hbck1s against an hbase2 cluster; it could do damage. To skip this behavior, set
806    // hbase.write.hbck1.lock.file to false.
807    if (this.conf.getBoolean("hbase.write.hbck1.lock.file", true)) {
808      Pair<Path, FSDataOutputStream> result = null;
809      try {
810        result = HBaseFsck.checkAndMarkRunningHbck(this.conf,
811            HBaseFsck.createLockRetryCounterFactory(this.conf).create());
812      } finally {
813        if (result != null) {
814          IOUtils.closeQuietly(result.getSecond());
815        }
816      }
817    }
818
819    status.setStatus("Initialize ServerManager and schedule SCP for crash servers");
820    this.serverManager = createServerManager(this);
821    if (!conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
822      DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) {
823      this.splitWALManager = new SplitWALManager(this);
824    }
825
826    // initialize master local region
827    masterRegion = MasterRegionFactory.create(this);
828    createProcedureExecutor();
829    Map<Class<?>, List<Procedure<MasterProcedureEnv>>> procsByType =
830      procedureExecutor.getActiveProceduresNoCopy().stream()
831        .collect(Collectors.groupingBy(p -> p.getClass()));
832
833    // Create Assignment Manager
834    this.assignmentManager = createAssignmentManager(this);
835    this.assignmentManager.start();
836    // TODO: TRSP can perform as the sub procedure for other procedures, so even if it is marked as
837    // completed, it could still be in the procedure list. This is a bit strange but is another
838    // story, need to verify the implementation for ProcedureExecutor and ProcedureStore.
839    List<TransitRegionStateProcedure> ritList =
840      procsByType.getOrDefault(TransitRegionStateProcedure.class, Collections.emptyList()).stream()
841        .filter(p -> !p.isFinished()).map(p -> (TransitRegionStateProcedure) p)
842        .collect(Collectors.toList());
843    this.assignmentManager.setupRIT(ritList);
844
845    // Start RegionServerTracker with listing of servers found with exiting SCPs -- these should
846    // be registered in the deadServers set -- and with the list of servernames out on the
847    // filesystem that COULD BE 'alive' (we'll schedule SCPs for each and let SCP figure it out).
848    // We also pass dirs that are already 'splitting'... so we can do some checks down in tracker.
849    // TODO: Generate the splitting and live Set in one pass instead of two as we currently do.
850    this.regionServerTracker = new RegionServerTracker(zooKeeper, this, this.serverManager);
851    this.regionServerTracker.start(
852      procsByType.getOrDefault(ServerCrashProcedure.class, Collections.emptyList()).stream()
853        .map(p -> (ServerCrashProcedure) p).map(p -> p.getServerName()).collect(Collectors.toSet()),
854      walManager.getLiveServersFromWALDir(), walManager.getSplittingServersFromWALDir());
855    // This manager will be started AFTER hbase:meta is confirmed on line.
856    // hbase.mirror.table.state.to.zookeeper is so hbase1 clients can connect. They read table
857    // state from zookeeper while hbase2 reads it from hbase:meta. Disable if no hbase1 clients.
858    this.tableStateManager =
859      this.conf.getBoolean(MirroringTableStateManager.MIRROR_TABLE_STATE_TO_ZK_KEY, true)
860        ?
861        new MirroringTableStateManager(this):
862        new TableStateManager(this);
863
864    status.setStatus("Initializing ZK system trackers");
865    initializeZKBasedSystemTrackers();
866    // Set ourselves as active Master now our claim has succeeded up in zk.
867    this.activeMaster = true;
868
869    // Start the Zombie master detector after setting master as active, see HBASE-21535
870    Thread zombieDetector = new Thread(new MasterInitializationMonitor(this),
871        "ActiveMasterInitializationMonitor-" + System.currentTimeMillis());
872    zombieDetector.setDaemon(true);
873    zombieDetector.start();
874
875    // This is for backwards compatibility
876    // See HBASE-11393
877    status.setStatus("Update TableCFs node in ZNode");
878    ReplicationPeerConfigUpgrader tableCFsUpdater =
879        new ReplicationPeerConfigUpgrader(zooKeeper, conf);
880    tableCFsUpdater.copyTableCFs();
881
882    if (!maintenanceMode) {
883      // Add the Observer to delete quotas on table deletion before starting all CPs by
884      // default with quota support, avoiding if user specifically asks to not load this Observer.
885      if (QuotaUtil.isQuotaEnabled(conf)) {
886        updateConfigurationForQuotasObserver(conf);
887      }
888      // initialize master side coprocessors before we start handling requests
889      status.setStatus("Initializing master coprocessors");
890      this.cpHost = new MasterCoprocessorHost(this, this.conf);
891    }
892
893    // Checking if meta needs initializing.
894    status.setStatus("Initializing meta table if this is a new deploy");
895    InitMetaProcedure initMetaProc = null;
896    // Print out state of hbase:meta on startup; helps debugging.
897    if (!this.assignmentManager.getRegionStates().hasTableRegionStates(TableName.META_TABLE_NAME)) {
898      Optional<InitMetaProcedure> optProc = procedureExecutor.getProcedures().stream()
899        .filter(p -> p instanceof InitMetaProcedure).map(o -> (InitMetaProcedure) o).findAny();
900      initMetaProc = optProc.orElseGet(() -> {
901        // schedule an init meta procedure if meta has not been deployed yet
902        InitMetaProcedure temp = new InitMetaProcedure();
903        procedureExecutor.submitProcedure(temp);
904        return temp;
905      });
906    }
907    if (this.balancer instanceof FavoredNodesPromoter) {
908      favoredNodesManager = new FavoredNodesManager(this);
909    }
910
911    // initialize load balancer
912    this.balancer.setMasterServices(this);
913    this.balancer.setClusterMetrics(getClusterMetricsWithoutCoprocessor());
914    this.balancer.initialize();
915
916    // start up all service threads.
917    status.setStatus("Initializing master service threads");
918    startServiceThreads();
919    // wait meta to be initialized after we start procedure executor
920    if (initMetaProc != null) {
921      initMetaProc.await();
922    }
923    // Wake up this server to check in
924    sleeper.skipSleepCycle();
925
926    // Wait for region servers to report in.
927    // With this as part of master initialization, it precludes our being able to start a single
928    // server that is both Master and RegionServer. Needs more thought. TODO.
929    String statusStr = "Wait for region servers to report in";
930    status.setStatus(statusStr);
931    LOG.info(Objects.toString(status));
932    waitForRegionServers(status);
933
934    // Check if master is shutting down because issue initializing regionservers or balancer.
935    if (isStopped()) {
936      return;
937    }
938
939    status.setStatus("Starting assignment manager");
940    // FIRST HBASE:META READ!!!!
941    // The below cannot make progress w/o hbase:meta being online.
942    // This is the FIRST attempt at going to hbase:meta. Meta on-lining is going on in background
943    // as procedures run -- in particular SCPs for crashed servers... One should put up hbase:meta
944    // if it is down. It may take a while to come online. So, wait here until meta if for sure
945    // available. That's what waitForMetaOnline does.
946    if (!waitForMetaOnline()) {
947      return;
948    }
949    this.assignmentManager.joinCluster();
950    // The below depends on hbase:meta being online.
951    this.tableStateManager.start();
952    this.assignmentManager.processOfflineRegions();
953    // this must be called after the above processOfflineRegions to prevent race
954    this.assignmentManager.wakeMetaLoadedEvent();
955
956    // for migrating from a version without HBASE-25099, and also for honoring the configuration
957    // first.
958    if (conf.get(HConstants.META_REPLICAS_NUM) != null) {
959      int replicasNumInConf =
960        conf.getInt(HConstants.META_REPLICAS_NUM, HConstants.DEFAULT_META_REPLICA_NUM);
961      TableDescriptor metaDesc = tableDescriptors.get(TableName.META_TABLE_NAME);
962      if (metaDesc.getRegionReplication() != replicasNumInConf) {
963        // it is possible that we already have some replicas before upgrading, so we must set the
964        // region replication number in meta TableDescriptor directly first, without creating a
965        // ModifyTableProcedure, otherwise it may cause a double assign for the meta replicas.
966        int existingReplicasCount =
967          assignmentManager.getRegionStates().getRegionsOfTable(TableName.META_TABLE_NAME).size();
968        if (existingReplicasCount > metaDesc.getRegionReplication()) {
969          LOG.info("Update replica count of hbase:meta from {}(in TableDescriptor)" +
970            " to {}(existing ZNodes)", metaDesc.getRegionReplication(), existingReplicasCount);
971          metaDesc = TableDescriptorBuilder.newBuilder(metaDesc)
972            .setRegionReplication(existingReplicasCount).build();
973          tableDescriptors.update(metaDesc);
974        }
975        // check again, and issue a ModifyTableProcedure if needed
976        if (metaDesc.getRegionReplication() != replicasNumInConf) {
977          LOG.info(
978            "The {} config is {} while the replica count in TableDescriptor is {}" +
979              " for hbase:meta, altering...",
980            HConstants.META_REPLICAS_NUM, replicasNumInConf, metaDesc.getRegionReplication());
981          procedureExecutor.submitProcedure(new ModifyTableProcedure(
982            procedureExecutor.getEnvironment(), TableDescriptorBuilder.newBuilder(metaDesc)
983              .setRegionReplication(replicasNumInConf).build(),
984            null, metaDesc, false));
985        }
986      }
987    }
988    // Initialize after meta is up as below scans meta
989    if (favoredNodesManager != null && !maintenanceMode) {
990      SnapshotOfRegionAssignmentFromMeta snapshotOfRegionAssignment =
991          new SnapshotOfRegionAssignmentFromMeta(getConnection());
992      snapshotOfRegionAssignment.initialize();
993      favoredNodesManager.initialize(snapshotOfRegionAssignment);
994    }
995
996    // set cluster status again after user regions are assigned
997    this.balancer.setClusterMetrics(getClusterMetricsWithoutCoprocessor());
998
999    // Start balancer and meta catalog janitor after meta and regions have been assigned.
1000    status.setStatus("Starting balancer and catalog janitor");
1001    this.clusterStatusChore = new ClusterStatusChore(this, balancer);
1002    getChoreService().scheduleChore(clusterStatusChore);
1003    this.balancerChore = new BalancerChore(this);
1004    getChoreService().scheduleChore(balancerChore);
1005    if (regionNormalizerManager != null) {
1006      getChoreService().scheduleChore(regionNormalizerManager.getRegionNormalizerChore());
1007    }
1008    this.catalogJanitorChore = new CatalogJanitor(this);
1009    getChoreService().scheduleChore(catalogJanitorChore);
1010    this.hbckChore = new HbckChore(this);
1011    getChoreService().scheduleChore(hbckChore);
1012
1013    // NAMESPACE READ!!!!
1014    // Here we expect hbase:namespace to be online. See inside initClusterSchemaService.
1015    // TODO: Fix this. Namespace is a pain being a sort-of system table. Fold it in to hbase:meta.
1016    // isNamespace does like isMeta and waits until namespace is onlined before allowing progress.
1017    if (!waitForNamespaceOnline()) {
1018      return;
1019    }
1020    status.setStatus("Starting cluster schema service");
1021    initClusterSchemaService();
1022
1023    if (this.cpHost != null) {
1024      try {
1025        this.cpHost.preMasterInitialization();
1026      } catch (IOException e) {
1027        LOG.error("Coprocessor preMasterInitialization() hook failed", e);
1028      }
1029    }
1030
1031    status.markComplete("Initialization successful");
1032    LOG.info(String.format("Master has completed initialization %.3fsec",
1033       (System.currentTimeMillis() - masterActiveTime) / 1000.0f));
1034    this.masterFinishedInitializationTime = System.currentTimeMillis();
1035    configurationManager.registerObserver(this.balancer);
1036    configurationManager.registerObserver(this.cleanerPool);
1037    configurationManager.registerObserver(this.hfileCleaner);
1038    configurationManager.registerObserver(this.logCleaner);
1039    configurationManager.registerObserver(this.regionsRecoveryConfigManager);
1040    // Set master as 'initialized'.
1041    setInitialized(true);
1042
1043    if (maintenanceMode) {
1044      LOG.info("Detected repair mode, skipping final initialization steps.");
1045      return;
1046    }
1047
1048    assignmentManager.checkIfShouldMoveSystemRegionAsync();
1049    status.setStatus("Starting quota manager");
1050    initQuotaManager();
1051    if (QuotaUtil.isQuotaEnabled(conf)) {
1052      // Create the quota snapshot notifier
1053      spaceQuotaSnapshotNotifier = createQuotaSnapshotNotifier();
1054      spaceQuotaSnapshotNotifier.initialize(getClusterConnection());
1055      this.quotaObserverChore = new QuotaObserverChore(this, getMasterMetrics());
1056      // Start the chore to read the region FS space reports and act on them
1057      getChoreService().scheduleChore(quotaObserverChore);
1058
1059      this.snapshotQuotaChore = new SnapshotQuotaObserverChore(this, getMasterMetrics());
1060      // Start the chore to read snapshots and add their usage to table/NS quotas
1061      getChoreService().scheduleChore(snapshotQuotaChore);
1062    }
1063    final SlowLogMasterService slowLogMasterService = new SlowLogMasterService(conf, this);
1064    slowLogMasterService.init();
1065
1066    // clear the dead servers with same host name and port of online server because we are not
1067    // removing dead server with same hostname and port of rs which is trying to check in before
1068    // master initialization. See HBASE-5916.
1069    this.serverManager.clearDeadServersWithSameHostNameAndPortOfOnlineServer();
1070
1071    // Check and set the znode ACLs if needed in case we are overtaking a non-secure configuration
1072    status.setStatus("Checking ZNode ACLs");
1073    zooKeeper.checkAndSetZNodeAcls();
1074
1075    status.setStatus("Initializing MOB Cleaner");
1076    initMobCleaner();
1077
1078    status.setStatus("Calling postStartMaster coprocessors");
1079    if (this.cpHost != null) {
1080      // don't let cp initialization errors kill the master
1081      try {
1082        this.cpHost.postStartMaster();
1083      } catch (IOException ioe) {
1084        LOG.error("Coprocessor postStartMaster() hook failed", ioe);
1085      }
1086    }
1087
1088    zombieDetector.interrupt();
1089
1090    /*
1091     * After master has started up, lets do balancer post startup initialization. Since this runs
1092     * in activeMasterManager thread, it should be fine.
1093     */
1094    long start = System.currentTimeMillis();
1095    this.balancer.postMasterStartupInitialize();
1096    if (LOG.isDebugEnabled()) {
1097      LOG.debug("Balancer post startup initialization complete, took " + (
1098          (System.currentTimeMillis() - start) / 1000) + " seconds");
1099    }
1100  }
1101
1102  /**
1103   * Check hbase:meta is up and ready for reading. For use during Master startup only.
1104   * @return True if meta is UP and online and startup can progress. Otherwise, meta is not online
1105   *   and we will hold here until operator intervention.
1106   */
1107  @InterfaceAudience.Private
1108  public boolean waitForMetaOnline() {
1109    return isRegionOnline(RegionInfoBuilder.FIRST_META_REGIONINFO);
1110  }
1111
1112  /**
1113   * @return True if region is online and scannable else false if an error or shutdown (Otherwise
1114   *   we just block in here holding up all forward-progess).
1115   */
1116  private boolean isRegionOnline(RegionInfo ri) {
1117    RetryCounter rc = null;
1118    while (!isStopped()) {
1119      RegionState rs = this.assignmentManager.getRegionStates().getRegionState(ri);
1120      if (rs.isOpened()) {
1121        if (this.getServerManager().isServerOnline(rs.getServerName())) {
1122          return true;
1123        }
1124      }
1125      // Region is not OPEN.
1126      Optional<Procedure<MasterProcedureEnv>> optProc = this.procedureExecutor.getProcedures().
1127          stream().filter(p -> p instanceof ServerCrashProcedure).findAny();
1128      // TODO: Add a page to refguide on how to do repair. Have this log message point to it.
1129      // Page will talk about loss of edits, how to schedule at least the meta WAL recovery, and
1130      // then how to assign including how to break region lock if one held.
1131      LOG.warn("{} is NOT online; state={}; ServerCrashProcedures={}. Master startup cannot " +
1132          "progress, in holding-pattern until region onlined.",
1133          ri.getRegionNameAsString(), rs, optProc.isPresent());
1134      // Check once-a-minute.
1135      if (rc == null) {
1136        rc = new RetryCounterFactory(Integer.MAX_VALUE, 1000, 60_000).create();
1137      }
1138      Threads.sleep(rc.getBackoffTimeAndIncrementAttempts());
1139    }
1140    return false;
1141  }
1142
1143  /**
1144   * Check hbase:namespace table is assigned. If not, startup will hang looking for the ns table
1145   * (TODO: Fix this! NS should not hold-up startup).
1146   * @return True if namespace table is up/online.
1147   */
1148  @InterfaceAudience.Private
1149  public boolean waitForNamespaceOnline() {
1150    List<RegionInfo> ris = this.assignmentManager.getRegionStates().
1151        getRegionsOfTable(TableName.NAMESPACE_TABLE_NAME);
1152    if (ris.isEmpty()) {
1153      // If empty, means we've not assigned the namespace table yet... Just return true so startup
1154      // continues and the namespace table gets created.
1155      return true;
1156    }
1157    // Else there are namespace regions up in meta. Ensure they are assigned before we go on.
1158    for (RegionInfo ri: ris) {
1159      if (!isRegionOnline(ri)) {
1160        return false;
1161      }
1162    }
1163    return true;
1164  }
1165
1166  /**
1167   * Adds the {@code MasterQuotasObserver} to the list of configured Master observers to
1168   * automatically remove quotas for a table when that table is deleted.
1169   */
1170  @InterfaceAudience.Private
1171  public void updateConfigurationForQuotasObserver(Configuration conf) {
1172    // We're configured to not delete quotas on table deletion, so we don't need to add the obs.
1173    if (!conf.getBoolean(
1174          MasterQuotasObserver.REMOVE_QUOTA_ON_TABLE_DELETE,
1175          MasterQuotasObserver.REMOVE_QUOTA_ON_TABLE_DELETE_DEFAULT)) {
1176      return;
1177    }
1178    String[] masterCoprocs = conf.getStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY);
1179    final int length = null == masterCoprocs ? 0 : masterCoprocs.length;
1180    String[] updatedCoprocs = new String[length + 1];
1181    if (length > 0) {
1182      System.arraycopy(masterCoprocs, 0, updatedCoprocs, 0, masterCoprocs.length);
1183    }
1184    updatedCoprocs[length] = MasterQuotasObserver.class.getName();
1185    conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, updatedCoprocs);
1186  }
1187
1188  private void initMobCleaner() {
1189    this.expiredMobFileCleanerChore = new ExpiredMobFileCleanerChore(this);
1190    getChoreService().scheduleChore(expiredMobFileCleanerChore);
1191
1192    int mobCompactionPeriod = conf.getInt(MobConstants.MOB_COMPACTION_CHORE_PERIOD,
1193        MobConstants.DEFAULT_MOB_COMPACTION_CHORE_PERIOD);
1194    this.mobCompactChore = new MobCompactionChore(this, mobCompactionPeriod);
1195    getChoreService().scheduleChore(mobCompactChore);
1196    this.mobCompactThread = new MasterMobCompactionThread(this);
1197  }
1198
1199  /**
1200   * <p>
1201   * Create a {@link ServerManager} instance.
1202   * </p>
1203   * <p>
1204   * Will be overridden in tests.
1205   * </p>
1206   */
1207  @InterfaceAudience.Private
1208  protected ServerManager createServerManager(final MasterServices master) throws IOException {
1209    // We put this out here in a method so can do a Mockito.spy and stub it out
1210    // w/ a mocked up ServerManager.
1211    setupClusterConnection();
1212    return new ServerManager(master);
1213  }
1214
1215  private void waitForRegionServers(final MonitoredTask status)
1216      throws IOException, InterruptedException {
1217    this.serverManager.waitForRegionServers(status);
1218  }
1219
1220  // Will be overridden in tests
1221  @InterfaceAudience.Private
1222  protected void initClusterSchemaService() throws IOException, InterruptedException {
1223    this.clusterSchemaService = new ClusterSchemaServiceImpl(this);
1224    this.clusterSchemaService.startAsync();
1225    try {
1226      this.clusterSchemaService.awaitRunning(getConfiguration().getInt(
1227        HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS,
1228        DEFAULT_HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS), TimeUnit.SECONDS);
1229    } catch (TimeoutException toe) {
1230      throw new IOException("Timedout starting ClusterSchemaService", toe);
1231    }
1232  }
1233
1234  private void initQuotaManager() throws IOException {
1235    MasterQuotaManager quotaManager = new MasterQuotaManager(this);
1236    quotaManager.start();
1237    this.quotaManager = quotaManager;
1238  }
1239
1240  private SpaceQuotaSnapshotNotifier createQuotaSnapshotNotifier() {
1241    SpaceQuotaSnapshotNotifier notifier =
1242        SpaceQuotaSnapshotNotifierFactory.getInstance().create(getConfiguration());
1243    return notifier;
1244  }
1245
1246  boolean isCatalogJanitorEnabled() {
1247    return catalogJanitorChore != null ? catalogJanitorChore.getEnabled() : false;
1248  }
1249
1250  boolean isCleanerChoreEnabled() {
1251    boolean hfileCleanerFlag = true, logCleanerFlag = true;
1252
1253    if (hfileCleaner != null) {
1254      hfileCleanerFlag = hfileCleaner.getEnabled();
1255    }
1256
1257    if (logCleaner != null) {
1258      logCleanerFlag = logCleaner.getEnabled();
1259    }
1260
1261    return (hfileCleanerFlag && logCleanerFlag);
1262  }
1263
1264  @Override
1265  public ServerManager getServerManager() {
1266    return this.serverManager;
1267  }
1268
1269  @Override
1270  public MasterFileSystem getMasterFileSystem() {
1271    return this.fileSystemManager;
1272  }
1273
1274  @Override
1275  public MasterWalManager getMasterWalManager() {
1276    return this.walManager;
1277  }
1278
1279  @Override
1280  public SplitWALManager getSplitWALManager() {
1281    return splitWALManager;
1282  }
1283
1284  @Override
1285  public TableStateManager getTableStateManager() {
1286    return tableStateManager;
1287  }
1288
1289  /*
1290   * Start up all services. If any of these threads gets an unhandled exception
1291   * then they just die with a logged message.  This should be fine because
1292   * in general, we do not expect the master to get such unhandled exceptions
1293   *  as OOMEs; it should be lightly loaded. See what HRegionServer does if
1294   *  need to install an unexpected exception handler.
1295   */
1296  private void startServiceThreads() throws IOException {
1297    // Start the executor service pools
1298    this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION, conf.getInt(
1299      HConstants.MASTER_OPEN_REGION_THREADS, HConstants.MASTER_OPEN_REGION_THREADS_DEFAULT));
1300    this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION, conf.getInt(
1301      HConstants.MASTER_CLOSE_REGION_THREADS, HConstants.MASTER_CLOSE_REGION_THREADS_DEFAULT));
1302    this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
1303      conf.getInt(HConstants.MASTER_SERVER_OPERATIONS_THREADS,
1304        HConstants.MASTER_SERVER_OPERATIONS_THREADS_DEFAULT));
1305    this.executorService.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS,
1306      conf.getInt(HConstants.MASTER_META_SERVER_OPERATIONS_THREADS,
1307        HConstants.MASTER_META_SERVER_OPERATIONS_THREADS_DEFAULT));
1308    this.executorService.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS, conf.getInt(
1309      HConstants.MASTER_LOG_REPLAY_OPS_THREADS, HConstants.MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT));
1310    this.executorService.startExecutorService(ExecutorType.MASTER_SNAPSHOT_OPERATIONS, conf.getInt(
1311      SnapshotManager.SNAPSHOT_POOL_THREADS_KEY, SnapshotManager.SNAPSHOT_POOL_THREADS_DEFAULT));
1312
1313    // We depend on there being only one instance of this executor running
1314    // at a time. To do concurrency, would need fencing of enable/disable of
1315    // tables.
1316    // Any time changing this maxThreads to > 1, pls see the comment at
1317    // AccessController#postCompletedCreateTableAction
1318    this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
1319    startProcedureExecutor();
1320
1321    // Create cleaner thread pool
1322    cleanerPool = new DirScanPool(conf);
1323    // Start log cleaner thread
1324    int cleanerInterval =
1325      conf.getInt(HBASE_MASTER_CLEANER_INTERVAL, DEFAULT_HBASE_MASTER_CLEANER_INTERVAL);
1326    this.logCleaner = new LogCleaner(cleanerInterval, this, conf,
1327      getMasterWalManager().getFileSystem(), getMasterWalManager().getOldLogDir(), cleanerPool);
1328    getChoreService().scheduleChore(logCleaner);
1329
1330    // start the hfile archive cleaner thread
1331    Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
1332    Map<String, Object> params = new HashMap<>();
1333    params.put(MASTER, this);
1334    this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf,
1335      getMasterFileSystem().getFileSystem(), archiveDir, cleanerPool, params);
1336    getChoreService().scheduleChore(hfileCleaner);
1337
1338    // Regions Reopen based on very high storeFileRefCount is considered enabled
1339    // only if hbase.regions.recovery.store.file.ref.count has value > 0
1340    final int maxStoreFileRefCount = conf.getInt(
1341      HConstants.STORE_FILE_REF_COUNT_THRESHOLD,
1342      HConstants.DEFAULT_STORE_FILE_REF_COUNT_THRESHOLD);
1343    if (maxStoreFileRefCount > 0) {
1344      this.regionsRecoveryChore = new RegionsRecoveryChore(this, conf, this);
1345      getChoreService().scheduleChore(this.regionsRecoveryChore);
1346    } else {
1347      LOG.info("Reopening regions with very high storeFileRefCount is disabled. " +
1348          "Provide threshold value > 0 for {} to enable it.",
1349        HConstants.STORE_FILE_REF_COUNT_THRESHOLD);
1350    }
1351
1352    this.regionsRecoveryConfigManager = new RegionsRecoveryConfigManager(this);
1353
1354    replicationBarrierCleaner = new ReplicationBarrierCleaner(conf, this, getConnection(),
1355      replicationPeerManager);
1356    getChoreService().scheduleChore(replicationBarrierCleaner);
1357
1358    final boolean isSnapshotChoreEnabled = this.snapshotCleanupTracker
1359        .isSnapshotCleanupEnabled();
1360    this.snapshotCleanerChore = new SnapshotCleanerChore(this, conf, getSnapshotManager());
1361    if (isSnapshotChoreEnabled) {
1362      getChoreService().scheduleChore(this.snapshotCleanerChore);
1363    } else {
1364      if (LOG.isTraceEnabled()) {
1365        LOG.trace("Snapshot Cleaner Chore is disabled. Not starting up the chore..");
1366      }
1367    }
1368    serviceStarted = true;
1369    if (LOG.isTraceEnabled()) {
1370      LOG.trace("Started service threads");
1371    }
1372  }
1373
1374  @Override
1375  protected void stopServiceThreads() {
1376    if (masterJettyServer != null) {
1377      LOG.info("Stopping master jetty server");
1378      try {
1379        masterJettyServer.stop();
1380      } catch (Exception e) {
1381        LOG.error("Failed to stop master jetty server", e);
1382      }
1383    }
1384    stopChores();
1385    if (this.mobCompactThread != null) {
1386      this.mobCompactThread.close();
1387    }
1388    super.stopServiceThreads();
1389    if (cleanerPool != null) {
1390      cleanerPool.shutdownNow();
1391      cleanerPool = null;
1392    }
1393
1394    LOG.debug("Stopping service threads");
1395
1396    // stop procedure executor prior to other services such as server manager and assignment
1397    // manager, as these services are important for some running procedures. See HBASE-24117 for
1398    // example.
1399    stopProcedureExecutor();
1400
1401    if (regionNormalizerManager != null) {
1402      regionNormalizerManager.stop();
1403    }
1404    if (this.quotaManager != null) {
1405      this.quotaManager.stop();
1406    }
1407
1408    if (this.activeMasterManager != null) {
1409      this.activeMasterManager.stop();
1410    }
1411    if (this.serverManager != null) {
1412      this.serverManager.stop();
1413    }
1414    if (this.assignmentManager != null) {
1415      this.assignmentManager.stop();
1416    }
1417
1418    if (masterRegion != null) {
1419      masterRegion.close(isAborted());
1420    }
1421    if (this.walManager != null) {
1422      this.walManager.stop();
1423    }
1424    if (this.fileSystemManager != null) {
1425      this.fileSystemManager.stop();
1426    }
1427    if (this.mpmHost != null) {
1428      this.mpmHost.stop("server shutting down.");
1429    }
1430    if (this.regionServerTracker != null) {
1431      this.regionServerTracker.stop();
1432    }
1433  }
1434
1435  private void createProcedureExecutor() throws IOException {
1436    MasterProcedureEnv procEnv = new MasterProcedureEnv(this);
1437    procedureStore =
1438      new RegionProcedureStore(this, masterRegion, new MasterProcedureEnv.FsUtilsLeaseRecovery(this));
1439    procedureStore.registerListener(new ProcedureStoreListener() {
1440
1441      @Override
1442      public void abortProcess() {
1443        abort("The Procedure Store lost the lease", null);
1444      }
1445    });
1446    MasterProcedureScheduler procedureScheduler = procEnv.getProcedureScheduler();
1447    procedureExecutor = new ProcedureExecutor<>(conf, procEnv, procedureStore, procedureScheduler);
1448    configurationManager.registerObserver(procEnv);
1449
1450    int cpus = Runtime.getRuntime().availableProcessors();
1451    final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, Math.max(
1452      (cpus > 0 ? cpus / 4 : 0), MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS));
1453    final boolean abortOnCorruption =
1454      conf.getBoolean(MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION,
1455        MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION);
1456    procedureStore.start(numThreads);
1457    // Just initialize it but do not start the workers, we will start the workers later by calling
1458    // startProcedureExecutor. See the javadoc for finishActiveMasterInitialization for more
1459    // details.
1460    procedureExecutor.init(numThreads, abortOnCorruption);
1461    if (!procEnv.getRemoteDispatcher().start()) {
1462      throw new HBaseIOException("Failed start of remote dispatcher");
1463    }
1464  }
1465
1466  private void startProcedureExecutor() throws IOException {
1467    procedureExecutor.startWorkers();
1468  }
1469
1470  /**
1471   * Turn on/off Snapshot Cleanup Chore
1472   *
1473   * @param on indicates whether Snapshot Cleanup Chore is to be run
1474   */
1475  void switchSnapshotCleanup(final boolean on, final boolean synchronous) {
1476    if (synchronous) {
1477      synchronized (this.snapshotCleanerChore) {
1478        switchSnapshotCleanup(on);
1479      }
1480    } else {
1481      switchSnapshotCleanup(on);
1482    }
1483  }
1484
1485  private void switchSnapshotCleanup(final boolean on) {
1486    try {
1487      snapshotCleanupTracker.setSnapshotCleanupEnabled(on);
1488      if (on) {
1489        if (!getChoreService().isChoreScheduled(this.snapshotCleanerChore)) {
1490          getChoreService().scheduleChore(this.snapshotCleanerChore);
1491        }
1492      } else {
1493        getChoreService().cancelChore(this.snapshotCleanerChore);
1494      }
1495    } catch (KeeperException e) {
1496      LOG.error("Error updating snapshot cleanup mode to {}", on, e);
1497    }
1498  }
1499
1500
1501  private void stopProcedureExecutor() {
1502    if (procedureExecutor != null) {
1503      configurationManager.deregisterObserver(procedureExecutor.getEnvironment());
1504      procedureExecutor.getEnvironment().getRemoteDispatcher().stop();
1505      procedureExecutor.stop();
1506      procedureExecutor.join();
1507      procedureExecutor = null;
1508    }
1509
1510    if (procedureStore != null) {
1511      procedureStore.stop(isAborted());
1512      procedureStore = null;
1513    }
1514  }
1515
1516  private void stopChores() {
1517    ChoreService choreService = getChoreService();
1518    if (choreService != null) {
1519      choreService.cancelChore(this.expiredMobFileCleanerChore);
1520      choreService.cancelChore(this.mobCompactChore);
1521      choreService.cancelChore(this.balancerChore);
1522      if (regionNormalizerManager != null) {
1523        choreService.cancelChore(regionNormalizerManager.getRegionNormalizerChore());
1524      }
1525      choreService.cancelChore(this.clusterStatusChore);
1526      choreService.cancelChore(this.catalogJanitorChore);
1527      choreService.cancelChore(this.clusterStatusPublisherChore);
1528      choreService.cancelChore(this.snapshotQuotaChore);
1529      choreService.cancelChore(this.logCleaner);
1530      choreService.cancelChore(this.hfileCleaner);
1531      choreService.cancelChore(this.replicationBarrierCleaner);
1532      choreService.cancelChore(this.snapshotCleanerChore);
1533      choreService.cancelChore(this.hbckChore);
1534      choreService.cancelChore(this.regionsRecoveryChore);
1535    }
1536  }
1537
1538  /**
1539   * @return Get remote side's InetAddress
1540   */
1541  InetAddress getRemoteInetAddress(final int port,
1542      final long serverStartCode) throws UnknownHostException {
1543    // Do it out here in its own little method so can fake an address when
1544    // mocking up in tests.
1545    InetAddress ia = RpcServer.getRemoteIp();
1546
1547    // The call could be from the local regionserver,
1548    // in which case, there is no remote address.
1549    if (ia == null && serverStartCode == startcode) {
1550      InetSocketAddress isa = rpcServices.getSocketAddress();
1551      if (isa != null && isa.getPort() == port) {
1552        ia = isa.getAddress();
1553      }
1554    }
1555    return ia;
1556  }
1557
1558  /**
1559   * @return Maximum time we should run balancer for
1560   */
1561  private int getMaxBalancingTime() {
1562    // if max balancing time isn't set, defaulting it to period time
1563    int maxBalancingTime = getConfiguration().getInt(HConstants.HBASE_BALANCER_MAX_BALANCING,
1564      getConfiguration()
1565        .getInt(HConstants.HBASE_BALANCER_PERIOD, HConstants.DEFAULT_HBASE_BALANCER_PERIOD));
1566    return maxBalancingTime;
1567  }
1568
1569  /**
1570   * @return Maximum number of regions in transition
1571   */
1572  private int getMaxRegionsInTransition() {
1573    int numRegions = this.assignmentManager.getRegionStates().getRegionAssignments().size();
1574    return Math.max((int) Math.floor(numRegions * this.maxRitPercent), 1);
1575  }
1576
1577  /**
1578   * It first sleep to the next balance plan start time. Meanwhile, throttling by the max
1579   * number regions in transition to protect availability.
1580   * @param nextBalanceStartTime The next balance plan start time
1581   * @param maxRegionsInTransition max number of regions in transition
1582   * @param cutoffTime when to exit balancer
1583   */
1584  private void balanceThrottling(long nextBalanceStartTime, int maxRegionsInTransition,
1585      long cutoffTime) {
1586    boolean interrupted = false;
1587
1588    // Sleep to next balance plan start time
1589    // But if there are zero regions in transition, it can skip sleep to speed up.
1590    while (!interrupted && System.currentTimeMillis() < nextBalanceStartTime
1591        && this.assignmentManager.getRegionStates().hasRegionsInTransition()) {
1592      try {
1593        Thread.sleep(100);
1594      } catch (InterruptedException ie) {
1595        interrupted = true;
1596      }
1597    }
1598
1599    // Throttling by max number regions in transition
1600    while (!interrupted
1601        && maxRegionsInTransition > 0
1602        && this.assignmentManager.getRegionStates().getRegionsInTransitionCount()
1603        >= maxRegionsInTransition && System.currentTimeMillis() <= cutoffTime) {
1604      try {
1605        // sleep if the number of regions in transition exceeds the limit
1606        Thread.sleep(100);
1607      } catch (InterruptedException ie) {
1608        interrupted = true;
1609      }
1610    }
1611
1612    if (interrupted) Thread.currentThread().interrupt();
1613  }
1614
1615  public boolean balance() throws IOException {
1616    return balance(false);
1617  }
1618
1619  /**
1620   * Checks master state before initiating action over region topology.
1621   * @param action the name of the action under consideration, for logging.
1622   * @return {@code true} when the caller should exit early, {@code false} otherwise.
1623   */
1624  @Override
1625  public boolean skipRegionManagementAction(final String action) {
1626    // Note: this method could be `default` on MasterServices if but for logging.
1627    if (!isInitialized()) {
1628      LOG.debug("Master has not been initialized, don't run {}.", action);
1629      return true;
1630    }
1631    if (this.getServerManager().isClusterShutdown()) {
1632      LOG.info("Cluster is shutting down, don't run {}.", action);
1633      return true;
1634    }
1635    if (isInMaintenanceMode()) {
1636      LOG.info("Master is in maintenance mode, don't run {}.", action);
1637      return true;
1638    }
1639    return false;
1640  }
1641
1642  public boolean balance(boolean force) throws IOException {
1643    if (loadBalancerTracker == null || !loadBalancerTracker.isBalancerOn()) {
1644      return false;
1645    }
1646    if (skipRegionManagementAction("balancer")) {
1647      return false;
1648    }
1649
1650    synchronized (this.balancer) {
1651        // Only allow one balance run at at time.
1652      if (this.assignmentManager.hasRegionsInTransition()) {
1653        List<RegionStateNode> regionsInTransition = assignmentManager.getRegionsInTransition();
1654        // if hbase:meta region is in transition, result of assignment cannot be recorded
1655        // ignore the force flag in that case
1656        boolean metaInTransition = assignmentManager.isMetaRegionInTransition();
1657        String prefix = force && !metaInTransition ? "R" : "Not r";
1658        List<RegionStateNode> toPrint = regionsInTransition;
1659        int max = 5;
1660        boolean truncated = false;
1661        if (regionsInTransition.size() > max) {
1662          toPrint = regionsInTransition.subList(0, max);
1663          truncated = true;
1664        }
1665        LOG.info(prefix + " not running balancer because " + regionsInTransition.size() +
1666          " region(s) in transition: " + toPrint + (truncated? "(truncated list)": ""));
1667        if (!force || metaInTransition) return false;
1668      }
1669      if (this.serverManager.areDeadServersInProgress()) {
1670        LOG.info("Not running balancer because processing dead regionserver(s): " +
1671          this.serverManager.getDeadServers());
1672        return false;
1673      }
1674
1675      if (this.cpHost != null) {
1676        try {
1677          if (this.cpHost.preBalance()) {
1678            LOG.debug("Coprocessor bypassing balancer request");
1679            return false;
1680          }
1681        } catch (IOException ioe) {
1682          LOG.error("Error invoking master coprocessor preBalance()", ioe);
1683          return false;
1684        }
1685      }
1686
1687      Map<TableName, Map<ServerName, List<RegionInfo>>> assignments =
1688        this.assignmentManager.getRegionStates()
1689          .getAssignmentsForBalancer(tableStateManager, this.serverManager.getOnlineServersList());
1690      for (Map<ServerName, List<RegionInfo>> serverMap : assignments.values()) {
1691        serverMap.keySet().removeAll(this.serverManager.getDrainingServersList());
1692      }
1693
1694      //Give the balancer the current cluster state.
1695      this.balancer.setClusterMetrics(getClusterMetricsWithoutCoprocessor());
1696
1697      List<RegionPlan> plans = this.balancer.balanceCluster(assignments);
1698
1699      if (skipRegionManagementAction("balancer")) {
1700        // make one last check that the cluster isn't shutting down before proceeding.
1701        return false;
1702      }
1703
1704      List<RegionPlan> sucRPs = executeRegionPlansWithThrottling(plans);
1705
1706      if (this.cpHost != null) {
1707        try {
1708          this.cpHost.postBalance(sucRPs);
1709        } catch (IOException ioe) {
1710          // balancing already succeeded so don't change the result
1711          LOG.error("Error invoking master coprocessor postBalance()", ioe);
1712        }
1713      }
1714    }
1715    // If LoadBalancer did not generate any plans, it means the cluster is already balanced.
1716    // Return true indicating a success.
1717    return true;
1718  }
1719
1720  /**
1721   * Execute region plans with throttling
1722   * @param plans to execute
1723   * @return succeeded plans
1724   */
1725  public List<RegionPlan> executeRegionPlansWithThrottling(List<RegionPlan> plans) {
1726    List<RegionPlan> successRegionPlans = new ArrayList<>();
1727    int maxRegionsInTransition = getMaxRegionsInTransition();
1728    long balanceStartTime = System.currentTimeMillis();
1729    long cutoffTime = balanceStartTime + this.maxBalancingTime;
1730    int rpCount = 0;  // number of RegionPlans balanced so far
1731    if (plans != null && !plans.isEmpty()) {
1732      int balanceInterval = this.maxBalancingTime / plans.size();
1733      LOG.info("Balancer plans size is " + plans.size() + ", the balance interval is "
1734          + balanceInterval + " ms, and the max number regions in transition is "
1735          + maxRegionsInTransition);
1736
1737      for (RegionPlan plan: plans) {
1738        LOG.info("balance " + plan);
1739        //TODO: bulk assign
1740        try {
1741          this.assignmentManager.moveAsync(plan);
1742        } catch (HBaseIOException hioe) {
1743          //should ignore failed plans here, avoiding the whole balance plans be aborted
1744          //later calls of balance() can fetch up the failed and skipped plans
1745          LOG.warn("Failed balance plan {}, skipping...", plan, hioe);
1746        }
1747        //rpCount records balance plans processed, does not care if a plan succeeds
1748        rpCount++;
1749        successRegionPlans.add(plan);
1750
1751        if (this.maxBalancingTime > 0) {
1752          balanceThrottling(balanceStartTime + rpCount * balanceInterval, maxRegionsInTransition,
1753            cutoffTime);
1754        }
1755
1756        // if performing next balance exceeds cutoff time, exit the loop
1757        if (this.maxBalancingTime > 0 && rpCount < plans.size()
1758          && System.currentTimeMillis() > cutoffTime) {
1759          // TODO: After balance, there should not be a cutoff time (keeping it as
1760          // a security net for now)
1761          LOG.debug("No more balancing till next balance run; maxBalanceTime="
1762              + this.maxBalancingTime);
1763          break;
1764        }
1765      }
1766    }
1767    return successRegionPlans;
1768  }
1769
1770  @Override
1771  public RegionNormalizerManager getRegionNormalizerManager() {
1772    return regionNormalizerManager;
1773  }
1774
1775  @Override
1776  public boolean normalizeRegions(
1777    final NormalizeTableFilterParams ntfp,
1778    final boolean isHighPriority
1779  ) throws IOException {
1780    if (regionNormalizerManager == null || !regionNormalizerManager.isNormalizerOn()) {
1781      LOG.debug("Region normalization is disabled, don't run region normalizer.");
1782      return false;
1783    }
1784    if (skipRegionManagementAction("region normalizer")) {
1785      return false;
1786    }
1787    if (assignmentManager.hasRegionsInTransition()) {
1788      return false;
1789    }
1790
1791    final Set<TableName> matchingTables = getTableDescriptors(new LinkedList<>(),
1792      ntfp.getNamespace(), ntfp.getRegex(), ntfp.getTableNames(), false)
1793      .stream()
1794      .map(TableDescriptor::getTableName)
1795      .collect(Collectors.toSet());
1796    final Set<TableName> allEnabledTables =
1797      tableStateManager.getTablesInStates(TableState.State.ENABLED);
1798    final List<TableName> targetTables =
1799      new ArrayList<>(Sets.intersection(matchingTables, allEnabledTables));
1800    Collections.shuffle(targetTables);
1801    return regionNormalizerManager.normalizeRegions(targetTables, isHighPriority);
1802  }
1803
1804  /**
1805   * @return Client info for use as prefix on an audit log string; who did an action
1806   */
1807  @Override
1808  public String getClientIdAuditPrefix() {
1809    return "Client=" + RpcServer.getRequestUserName().orElse(null)
1810        + "/" + RpcServer.getRemoteAddress().orElse(null);
1811  }
1812
1813  /**
1814   * Switch for the background CatalogJanitor thread.
1815   * Used for testing.  The thread will continue to run.  It will just be a noop
1816   * if disabled.
1817   * @param b If false, the catalog janitor won't do anything.
1818   */
1819  public void setCatalogJanitorEnabled(final boolean b) {
1820    this.catalogJanitorChore.setEnabled(b);
1821  }
1822
1823  @Override
1824  public long mergeRegions(
1825      final RegionInfo[] regionsToMerge,
1826      final boolean forcible,
1827      final long ng,
1828      final long nonce) throws IOException {
1829    checkInitialized();
1830
1831    if (!isSplitOrMergeEnabled(MasterSwitchType.MERGE)) {
1832      String regionsStr = Arrays.deepToString(regionsToMerge);
1833      LOG.warn("Merge switch is off! skip merge of " + regionsStr);
1834      throw new DoNotRetryIOException("Merge of " + regionsStr +
1835          " failed because merge switch is off");
1836    }
1837
1838    final String mergeRegionsStr = Arrays.stream(regionsToMerge).map(RegionInfo::getEncodedName)
1839      .collect(Collectors.joining(", "));
1840    return MasterProcedureUtil.submitProcedure(new NonceProcedureRunnable(this, ng, nonce) {
1841      @Override
1842      protected void run() throws IOException {
1843        getMaster().getMasterCoprocessorHost().preMergeRegions(regionsToMerge);
1844        String aid = getClientIdAuditPrefix();
1845        LOG.info("{} merge regions {}", aid, mergeRegionsStr);
1846        submitProcedure(new MergeTableRegionsProcedure(procedureExecutor.getEnvironment(),
1847            regionsToMerge, forcible));
1848        getMaster().getMasterCoprocessorHost().postMergeRegions(regionsToMerge);
1849      }
1850
1851      @Override
1852      protected String getDescription() {
1853        return "MergeTableProcedure";
1854      }
1855    });
1856  }
1857
1858  @Override
1859  public long splitRegion(final RegionInfo regionInfo, final byte[] splitRow,
1860      final long nonceGroup, final long nonce)
1861  throws IOException {
1862    checkInitialized();
1863
1864    if (!isSplitOrMergeEnabled(MasterSwitchType.SPLIT)) {
1865      LOG.warn("Split switch is off! skip split of " + regionInfo);
1866      throw new DoNotRetryIOException("Split region " + regionInfo.getRegionNameAsString() +
1867          " failed due to split switch off");
1868    }
1869
1870    return MasterProcedureUtil.submitProcedure(
1871        new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
1872      @Override
1873      protected void run() throws IOException {
1874        getMaster().getMasterCoprocessorHost().preSplitRegion(regionInfo.getTable(), splitRow);
1875        LOG.info(getClientIdAuditPrefix() + " split " + regionInfo.getRegionNameAsString());
1876
1877        // Execute the operation asynchronously
1878        submitProcedure(getAssignmentManager().createSplitProcedure(regionInfo, splitRow));
1879      }
1880
1881      @Override
1882      protected String getDescription() {
1883        return "SplitTableProcedure";
1884      }
1885    });
1886  }
1887
1888  // Public so can be accessed by tests. Blocks until move is done.
1889  // Replace with an async implementation from which you can get
1890  // a success/failure result.
1891  @InterfaceAudience.Private
1892  public void move(final byte[] encodedRegionName, byte[] destServerName) throws HBaseIOException {
1893    RegionState regionState = assignmentManager.getRegionStates().
1894      getRegionState(Bytes.toString(encodedRegionName));
1895
1896    RegionInfo hri;
1897    if (regionState != null) {
1898      hri = regionState.getRegion();
1899    } else {
1900      throw new UnknownRegionException(Bytes.toStringBinary(encodedRegionName));
1901    }
1902
1903    ServerName dest;
1904    List<ServerName> exclude = hri.getTable().isSystemTable() ? assignmentManager.getExcludedServersForSystemTable()
1905        : new ArrayList<>(1);
1906    if (destServerName != null && exclude.contains(ServerName.valueOf(Bytes.toString(destServerName)))) {
1907      LOG.info(
1908          Bytes.toString(encodedRegionName) + " can not move to " + Bytes.toString(destServerName)
1909              + " because the server is in exclude list");
1910      destServerName = null;
1911    }
1912    if (destServerName == null || destServerName.length == 0) {
1913      LOG.info("Passed destination servername is null/empty so " +
1914        "choosing a server at random");
1915      exclude.add(regionState.getServerName());
1916      final List<ServerName> destServers = this.serverManager.createDestinationServersList(exclude);
1917      dest = balancer.randomAssignment(hri, destServers);
1918      if (dest == null) {
1919        LOG.debug("Unable to determine a plan to assign " + hri);
1920        return;
1921      }
1922    } else {
1923      ServerName candidate = ServerName.valueOf(Bytes.toString(destServerName));
1924      dest = balancer.randomAssignment(hri, Lists.newArrayList(candidate));
1925      if (dest == null) {
1926        LOG.debug("Unable to determine a plan to assign " + hri);
1927        return;
1928      }
1929      // TODO: What is this? I don't get it.
1930      if (dest.equals(serverName) && balancer instanceof BaseLoadBalancer
1931          && !((BaseLoadBalancer)balancer).shouldBeOnMaster(hri)) {
1932        // To avoid unnecessary region moving later by balancer. Don't put user
1933        // regions on master.
1934        LOG.debug("Skipping move of region " + hri.getRegionNameAsString()
1935          + " to avoid unnecessary region moving later by load balancer,"
1936          + " because it should not be on master");
1937        return;
1938      }
1939    }
1940
1941    if (dest.equals(regionState.getServerName())) {
1942      LOG.debug("Skipping move of region " + hri.getRegionNameAsString()
1943        + " because region already assigned to the same server " + dest + ".");
1944      return;
1945    }
1946
1947    // Now we can do the move
1948    RegionPlan rp = new RegionPlan(hri, regionState.getServerName(), dest);
1949    assert rp.getDestination() != null: rp.toString() + " " + dest;
1950
1951    try {
1952      checkInitialized();
1953      if (this.cpHost != null) {
1954        this.cpHost.preMove(hri, rp.getSource(), rp.getDestination());
1955      }
1956
1957      TransitRegionStateProcedure proc =
1958          this.assignmentManager.createMoveRegionProcedure(rp.getRegionInfo(), rp.getDestination());
1959      // Warmup the region on the destination before initiating the move. this call
1960      // is synchronous and takes some time. doing it before the source region gets
1961      // closed
1962      serverManager.sendRegionWarmup(rp.getDestination(), hri);
1963      LOG.info(getClientIdAuditPrefix() + " move " + rp + ", running balancer");
1964      Future<byte[]> future = ProcedureSyncWait.submitProcedure(this.procedureExecutor, proc);
1965      try {
1966        // Is this going to work? Will we throw exception on error?
1967        // TODO: CompletableFuture rather than this stunted Future.
1968        future.get();
1969      } catch (InterruptedException | ExecutionException e) {
1970        throw new HBaseIOException(e);
1971      }
1972      if (this.cpHost != null) {
1973        this.cpHost.postMove(hri, rp.getSource(), rp.getDestination());
1974      }
1975    } catch (IOException ioe) {
1976      if (ioe instanceof HBaseIOException) {
1977        throw (HBaseIOException)ioe;
1978      }
1979      throw new HBaseIOException(ioe);
1980    }
1981  }
1982
1983  @Override
1984  public long createTable(final TableDescriptor tableDescriptor, final byte[][] splitKeys,
1985      final long nonceGroup, final long nonce) throws IOException {
1986    checkInitialized();
1987    TableDescriptor desc = getMasterCoprocessorHost().preCreateTableRegionsInfos(tableDescriptor);
1988    if (desc == null) {
1989      throw new IOException("Creation for " + tableDescriptor + " is canceled by CP");
1990    }
1991    String namespace = desc.getTableName().getNamespaceAsString();
1992    this.clusterSchemaService.getNamespace(namespace);
1993
1994    RegionInfo[] newRegions = ModifyRegionUtils.createRegionInfos(desc, splitKeys);
1995    TableDescriptorChecker.sanityCheck(conf, desc);
1996
1997    return MasterProcedureUtil
1998      .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
1999        @Override
2000        protected void run() throws IOException {
2001          getMaster().getMasterCoprocessorHost().preCreateTable(desc, newRegions);
2002
2003          LOG.info(getClientIdAuditPrefix() + " create " + desc);
2004
2005          // TODO: We can handle/merge duplicate requests, and differentiate the case of
2006          // TableExistsException by saying if the schema is the same or not.
2007          //
2008          // We need to wait for the procedure to potentially fail due to "prepare" sanity
2009          // checks. This will block only the beginning of the procedure. See HBASE-19953.
2010          ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch();
2011          submitProcedure(
2012            new CreateTableProcedure(procedureExecutor.getEnvironment(), desc, newRegions, latch));
2013          latch.await();
2014
2015          getMaster().getMasterCoprocessorHost().postCreateTable(desc, newRegions);
2016        }
2017
2018        @Override
2019        protected String getDescription() {
2020          return "CreateTableProcedure";
2021        }
2022      });
2023  }
2024
2025  @Override
2026  public long createSystemTable(final TableDescriptor tableDescriptor) throws IOException {
2027    if (isStopped()) {
2028      throw new MasterNotRunningException();
2029    }
2030
2031    TableName tableName = tableDescriptor.getTableName();
2032    if (!(tableName.isSystemTable())) {
2033      throw new IllegalArgumentException(
2034        "Only system table creation can use this createSystemTable API");
2035    }
2036
2037    RegionInfo[] newRegions = ModifyRegionUtils.createRegionInfos(tableDescriptor, null);
2038
2039    LOG.info(getClientIdAuditPrefix() + " create " + tableDescriptor);
2040
2041    // This special create table is called locally to master.  Therefore, no RPC means no need
2042    // to use nonce to detect duplicated RPC call.
2043    long procId = this.procedureExecutor.submitProcedure(
2044      new CreateTableProcedure(procedureExecutor.getEnvironment(), tableDescriptor, newRegions));
2045
2046    return procId;
2047  }
2048
2049  private void startActiveMasterManager(int infoPort) throws KeeperException {
2050    String backupZNode = ZNodePaths.joinZNode(
2051      zooKeeper.getZNodePaths().backupMasterAddressesZNode, serverName.toString());
2052    /*
2053    * Add a ZNode for ourselves in the backup master directory since we
2054    * may not become the active master. If so, we want the actual active
2055    * master to know we are backup masters, so that it won't assign
2056    * regions to us if so configured.
2057    *
2058    * If we become the active master later, ActiveMasterManager will delete
2059    * this node explicitly.  If we crash before then, ZooKeeper will delete
2060    * this node for us since it is ephemeral.
2061    */
2062    LOG.info("Adding backup master ZNode " + backupZNode);
2063    if (!MasterAddressTracker.setMasterAddress(zooKeeper, backupZNode, serverName, infoPort)) {
2064      LOG.warn("Failed create of " + backupZNode + " by " + serverName);
2065    }
2066    this.activeMasterManager.setInfoPort(infoPort);
2067    int timeout = conf.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
2068    // If we're a backup master, stall until a primary to write this address
2069    if (conf.getBoolean(HConstants.MASTER_TYPE_BACKUP, HConstants.DEFAULT_MASTER_TYPE_BACKUP)) {
2070      LOG.debug("HMaster started in backup mode. Stalling until master znode is written.");
2071      // This will only be a minute or so while the cluster starts up,
2072      // so don't worry about setting watches on the parent znode
2073      while (!activeMasterManager.hasActiveMaster()) {
2074        LOG.debug("Waiting for master address and cluster state znode to be written.");
2075        Threads.sleep(timeout);
2076      }
2077    }
2078    MonitoredTask status = TaskMonitor.get().createStatus("Master startup");
2079    status.setDescription("Master startup");
2080    try {
2081      if (activeMasterManager.blockUntilBecomingActiveMaster(timeout, status)) {
2082        finishActiveMasterInitialization(status);
2083      }
2084    } catch (Throwable t) {
2085      status.setStatus("Failed to become active: " + t.getMessage());
2086      LOG.error(HBaseMarkers.FATAL, "Failed to become active master", t);
2087      // HBASE-5680: Likely hadoop23 vs hadoop 20.x/1.x incompatibility
2088      if (t instanceof NoClassDefFoundError && t.getMessage().
2089          contains("org/apache/hadoop/hdfs/protocol/HdfsConstants$SafeModeAction")) {
2090        // improved error message for this special case
2091        abort("HBase is having a problem with its Hadoop jars.  You may need to recompile " +
2092          "HBase against Hadoop version " + org.apache.hadoop.util.VersionInfo.getVersion() +
2093          " or change your hadoop jars to start properly", t);
2094      } else {
2095        abort("Unhandled exception. Starting shutdown.", t);
2096      }
2097    } finally {
2098      status.cleanup();
2099    }
2100  }
2101
2102  private static boolean isCatalogTable(final TableName tableName) {
2103    return tableName.equals(TableName.META_TABLE_NAME);
2104  }
2105
2106  @Override
2107  public long deleteTable(
2108      final TableName tableName,
2109      final long nonceGroup,
2110      final long nonce) throws IOException {
2111    checkInitialized();
2112
2113    return MasterProcedureUtil.submitProcedure(
2114        new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2115      @Override
2116      protected void run() throws IOException {
2117        getMaster().getMasterCoprocessorHost().preDeleteTable(tableName);
2118
2119        LOG.info(getClientIdAuditPrefix() + " delete " + tableName);
2120
2121        // TODO: We can handle/merge duplicate request
2122        //
2123        // We need to wait for the procedure to potentially fail due to "prepare" sanity
2124        // checks. This will block only the beginning of the procedure. See HBASE-19953.
2125        ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch();
2126        submitProcedure(new DeleteTableProcedure(procedureExecutor.getEnvironment(),
2127            tableName, latch));
2128        latch.await();
2129
2130        getMaster().getMasterCoprocessorHost().postDeleteTable(tableName);
2131      }
2132
2133      @Override
2134      protected String getDescription() {
2135        return "DeleteTableProcedure";
2136      }
2137    });
2138  }
2139
2140  @Override
2141  public long truncateTable(
2142      final TableName tableName,
2143      final boolean preserveSplits,
2144      final long nonceGroup,
2145      final long nonce) throws IOException {
2146    checkInitialized();
2147
2148    return MasterProcedureUtil.submitProcedure(
2149        new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2150      @Override
2151      protected void run() throws IOException {
2152        getMaster().getMasterCoprocessorHost().preTruncateTable(tableName);
2153
2154        LOG.info(getClientIdAuditPrefix() + " truncate " + tableName);
2155        ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(2, 0);
2156        submitProcedure(new TruncateTableProcedure(procedureExecutor.getEnvironment(),
2157            tableName, preserveSplits, latch));
2158        latch.await();
2159
2160        getMaster().getMasterCoprocessorHost().postTruncateTable(tableName);
2161      }
2162
2163      @Override
2164      protected String getDescription() {
2165        return "TruncateTableProcedure";
2166      }
2167    });
2168  }
2169
2170  @Override
2171  public long addColumn(final TableName tableName, final ColumnFamilyDescriptor column,
2172      final long nonceGroup, final long nonce) throws IOException {
2173    checkInitialized();
2174    checkTableExists(tableName);
2175
2176    return modifyTable(tableName, new TableDescriptorGetter() {
2177
2178      @Override
2179      public TableDescriptor get() throws IOException {
2180        TableDescriptor old = getTableDescriptors().get(tableName);
2181        if (old.hasColumnFamily(column.getName())) {
2182          throw new InvalidFamilyOperationException("Column family '" + column.getNameAsString()
2183              + "' in table '" + tableName + "' already exists so cannot be added");
2184        }
2185
2186        return TableDescriptorBuilder.newBuilder(old).setColumnFamily(column).build();
2187      }
2188    }, nonceGroup, nonce, true);
2189  }
2190
2191  /**
2192   * Implement to return TableDescriptor after pre-checks
2193   */
2194  protected interface TableDescriptorGetter {
2195    TableDescriptor get() throws IOException;
2196  }
2197
2198  @Override
2199  public long modifyColumn(final TableName tableName, final ColumnFamilyDescriptor descriptor,
2200      final long nonceGroup, final long nonce) throws IOException {
2201    checkInitialized();
2202    checkTableExists(tableName);
2203    return modifyTable(tableName, new TableDescriptorGetter() {
2204
2205      @Override
2206      public TableDescriptor get() throws IOException {
2207        TableDescriptor old = getTableDescriptors().get(tableName);
2208        if (!old.hasColumnFamily(descriptor.getName())) {
2209          throw new InvalidFamilyOperationException("Family '" + descriptor.getNameAsString()
2210              + "' does not exist, so it cannot be modified");
2211        }
2212
2213        return TableDescriptorBuilder.newBuilder(old).modifyColumnFamily(descriptor).build();
2214      }
2215    }, nonceGroup, nonce, true);
2216  }
2217
2218  @Override
2219  public long deleteColumn(final TableName tableName, final byte[] columnName,
2220      final long nonceGroup, final long nonce) throws IOException {
2221    checkInitialized();
2222    checkTableExists(tableName);
2223
2224    return modifyTable(tableName, new TableDescriptorGetter() {
2225
2226      @Override
2227      public TableDescriptor get() throws IOException {
2228        TableDescriptor old = getTableDescriptors().get(tableName);
2229
2230        if (!old.hasColumnFamily(columnName)) {
2231          throw new InvalidFamilyOperationException("Family '" + Bytes.toString(columnName)
2232              + "' does not exist, so it cannot be deleted");
2233        }
2234        if (old.getColumnFamilyCount() == 1) {
2235          throw new InvalidFamilyOperationException("Family '" + Bytes.toString(columnName)
2236              + "' is the only column family in the table, so it cannot be deleted");
2237        }
2238        return TableDescriptorBuilder.newBuilder(old).removeColumnFamily(columnName).build();
2239      }
2240    }, nonceGroup, nonce, true);
2241  }
2242
2243  @Override
2244  public long enableTable(final TableName tableName, final long nonceGroup, final long nonce)
2245      throws IOException {
2246    checkInitialized();
2247
2248    return MasterProcedureUtil.submitProcedure(
2249        new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2250      @Override
2251      protected void run() throws IOException {
2252        getMaster().getMasterCoprocessorHost().preEnableTable(tableName);
2253
2254        // Normally, it would make sense for this authorization check to exist inside
2255        // AccessController, but because the authorization check is done based on internal state
2256        // (rather than explicit permissions) we'll do the check here instead of in the
2257        // coprocessor.
2258        MasterQuotaManager quotaManager = getMasterQuotaManager();
2259        if (quotaManager != null) {
2260          if (quotaManager.isQuotaInitialized()) {
2261              SpaceQuotaSnapshot currSnapshotOfTable =
2262                  QuotaTableUtil.getCurrentSnapshotFromQuotaTable(getConnection(), tableName);
2263              if (currSnapshotOfTable != null) {
2264                SpaceQuotaStatus quotaStatus = currSnapshotOfTable.getQuotaStatus();
2265                if (quotaStatus.isInViolation()
2266                    && SpaceViolationPolicy.DISABLE == quotaStatus.getPolicy().orElse(null)) {
2267                throw new AccessDeniedException("Enabling the table '" + tableName
2268                    + "' is disallowed due to a violated space quota.");
2269              }
2270            }
2271          } else if (LOG.isTraceEnabled()) {
2272            LOG.trace("Unable to check for space quotas as the MasterQuotaManager is not enabled");
2273          }
2274        }
2275
2276        LOG.info(getClientIdAuditPrefix() + " enable " + tableName);
2277
2278        // Execute the operation asynchronously - client will check the progress of the operation
2279        // In case the request is from a <1.1 client before returning,
2280        // we want to make sure that the table is prepared to be
2281        // enabled (the table is locked and the table state is set).
2282        // Note: if the procedure throws exception, we will catch it and rethrow.
2283        final ProcedurePrepareLatch prepareLatch = ProcedurePrepareLatch.createLatch();
2284        submitProcedure(new EnableTableProcedure(procedureExecutor.getEnvironment(),
2285            tableName, prepareLatch));
2286        prepareLatch.await();
2287
2288        getMaster().getMasterCoprocessorHost().postEnableTable(tableName);
2289      }
2290
2291      @Override
2292      protected String getDescription() {
2293        return "EnableTableProcedure";
2294      }
2295    });
2296  }
2297
2298  @Override
2299  public long disableTable(final TableName tableName, final long nonceGroup, final long nonce)
2300      throws IOException {
2301    checkInitialized();
2302
2303    return MasterProcedureUtil.submitProcedure(
2304        new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2305      @Override
2306      protected void run() throws IOException {
2307        getMaster().getMasterCoprocessorHost().preDisableTable(tableName);
2308
2309        LOG.info(getClientIdAuditPrefix() + " disable " + tableName);
2310
2311        // Execute the operation asynchronously - client will check the progress of the operation
2312        // In case the request is from a <1.1 client before returning,
2313        // we want to make sure that the table is prepared to be
2314        // enabled (the table is locked and the table state is set).
2315        // Note: if the procedure throws exception, we will catch it and rethrow.
2316        //
2317        // We need to wait for the procedure to potentially fail due to "prepare" sanity
2318        // checks. This will block only the beginning of the procedure. See HBASE-19953.
2319        final ProcedurePrepareLatch prepareLatch = ProcedurePrepareLatch.createBlockingLatch();
2320        submitProcedure(new DisableTableProcedure(procedureExecutor.getEnvironment(),
2321            tableName, false, prepareLatch));
2322        prepareLatch.await();
2323
2324        getMaster().getMasterCoprocessorHost().postDisableTable(tableName);
2325      }
2326
2327      @Override
2328      protected String getDescription() {
2329        return "DisableTableProcedure";
2330      }
2331    });
2332  }
2333
2334  private long modifyTable(final TableName tableName,
2335      final TableDescriptorGetter newDescriptorGetter, final long nonceGroup, final long nonce,
2336      final boolean shouldCheckDescriptor) throws IOException {
2337    return MasterProcedureUtil
2338        .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2339          @Override
2340          protected void run() throws IOException {
2341            TableDescriptor oldDescriptor = getMaster().getTableDescriptors().get(tableName);
2342            TableDescriptor newDescriptor = getMaster().getMasterCoprocessorHost()
2343                .preModifyTable(tableName, oldDescriptor, newDescriptorGetter.get());
2344            TableDescriptorChecker.sanityCheck(conf, newDescriptor);
2345            LOG.info("{} modify table {} from {} to {}", getClientIdAuditPrefix(), tableName,
2346              oldDescriptor, newDescriptor);
2347
2348            // Execute the operation synchronously - wait for the operation completes before
2349            // continuing.
2350            //
2351            // We need to wait for the procedure to potentially fail due to "prepare" sanity
2352            // checks. This will block only the beginning of the procedure. See HBASE-19953.
2353            ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch();
2354            submitProcedure(new ModifyTableProcedure(procedureExecutor.getEnvironment(),
2355                newDescriptor, latch, oldDescriptor, shouldCheckDescriptor));
2356            latch.await();
2357
2358            getMaster().getMasterCoprocessorHost().postModifyTable(tableName, oldDescriptor,
2359              newDescriptor);
2360          }
2361
2362          @Override
2363          protected String getDescription() {
2364            return "ModifyTableProcedure";
2365          }
2366        });
2367
2368  }
2369
2370  @Override
2371  public long modifyTable(final TableName tableName, final TableDescriptor newDescriptor,
2372      final long nonceGroup, final long nonce) throws IOException {
2373    checkInitialized();
2374    return modifyTable(tableName, new TableDescriptorGetter() {
2375      @Override
2376      public TableDescriptor get() throws IOException {
2377        return newDescriptor;
2378      }
2379    }, nonceGroup, nonce, false);
2380
2381  }
2382
2383  public long restoreSnapshot(final SnapshotDescription snapshotDesc,
2384      final long nonceGroup, final long nonce, final boolean restoreAcl) throws IOException {
2385    checkInitialized();
2386    getSnapshotManager().checkSnapshotSupport();
2387
2388    // Ensure namespace exists. Will throw exception if non-known NS.
2389    final TableName dstTable = TableName.valueOf(snapshotDesc.getTable());
2390    getClusterSchema().getNamespace(dstTable.getNamespaceAsString());
2391
2392    return MasterProcedureUtil.submitProcedure(
2393        new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2394      @Override
2395      protected void run() throws IOException {
2396          setProcId(
2397            getSnapshotManager().restoreOrCloneSnapshot(snapshotDesc, getNonceKey(), restoreAcl));
2398      }
2399
2400      @Override
2401      protected String getDescription() {
2402        return "RestoreSnapshotProcedure";
2403      }
2404    });
2405  }
2406
2407  private void checkTableExists(final TableName tableName)
2408    throws IOException, TableNotFoundException {
2409    if (!tableDescriptors.exists(tableName)) {
2410      throw new TableNotFoundException(tableName);
2411    }
2412  }
2413
2414  @Override
2415  public void checkTableModifiable(final TableName tableName)
2416      throws IOException, TableNotFoundException, TableNotDisabledException {
2417    if (isCatalogTable(tableName)) {
2418      throw new IOException("Can't modify catalog tables");
2419    }
2420    checkTableExists(tableName);
2421    TableState ts = getTableStateManager().getTableState(tableName);
2422    if (!ts.isDisabled()) {
2423      throw new TableNotDisabledException("Not DISABLED; " + ts);
2424    }
2425  }
2426
2427  public ClusterMetrics getClusterMetricsWithoutCoprocessor() throws InterruptedIOException {
2428    return getClusterMetricsWithoutCoprocessor(EnumSet.allOf(Option.class));
2429  }
2430
2431  public ClusterMetrics getClusterMetricsWithoutCoprocessor(EnumSet<Option> options)
2432      throws InterruptedIOException {
2433    ClusterMetricsBuilder builder = ClusterMetricsBuilder.newBuilder();
2434    // given that hbase1 can't submit the request with Option,
2435    // we return all information to client if the list of Option is empty.
2436    if (options.isEmpty()) {
2437      options = EnumSet.allOf(Option.class);
2438    }
2439
2440    for (Option opt : options) {
2441      switch (opt) {
2442        case HBASE_VERSION: builder.setHBaseVersion(VersionInfo.getVersion()); break;
2443        case CLUSTER_ID: builder.setClusterId(getClusterId()); break;
2444        case MASTER: builder.setMasterName(getServerName()); break;
2445        case BACKUP_MASTERS: builder.setBackerMasterNames(getBackupMasters()); break;
2446        case LIVE_SERVERS: {
2447          if (serverManager != null) {
2448            builder.setLiveServerMetrics(serverManager.getOnlineServers().entrySet().stream()
2449              .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())));
2450          }
2451          break;
2452        }
2453        case DEAD_SERVERS: {
2454          if (serverManager != null) {
2455            builder.setDeadServerNames(new ArrayList<>(
2456              serverManager.getDeadServers().copyServerNames()));
2457          }
2458          break;
2459        }
2460        case MASTER_COPROCESSORS: {
2461          if (cpHost != null) {
2462            builder.setMasterCoprocessorNames(Arrays.asList(getMasterCoprocessors()));
2463          }
2464          break;
2465        }
2466        case REGIONS_IN_TRANSITION: {
2467          if (assignmentManager != null) {
2468            builder.setRegionsInTransition(assignmentManager.getRegionStates()
2469                .getRegionsStateInTransition());
2470          }
2471          break;
2472        }
2473        case BALANCER_ON: {
2474          if (loadBalancerTracker != null) {
2475            builder.setBalancerOn(loadBalancerTracker.isBalancerOn());
2476          }
2477          break;
2478        }
2479        case MASTER_INFO_PORT: {
2480          if (infoServer != null) {
2481            builder.setMasterInfoPort(infoServer.getPort());
2482          }
2483          break;
2484        }
2485        case SERVERS_NAME: {
2486          if (serverManager != null) {
2487            builder.setServerNames(serverManager.getOnlineServersList());
2488          }
2489          break;
2490        }
2491        case TABLE_TO_REGIONS_COUNT: {
2492          if (isActiveMaster() && isInitialized() && assignmentManager != null) {
2493            try {
2494              Map<TableName, RegionStatesCount> tableRegionStatesCountMap = new HashMap<>();
2495              Map<String, TableDescriptor> tableDescriptorMap = getTableDescriptors().getAll();
2496              for (TableDescriptor tableDescriptor : tableDescriptorMap.values()) {
2497                TableName tableName = tableDescriptor.getTableName();
2498                RegionStatesCount regionStatesCount = assignmentManager
2499                  .getRegionStatesCount(tableName);
2500                tableRegionStatesCountMap.put(tableName, regionStatesCount);
2501              }
2502              builder.setTableRegionStatesCount(tableRegionStatesCountMap);
2503            } catch (IOException e) {
2504              LOG.error("Error while populating TABLE_TO_REGIONS_COUNT for Cluster Metrics..", e);
2505            }
2506          }
2507          break;
2508        }
2509      }
2510    }
2511    return builder.build();
2512  }
2513
2514  /**
2515   * @return cluster status
2516   */
2517  public ClusterMetrics getClusterMetrics() throws IOException {
2518    return getClusterMetrics(EnumSet.allOf(Option.class));
2519  }
2520
2521  public ClusterMetrics getClusterMetrics(EnumSet<Option> options) throws IOException {
2522    if (cpHost != null) {
2523      cpHost.preGetClusterMetrics();
2524    }
2525    ClusterMetrics status = getClusterMetricsWithoutCoprocessor(options);
2526    if (cpHost != null) {
2527      cpHost.postGetClusterMetrics(status);
2528    }
2529    return status;
2530  }
2531
2532  List<ServerName> getBackupMasters() {
2533    return activeMasterManager.getBackupMasters();
2534  }
2535
2536  /**
2537   * The set of loaded coprocessors is stored in a static set. Since it's
2538   * statically allocated, it does not require that HMaster's cpHost be
2539   * initialized prior to accessing it.
2540   * @return a String representation of the set of names of the loaded coprocessors.
2541   */
2542  public static String getLoadedCoprocessors() {
2543    return CoprocessorHost.getLoadedCoprocessors().toString();
2544  }
2545
2546  /**
2547   * @return timestamp in millis when HMaster was started.
2548   */
2549  public long getMasterStartTime() {
2550    return startcode;
2551  }
2552
2553  /**
2554   * @return timestamp in millis when HMaster became the active master.
2555   */
2556  public long getMasterActiveTime() {
2557    return masterActiveTime;
2558  }
2559
2560  /**
2561   * @return timestamp in millis when HMaster finished becoming the active master
2562   */
2563  public long getMasterFinishedInitializationTime() {
2564    return masterFinishedInitializationTime;
2565  }
2566
2567  public int getNumWALFiles() {
2568    return 0;
2569  }
2570
2571  public ProcedureStore getProcedureStore() {
2572    return procedureStore;
2573  }
2574
2575  public int getRegionServerInfoPort(final ServerName sn) {
2576    int port = this.serverManager.getInfoPort(sn);
2577    return port == 0 ? conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
2578      HConstants.DEFAULT_REGIONSERVER_INFOPORT) : port;
2579  }
2580
2581  @Override
2582  public String getRegionServerVersion(ServerName sn) {
2583    // Will return "0.0.0" if the server is not online to prevent move system region to unknown
2584    // version RS.
2585    return this.serverManager.getVersion(sn);
2586  }
2587
2588  @Override
2589  public void checkIfShouldMoveSystemRegionAsync() {
2590    assignmentManager.checkIfShouldMoveSystemRegionAsync();
2591  }
2592
2593  /**
2594   * @return array of coprocessor SimpleNames.
2595   */
2596  public String[] getMasterCoprocessors() {
2597    Set<String> masterCoprocessors = getMasterCoprocessorHost().getCoprocessors();
2598    return masterCoprocessors.toArray(new String[masterCoprocessors.size()]);
2599  }
2600
2601  @Override
2602  public void abort(String reason, Throwable cause) {
2603    if (!setAbortRequested() || isStopped()) {
2604      LOG.debug("Abort called but aborted={}, stopped={}", isAborted(), isStopped());
2605      return;
2606    }
2607    if (cpHost != null) {
2608      // HBASE-4014: dump a list of loaded coprocessors.
2609      LOG.error(HBaseMarkers.FATAL, "Master server abort: loaded coprocessors are: " +
2610          getLoadedCoprocessors());
2611    }
2612    String msg = "***** ABORTING master " + this + ": " + reason + " *****";
2613    if (cause != null) {
2614      LOG.error(HBaseMarkers.FATAL, msg, cause);
2615    } else {
2616      LOG.error(HBaseMarkers.FATAL, msg);
2617    }
2618
2619    try {
2620      stopMaster();
2621    } catch (IOException e) {
2622      LOG.error("Exception occurred while stopping master", e);
2623    }
2624  }
2625
2626  @Override
2627  public ZKWatcher getZooKeeper() {
2628    return zooKeeper;
2629  }
2630
2631  @Override
2632  public MasterCoprocessorHost getMasterCoprocessorHost() {
2633    return cpHost;
2634  }
2635
2636  @Override
2637  public MasterQuotaManager getMasterQuotaManager() {
2638    return quotaManager;
2639  }
2640
2641  @Override
2642  public ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
2643    return procedureExecutor;
2644  }
2645
2646  @Override
2647  public ServerName getServerName() {
2648    return this.serverName;
2649  }
2650
2651  @Override
2652  public AssignmentManager getAssignmentManager() {
2653    return this.assignmentManager;
2654  }
2655
2656  @Override
2657  public CatalogJanitor getCatalogJanitor() {
2658    return this.catalogJanitorChore;
2659  }
2660
2661  public MemoryBoundedLogMessageBuffer getRegionServerFatalLogBuffer() {
2662    return rsFatals;
2663  }
2664
2665  /**
2666   * Shutdown the cluster.
2667   * Master runs a coordinated stop of all RegionServers and then itself.
2668   */
2669  public void shutdown() throws IOException {
2670    if (cpHost != null) {
2671      cpHost.preShutdown();
2672    }
2673
2674    // Tell the servermanager cluster shutdown has been called. This makes it so when Master is
2675    // last running server, it'll stop itself. Next, we broadcast the cluster shutdown by setting
2676    // the cluster status as down. RegionServers will notice this change in state and will start
2677    // shutting themselves down. When last has exited, Master can go down.
2678    if (this.serverManager != null) {
2679      this.serverManager.shutdownCluster();
2680    }
2681    if (this.clusterStatusTracker != null) {
2682      try {
2683        this.clusterStatusTracker.setClusterDown();
2684      } catch (KeeperException e) {
2685        LOG.error("ZooKeeper exception trying to set cluster as down in ZK", e);
2686      }
2687    }
2688    // Stop the procedure executor. Will stop any ongoing assign, unassign, server crash etc.,
2689    // processing so we can go down.
2690    if (this.procedureExecutor != null) {
2691      this.procedureExecutor.stop();
2692    }
2693    // Shutdown our cluster connection. This will kill any hosted RPCs that might be going on;
2694    // this is what we want especially if the Master is in startup phase doing call outs to
2695    // hbase:meta, etc. when cluster is down. Without ths connection close, we'd have to wait on
2696    // the rpc to timeout.
2697    if (this.clusterConnection != null) {
2698      this.clusterConnection.close();
2699    }
2700  }
2701
2702  public void stopMaster() throws IOException {
2703    if (cpHost != null) {
2704      cpHost.preStopMaster();
2705    }
2706    stop("Stopped by " + Thread.currentThread().getName());
2707  }
2708
2709  @Override
2710  public void stop(String msg) {
2711    if (!isStopped()) {
2712      super.stop(msg);
2713      if (this.activeMasterManager != null) {
2714        this.activeMasterManager.stop();
2715      }
2716    }
2717  }
2718
2719  @InterfaceAudience.Private
2720  protected void checkServiceStarted() throws ServerNotRunningYetException {
2721    if (!serviceStarted) {
2722      throw new ServerNotRunningYetException("Server is not running yet");
2723    }
2724  }
2725
2726  void checkInitialized() throws PleaseHoldException, ServerNotRunningYetException,
2727      MasterNotRunningException, MasterStoppedException {
2728    checkServiceStarted();
2729    if (!isInitialized()) {
2730      throw new PleaseHoldException("Master is initializing");
2731    }
2732    if (isStopped()) {
2733      throw new MasterStoppedException();
2734    }
2735  }
2736
2737  /**
2738   * Report whether this master is currently the active master or not.
2739   * If not active master, we are parked on ZK waiting to become active.
2740   *
2741   * This method is used for testing.
2742   *
2743   * @return true if active master, false if not.
2744   */
2745  @Override
2746  public boolean isActiveMaster() {
2747    return activeMaster;
2748  }
2749
2750  /**
2751   * Report whether this master has completed with its initialization and is
2752   * ready.  If ready, the master is also the active master.  A standby master
2753   * is never ready.
2754   *
2755   * This method is used for testing.
2756   *
2757   * @return true if master is ready to go, false if not.
2758   */
2759  @Override
2760  public boolean isInitialized() {
2761    return initialized.isReady();
2762  }
2763
2764  /**
2765   * Report whether this master is in maintenance mode.
2766   *
2767   * @return true if master is in maintenanceMode
2768   */
2769  @Override
2770  public boolean isInMaintenanceMode() {
2771    return maintenanceMode;
2772  }
2773
2774  @InterfaceAudience.Private
2775  public void setInitialized(boolean isInitialized) {
2776    procedureExecutor.getEnvironment().setEventReady(initialized, isInitialized);
2777  }
2778
2779  @Override
2780  public ProcedureEvent<?> getInitializedEvent() {
2781    return initialized;
2782  }
2783
2784  /**
2785   * Compute the average load across all region servers.
2786   * Currently, this uses a very naive computation - just uses the number of
2787   * regions being served, ignoring stats about number of requests.
2788   * @return the average load
2789   */
2790  public double getAverageLoad() {
2791    if (this.assignmentManager == null) {
2792      return 0;
2793    }
2794
2795    RegionStates regionStates = this.assignmentManager.getRegionStates();
2796    if (regionStates == null) {
2797      return 0;
2798    }
2799    return regionStates.getAverageLoad();
2800  }
2801
2802  @Override
2803  public boolean registerService(Service instance) {
2804    /*
2805     * No stacking of instances is allowed for a single service name
2806     */
2807    Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
2808    String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc);
2809    if (coprocessorServiceHandlers.containsKey(serviceName)) {
2810      LOG.error("Coprocessor service "+serviceName+
2811          " already registered, rejecting request from "+instance
2812      );
2813      return false;
2814    }
2815
2816    coprocessorServiceHandlers.put(serviceName, instance);
2817    if (LOG.isDebugEnabled()) {
2818      LOG.debug("Registered master coprocessor service: service="+serviceName);
2819    }
2820    return true;
2821  }
2822
2823  /**
2824   * Utility for constructing an instance of the passed HMaster class.
2825   * @param masterClass
2826   * @return HMaster instance.
2827   */
2828  public static HMaster constructMaster(Class<? extends HMaster> masterClass,
2829      final Configuration conf)  {
2830    try {
2831      Constructor<? extends HMaster> c = masterClass.getConstructor(Configuration.class);
2832      return c.newInstance(conf);
2833    } catch(Exception e) {
2834      Throwable error = e;
2835      if (e instanceof InvocationTargetException &&
2836          ((InvocationTargetException)e).getTargetException() != null) {
2837        error = ((InvocationTargetException)e).getTargetException();
2838      }
2839      throw new RuntimeException("Failed construction of Master: " + masterClass.toString() + ". "
2840        , error);
2841    }
2842  }
2843
2844  /**
2845   * @see org.apache.hadoop.hbase.master.HMasterCommandLine
2846   */
2847  public static void main(String [] args) {
2848    LOG.info("STARTING service " + HMaster.class.getSimpleName());
2849    VersionInfo.logVersion();
2850    new HMasterCommandLine(HMaster.class).doMain(args);
2851  }
2852
2853  public HFileCleaner getHFileCleaner() {
2854    return this.hfileCleaner;
2855  }
2856
2857  public LogCleaner getLogCleaner() {
2858    return this.logCleaner;
2859  }
2860
2861  /**
2862   * @return the underlying snapshot manager
2863   */
2864  @Override
2865  public SnapshotManager getSnapshotManager() {
2866    return this.snapshotManager;
2867  }
2868
2869  /**
2870   * @return the underlying MasterProcedureManagerHost
2871   */
2872  @Override
2873  public MasterProcedureManagerHost getMasterProcedureManagerHost() {
2874    return mpmHost;
2875  }
2876
2877  @Override
2878  public ClusterSchema getClusterSchema() {
2879    return this.clusterSchemaService;
2880  }
2881
2882  /**
2883   * Create a new Namespace.
2884   * @param namespaceDescriptor descriptor for new Namespace
2885   * @param nonceGroup Identifier for the source of the request, a client or process.
2886   * @param nonce A unique identifier for this operation from the client or process identified by
2887   * <code>nonceGroup</code> (the source must ensure each operation gets a unique id).
2888   * @return procedure id
2889   */
2890  long createNamespace(final NamespaceDescriptor namespaceDescriptor, final long nonceGroup,
2891      final long nonce) throws IOException {
2892    checkInitialized();
2893
2894    TableName.isLegalNamespaceName(Bytes.toBytes(namespaceDescriptor.getName()));
2895
2896    return MasterProcedureUtil.submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this,
2897          nonceGroup, nonce) {
2898      @Override
2899      protected void run() throws IOException {
2900        getMaster().getMasterCoprocessorHost().preCreateNamespace(namespaceDescriptor);
2901        // We need to wait for the procedure to potentially fail due to "prepare" sanity
2902        // checks. This will block only the beginning of the procedure. See HBASE-19953.
2903        ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch();
2904        LOG.info(getClientIdAuditPrefix() + " creating " + namespaceDescriptor);
2905        // Execute the operation synchronously - wait for the operation to complete before
2906        // continuing.
2907        setProcId(getClusterSchema().createNamespace(namespaceDescriptor, getNonceKey(), latch));
2908        latch.await();
2909        getMaster().getMasterCoprocessorHost().postCreateNamespace(namespaceDescriptor);
2910      }
2911
2912      @Override
2913      protected String getDescription() {
2914        return "CreateNamespaceProcedure";
2915      }
2916    });
2917  }
2918
2919  /**
2920   * Modify an existing Namespace.
2921   * @param nonceGroup Identifier for the source of the request, a client or process.
2922   * @param nonce A unique identifier for this operation from the client or process identified by
2923   * <code>nonceGroup</code> (the source must ensure each operation gets a unique id).
2924   * @return procedure id
2925   */
2926  long modifyNamespace(final NamespaceDescriptor newNsDescriptor, final long nonceGroup,
2927      final long nonce) throws IOException {
2928    checkInitialized();
2929
2930    TableName.isLegalNamespaceName(Bytes.toBytes(newNsDescriptor.getName()));
2931
2932    return MasterProcedureUtil.submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this,
2933          nonceGroup, nonce) {
2934      @Override
2935      protected void run() throws IOException {
2936        NamespaceDescriptor oldNsDescriptor = getNamespace(newNsDescriptor.getName());
2937        getMaster().getMasterCoprocessorHost().preModifyNamespace(oldNsDescriptor, newNsDescriptor);
2938        // We need to wait for the procedure to potentially fail due to "prepare" sanity
2939        // checks. This will block only the beginning of the procedure. See HBASE-19953.
2940        ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch();
2941        LOG.info(getClientIdAuditPrefix() + " modify " + newNsDescriptor);
2942        // Execute the operation synchronously - wait for the operation to complete before
2943        // continuing.
2944        setProcId(getClusterSchema().modifyNamespace(newNsDescriptor, getNonceKey(), latch));
2945        latch.await();
2946        getMaster().getMasterCoprocessorHost().postModifyNamespace(oldNsDescriptor,
2947          newNsDescriptor);
2948      }
2949
2950      @Override
2951      protected String getDescription() {
2952        return "ModifyNamespaceProcedure";
2953      }
2954    });
2955  }
2956
2957  /**
2958   * Delete an existing Namespace. Only empty Namespaces (no tables) can be removed.
2959   * @param nonceGroup Identifier for the source of the request, a client or process.
2960   * @param nonce A unique identifier for this operation from the client or process identified by
2961   * <code>nonceGroup</code> (the source must ensure each operation gets a unique id).
2962   * @return procedure id
2963   */
2964  long deleteNamespace(final String name, final long nonceGroup, final long nonce)
2965      throws IOException {
2966    checkInitialized();
2967
2968    return MasterProcedureUtil.submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this,
2969          nonceGroup, nonce) {
2970      @Override
2971      protected void run() throws IOException {
2972        getMaster().getMasterCoprocessorHost().preDeleteNamespace(name);
2973        LOG.info(getClientIdAuditPrefix() + " delete " + name);
2974        // Execute the operation synchronously - wait for the operation to complete before
2975        // continuing.
2976        //
2977        // We need to wait for the procedure to potentially fail due to "prepare" sanity
2978        // checks. This will block only the beginning of the procedure. See HBASE-19953.
2979        ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch();
2980        setProcId(submitProcedure(
2981              new DeleteNamespaceProcedure(procedureExecutor.getEnvironment(), name, latch)));
2982        latch.await();
2983        // Will not be invoked in the face of Exception thrown by the Procedure's execution
2984        getMaster().getMasterCoprocessorHost().postDeleteNamespace(name);
2985      }
2986
2987      @Override
2988      protected String getDescription() {
2989        return "DeleteNamespaceProcedure";
2990      }
2991    });
2992  }
2993
2994  /**
2995   * Get a Namespace
2996   * @param name Name of the Namespace
2997   * @return Namespace descriptor for <code>name</code>
2998   */
2999  NamespaceDescriptor getNamespace(String name) throws IOException {
3000    checkInitialized();
3001    if (this.cpHost != null) this.cpHost.preGetNamespaceDescriptor(name);
3002    NamespaceDescriptor nsd = this.clusterSchemaService.getNamespace(name);
3003    if (this.cpHost != null) this.cpHost.postGetNamespaceDescriptor(nsd);
3004    return nsd;
3005  }
3006
3007  /**
3008   * Get all Namespaces
3009   * @return All Namespace descriptors
3010   */
3011  List<NamespaceDescriptor> getNamespaces() throws IOException {
3012    checkInitialized();
3013    final List<NamespaceDescriptor> nsds = new ArrayList<>();
3014    if (cpHost != null) {
3015      cpHost.preListNamespaceDescriptors(nsds);
3016    }
3017    nsds.addAll(this.clusterSchemaService.getNamespaces());
3018    if (this.cpHost != null) {
3019      this.cpHost.postListNamespaceDescriptors(nsds);
3020    }
3021    return nsds;
3022  }
3023
3024  /**
3025   * List namespace names
3026   * @return All namespace names
3027   */
3028  public List<String> listNamespaces() throws IOException {
3029    checkInitialized();
3030    List<String> namespaces = new ArrayList<>();
3031    if (cpHost != null) {
3032      cpHost.preListNamespaces(namespaces);
3033    }
3034    for (NamespaceDescriptor namespace : clusterSchemaService.getNamespaces()) {
3035      namespaces.add(namespace.getName());
3036    }
3037    if (cpHost != null) {
3038      cpHost.postListNamespaces(namespaces);
3039    }
3040    return namespaces;
3041  }
3042
3043  @Override
3044  public List<TableName> listTableNamesByNamespace(String name) throws IOException {
3045    checkInitialized();
3046    return listTableNames(name, null, true);
3047  }
3048
3049  @Override
3050  public List<TableDescriptor> listTableDescriptorsByNamespace(String name) throws IOException {
3051    checkInitialized();
3052    return listTableDescriptors(name, null, null, true);
3053  }
3054
3055  @Override
3056  public boolean abortProcedure(final long procId, final boolean mayInterruptIfRunning)
3057      throws IOException {
3058    if (cpHost != null) {
3059      cpHost.preAbortProcedure(this.procedureExecutor, procId);
3060    }
3061
3062    final boolean result = this.procedureExecutor.abort(procId, mayInterruptIfRunning);
3063
3064    if (cpHost != null) {
3065      cpHost.postAbortProcedure();
3066    }
3067
3068    return result;
3069  }
3070
3071  @Override
3072  public List<Procedure<?>> getProcedures() throws IOException {
3073    if (cpHost != null) {
3074      cpHost.preGetProcedures();
3075    }
3076
3077    @SuppressWarnings({ "unchecked", "rawtypes" })
3078    List<Procedure<?>> procList = (List) this.procedureExecutor.getProcedures();
3079
3080    if (cpHost != null) {
3081      cpHost.postGetProcedures(procList);
3082    }
3083
3084    return procList;
3085  }
3086
3087  @Override
3088  public List<LockedResource> getLocks() throws IOException {
3089    if (cpHost != null) {
3090      cpHost.preGetLocks();
3091    }
3092
3093    MasterProcedureScheduler procedureScheduler =
3094      procedureExecutor.getEnvironment().getProcedureScheduler();
3095
3096    final List<LockedResource> lockedResources = procedureScheduler.getLocks();
3097
3098    if (cpHost != null) {
3099      cpHost.postGetLocks(lockedResources);
3100    }
3101
3102    return lockedResources;
3103  }
3104
3105  /**
3106   * Returns the list of table descriptors that match the specified request
3107   * @param namespace the namespace to query, or null if querying for all
3108   * @param regex The regular expression to match against, or null if querying for all
3109   * @param tableNameList the list of table names, or null if querying for all
3110   * @param includeSysTables False to match only against userspace tables
3111   * @return the list of table descriptors
3112   */
3113  public List<TableDescriptor> listTableDescriptors(final String namespace, final String regex,
3114      final List<TableName> tableNameList, final boolean includeSysTables)
3115  throws IOException {
3116    List<TableDescriptor> htds = new ArrayList<>();
3117    if (cpHost != null) {
3118      cpHost.preGetTableDescriptors(tableNameList, htds, regex);
3119    }
3120    htds = getTableDescriptors(htds, namespace, regex, tableNameList, includeSysTables);
3121    if (cpHost != null) {
3122      cpHost.postGetTableDescriptors(tableNameList, htds, regex);
3123    }
3124    return htds;
3125  }
3126
3127  /**
3128   * Returns the list of table names that match the specified request
3129   * @param regex The regular expression to match against, or null if querying for all
3130   * @param namespace the namespace to query, or null if querying for all
3131   * @param includeSysTables False to match only against userspace tables
3132   * @return the list of table names
3133   */
3134  public List<TableName> listTableNames(final String namespace, final String regex,
3135      final boolean includeSysTables) throws IOException {
3136    List<TableDescriptor> htds = new ArrayList<>();
3137    if (cpHost != null) {
3138      cpHost.preGetTableNames(htds, regex);
3139    }
3140    htds = getTableDescriptors(htds, namespace, regex, null, includeSysTables);
3141    if (cpHost != null) {
3142      cpHost.postGetTableNames(htds, regex);
3143    }
3144    List<TableName> result = new ArrayList<>(htds.size());
3145    for (TableDescriptor htd: htds) result.add(htd.getTableName());
3146    return result;
3147  }
3148
3149  /**
3150   * Return a list of table table descriptors after applying any provided filter parameters. Note
3151   * that the user-facing description of this filter logic is presented on the class-level javadoc
3152   * of {@link NormalizeTableFilterParams}.
3153   */
3154  private List<TableDescriptor> getTableDescriptors(final List<TableDescriptor> htds,
3155      final String namespace, final String regex, final List<TableName> tableNameList,
3156      final boolean includeSysTables)
3157  throws IOException {
3158    if (tableNameList == null || tableNameList.isEmpty()) {
3159      // request for all TableDescriptors
3160      Collection<TableDescriptor> allHtds;
3161      if (namespace != null && namespace.length() > 0) {
3162        // Do a check on the namespace existence. Will fail if does not exist.
3163        this.clusterSchemaService.getNamespace(namespace);
3164        allHtds = tableDescriptors.getByNamespace(namespace).values();
3165      } else {
3166        allHtds = tableDescriptors.getAll().values();
3167      }
3168      for (TableDescriptor desc: allHtds) {
3169        if (tableStateManager.isTablePresent(desc.getTableName())
3170            && (includeSysTables || !desc.getTableName().isSystemTable())) {
3171          htds.add(desc);
3172        }
3173      }
3174    } else {
3175      for (TableName s: tableNameList) {
3176        if (tableStateManager.isTablePresent(s)) {
3177          TableDescriptor desc = tableDescriptors.get(s);
3178          if (desc != null) {
3179            htds.add(desc);
3180          }
3181        }
3182      }
3183    }
3184
3185    // Retains only those matched by regular expression.
3186    if (regex != null) filterTablesByRegex(htds, Pattern.compile(regex));
3187    return htds;
3188  }
3189
3190  /**
3191   * Removes the table descriptors that don't match the pattern.
3192   * @param descriptors list of table descriptors to filter
3193   * @param pattern the regex to use
3194   */
3195  private static void filterTablesByRegex(final Collection<TableDescriptor> descriptors,
3196      final Pattern pattern) {
3197    final String defaultNS = NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR;
3198    Iterator<TableDescriptor> itr = descriptors.iterator();
3199    while (itr.hasNext()) {
3200      TableDescriptor htd = itr.next();
3201      String tableName = htd.getTableName().getNameAsString();
3202      boolean matched = pattern.matcher(tableName).matches();
3203      if (!matched && htd.getTableName().getNamespaceAsString().equals(defaultNS)) {
3204        matched = pattern.matcher(defaultNS + TableName.NAMESPACE_DELIM + tableName).matches();
3205      }
3206      if (!matched) {
3207        itr.remove();
3208      }
3209    }
3210  }
3211
3212  @Override
3213  public long getLastMajorCompactionTimestamp(TableName table) throws IOException {
3214    return getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))
3215        .getLastMajorCompactionTimestamp(table);
3216  }
3217
3218  @Override
3219  public long getLastMajorCompactionTimestampForRegion(byte[] regionName) throws IOException {
3220    return getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))
3221        .getLastMajorCompactionTimestamp(regionName);
3222  }
3223
3224  /**
3225   * Gets the mob file compaction state for a specific table.
3226   * Whether all the mob files are selected is known during the compaction execution, but
3227   * the statistic is done just before compaction starts, it is hard to know the compaction
3228   * type at that time, so the rough statistics are chosen for the mob file compaction. Only two
3229   * compaction states are available, CompactionState.MAJOR_AND_MINOR and CompactionState.NONE.
3230   * @param tableName The current table name.
3231   * @return If a given table is in mob file compaction now.
3232   */
3233  public GetRegionInfoResponse.CompactionState getMobCompactionState(TableName tableName) {
3234    AtomicInteger compactionsCount = mobCompactionStates.get(tableName);
3235    if (compactionsCount != null && compactionsCount.get() != 0) {
3236      return GetRegionInfoResponse.CompactionState.MAJOR_AND_MINOR;
3237    }
3238    return GetRegionInfoResponse.CompactionState.NONE;
3239  }
3240
3241  public void reportMobCompactionStart(TableName tableName) throws IOException {
3242    IdLock.Entry lockEntry = null;
3243    try {
3244      lockEntry = mobCompactionLock.getLockEntry(tableName.hashCode());
3245      AtomicInteger compactionsCount = mobCompactionStates.get(tableName);
3246      if (compactionsCount == null) {
3247        compactionsCount = new AtomicInteger(0);
3248        mobCompactionStates.put(tableName, compactionsCount);
3249      }
3250      compactionsCount.incrementAndGet();
3251    } finally {
3252      if (lockEntry != null) {
3253        mobCompactionLock.releaseLockEntry(lockEntry);
3254      }
3255    }
3256  }
3257
3258  public void reportMobCompactionEnd(TableName tableName) throws IOException {
3259    IdLock.Entry lockEntry = null;
3260    try {
3261      lockEntry = mobCompactionLock.getLockEntry(tableName.hashCode());
3262      AtomicInteger compactionsCount = mobCompactionStates.get(tableName);
3263      if (compactionsCount != null) {
3264        int count = compactionsCount.decrementAndGet();
3265        // remove the entry if the count is 0.
3266        if (count == 0) {
3267          mobCompactionStates.remove(tableName);
3268        }
3269      }
3270    } finally {
3271      if (lockEntry != null) {
3272        mobCompactionLock.releaseLockEntry(lockEntry);
3273      }
3274    }
3275  }
3276
3277  /**
3278   * Requests mob compaction.
3279   * @param tableName The table the compact.
3280   * @param columns The compacted columns.
3281   * @param allFiles Whether add all mob files into the compaction.
3282   */
3283  public void requestMobCompaction(TableName tableName,
3284                                   List<ColumnFamilyDescriptor> columns, boolean allFiles) throws IOException {
3285    mobCompactThread.requestMobCompaction(conf, getFileSystem(), tableName, columns, allFiles);
3286  }
3287
3288  /**
3289   * Queries the state of the {@link LoadBalancerTracker}. If the balancer is not initialized,
3290   * false is returned.
3291   *
3292   * @return The state of the load balancer, or false if the load balancer isn't defined.
3293   */
3294  public boolean isBalancerOn() {
3295    return !isInMaintenanceMode()
3296        && loadBalancerTracker != null
3297        && loadBalancerTracker.isBalancerOn();
3298  }
3299
3300  /**
3301   * Queries the state of the {@link RegionNormalizerTracker}. If it's not initialized,
3302   * false is returned.
3303   */
3304  public boolean isNormalizerOn() {
3305    return !isInMaintenanceMode()
3306      && getRegionNormalizerManager().isNormalizerOn();
3307  }
3308
3309  /**
3310   * Queries the state of the {@link SplitOrMergeTracker}. If it is not initialized,
3311   * false is returned. If switchType is illegal, false will return.
3312   * @param switchType see {@link org.apache.hadoop.hbase.client.MasterSwitchType}
3313   * @return The state of the switch
3314   */
3315  @Override
3316  public boolean isSplitOrMergeEnabled(MasterSwitchType switchType) {
3317    return !isInMaintenanceMode()
3318        && splitOrMergeTracker != null
3319        && splitOrMergeTracker.isSplitOrMergeEnabled(switchType);
3320  }
3321
3322  /**
3323   * Fetch the configured {@link LoadBalancer} class name. If none is set, a default is returned.
3324   *
3325   * @return The name of the {@link LoadBalancer} in use.
3326   */
3327  public String getLoadBalancerClassName() {
3328    return conf.get(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, LoadBalancerFactory
3329        .getDefaultLoadBalancerClass().getName());
3330  }
3331
3332  public SplitOrMergeTracker getSplitOrMergeTracker() {
3333    return splitOrMergeTracker;
3334  }
3335
3336  @Override
3337  public LoadBalancer getLoadBalancer() {
3338    return balancer;
3339  }
3340
3341  @Override
3342  public FavoredNodesManager getFavoredNodesManager() {
3343    return favoredNodesManager;
3344  }
3345
3346  private long executePeerProcedure(ModifyPeerProcedure procedure) throws IOException {
3347    long procId = procedureExecutor.submitProcedure(procedure);
3348    procedure.getLatch().await();
3349    return procId;
3350  }
3351
3352  @Override
3353  public long addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
3354      throws ReplicationException, IOException {
3355    LOG.info(getClientIdAuditPrefix() + " creating replication peer, id=" + peerId + ", config=" +
3356      peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED"));
3357    return executePeerProcedure(new AddPeerProcedure(peerId, peerConfig, enabled));
3358  }
3359
3360  @Override
3361  public long removeReplicationPeer(String peerId) throws ReplicationException, IOException {
3362    LOG.info(getClientIdAuditPrefix() + " removing replication peer, id=" + peerId);
3363    return executePeerProcedure(new RemovePeerProcedure(peerId));
3364  }
3365
3366  @Override
3367  public long enableReplicationPeer(String peerId) throws ReplicationException, IOException {
3368    LOG.info(getClientIdAuditPrefix() + " enable replication peer, id=" + peerId);
3369    return executePeerProcedure(new EnablePeerProcedure(peerId));
3370  }
3371
3372  @Override
3373  public long disableReplicationPeer(String peerId) throws ReplicationException, IOException {
3374    LOG.info(getClientIdAuditPrefix() + " disable replication peer, id=" + peerId);
3375    return executePeerProcedure(new DisablePeerProcedure(peerId));
3376  }
3377
3378  @Override
3379  public ReplicationPeerConfig getReplicationPeerConfig(String peerId)
3380      throws ReplicationException, IOException {
3381    if (cpHost != null) {
3382      cpHost.preGetReplicationPeerConfig(peerId);
3383    }
3384    LOG.info(getClientIdAuditPrefix() + " get replication peer config, id=" + peerId);
3385    ReplicationPeerConfig peerConfig = this.replicationPeerManager.getPeerConfig(peerId)
3386        .orElseThrow(() -> new ReplicationPeerNotFoundException(peerId));
3387    if (cpHost != null) {
3388      cpHost.postGetReplicationPeerConfig(peerId);
3389    }
3390    return peerConfig;
3391  }
3392
3393  @Override
3394  public long updateReplicationPeerConfig(String peerId, ReplicationPeerConfig peerConfig)
3395      throws ReplicationException, IOException {
3396    LOG.info(getClientIdAuditPrefix() + " update replication peer config, id=" + peerId +
3397      ", config=" + peerConfig);
3398    return executePeerProcedure(new UpdatePeerConfigProcedure(peerId, peerConfig));
3399  }
3400
3401  @Override
3402  public List<ReplicationPeerDescription> listReplicationPeers(String regex)
3403      throws ReplicationException, IOException {
3404    if (cpHost != null) {
3405      cpHost.preListReplicationPeers(regex);
3406    }
3407    LOG.debug("{} list replication peers, regex={}", getClientIdAuditPrefix(), regex);
3408    Pattern pattern = regex == null ? null : Pattern.compile(regex);
3409    List<ReplicationPeerDescription> peers =
3410      this.replicationPeerManager.listPeers(pattern);
3411    if (cpHost != null) {
3412      cpHost.postListReplicationPeers(regex);
3413    }
3414    return peers;
3415  }
3416
3417  /**
3418   * Mark region server(s) as decommissioned (previously called 'draining') to prevent additional
3419   * regions from getting assigned to them. Also unload the regions on the servers asynchronously.0
3420   * @param servers Region servers to decommission.
3421   */
3422  public void decommissionRegionServers(final List<ServerName> servers, final boolean offload)
3423      throws HBaseIOException {
3424    List<ServerName> serversAdded = new ArrayList<>(servers.size());
3425    // Place the decommission marker first.
3426    String parentZnode = getZooKeeper().getZNodePaths().drainingZNode;
3427    for (ServerName server : servers) {
3428      try {
3429        String node = ZNodePaths.joinZNode(parentZnode, server.getServerName());
3430        ZKUtil.createAndFailSilent(getZooKeeper(), node);
3431      } catch (KeeperException ke) {
3432        throw new HBaseIOException(
3433          this.zooKeeper.prefix("Unable to decommission '" + server.getServerName() + "'."), ke);
3434      }
3435      if (this.serverManager.addServerToDrainList(server)) {
3436        serversAdded.add(server);
3437      };
3438    }
3439    // Move the regions off the decommissioned servers.
3440    if (offload) {
3441      final List<ServerName> destServers = this.serverManager.createDestinationServersList();
3442      for (ServerName server : serversAdded) {
3443        final List<RegionInfo> regionsOnServer = this.assignmentManager.getRegionsOnServer(server);
3444        for (RegionInfo hri : regionsOnServer) {
3445          ServerName dest = balancer.randomAssignment(hri, destServers);
3446          if (dest == null) {
3447            throw new HBaseIOException("Unable to determine a plan to move " + hri);
3448          }
3449          RegionPlan rp = new RegionPlan(hri, server, dest);
3450          this.assignmentManager.moveAsync(rp);
3451        }
3452      }
3453    }
3454  }
3455
3456  /**
3457   * List region servers marked as decommissioned (previously called 'draining') to not get regions
3458   * assigned to them.
3459   * @return List of decommissioned servers.
3460   */
3461  public List<ServerName> listDecommissionedRegionServers() {
3462    return this.serverManager.getDrainingServersList();
3463  }
3464
3465  /**
3466   * Remove decommission marker (previously called 'draining') from a region server to allow regions
3467   * assignments. Load regions onto the server asynchronously if a list of regions is given
3468   * @param server Region server to remove decommission marker from.
3469   */
3470  public void recommissionRegionServer(final ServerName server,
3471      final List<byte[]> encodedRegionNames) throws IOException {
3472    // Remove the server from decommissioned (draining) server list.
3473    String parentZnode = getZooKeeper().getZNodePaths().drainingZNode;
3474    String node = ZNodePaths.joinZNode(parentZnode, server.getServerName());
3475    try {
3476      ZKUtil.deleteNodeFailSilent(getZooKeeper(), node);
3477    } catch (KeeperException ke) {
3478      throw new HBaseIOException(
3479        this.zooKeeper.prefix("Unable to recommission '" + server.getServerName() + "'."), ke);
3480    }
3481    this.serverManager.removeServerFromDrainList(server);
3482
3483    // Load the regions onto the server if we are given a list of regions.
3484    if (encodedRegionNames == null || encodedRegionNames.isEmpty()) {
3485      return;
3486    }
3487    if (!this.serverManager.isServerOnline(server)) {
3488      return;
3489    }
3490    for (byte[] encodedRegionName : encodedRegionNames) {
3491      RegionState regionState =
3492        assignmentManager.getRegionStates().getRegionState(Bytes.toString(encodedRegionName));
3493      if (regionState == null) {
3494        LOG.warn("Unknown region " + Bytes.toStringBinary(encodedRegionName));
3495        continue;
3496      }
3497      RegionInfo hri = regionState.getRegion();
3498      if (server.equals(regionState.getServerName())) {
3499        LOG.info("Skipping move of region " + hri.getRegionNameAsString() +
3500          " because region already assigned to the same server " + server + ".");
3501        continue;
3502      }
3503      RegionPlan rp = new RegionPlan(hri, regionState.getServerName(), server);
3504      this.assignmentManager.moveAsync(rp);
3505    }
3506  }
3507
3508  @Override
3509  public LockManager getLockManager() {
3510    return lockManager;
3511  }
3512
3513  public QuotaObserverChore getQuotaObserverChore() {
3514    return this.quotaObserverChore;
3515  }
3516
3517  public SpaceQuotaSnapshotNotifier getSpaceQuotaSnapshotNotifier() {
3518    return this.spaceQuotaSnapshotNotifier;
3519  }
3520
3521  @SuppressWarnings("unchecked")
3522  private RemoteProcedure<MasterProcedureEnv, ?> getRemoteProcedure(long procId) {
3523    Procedure<?> procedure = procedureExecutor.getProcedure(procId);
3524    if (procedure == null) {
3525      return null;
3526    }
3527    assert procedure instanceof RemoteProcedure;
3528    return (RemoteProcedure<MasterProcedureEnv, ?>) procedure;
3529  }
3530
3531  public void remoteProcedureCompleted(long procId) {
3532    LOG.debug("Remote procedure done, pid={}", procId);
3533    RemoteProcedure<MasterProcedureEnv, ?> procedure = getRemoteProcedure(procId);
3534    if (procedure != null) {
3535      procedure.remoteOperationCompleted(procedureExecutor.getEnvironment());
3536    }
3537  }
3538
3539  public void remoteProcedureFailed(long procId, RemoteProcedureException error) {
3540    LOG.debug("Remote procedure failed, pid={}", procId, error);
3541    RemoteProcedure<MasterProcedureEnv, ?> procedure = getRemoteProcedure(procId);
3542    if (procedure != null) {
3543      procedure.remoteOperationFailed(procedureExecutor.getEnvironment(), error);
3544    }
3545  }
3546
3547  /**
3548   * Reopen regions provided in the argument
3549   *
3550   * @param tableName The current table name
3551   * @param regionNames The region names of the regions to reopen
3552   * @param nonceGroup Identifier for the source of the request, a client or process
3553   * @param nonce A unique identifier for this operation from the client or process identified by
3554   *   <code>nonceGroup</code> (the source must ensure each operation gets a unique id).
3555   * @return procedure Id
3556   * @throws IOException if reopening region fails while running procedure
3557   */
3558  long reopenRegions(final TableName tableName, final List<byte[]> regionNames,
3559      final long nonceGroup, final long nonce)
3560      throws IOException {
3561
3562    return MasterProcedureUtil
3563      .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
3564
3565        @Override
3566        protected void run() throws IOException {
3567          submitProcedure(new ReopenTableRegionsProcedure(tableName, regionNames));
3568        }
3569
3570        @Override
3571        protected String getDescription() {
3572          return "ReopenTableRegionsProcedure";
3573        }
3574
3575      });
3576
3577  }
3578
3579  @Override
3580  public ReplicationPeerManager getReplicationPeerManager() {
3581    return replicationPeerManager;
3582  }
3583
3584  public HashMap<String, List<Pair<ServerName, ReplicationLoadSource>>>
3585      getReplicationLoad(ServerName[] serverNames) {
3586    List<ReplicationPeerDescription> peerList = this.getReplicationPeerManager().listPeers(null);
3587    if (peerList == null) {
3588      return null;
3589    }
3590    HashMap<String, List<Pair<ServerName, ReplicationLoadSource>>> replicationLoadSourceMap =
3591        new HashMap<>(peerList.size());
3592    peerList.stream()
3593        .forEach(peer -> replicationLoadSourceMap.put(peer.getPeerId(), new ArrayList<>()));
3594    for (ServerName serverName : serverNames) {
3595      List<ReplicationLoadSource> replicationLoadSources =
3596          getServerManager().getLoad(serverName).getReplicationLoadSourceList();
3597      for (ReplicationLoadSource replicationLoadSource : replicationLoadSources) {
3598        List<Pair<ServerName, ReplicationLoadSource>> replicationLoadSourceList =
3599          replicationLoadSourceMap.get(replicationLoadSource.getPeerID());
3600        if (replicationLoadSourceList == null) {
3601          LOG.debug("{} does not exist, but it exists "
3602            + "in znode(/hbase/replication/rs). when the rs restarts, peerId is deleted, so "
3603            + "we just need to ignore it", replicationLoadSource.getPeerID());
3604          continue;
3605        }
3606        replicationLoadSourceList.add(new Pair<>(serverName, replicationLoadSource));
3607      }
3608    }
3609    for (List<Pair<ServerName, ReplicationLoadSource>> loads : replicationLoadSourceMap.values()) {
3610      if (loads.size() > 0) {
3611        loads.sort(Comparator.comparingLong(load -> (-1) * load.getSecond().getReplicationLag()));
3612      }
3613    }
3614    return replicationLoadSourceMap;
3615  }
3616
3617  /**
3618   * This method modifies the master's configuration in order to inject replication-related features
3619   */
3620  @InterfaceAudience.Private
3621  public static void decorateMasterConfiguration(Configuration conf) {
3622    String plugins = conf.get(HBASE_MASTER_LOGCLEANER_PLUGINS);
3623    String cleanerClass = ReplicationLogCleaner.class.getCanonicalName();
3624    if (!plugins.contains(cleanerClass)) {
3625      conf.set(HBASE_MASTER_LOGCLEANER_PLUGINS, plugins + "," + cleanerClass);
3626    }
3627    if (ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf)) {
3628      plugins = conf.get(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
3629      cleanerClass = ReplicationHFileCleaner.class.getCanonicalName();
3630      if (!plugins.contains(cleanerClass)) {
3631        conf.set(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, plugins + "," + cleanerClass);
3632      }
3633    }
3634  }
3635
3636  @Override
3637  public Map<String, ReplicationStatus> getWalGroupsReplicationStatus() {
3638    if (!this.isOnline() || !LoadBalancer.isMasterCanHostUserRegions(conf)) {
3639      return new HashMap<>();
3640    }
3641    return super.getWalGroupsReplicationStatus();
3642  }
3643
3644  public HbckChore getHbckChore() {
3645    return this.hbckChore;
3646  }
3647
3648  public Optional<ServerName> getActiveMaster() {
3649    return activeMasterManager.getActiveMasterServerName();
3650  }
3651
3652  public void runReplicationBarrierCleaner() {
3653    ReplicationBarrierCleaner rbc = this.replicationBarrierCleaner;
3654    if (rbc != null) {
3655      rbc.chore();
3656    }
3657  }
3658
3659  public SnapshotQuotaObserverChore getSnapshotQuotaObserverChore() {
3660    return this.snapshotQuotaChore;
3661  }
3662
3663  public String getClusterId() {
3664    if (activeMaster) {
3665      return super.getClusterId();
3666    }
3667    return cachedClusterId.getFromCacheOrFetch();
3668  }
3669
3670  public MetaRegionLocationCache getMetaRegionLocationCache() {
3671    return this.metaRegionLocationCache;
3672  }
3673
3674  /**
3675   * Get the compaction state of the table
3676   *
3677   * @param tableName The table name
3678   * @return CompactionState Compaction state of the table
3679   */
3680  public CompactionState getCompactionState(final TableName tableName) {
3681    CompactionState compactionState = CompactionState.NONE;
3682    try {
3683      List<RegionInfo> regions =
3684        assignmentManager.getRegionStates().getRegionsOfTable(tableName);
3685      for (RegionInfo regionInfo : regions) {
3686        ServerName serverName =
3687          assignmentManager.getRegionStates().getRegionServerOfRegion(regionInfo);
3688        if (serverName == null) {
3689          continue;
3690        }
3691        ServerMetrics sl = serverManager.getLoad(serverName);
3692        if (sl == null) {
3693          continue;
3694        }
3695        RegionMetrics regionMetrics = sl.getRegionMetrics().get(regionInfo.getRegionName());
3696        if (regionMetrics.getCompactionState() == CompactionState.MAJOR) {
3697          if (compactionState == CompactionState.MINOR) {
3698            compactionState = CompactionState.MAJOR_AND_MINOR;
3699          } else {
3700            compactionState = CompactionState.MAJOR;
3701          }
3702        } else if (regionMetrics.getCompactionState() == CompactionState.MINOR) {
3703          if (compactionState == CompactionState.MAJOR) {
3704            compactionState = CompactionState.MAJOR_AND_MINOR;
3705          } else {
3706            compactionState = CompactionState.MINOR;
3707          }
3708        }
3709      }
3710    } catch (Exception e) {
3711      compactionState = null;
3712      LOG.error("Exception when get compaction state for " + tableName.getNameAsString(), e);
3713    }
3714    return compactionState;
3715  }
3716
3717  @Override
3718  public MetaLocationSyncer getMetaLocationSyncer() {
3719    return metaLocationSyncer;
3720  }
3721}