001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
021import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
022import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
023
024import com.google.protobuf.Descriptors;
025import com.google.protobuf.Service;
026import java.io.IOException;
027import java.io.InterruptedIOException;
028import java.lang.reflect.Constructor;
029import java.lang.reflect.InvocationTargetException;
030import java.net.InetAddress;
031import java.net.InetSocketAddress;
032import java.net.UnknownHostException;
033import java.util.ArrayList;
034import java.util.Arrays;
035import java.util.Collection;
036import java.util.Collections;
037import java.util.Comparator;
038import java.util.EnumSet;
039import java.util.HashMap;
040import java.util.Iterator;
041import java.util.List;
042import java.util.Map;
043import java.util.Map.Entry;
044import java.util.Objects;
045import java.util.Optional;
046import java.util.Set;
047import java.util.concurrent.ExecutionException;
048import java.util.concurrent.Future;
049import java.util.concurrent.TimeUnit;
050import java.util.concurrent.TimeoutException;
051import java.util.concurrent.atomic.AtomicInteger;
052import java.util.function.Function;
053import java.util.regex.Pattern;
054import java.util.stream.Collectors;
055import javax.servlet.ServletException;
056import javax.servlet.http.HttpServlet;
057import javax.servlet.http.HttpServletRequest;
058import javax.servlet.http.HttpServletResponse;
059import org.apache.commons.lang3.StringUtils;
060import org.apache.hadoop.conf.Configuration;
061import org.apache.hadoop.fs.Path;
062import org.apache.hadoop.hbase.ChoreService;
063import org.apache.hadoop.hbase.ClusterId;
064import org.apache.hadoop.hbase.ClusterMetrics;
065import org.apache.hadoop.hbase.ClusterMetrics.Option;
066import org.apache.hadoop.hbase.ClusterMetricsBuilder;
067import org.apache.hadoop.hbase.DoNotRetryIOException;
068import org.apache.hadoop.hbase.HBaseIOException;
069import org.apache.hadoop.hbase.HBaseInterfaceAudience;
070import org.apache.hadoop.hbase.HConstants;
071import org.apache.hadoop.hbase.InvalidFamilyOperationException;
072import org.apache.hadoop.hbase.MasterNotRunningException;
073import org.apache.hadoop.hbase.MetaTableAccessor;
074import org.apache.hadoop.hbase.NamespaceDescriptor;
075import org.apache.hadoop.hbase.PleaseHoldException;
076import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
077import org.apache.hadoop.hbase.ServerName;
078import org.apache.hadoop.hbase.TableName;
079import org.apache.hadoop.hbase.TableNotDisabledException;
080import org.apache.hadoop.hbase.TableNotFoundException;
081import org.apache.hadoop.hbase.UnknownRegionException;
082import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
083import org.apache.hadoop.hbase.client.MasterSwitchType;
084import org.apache.hadoop.hbase.client.RegionInfo;
085import org.apache.hadoop.hbase.client.RegionInfoBuilder;
086import org.apache.hadoop.hbase.client.RegionStatesCount;
087import org.apache.hadoop.hbase.client.TableDescriptor;
088import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
089import org.apache.hadoop.hbase.client.TableState;
090import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
091import org.apache.hadoop.hbase.exceptions.DeserializationException;
092import org.apache.hadoop.hbase.executor.ExecutorType;
093import org.apache.hadoop.hbase.favored.FavoredNodesManager;
094import org.apache.hadoop.hbase.favored.FavoredNodesPromoter;
095import org.apache.hadoop.hbase.http.InfoServer;
096import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
097import org.apache.hadoop.hbase.ipc.RpcServer;
098import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
099import org.apache.hadoop.hbase.log.HBaseMarkers;
100import org.apache.hadoop.hbase.master.MasterRpcServices.BalanceSwitchMode;
101import org.apache.hadoop.hbase.master.assignment.AssignProcedure;
102import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
103import org.apache.hadoop.hbase.master.assignment.MergeTableRegionsProcedure;
104import org.apache.hadoop.hbase.master.assignment.MoveRegionProcedure;
105import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
106import org.apache.hadoop.hbase.master.assignment.RegionStates;
107import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
108import org.apache.hadoop.hbase.master.assignment.UnassignProcedure;
109import org.apache.hadoop.hbase.master.balancer.BalancerChore;
110import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
111import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
112import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
113import org.apache.hadoop.hbase.master.cleaner.DirScanPool;
114import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
115import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
116import org.apache.hadoop.hbase.master.cleaner.ReplicationBarrierCleaner;
117import org.apache.hadoop.hbase.master.locking.LockManager;
118import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan;
119import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
120import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
121import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerChore;
122import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerFactory;
123import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
124import org.apache.hadoop.hbase.master.procedure.DeleteNamespaceProcedure;
125import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure;
126import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure;
127import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure;
128import org.apache.hadoop.hbase.master.procedure.InitMetaProcedure;
129import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
130import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
131import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
132import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
133import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil.NonceProcedureRunnable;
134import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure;
135import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
136import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
137import org.apache.hadoop.hbase.master.procedure.RecoverMetaProcedure;
138import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
139import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure;
140import org.apache.hadoop.hbase.master.replication.AddPeerProcedure;
141import org.apache.hadoop.hbase.master.replication.DisablePeerProcedure;
142import org.apache.hadoop.hbase.master.replication.EnablePeerProcedure;
143import org.apache.hadoop.hbase.master.replication.ModifyPeerProcedure;
144import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure;
145import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
146import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure;
147import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
148import org.apache.hadoop.hbase.master.zksyncer.MasterAddressSyncer;
149import org.apache.hadoop.hbase.master.zksyncer.MetaLocationSyncer;
150import org.apache.hadoop.hbase.mob.MobConstants;
151import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
152import org.apache.hadoop.hbase.monitoring.MonitoredTask;
153import org.apache.hadoop.hbase.monitoring.TaskMonitor;
154import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
155import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager;
156import org.apache.hadoop.hbase.procedure2.LockedResource;
157import org.apache.hadoop.hbase.procedure2.Procedure;
158import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
159import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
160import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
161import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
162import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener;
163import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
164import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
165import org.apache.hadoop.hbase.quotas.MasterQuotasObserver;
166import org.apache.hadoop.hbase.quotas.QuotaObserverChore;
167import org.apache.hadoop.hbase.quotas.QuotaTableUtil;
168import org.apache.hadoop.hbase.quotas.QuotaUtil;
169import org.apache.hadoop.hbase.quotas.SnapshotQuotaObserverChore;
170import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot;
171import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshot.SpaceQuotaStatus;
172import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshotNotifier;
173import org.apache.hadoop.hbase.quotas.SpaceQuotaSnapshotNotifierFactory;
174import org.apache.hadoop.hbase.quotas.SpaceViolationPolicy;
175import org.apache.hadoop.hbase.regionserver.HRegionServer;
176import org.apache.hadoop.hbase.regionserver.RSRpcServices;
177import org.apache.hadoop.hbase.replication.ReplicationException;
178import org.apache.hadoop.hbase.replication.ReplicationLoadSource;
179import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
180import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
181import org.apache.hadoop.hbase.replication.ReplicationUtils;
182import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
183import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
184import org.apache.hadoop.hbase.replication.master.ReplicationPeerConfigUpgrader;
185import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
186import org.apache.hadoop.hbase.security.AccessDeniedException;
187import org.apache.hadoop.hbase.security.SecurityConstants;
188import org.apache.hadoop.hbase.security.UserProvider;
189import org.apache.hadoop.hbase.trace.TraceUtil;
190import org.apache.hadoop.hbase.util.Addressing;
191import org.apache.hadoop.hbase.util.Bytes;
192import org.apache.hadoop.hbase.util.HBaseFsck;
193import org.apache.hadoop.hbase.util.HFileArchiveUtil;
194import org.apache.hadoop.hbase.util.HasThread;
195import org.apache.hadoop.hbase.util.IdLock;
196import org.apache.hadoop.hbase.util.ModifyRegionUtils;
197import org.apache.hadoop.hbase.util.Pair;
198import org.apache.hadoop.hbase.util.RetryCounter;
199import org.apache.hadoop.hbase.util.RetryCounterFactory;
200import org.apache.hadoop.hbase.util.TableDescriptorChecker;
201import org.apache.hadoop.hbase.util.Threads;
202import org.apache.hadoop.hbase.util.VersionInfo;
203import org.apache.hadoop.hbase.zookeeper.LoadBalancerTracker;
204import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
205import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker;
206import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
207import org.apache.hadoop.hbase.zookeeper.ZKUtil;
208import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
209import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
210import org.apache.yetus.audience.InterfaceAudience;
211import org.apache.zookeeper.KeeperException;
212import org.eclipse.jetty.server.Server;
213import org.eclipse.jetty.server.ServerConnector;
214import org.eclipse.jetty.servlet.ServletHolder;
215import org.eclipse.jetty.webapp.WebAppContext;
216import org.slf4j.Logger;
217import org.slf4j.LoggerFactory;
218
219import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
220import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
221import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
222import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
223
224import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
225import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
226import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
227
228/**
229 * HMaster is the "master server" for HBase. An HBase cluster has one active
230 * master.  If many masters are started, all compete.  Whichever wins goes on to
231 * run the cluster.  All others park themselves in their constructor until
232 * master or cluster shutdown or until the active master loses its lease in
233 * zookeeper.  Thereafter, all running master jostle to take over master role.
234 *
235 * <p>The Master can be asked shutdown the cluster. See {@link #shutdown()}.  In
236 * this case it will tell all regionservers to go down and then wait on them
237 * all reporting in that they are down.  This master will then shut itself down.
238 *
239 * <p>You can also shutdown just this master.  Call {@link #stopMaster()}.
240 *
241 * @see org.apache.zookeeper.Watcher
242 */
243@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
244@SuppressWarnings("deprecation")
245public class HMaster extends HRegionServer implements MasterServices {
246  private static Logger LOG = LoggerFactory.getLogger(HMaster.class);
247
248  /**
249   * Protection against zombie master. Started once Master accepts active responsibility and
250   * starts taking over responsibilities. Allows a finite time window before giving up ownership.
251   */
252  private static class InitializationMonitor extends HasThread {
253    /** The amount of time in milliseconds to sleep before checking initialization status. */
254    public static final String TIMEOUT_KEY = "hbase.master.initializationmonitor.timeout";
255    public static final long TIMEOUT_DEFAULT = TimeUnit.MILLISECONDS.convert(15, TimeUnit.MINUTES);
256
257    /**
258     * When timeout expired and initialization has not complete, call {@link System#exit(int)} when
259     * true, do nothing otherwise.
260     */
261    public static final String HALT_KEY = "hbase.master.initializationmonitor.haltontimeout";
262    public static final boolean HALT_DEFAULT = false;
263
264    private final HMaster master;
265    private final long timeout;
266    private final boolean haltOnTimeout;
267
268    /** Creates a Thread that monitors the {@link #isInitialized()} state. */
269    InitializationMonitor(HMaster master) {
270      super("MasterInitializationMonitor");
271      this.master = master;
272      this.timeout = master.getConfiguration().getLong(TIMEOUT_KEY, TIMEOUT_DEFAULT);
273      this.haltOnTimeout = master.getConfiguration().getBoolean(HALT_KEY, HALT_DEFAULT);
274      this.setDaemon(true);
275    }
276
277    @Override
278    public void run() {
279      try {
280        while (!master.isStopped() && master.isActiveMaster()) {
281          Thread.sleep(timeout);
282          if (master.isInitialized()) {
283            LOG.debug("Initialization completed within allotted tolerance. Monitor exiting.");
284          } else {
285            LOG.error("Master failed to complete initialization after " + timeout + "ms. Please"
286                + " consider submitting a bug report including a thread dump of this process.");
287            if (haltOnTimeout) {
288              LOG.error("Zombie Master exiting. Thread dump to stdout");
289              Threads.printThreadInfo(System.out, "Zombie HMaster");
290              System.exit(-1);
291            }
292          }
293        }
294      } catch (InterruptedException ie) {
295        LOG.trace("InitMonitor thread interrupted. Existing.");
296      }
297    }
298  }
299
300  // MASTER is name of the webapp and the attribute name used stuffing this
301  //instance into web context.
302  public static final String MASTER = "master";
303
304  // Manager and zk listener for master election
305  private final ActiveMasterManager activeMasterManager;
306  // Region server tracker
307  private RegionServerTracker regionServerTracker;
308  // Draining region server tracker
309  private DrainingServerTracker drainingServerTracker;
310  // Tracker for load balancer state
311  LoadBalancerTracker loadBalancerTracker;
312  // Tracker for meta location, if any client ZK quorum specified
313  MetaLocationSyncer metaLocationSyncer;
314  // Tracker for active master location, if any client ZK quorum specified
315  MasterAddressSyncer masterAddressSyncer;
316
317  // Tracker for split and merge state
318  private SplitOrMergeTracker splitOrMergeTracker;
319
320  // Tracker for region normalizer state
321  private RegionNormalizerTracker regionNormalizerTracker;
322
323  private ClusterSchemaService clusterSchemaService;
324
325  public static final String HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS =
326    "hbase.master.wait.on.service.seconds";
327  public static final int DEFAULT_HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS = 5 * 60;
328
329  // Metrics for the HMaster
330  final MetricsMaster metricsMaster;
331  // file system manager for the master FS operations
332  private MasterFileSystem fileSystemManager;
333  private MasterWalManager walManager;
334
335  // manager to manage procedure-based WAL splitting, can be null if current
336  // is zk-based WAL splitting. SplitWALManager will replace SplitLogManager
337  // and MasterWalManager, which means zk-based WAL splitting code will be
338  // useless after we switch to the procedure-based one. our eventual goal
339  // is to remove all the zk-based WAL splitting code.
340  private SplitWALManager splitWALManager;
341
342  // server manager to deal with region server info
343  private volatile ServerManager serverManager;
344
345  // manager of assignment nodes in zookeeper
346  private AssignmentManager assignmentManager;
347
348  // manager of replication
349  private ReplicationPeerManager replicationPeerManager;
350
351  // buffer for "fatal error" notices from region servers
352  // in the cluster. This is only used for assisting
353  // operations/debugging.
354  MemoryBoundedLogMessageBuffer rsFatals;
355
356  // flag set after we become the active master (used for testing)
357  private volatile boolean activeMaster = false;
358
359  // flag set after we complete initialization once active
360  private final ProcedureEvent<?> initialized = new ProcedureEvent<>("master initialized");
361
362  // flag set after master services are started,
363  // initialization may have not completed yet.
364  volatile boolean serviceStarted = false;
365
366  // Maximum time we should run balancer for
367  private final int maxBlancingTime;
368  // Maximum percent of regions in transition when balancing
369  private final double maxRitPercent;
370
371  private final LockManager lockManager = new LockManager(this);
372
373  private LoadBalancer balancer;
374  private RegionNormalizer normalizer;
375  private BalancerChore balancerChore;
376  private RegionNormalizerChore normalizerChore;
377  private ClusterStatusChore clusterStatusChore;
378  private ClusterStatusPublisher clusterStatusPublisherChore = null;
379
380  private HbckChore hbckChore;
381  CatalogJanitor catalogJanitorChore;
382  private DirScanPool cleanerPool;
383  private LogCleaner logCleaner;
384  private HFileCleaner hfileCleaner;
385  private ReplicationBarrierCleaner replicationBarrierCleaner;
386  private ExpiredMobFileCleanerChore expiredMobFileCleanerChore;
387  private MobCompactionChore mobCompactChore;
388  private MasterMobCompactionThread mobCompactThread;
389  // used to synchronize the mobCompactionStates
390  private final IdLock mobCompactionLock = new IdLock();
391  // save the information of mob compactions in tables.
392  // the key is table name, the value is the number of compactions in that table.
393  private Map<TableName, AtomicInteger> mobCompactionStates = Maps.newConcurrentMap();
394
395  MasterCoprocessorHost cpHost;
396
397  private final boolean preLoadTableDescriptors;
398
399  // Time stamps for when a hmaster became active
400  private long masterActiveTime;
401
402  // Time stamp for when HMaster finishes becoming Active Master
403  private long masterFinishedInitializationTime;
404
405  Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
406
407  // monitor for snapshot of hbase tables
408  SnapshotManager snapshotManager;
409  // monitor for distributed procedures
410  private MasterProcedureManagerHost mpmHost;
411
412  // it is assigned after 'initialized' guard set to true, so should be volatile
413  private volatile MasterQuotaManager quotaManager;
414  private SpaceQuotaSnapshotNotifier spaceQuotaSnapshotNotifier;
415  private QuotaObserverChore quotaObserverChore;
416  private SnapshotQuotaObserverChore snapshotQuotaChore;
417
418  private ProcedureExecutor<MasterProcedureEnv> procedureExecutor;
419  private WALProcedureStore procedureStore;
420
421  // handle table states
422  private TableStateManager tableStateManager;
423
424  private long splitPlanCount;
425  private long mergePlanCount;
426
427  /* Handle favored nodes information */
428  private FavoredNodesManager favoredNodesManager;
429
430  /** jetty server for master to redirect requests to regionserver infoServer */
431  private Server masterJettyServer;
432
433  // Determine if we should do normal startup or minimal "single-user" mode with no region
434  // servers and no user tables. Useful for repair and recovery of hbase:meta
435  private final boolean maintenanceMode;
436  static final String MAINTENANCE_MODE = "hbase.master.maintenance_mode";
437
438  public static class RedirectServlet extends HttpServlet {
439    private static final long serialVersionUID = 2894774810058302473L;
440    private final int regionServerInfoPort;
441    private final String regionServerHostname;
442
443    /**
444     * @param infoServer that we're trying to send all requests to
445     * @param hostname may be null. if given, will be used for redirects instead of host from client.
446     */
447    public RedirectServlet(InfoServer infoServer, String hostname) {
448       regionServerInfoPort = infoServer.getPort();
449       regionServerHostname = hostname;
450    }
451
452    @Override
453    public void doGet(HttpServletRequest request,
454        HttpServletResponse response) throws ServletException, IOException {
455      String redirectHost = regionServerHostname;
456      if(redirectHost == null) {
457        redirectHost = request.getServerName();
458        if(!Addressing.isLocalAddress(InetAddress.getByName(redirectHost))) {
459          LOG.warn("Couldn't resolve '" + redirectHost + "' as an address local to this node and '" +
460              MASTER_HOSTNAME_KEY + "' is not set; client will get an HTTP 400 response. If " +
461              "your HBase deployment relies on client accessible names that the region server process " +
462              "can't resolve locally, then you should set the previously mentioned configuration variable " +
463              "to an appropriate hostname.");
464          // no sending client provided input back to the client, so the goal host is just in the logs.
465          response.sendError(400, "Request was to a host that I can't resolve for any of the network interfaces on " +
466              "this node. If this is due to an intermediary such as an HTTP load balancer or other proxy, your HBase " +
467              "administrator can set '" + MASTER_HOSTNAME_KEY + "' to point to the correct hostname.");
468          return;
469        }
470      }
471      // TODO this scheme should come from looking at the scheme registered in the infoserver's http server for the
472      // host and port we're using, but it's buried way too deep to do that ATM.
473      String redirectUrl = request.getScheme() + "://"
474        + redirectHost + ":" + regionServerInfoPort
475        + request.getRequestURI();
476      response.sendRedirect(redirectUrl);
477    }
478  }
479
480  /**
481   * Initializes the HMaster. The steps are as follows:
482   * <p>
483   * <ol>
484   * <li>Initialize the local HRegionServer
485   * <li>Start the ActiveMasterManager.
486   * </ol>
487   * <p>
488   * Remaining steps of initialization occur in
489   * #finishActiveMasterInitialization(MonitoredTask) after
490   * the master becomes the active one.
491   */
492  public HMaster(final Configuration conf)
493      throws IOException, KeeperException {
494    super(conf);
495    TraceUtil.initTracer(conf);
496    try {
497      if (conf.getBoolean(MAINTENANCE_MODE, false)) {
498        LOG.info("Detected {}=true via configuration.", MAINTENANCE_MODE);
499        maintenanceMode = true;
500      } else if (Boolean.getBoolean(MAINTENANCE_MODE)) {
501        LOG.info("Detected {}=true via environment variables.", MAINTENANCE_MODE);
502        maintenanceMode = true;
503      } else {
504        maintenanceMode = false;
505      }
506
507      this.rsFatals = new MemoryBoundedLogMessageBuffer(
508          conf.getLong("hbase.master.buffer.for.rs.fatals", 1 * 1024 * 1024));
509      LOG.info("hbase.rootdir=" + getRootDir() +
510          ", hbase.cluster.distributed=" + this.conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, false));
511
512      // Disable usage of meta replicas in the master
513      this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
514
515      decorateMasterConfiguration(this.conf);
516
517      // Hack! Maps DFSClient => Master for logs.  HDFS made this
518      // config param for task trackers, but we can piggyback off of it.
519      if (this.conf.get("mapreduce.task.attempt.id") == null) {
520        this.conf.set("mapreduce.task.attempt.id", "hb_m_" + this.serverName.toString());
521      }
522
523      this.metricsMaster = new MetricsMaster(new MetricsMasterWrapperImpl(this));
524
525      // preload table descriptor at startup
526      this.preLoadTableDescriptors = conf.getBoolean("hbase.master.preload.tabledescriptors", true);
527
528      this.maxBlancingTime = getMaxBalancingTime();
529      this.maxRitPercent = conf.getDouble(HConstants.HBASE_MASTER_BALANCER_MAX_RIT_PERCENT,
530          HConstants.DEFAULT_HBASE_MASTER_BALANCER_MAX_RIT_PERCENT);
531
532      // Do we publish the status?
533
534      boolean shouldPublish = conf.getBoolean(HConstants.STATUS_PUBLISHED,
535          HConstants.STATUS_PUBLISHED_DEFAULT);
536      Class<? extends ClusterStatusPublisher.Publisher> publisherClass =
537          conf.getClass(ClusterStatusPublisher.STATUS_PUBLISHER_CLASS,
538              ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS,
539              ClusterStatusPublisher.Publisher.class);
540
541      if (shouldPublish) {
542        if (publisherClass == null) {
543          LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
544              ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS +
545              " is not set - not publishing status");
546        } else {
547          clusterStatusPublisherChore = new ClusterStatusPublisher(this, conf, publisherClass);
548          getChoreService().scheduleChore(clusterStatusPublisherChore);
549        }
550      }
551
552      // Some unit tests don't need a cluster, so no zookeeper at all
553      if (!conf.getBoolean("hbase.testing.nocluster", false)) {
554        this.activeMasterManager = new ActiveMasterManager(zooKeeper, this.serverName, this);
555      } else {
556        this.activeMasterManager = null;
557      }
558    } catch (Throwable t) {
559      // Make sure we log the exception. HMaster is often started via reflection and the
560      // cause of failed startup is lost.
561      LOG.error("Failed construction of Master", t);
562      throw t;
563    }
564  }
565
566  @Override
567  protected String getUseThisHostnameInstead(Configuration conf) {
568    return conf.get(MASTER_HOSTNAME_KEY);
569  }
570
571  // Main run loop. Calls through to the regionserver run loop AFTER becoming active Master; will
572  // block in here until then.
573  @Override
574  public void run() {
575    try {
576      if (!conf.getBoolean("hbase.testing.nocluster", false)) {
577        Threads.setDaemonThreadRunning(new Thread(() -> {
578          try {
579            int infoPort = putUpJettyServer();
580            startActiveMasterManager(infoPort);
581          } catch (Throwable t) {
582            // Make sure we log the exception.
583            String error = "Failed to become Active Master";
584            LOG.error(error, t);
585            // Abort should have been called already.
586            if (!isAborted()) {
587              abort(error, t);
588            }
589          }
590        }), getName() + ":becomeActiveMaster");
591      }
592      // Fall in here even if we have been aborted. Need to run the shutdown services and
593      // the super run call will do this for us.
594      super.run();
595    } finally {
596      if (this.clusterSchemaService != null) {
597        // If on way out, then we are no longer active master.
598        this.clusterSchemaService.stopAsync();
599        try {
600          this.clusterSchemaService.awaitTerminated(
601              getConfiguration().getInt(HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS,
602              DEFAULT_HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS), TimeUnit.SECONDS);
603        } catch (TimeoutException te) {
604          LOG.warn("Failed shutdown of clusterSchemaService", te);
605        }
606      }
607      this.activeMaster = false;
608    }
609  }
610
611  // return the actual infoPort, -1 means disable info server.
612  private int putUpJettyServer() throws IOException {
613    if (!conf.getBoolean("hbase.master.infoserver.redirect", true)) {
614      return -1;
615    }
616    final int infoPort = conf.getInt("hbase.master.info.port.orig",
617      HConstants.DEFAULT_MASTER_INFOPORT);
618    // -1 is for disabling info server, so no redirecting
619    if (infoPort < 0 || infoServer == null) {
620      return -1;
621    }
622    if(infoPort == infoServer.getPort()) {
623      return infoPort;
624    }
625    final String addr = conf.get("hbase.master.info.bindAddress", "0.0.0.0");
626    if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) {
627      String msg =
628          "Failed to start redirecting jetty server. Address " + addr
629              + " does not belong to this host. Correct configuration parameter: "
630              + "hbase.master.info.bindAddress";
631      LOG.error(msg);
632      throw new IOException(msg);
633    }
634
635    // TODO I'm pretty sure we could just add another binding to the InfoServer run by
636    // the RegionServer and have it run the RedirectServlet instead of standing up
637    // a second entire stack here.
638    masterJettyServer = new Server();
639    final ServerConnector connector = new ServerConnector(masterJettyServer);
640    connector.setHost(addr);
641    connector.setPort(infoPort);
642    masterJettyServer.addConnector(connector);
643    masterJettyServer.setStopAtShutdown(true);
644
645    final String redirectHostname =
646        StringUtils.isBlank(useThisHostnameInstead) ? null : useThisHostnameInstead;
647
648    final RedirectServlet redirect = new RedirectServlet(infoServer, redirectHostname);
649    final WebAppContext context = new WebAppContext(null, "/", null, null, null, null, WebAppContext.NO_SESSIONS);
650    context.addServlet(new ServletHolder(redirect), "/*");
651    context.setServer(masterJettyServer);
652
653    try {
654      masterJettyServer.start();
655    } catch (Exception e) {
656      throw new IOException("Failed to start redirecting jetty server", e);
657    }
658    return connector.getLocalPort();
659  }
660
661  @Override
662  protected Function<TableDescriptorBuilder, TableDescriptorBuilder> getMetaTableObserver() {
663    return builder -> builder.setRegionReplication(conf.getInt(HConstants.META_REPLICAS_NUM, HConstants.DEFAULT_META_REPLICA_NUM));
664  }
665  /**
666   * For compatibility, if failed with regionserver credentials, try the master one
667   */
668  @Override
669  protected void login(UserProvider user, String host) throws IOException {
670    try {
671      super.login(user, host);
672    } catch (IOException ie) {
673      user.login(SecurityConstants.MASTER_KRB_KEYTAB_FILE,
674              SecurityConstants.MASTER_KRB_PRINCIPAL, host);
675    }
676  }
677
678  /**
679   * If configured to put regions on active master,
680   * wait till a backup master becomes active.
681   * Otherwise, loop till the server is stopped or aborted.
682   */
683  @Override
684  protected void waitForMasterActive(){
685    if (maintenanceMode) {
686      return;
687    }
688    boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(conf);
689    while (!(tablesOnMaster && activeMaster) && !isStopped() && !isAborted()) {
690      sleeper.sleep();
691    }
692  }
693
694  @VisibleForTesting
695  public MasterRpcServices getMasterRpcServices() {
696    return (MasterRpcServices)rpcServices;
697  }
698
699  public boolean balanceSwitch(final boolean b) throws IOException {
700    return getMasterRpcServices().switchBalancer(b, BalanceSwitchMode.ASYNC);
701  }
702
703  @Override
704  protected String getProcessName() {
705    return MASTER;
706  }
707
708  @Override
709  protected boolean canCreateBaseZNode() {
710    return true;
711  }
712
713  @Override
714  protected boolean canUpdateTableDescriptor() {
715    return true;
716  }
717
718  @Override
719  protected RSRpcServices createRpcServices() throws IOException {
720    return new MasterRpcServices(this);
721  }
722
723  @Override
724  protected void configureInfoServer() {
725    infoServer.addServlet("master-status", "/master-status", MasterStatusServlet.class);
726    infoServer.setAttribute(MASTER, this);
727    if (LoadBalancer.isTablesOnMaster(conf)) {
728      super.configureInfoServer();
729    }
730  }
731
732  @Override
733  protected Class<? extends HttpServlet> getDumpServlet() {
734    return MasterDumpServlet.class;
735  }
736
737  @Override
738  public MetricsMaster getMasterMetrics() {
739    return metricsMaster;
740  }
741
742  /**
743   * <p>
744   * Initialize all ZK based system trackers. But do not include {@link RegionServerTracker}, it
745   * should have already been initialized along with {@link ServerManager}.
746   * </p>
747   * <p>
748   * Will be overridden in tests.
749   * </p>
750   */
751  @VisibleForTesting
752  protected void initializeZKBasedSystemTrackers()
753      throws IOException, InterruptedException, KeeperException, ReplicationException {
754    this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
755    this.normalizer = RegionNormalizerFactory.getRegionNormalizer(conf);
756    this.normalizer.setMasterServices(this);
757    this.normalizer.setMasterRpcServices((MasterRpcServices)rpcServices);
758    this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this);
759    this.loadBalancerTracker.start();
760
761    this.regionNormalizerTracker = new RegionNormalizerTracker(zooKeeper, this);
762    this.regionNormalizerTracker.start();
763
764    this.splitOrMergeTracker = new SplitOrMergeTracker(zooKeeper, conf, this);
765    this.splitOrMergeTracker.start();
766
767    this.replicationPeerManager = ReplicationPeerManager.create(zooKeeper, conf);
768
769    this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, this.serverManager);
770    this.drainingServerTracker.start();
771
772    String clientQuorumServers = conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM);
773    boolean clientZkObserverMode = conf.getBoolean(HConstants.CLIENT_ZOOKEEPER_OBSERVER_MODE,
774      HConstants.DEFAULT_CLIENT_ZOOKEEPER_OBSERVER_MODE);
775    if (clientQuorumServers != null && !clientZkObserverMode) {
776      // we need to take care of the ZK information synchronization
777      // if given client ZK are not observer nodes
778      ZKWatcher clientZkWatcher = new ZKWatcher(conf,
779          getProcessName() + ":" + rpcServices.getSocketAddress().getPort() + "-clientZK", this,
780          false, true);
781      this.metaLocationSyncer = new MetaLocationSyncer(zooKeeper, clientZkWatcher, this);
782      this.metaLocationSyncer.start();
783      this.masterAddressSyncer = new MasterAddressSyncer(zooKeeper, clientZkWatcher, this);
784      this.masterAddressSyncer.start();
785      // set cluster id is a one-go effort
786      ZKClusterId.setClusterId(clientZkWatcher, fileSystemManager.getClusterId());
787    }
788
789    // Set the cluster as up.  If new RSs, they'll be waiting on this before
790    // going ahead with their startup.
791    boolean wasUp = this.clusterStatusTracker.isClusterUp();
792    if (!wasUp) this.clusterStatusTracker.setClusterUp();
793
794    LOG.info("Active/primary master=" + this.serverName +
795        ", sessionid=0x" +
796        Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) +
797        ", setting cluster-up flag (Was=" + wasUp + ")");
798
799    // create/initialize the snapshot manager and other procedure managers
800    this.snapshotManager = new SnapshotManager();
801    this.mpmHost = new MasterProcedureManagerHost();
802    this.mpmHost.register(this.snapshotManager);
803    this.mpmHost.register(new MasterFlushTableProcedureManager());
804    this.mpmHost.loadProcedures(conf);
805    this.mpmHost.initialize(this, this.metricsMaster);
806  }
807
808  private static final ImmutableSet<Class<? extends Procedure>> UNSUPPORTED_PROCEDURES =
809    ImmutableSet.of(RecoverMetaProcedure.class, AssignProcedure.class, UnassignProcedure.class,
810      MoveRegionProcedure.class);
811
812  /**
813   * In HBASE-20811, we have introduced a new TRSP to assign/unassign/move regions, and it is
814   * incompatible with the old AssignProcedure/UnassignProcedure/MoveRegionProcedure. So we need to
815   * make sure that there are none these procedures when upgrading. If there are, the master will
816   * quit, you need to go back to the old version to finish these procedures first before upgrading.
817   */
818  private void checkUnsupportedProcedure(
819      Map<Class<? extends Procedure>, List<Procedure<MasterProcedureEnv>>> procsByType)
820      throws HBaseIOException {
821    // Confirm that we do not have unfinished assign/unassign related procedures. It is not easy to
822    // support both the old assign/unassign procedures and the new TransitRegionStateProcedure as
823    // there will be conflict in the code for AM. We should finish all these procedures before
824    // upgrading.
825    for (Class<? extends Procedure> clazz : UNSUPPORTED_PROCEDURES) {
826      List<Procedure<MasterProcedureEnv>> procs = procsByType.get(clazz);
827      if (procs != null) {
828        LOG.error(
829          "Unsupported procedure type {} found, please rollback your master to the old" +
830            " version to finish them, and then try to upgrade again. The full procedure list: {}",
831          clazz, procs);
832        throw new HBaseIOException("Unsupported procedure type " + clazz + " found");
833      }
834    }
835    // A special check for SCP, as we do not support RecoverMetaProcedure any more so we need to
836    // make sure that no one will try to schedule it but SCP does have a state which will schedule
837    // it.
838    if (procsByType.getOrDefault(ServerCrashProcedure.class, Collections.emptyList()).stream()
839      .map(p -> (ServerCrashProcedure) p).anyMatch(ServerCrashProcedure::isInRecoverMetaState)) {
840      LOG.error("At least one ServerCrashProcedure is going to schedule a RecoverMetaProcedure," +
841        " which is not supported any more. Please rollback your master to the old version to" +
842        " finish them, and then try to upgrade again.");
843      throw new HBaseIOException("Unsupported procedure state found for ServerCrashProcedure");
844    }
845  }
846
847  // Will be overriden in test to inject customized AssignmentManager
848  @VisibleForTesting
849  protected AssignmentManager createAssignmentManager(MasterServices master) {
850    return new AssignmentManager(master);
851  }
852
853  /**
854   * Finish initialization of HMaster after becoming the primary master.
855   * <p/>
856   * The startup order is a bit complicated but very important, do not change it unless you know
857   * what you are doing.
858   * <ol>
859   * <li>Initialize file system based components - file system manager, wal manager, table
860   * descriptors, etc</li>
861   * <li>Publish cluster id</li>
862   * <li>Here comes the most complicated part - initialize server manager, assignment manager and
863   * region server tracker
864   * <ol type='i'>
865   * <li>Create server manager</li>
866   * <li>Create procedure executor, load the procedures, but do not start workers. We will start it
867   * later after we finish scheduling SCPs to avoid scheduling duplicated SCPs for the same
868   * server</li>
869   * <li>Create assignment manager and start it, load the meta region state, but do not load data
870   * from meta region</li>
871   * <li>Start region server tracker, construct the online servers set and find out dead servers and
872   * schedule SCP for them. The online servers will be constructed by scanning zk, and we will also
873   * scan the wal directory to find out possible live region servers, and the differences between
874   * these two sets are the dead servers</li>
875   * </ol>
876   * </li>
877   * <li>If this is a new deploy, schedule a InitMetaProcedure to initialize meta</li>
878   * <li>Start necessary service threads - balancer, catalog janior, executor services, and also the
879   * procedure executor, etc. Notice that the balancer must be created first as assignment manager
880   * may use it when assigning regions.</li>
881   * <li>Wait for meta to be initialized if necesssary, start table state manager.</li>
882   * <li>Wait for enough region servers to check-in</li>
883   * <li>Let assignment manager load data from meta and construct region states</li>
884   * <li>Start all other things such as chore services, etc</li>
885   * </ol>
886   * <p/>
887   * Notice that now we will not schedule a special procedure to make meta online(unless the first
888   * time where meta has not been created yet), we will rely on SCP to bring meta online.
889   */
890  private void finishActiveMasterInitialization(MonitoredTask status)
891      throws IOException, InterruptedException, KeeperException, ReplicationException {
892    /*
893     * We are active master now... go initialize components we need to run.
894     */
895    status.setStatus("Initializing Master file system");
896
897    this.masterActiveTime = System.currentTimeMillis();
898    // TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring.
899
900    // Only initialize the MemStoreLAB when master carry table
901    if (LoadBalancer.isTablesOnMaster(conf)) {
902      initializeMemStoreChunkCreator();
903    }
904    this.fileSystemManager = new MasterFileSystem(conf);
905    this.walManager = new MasterWalManager(this);
906
907    // enable table descriptors cache
908    this.tableDescriptors.setCacheOn();
909
910    // warm-up HTDs cache on master initialization
911    if (preLoadTableDescriptors) {
912      status.setStatus("Pre-loading table descriptors");
913      this.tableDescriptors.getAll();
914    }
915
916    // Publish cluster ID; set it in Master too. The superclass RegionServer does this later but
917    // only after it has checked in with the Master. At least a few tests ask Master for clusterId
918    // before it has called its run method and before RegionServer has done the reportForDuty.
919    ClusterId clusterId = fileSystemManager.getClusterId();
920    status.setStatus("Publishing Cluster ID " + clusterId + " in ZooKeeper");
921    ZKClusterId.setClusterId(this.zooKeeper, fileSystemManager.getClusterId());
922    this.clusterId = clusterId.toString();
923
924    // Precaution. Put in place the old hbck1 lock file to fence out old hbase1s running their
925    // hbck1s against an hbase2 cluster; it could do damage. To skip this behavior, set
926    // hbase.write.hbck1.lock.file to false.
927    if (this.conf.getBoolean("hbase.write.hbck1.lock.file", true)) {
928      HBaseFsck.checkAndMarkRunningHbck(this.conf,
929          HBaseFsck.createLockRetryCounterFactory(this.conf).create());
930    }
931
932    status.setStatus("Initialize ServerManager and schedule SCP for crash servers");
933    this.serverManager = createServerManager(this);
934    if (!conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK,
935      DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) {
936      this.splitWALManager = new SplitWALManager(this);
937    }
938    createProcedureExecutor();
939    @SuppressWarnings("rawtypes")
940    Map<Class<? extends Procedure>, List<Procedure<MasterProcedureEnv>>> procsByType =
941      procedureExecutor.getActiveProceduresNoCopy().stream()
942        .collect(Collectors.groupingBy(p -> p.getClass()));
943
944    checkUnsupportedProcedure(procsByType);
945
946    // Create Assignment Manager
947    this.assignmentManager = createAssignmentManager(this);
948    this.assignmentManager.start();
949    // TODO: TRSP can perform as the sub procedure for other procedures, so even if it is marked as
950    // completed, it could still be in the procedure list. This is a bit strange but is another
951    // story, need to verify the implementation for ProcedureExecutor and ProcedureStore.
952    List<TransitRegionStateProcedure> ritList =
953      procsByType.getOrDefault(TransitRegionStateProcedure.class, Collections.emptyList()).stream()
954        .filter(p -> !p.isFinished()).map(p -> (TransitRegionStateProcedure) p)
955        .collect(Collectors.toList());
956    this.assignmentManager.setupRIT(ritList);
957
958    // Start RegionServerTracker with listing of servers found with exiting SCPs -- these should
959    // be registered in the deadServers set -- and with the list of servernames out on the
960    // filesystem that COULD BE 'alive' (we'll schedule SCPs for each and let SCP figure it out).
961    // We also pass dirs that are already 'splitting'... so we can do some checks down in tracker.
962    // TODO: Generate the splitting and live Set in one pass instead of two as we currently do.
963    this.regionServerTracker = new RegionServerTracker(zooKeeper, this, this.serverManager);
964    this.regionServerTracker.start(
965      procsByType.getOrDefault(ServerCrashProcedure.class, Collections.emptyList()).stream()
966        .map(p -> (ServerCrashProcedure) p).map(p -> p.getServerName()).collect(Collectors.toSet()),
967      walManager.getLiveServersFromWALDir(), walManager.getSplittingServersFromWALDir());
968    // This manager will be started AFTER hbase:meta is confirmed on line.
969    // hbase.mirror.table.state.to.zookeeper is so hbase1 clients can connect. They read table
970    // state from zookeeper while hbase2 reads it from hbase:meta. Disable if no hbase1 clients.
971    this.tableStateManager =
972      this.conf.getBoolean(MirroringTableStateManager.MIRROR_TABLE_STATE_TO_ZK_KEY, true)
973        ?
974        new MirroringTableStateManager(this):
975        new TableStateManager(this);
976
977    status.setStatus("Initializing ZK system trackers");
978    initializeZKBasedSystemTrackers();
979    // Set ourselves as active Master now our claim has succeeded up in zk.
980    this.activeMaster = true;
981
982    // Start the Zombie master detector after setting master as active, see HBASE-21535
983    Thread zombieDetector = new Thread(new InitializationMonitor(this),
984        "ActiveMasterInitializationMonitor-" + System.currentTimeMillis());
985    zombieDetector.setDaemon(true);
986    zombieDetector.start();
987
988    // This is for backwards compatibility
989    // See HBASE-11393
990    status.setStatus("Update TableCFs node in ZNode");
991    ReplicationPeerConfigUpgrader tableCFsUpdater =
992        new ReplicationPeerConfigUpgrader(zooKeeper, conf);
993    tableCFsUpdater.copyTableCFs();
994
995    if (!maintenanceMode) {
996      // Add the Observer to delete quotas on table deletion before starting all CPs by
997      // default with quota support, avoiding if user specifically asks to not load this Observer.
998      if (QuotaUtil.isQuotaEnabled(conf)) {
999        updateConfigurationForQuotasObserver(conf);
1000      }
1001      // initialize master side coprocessors before we start handling requests
1002      status.setStatus("Initializing master coprocessors");
1003      this.cpHost = new MasterCoprocessorHost(this, this.conf);
1004    }
1005
1006    // Checking if meta needs initializing.
1007    status.setStatus("Initializing meta table if this is a new deploy");
1008    InitMetaProcedure initMetaProc = null;
1009    // Print out state of hbase:meta on startup; helps debugging.
1010    RegionState rs = this.assignmentManager.getRegionStates().
1011        getRegionState(RegionInfoBuilder.FIRST_META_REGIONINFO);
1012    LOG.info("hbase:meta {}", rs);
1013    if (rs.isOffline()) {
1014      Optional<InitMetaProcedure> optProc = procedureExecutor.getProcedures().stream()
1015        .filter(p -> p instanceof InitMetaProcedure).map(o -> (InitMetaProcedure) o).findAny();
1016      initMetaProc = optProc.orElseGet(() -> {
1017        // schedule an init meta procedure if meta has not been deployed yet
1018        InitMetaProcedure temp = new InitMetaProcedure();
1019        procedureExecutor.submitProcedure(temp);
1020        return temp;
1021      });
1022    }
1023    if (this.balancer instanceof FavoredNodesPromoter) {
1024      favoredNodesManager = new FavoredNodesManager(this);
1025    }
1026
1027    // initialize load balancer
1028    this.balancer.setMasterServices(this);
1029    this.balancer.setClusterMetrics(getClusterMetricsWithoutCoprocessor());
1030    this.balancer.initialize();
1031
1032    // start up all service threads.
1033    status.setStatus("Initializing master service threads");
1034    startServiceThreads();
1035    // wait meta to be initialized after we start procedure executor
1036    if (initMetaProc != null) {
1037      initMetaProc.await();
1038    }
1039    // Wake up this server to check in
1040    sleeper.skipSleepCycle();
1041
1042    // Wait for region servers to report in.
1043    // With this as part of master initialization, it precludes our being able to start a single
1044    // server that is both Master and RegionServer. Needs more thought. TODO.
1045    String statusStr = "Wait for region servers to report in";
1046    status.setStatus(statusStr);
1047    LOG.info(Objects.toString(status));
1048    waitForRegionServers(status);
1049
1050    // Check if master is shutting down because issue initializing regionservers or balancer.
1051    if (isStopped()) {
1052      return;
1053    }
1054
1055    status.setStatus("Starting assignment manager");
1056    // FIRST HBASE:META READ!!!!
1057    // The below cannot make progress w/o hbase:meta being online.
1058    // This is the FIRST attempt at going to hbase:meta. Meta on-lining is going on in background
1059    // as procedures run -- in particular SCPs for crashed servers... One should put up hbase:meta
1060    // if it is down. It may take a while to come online. So, wait here until meta if for sure
1061    // available. That's what waitForMetaOnline does.
1062    if (!waitForMetaOnline()) {
1063      return;
1064    }
1065    this.assignmentManager.joinCluster();
1066    // The below depends on hbase:meta being online.
1067    this.tableStateManager.start();
1068    // Below has to happen after tablestatemanager has started in the case where this hbase-2.x
1069    // is being started over an hbase-1.x dataset. tablestatemanager runs a migration as part
1070    // of its 'start' moving table state from zookeeper to hbase:meta. This migration needs to
1071    // complete before we do this next step processing offline regions else it fails reading
1072    // table states messing up master launch (namespace table, etc., are not assigned).
1073    this.assignmentManager.processOfflineRegions();
1074    // Initialize after meta is up as below scans meta
1075    if (favoredNodesManager != null && !maintenanceMode) {
1076      SnapshotOfRegionAssignmentFromMeta snapshotOfRegionAssignment =
1077          new SnapshotOfRegionAssignmentFromMeta(getConnection());
1078      snapshotOfRegionAssignment.initialize();
1079      favoredNodesManager.initialize(snapshotOfRegionAssignment);
1080    }
1081
1082    // set cluster status again after user regions are assigned
1083    this.balancer.setClusterMetrics(getClusterMetricsWithoutCoprocessor());
1084
1085    // Start balancer and meta catalog janitor after meta and regions have been assigned.
1086    status.setStatus("Starting balancer and catalog janitor");
1087    this.clusterStatusChore = new ClusterStatusChore(this, balancer);
1088    getChoreService().scheduleChore(clusterStatusChore);
1089    this.balancerChore = new BalancerChore(this);
1090    getChoreService().scheduleChore(balancerChore);
1091    this.normalizerChore = new RegionNormalizerChore(this);
1092    getChoreService().scheduleChore(normalizerChore);
1093    this.catalogJanitorChore = new CatalogJanitor(this);
1094    getChoreService().scheduleChore(catalogJanitorChore);
1095    this.hbckChore = new HbckChore(this);
1096    getChoreService().scheduleChore(hbckChore);
1097
1098    // NAMESPACE READ!!!!
1099    // Here we expect hbase:namespace to be online. See inside initClusterSchemaService.
1100    // TODO: Fix this. Namespace is a pain being a sort-of system table. Fold it in to hbase:meta.
1101    // isNamespace does like isMeta and waits until namespace is onlined before allowing progress.
1102    if (!waitForNamespaceOnline()) {
1103      return;
1104    }
1105    status.setStatus("Starting cluster schema service");
1106    initClusterSchemaService();
1107
1108    if (this.cpHost != null) {
1109      try {
1110        this.cpHost.preMasterInitialization();
1111      } catch (IOException e) {
1112        LOG.error("Coprocessor preMasterInitialization() hook failed", e);
1113      }
1114    }
1115
1116    status.markComplete("Initialization successful");
1117    LOG.info(String.format("Master has completed initialization %.3fsec",
1118       (System.currentTimeMillis() - masterActiveTime) / 1000.0f));
1119    this.masterFinishedInitializationTime = System.currentTimeMillis();
1120    configurationManager.registerObserver(this.balancer);
1121    configurationManager.registerObserver(this.cleanerPool);
1122    configurationManager.registerObserver(this.hfileCleaner);
1123    configurationManager.registerObserver(this.logCleaner);
1124    // Set master as 'initialized'.
1125    setInitialized(true);
1126
1127    if (maintenanceMode) {
1128      LOG.info("Detected repair mode, skipping final initialization steps.");
1129      return;
1130    }
1131
1132    assignmentManager.checkIfShouldMoveSystemRegionAsync();
1133    status.setStatus("Assign meta replicas");
1134    MasterMetaBootstrap metaBootstrap = createMetaBootstrap();
1135    metaBootstrap.assignMetaReplicas();
1136    status.setStatus("Starting quota manager");
1137    initQuotaManager();
1138    if (QuotaUtil.isQuotaEnabled(conf)) {
1139      // Create the quota snapshot notifier
1140      spaceQuotaSnapshotNotifier = createQuotaSnapshotNotifier();
1141      spaceQuotaSnapshotNotifier.initialize(getClusterConnection());
1142      this.quotaObserverChore = new QuotaObserverChore(this, getMasterMetrics());
1143      // Start the chore to read the region FS space reports and act on them
1144      getChoreService().scheduleChore(quotaObserverChore);
1145
1146      this.snapshotQuotaChore = new SnapshotQuotaObserverChore(this, getMasterMetrics());
1147      // Start the chore to read snapshots and add their usage to table/NS quotas
1148      getChoreService().scheduleChore(snapshotQuotaChore);
1149    }
1150
1151    // clear the dead servers with same host name and port of online server because we are not
1152    // removing dead server with same hostname and port of rs which is trying to check in before
1153    // master initialization. See HBASE-5916.
1154    this.serverManager.clearDeadServersWithSameHostNameAndPortOfOnlineServer();
1155
1156    // Check and set the znode ACLs if needed in case we are overtaking a non-secure configuration
1157    status.setStatus("Checking ZNode ACLs");
1158    zooKeeper.checkAndSetZNodeAcls();
1159
1160    status.setStatus("Initializing MOB Cleaner");
1161    initMobCleaner();
1162
1163    status.setStatus("Calling postStartMaster coprocessors");
1164    if (this.cpHost != null) {
1165      // don't let cp initialization errors kill the master
1166      try {
1167        this.cpHost.postStartMaster();
1168      } catch (IOException ioe) {
1169        LOG.error("Coprocessor postStartMaster() hook failed", ioe);
1170      }
1171    }
1172
1173    zombieDetector.interrupt();
1174
1175    /*
1176     * After master has started up, lets do balancer post startup initialization. Since this runs
1177     * in activeMasterManager thread, it should be fine.
1178     */
1179    long start = System.currentTimeMillis();
1180    this.balancer.postMasterStartupInitialize();
1181    if (LOG.isDebugEnabled()) {
1182      LOG.debug("Balancer post startup initialization complete, took " + (
1183          (System.currentTimeMillis() - start) / 1000) + " seconds");
1184    }
1185  }
1186
1187  /**
1188   * Check hbase:meta is up and ready for reading. For use during Master startup only.
1189   * @return True if meta is UP and online and startup can progress. Otherwise, meta is not online
1190   *   and we will hold here until operator intervention.
1191   */
1192  @VisibleForTesting
1193  public boolean waitForMetaOnline() throws InterruptedException {
1194    return isRegionOnline(RegionInfoBuilder.FIRST_META_REGIONINFO);
1195  }
1196
1197  /**
1198   * @return True if region is online and scannable else false if an error or shutdown (Otherwise
1199   *   we just block in here holding up all forward-progess).
1200   */
1201  private boolean isRegionOnline(RegionInfo ri) throws InterruptedException {
1202    RetryCounter rc = null;
1203    while (!isStopped()) {
1204      RegionState rs = this.assignmentManager.getRegionStates().getRegionState(ri);
1205      if (rs.isOpened()) {
1206        if (this.getServerManager().isServerOnline(rs.getServerName())) {
1207          return true;
1208        }
1209      }
1210      // Region is not OPEN.
1211      Optional<Procedure<MasterProcedureEnv>> optProc = this.procedureExecutor.getProcedures().
1212          stream().filter(p -> p instanceof ServerCrashProcedure).findAny();
1213      // TODO: Add a page to refguide on how to do repair. Have this log message point to it.
1214      // Page will talk about loss of edits, how to schedule at least the meta WAL recovery, and
1215      // then how to assign including how to break region lock if one held.
1216      LOG.warn("{} is NOT online; state={}; ServerCrashProcedures={}. Master startup cannot " +
1217          "progress, in holding-pattern until region onlined.",
1218          ri.getRegionNameAsString(), rs, optProc.isPresent());
1219      // Check once-a-minute.
1220      if (rc == null) {
1221        rc = new RetryCounterFactory(1000).create();
1222      }
1223      Threads.sleep(rc.getBackoffTimeAndIncrementAttempts());
1224    }
1225    return false;
1226  }
1227
1228  /**
1229   * Check hbase:namespace table is assigned. If not, startup will hang looking for the ns table
1230   * (TODO: Fix this! NS should not hold-up startup).
1231   * @return True if namespace table is up/online.
1232   */
1233  @VisibleForTesting
1234  public boolean waitForNamespaceOnline() throws InterruptedException {
1235    List<RegionInfo> ris = this.assignmentManager.getRegionStates().
1236        getRegionsOfTable(TableName.NAMESPACE_TABLE_NAME);
1237    if (ris.isEmpty()) {
1238      // If empty, means we've not assigned the namespace table yet... Just return true so startup
1239      // continues and the namespace table gets created.
1240      return true;
1241    }
1242    // Else there are namespace regions up in meta. Ensure they are assigned before we go on.
1243    for (RegionInfo ri: ris) {
1244      isRegionOnline(ri);
1245    }
1246    return true;
1247  }
1248
1249  /**
1250   * Adds the {@code MasterQuotasObserver} to the list of configured Master observers to
1251   * automatically remove quotas for a table when that table is deleted.
1252   */
1253  @VisibleForTesting
1254  public void updateConfigurationForQuotasObserver(Configuration conf) {
1255    // We're configured to not delete quotas on table deletion, so we don't need to add the obs.
1256    if (!conf.getBoolean(
1257          MasterQuotasObserver.REMOVE_QUOTA_ON_TABLE_DELETE,
1258          MasterQuotasObserver.REMOVE_QUOTA_ON_TABLE_DELETE_DEFAULT)) {
1259      return;
1260    }
1261    String[] masterCoprocs = conf.getStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY);
1262    final int length = null == masterCoprocs ? 0 : masterCoprocs.length;
1263    String[] updatedCoprocs = new String[length + 1];
1264    if (length > 0) {
1265      System.arraycopy(masterCoprocs, 0, updatedCoprocs, 0, masterCoprocs.length);
1266    }
1267    updatedCoprocs[length] = MasterQuotasObserver.class.getName();
1268    conf.setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, updatedCoprocs);
1269  }
1270
1271  private void initMobCleaner() {
1272    this.expiredMobFileCleanerChore = new ExpiredMobFileCleanerChore(this);
1273    getChoreService().scheduleChore(expiredMobFileCleanerChore);
1274
1275    int mobCompactionPeriod = conf.getInt(MobConstants.MOB_COMPACTION_CHORE_PERIOD,
1276        MobConstants.DEFAULT_MOB_COMPACTION_CHORE_PERIOD);
1277    this.mobCompactChore = new MobCompactionChore(this, mobCompactionPeriod);
1278    getChoreService().scheduleChore(mobCompactChore);
1279    this.mobCompactThread = new MasterMobCompactionThread(this);
1280  }
1281
1282  /**
1283   * <p>
1284   * Create a {@link MasterMetaBootstrap} instance.
1285   * </p>
1286   * <p>
1287   * Will be overridden in tests.
1288   * </p>
1289   */
1290  @VisibleForTesting
1291  protected MasterMetaBootstrap createMetaBootstrap() {
1292    // We put this out here in a method so can do a Mockito.spy and stub it out
1293    // w/ a mocked up MasterMetaBootstrap.
1294    return new MasterMetaBootstrap(this);
1295  }
1296
1297  /**
1298   * <p>
1299   * Create a {@link ServerManager} instance.
1300   * </p>
1301   * <p>
1302   * Will be overridden in tests.
1303   * </p>
1304   */
1305  @VisibleForTesting
1306  protected ServerManager createServerManager(final MasterServices master) throws IOException {
1307    // We put this out here in a method so can do a Mockito.spy and stub it out
1308    // w/ a mocked up ServerManager.
1309    setupClusterConnection();
1310    return new ServerManager(master);
1311  }
1312
1313  private void waitForRegionServers(final MonitoredTask status)
1314      throws IOException, InterruptedException {
1315    this.serverManager.waitForRegionServers(status);
1316  }
1317
1318  // Will be overridden in tests
1319  @VisibleForTesting
1320  protected void initClusterSchemaService() throws IOException, InterruptedException {
1321    this.clusterSchemaService = new ClusterSchemaServiceImpl(this);
1322    this.clusterSchemaService.startAsync();
1323    try {
1324      this.clusterSchemaService.awaitRunning(getConfiguration().getInt(
1325        HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS,
1326        DEFAULT_HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS), TimeUnit.SECONDS);
1327    } catch (TimeoutException toe) {
1328      throw new IOException("Timedout starting ClusterSchemaService", toe);
1329    }
1330  }
1331
1332  private void initQuotaManager() throws IOException {
1333    MasterQuotaManager quotaManager = new MasterQuotaManager(this);
1334    quotaManager.start();
1335    this.quotaManager = quotaManager;
1336  }
1337
1338  private SpaceQuotaSnapshotNotifier createQuotaSnapshotNotifier() {
1339    SpaceQuotaSnapshotNotifier notifier =
1340        SpaceQuotaSnapshotNotifierFactory.getInstance().create(getConfiguration());
1341    return notifier;
1342  }
1343
1344  boolean isCatalogJanitorEnabled() {
1345    return catalogJanitorChore != null ?
1346      catalogJanitorChore.getEnabled() : false;
1347  }
1348
1349  boolean isCleanerChoreEnabled() {
1350    boolean hfileCleanerFlag = true, logCleanerFlag = true;
1351
1352    if (hfileCleaner != null) {
1353      hfileCleanerFlag = hfileCleaner.getEnabled();
1354    }
1355
1356    if (logCleaner != null) {
1357      logCleanerFlag = logCleaner.getEnabled();
1358    }
1359
1360    return (hfileCleanerFlag && logCleanerFlag);
1361  }
1362
1363  @Override
1364  public ServerManager getServerManager() {
1365    return this.serverManager;
1366  }
1367
1368  @Override
1369  public MasterFileSystem getMasterFileSystem() {
1370    return this.fileSystemManager;
1371  }
1372
1373  @Override
1374  public MasterWalManager getMasterWalManager() {
1375    return this.walManager;
1376  }
1377
1378  @Override
1379  public SplitWALManager getSplitWALManager() {
1380    return splitWALManager;
1381  }
1382
1383  @Override
1384  public TableStateManager getTableStateManager() {
1385    return tableStateManager;
1386  }
1387
1388  /*
1389   * Start up all services. If any of these threads gets an unhandled exception
1390   * then they just die with a logged message.  This should be fine because
1391   * in general, we do not expect the master to get such unhandled exceptions
1392   *  as OOMEs; it should be lightly loaded. See what HRegionServer does if
1393   *  need to install an unexpected exception handler.
1394   */
1395  private void startServiceThreads() throws IOException {
1396    // Start the executor service pools
1397    this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION, conf.getInt(
1398      HConstants.MASTER_OPEN_REGION_THREADS, HConstants.MASTER_OPEN_REGION_THREADS_DEFAULT));
1399    this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION, conf.getInt(
1400      HConstants.MASTER_CLOSE_REGION_THREADS, HConstants.MASTER_CLOSE_REGION_THREADS_DEFAULT));
1401    this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
1402      conf.getInt(HConstants.MASTER_SERVER_OPERATIONS_THREADS,
1403        HConstants.MASTER_SERVER_OPERATIONS_THREADS_DEFAULT));
1404    this.executorService.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS,
1405      conf.getInt(HConstants.MASTER_META_SERVER_OPERATIONS_THREADS,
1406        HConstants.MASTER_META_SERVER_OPERATIONS_THREADS_DEFAULT));
1407    this.executorService.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS, conf.getInt(
1408      HConstants.MASTER_LOG_REPLAY_OPS_THREADS, HConstants.MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT));
1409    this.executorService.startExecutorService(ExecutorType.MASTER_SNAPSHOT_OPERATIONS, conf.getInt(
1410      SnapshotManager.SNAPSHOT_POOL_THREADS_KEY, SnapshotManager.SNAPSHOT_POOL_THREADS_DEFAULT));
1411
1412   // We depend on there being only one instance of this executor running
1413   // at a time.  To do concurrency, would need fencing of enable/disable of
1414   // tables.
1415   // Any time changing this maxThreads to > 1, pls see the comment at
1416   // AccessController#postCompletedCreateTableAction
1417   this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
1418   startProcedureExecutor();
1419
1420    // Create cleaner thread pool
1421    cleanerPool = new DirScanPool(conf);
1422    // Start log cleaner thread
1423    int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 600 * 1000);
1424    this.logCleaner = new LogCleaner(cleanerInterval, this, conf,
1425      getMasterWalManager().getFileSystem(), getMasterWalManager().getOldLogDir(), cleanerPool);
1426    getChoreService().scheduleChore(logCleaner);
1427
1428    // start the hfile archive cleaner thread
1429    Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
1430    Map<String, Object> params = new HashMap<>();
1431    params.put(MASTER, this);
1432    this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf,
1433      getMasterFileSystem().getFileSystem(), archiveDir, cleanerPool, params);
1434    getChoreService().scheduleChore(hfileCleaner);
1435
1436    replicationBarrierCleaner = new ReplicationBarrierCleaner(conf, this, getConnection(),
1437      replicationPeerManager);
1438    getChoreService().scheduleChore(replicationBarrierCleaner);
1439
1440    serviceStarted = true;
1441    if (LOG.isTraceEnabled()) {
1442      LOG.trace("Started service threads");
1443    }
1444  }
1445
1446  @Override
1447  protected void stopServiceThreads() {
1448    if (masterJettyServer != null) {
1449      LOG.info("Stopping master jetty server");
1450      try {
1451        masterJettyServer.stop();
1452      } catch (Exception e) {
1453        LOG.error("Failed to stop master jetty server", e);
1454      }
1455    }
1456    stopChores();
1457    if (this.mobCompactThread != null) {
1458      this.mobCompactThread.close();
1459    }
1460    super.stopServiceThreads();
1461    if (cleanerPool != null) {
1462      cleanerPool.shutdownNow();
1463      cleanerPool = null;
1464    }
1465
1466    LOG.debug("Stopping service threads");
1467
1468    if (this.quotaManager != null) {
1469      this.quotaManager.stop();
1470    }
1471
1472    if (this.activeMasterManager != null) {
1473      this.activeMasterManager.stop();
1474    }
1475    if (this.serverManager != null) {
1476      this.serverManager.stop();
1477    }
1478    if (this.assignmentManager != null) {
1479      this.assignmentManager.stop();
1480    }
1481
1482    stopProcedureExecutor();
1483
1484    if (this.walManager != null) {
1485      this.walManager.stop();
1486    }
1487    if (this.fileSystemManager != null) {
1488      this.fileSystemManager.stop();
1489    }
1490    if (this.mpmHost != null) {
1491      this.mpmHost.stop("server shutting down.");
1492    }
1493    if (this.regionServerTracker != null) {
1494      this.regionServerTracker.stop();
1495    }
1496  }
1497
1498  private void createProcedureExecutor() throws IOException {
1499    MasterProcedureEnv procEnv = new MasterProcedureEnv(this);
1500    procedureStore =
1501      new WALProcedureStore(conf, new MasterProcedureEnv.WALStoreLeaseRecovery(this));
1502    procedureStore.registerListener(new ProcedureStoreListener() {
1503
1504      @Override
1505      public void abortProcess() {
1506        abort("The Procedure Store lost the lease", null);
1507      }
1508    });
1509    MasterProcedureScheduler procedureScheduler = procEnv.getProcedureScheduler();
1510    procedureExecutor = new ProcedureExecutor<>(conf, procEnv, procedureStore, procedureScheduler);
1511    configurationManager.registerObserver(procEnv);
1512
1513    int cpus = Runtime.getRuntime().availableProcessors();
1514    final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, Math.max(
1515      (cpus > 0 ? cpus / 4 : 0), MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS));
1516    final boolean abortOnCorruption =
1517      conf.getBoolean(MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION,
1518        MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION);
1519    procedureStore.start(numThreads);
1520    // Just initialize it but do not start the workers, we will start the workers later by calling
1521    // startProcedureExecutor. See the javadoc for finishActiveMasterInitialization for more
1522    // details.
1523    procedureExecutor.init(numThreads, abortOnCorruption);
1524    procEnv.getRemoteDispatcher().start();
1525  }
1526
1527  private void startProcedureExecutor() throws IOException {
1528    procedureExecutor.startWorkers();
1529  }
1530
1531  private void stopProcedureExecutor() {
1532    if (procedureExecutor != null) {
1533      configurationManager.deregisterObserver(procedureExecutor.getEnvironment());
1534      procedureExecutor.getEnvironment().getRemoteDispatcher().stop();
1535      procedureExecutor.stop();
1536      procedureExecutor.join();
1537      procedureExecutor = null;
1538    }
1539
1540    if (procedureStore != null) {
1541      procedureStore.stop(isAborted());
1542      procedureStore = null;
1543    }
1544  }
1545
1546  private void stopChores() {
1547    ChoreService choreService = getChoreService();
1548    if (choreService != null) {
1549      choreService.cancelChore(this.expiredMobFileCleanerChore);
1550      choreService.cancelChore(this.mobCompactChore);
1551      choreService.cancelChore(this.balancerChore);
1552      choreService.cancelChore(this.normalizerChore);
1553      choreService.cancelChore(this.clusterStatusChore);
1554      choreService.cancelChore(this.catalogJanitorChore);
1555      choreService.cancelChore(this.clusterStatusPublisherChore);
1556      choreService.cancelChore(this.snapshotQuotaChore);
1557      choreService.cancelChore(this.logCleaner);
1558      choreService.cancelChore(this.hfileCleaner);
1559      choreService.cancelChore(this.replicationBarrierCleaner);
1560      choreService.cancelChore(this.hbckChore);
1561    }
1562  }
1563
1564  /**
1565   * @return Get remote side's InetAddress
1566   */
1567  InetAddress getRemoteInetAddress(final int port,
1568      final long serverStartCode) throws UnknownHostException {
1569    // Do it out here in its own little method so can fake an address when
1570    // mocking up in tests.
1571    InetAddress ia = RpcServer.getRemoteIp();
1572
1573    // The call could be from the local regionserver,
1574    // in which case, there is no remote address.
1575    if (ia == null && serverStartCode == startcode) {
1576      InetSocketAddress isa = rpcServices.getSocketAddress();
1577      if (isa != null && isa.getPort() == port) {
1578        ia = isa.getAddress();
1579      }
1580    }
1581    return ia;
1582  }
1583
1584  /**
1585   * @return Maximum time we should run balancer for
1586   */
1587  private int getMaxBalancingTime() {
1588    // if max balancing time isn't set, defaulting it to period time
1589    int maxBalancingTime = getConfiguration().getInt(HConstants.HBASE_BALANCER_MAX_BALANCING,
1590      getConfiguration()
1591        .getInt(HConstants.HBASE_BALANCER_PERIOD, HConstants.DEFAULT_HBASE_BALANCER_PERIOD));
1592    return maxBalancingTime;
1593  }
1594
1595  /**
1596   * @return Maximum number of regions in transition
1597   */
1598  private int getMaxRegionsInTransition() {
1599    int numRegions = this.assignmentManager.getRegionStates().getRegionAssignments().size();
1600    return Math.max((int) Math.floor(numRegions * this.maxRitPercent), 1);
1601  }
1602
1603  /**
1604   * It first sleep to the next balance plan start time. Meanwhile, throttling by the max
1605   * number regions in transition to protect availability.
1606   * @param nextBalanceStartTime The next balance plan start time
1607   * @param maxRegionsInTransition max number of regions in transition
1608   * @param cutoffTime when to exit balancer
1609   */
1610  private void balanceThrottling(long nextBalanceStartTime, int maxRegionsInTransition,
1611      long cutoffTime) {
1612    boolean interrupted = false;
1613
1614    // Sleep to next balance plan start time
1615    // But if there are zero regions in transition, it can skip sleep to speed up.
1616    while (!interrupted && System.currentTimeMillis() < nextBalanceStartTime
1617        && this.assignmentManager.getRegionStates().hasRegionsInTransition()) {
1618      try {
1619        Thread.sleep(100);
1620      } catch (InterruptedException ie) {
1621        interrupted = true;
1622      }
1623    }
1624
1625    // Throttling by max number regions in transition
1626    while (!interrupted
1627        && maxRegionsInTransition > 0
1628        && this.assignmentManager.getRegionStates().getRegionsInTransitionCount()
1629        >= maxRegionsInTransition && System.currentTimeMillis() <= cutoffTime) {
1630      try {
1631        // sleep if the number of regions in transition exceeds the limit
1632        Thread.sleep(100);
1633      } catch (InterruptedException ie) {
1634        interrupted = true;
1635      }
1636    }
1637
1638    if (interrupted) Thread.currentThread().interrupt();
1639  }
1640
1641  public boolean balance() throws IOException {
1642    return balance(false);
1643  }
1644
1645  public boolean balance(boolean force) throws IOException {
1646    // if master not initialized, don't run balancer.
1647    if (!isInitialized()) {
1648      LOG.debug("Master has not been initialized, don't run balancer.");
1649      return false;
1650    }
1651
1652    if (isInMaintenanceMode()) {
1653      LOG.info("Master is in maintenanceMode mode, don't run balancer.");
1654      return false;
1655    }
1656
1657    synchronized (this.balancer) {
1658      // If balance not true, don't run balancer.
1659      if (!this.loadBalancerTracker.isBalancerOn()) return false;
1660        // Only allow one balance run at at time.
1661      if (this.assignmentManager.hasRegionsInTransition()) {
1662        List<RegionStateNode> regionsInTransition = assignmentManager.getRegionsInTransition();
1663        // if hbase:meta region is in transition, result of assignment cannot be recorded
1664        // ignore the force flag in that case
1665        boolean metaInTransition = assignmentManager.isMetaRegionInTransition();
1666        String prefix = force && !metaInTransition ? "R" : "Not r";
1667        List<RegionStateNode> toPrint = regionsInTransition;
1668        int max = 5;
1669        boolean truncated = false;
1670        if (regionsInTransition.size() > max) {
1671          toPrint = regionsInTransition.subList(0, max);
1672          truncated = true;
1673        }
1674        LOG.info(prefix + "unning balancer because " + regionsInTransition.size() +
1675          " region(s) in transition: " + toPrint + (truncated? "(truncated list)": ""));
1676        if (!force || metaInTransition) return false;
1677      }
1678      if (this.serverManager.areDeadServersInProgress()) {
1679        LOG.info("Not running balancer because processing dead regionserver(s): " +
1680          this.serverManager.getDeadServers());
1681        return false;
1682      }
1683
1684      if (this.cpHost != null) {
1685        try {
1686          if (this.cpHost.preBalance()) {
1687            LOG.debug("Coprocessor bypassing balancer request");
1688            return false;
1689          }
1690        } catch (IOException ioe) {
1691          LOG.error("Error invoking master coprocessor preBalance()", ioe);
1692          return false;
1693        }
1694      }
1695
1696      boolean isByTable = getConfiguration().getBoolean("hbase.master.loadbalance.bytable", false);
1697      Map<TableName, Map<ServerName, List<RegionInfo>>> assignments =
1698        this.assignmentManager.getRegionStates()
1699          .getAssignmentsForBalancer(tableStateManager, this.serverManager.getOnlineServersList(),
1700            isByTable);
1701      for (Map<ServerName, List<RegionInfo>> serverMap : assignments.values()) {
1702        serverMap.keySet().removeAll(this.serverManager.getDrainingServersList());
1703      }
1704
1705      //Give the balancer the current cluster state.
1706      this.balancer.setClusterMetrics(getClusterMetricsWithoutCoprocessor());
1707      this.balancer.setClusterLoad(assignments);
1708
1709      List<RegionPlan> plans = new ArrayList<>();
1710      for (Entry<TableName, Map<ServerName, List<RegionInfo>>> e : assignments.entrySet()) {
1711        List<RegionPlan> partialPlans = this.balancer.balanceCluster(e.getKey(), e.getValue());
1712        if (partialPlans != null) {
1713          plans.addAll(partialPlans);
1714        }
1715      }
1716
1717      List<RegionPlan> sucRPs = executeRegionPlansWithThrottling(plans);
1718
1719      if (this.cpHost != null) {
1720        try {
1721          this.cpHost.postBalance(sucRPs);
1722        } catch (IOException ioe) {
1723          // balancing already succeeded so don't change the result
1724          LOG.error("Error invoking master coprocessor postBalance()", ioe);
1725        }
1726      }
1727    }
1728    // If LoadBalancer did not generate any plans, it means the cluster is already balanced.
1729    // Return true indicating a success.
1730    return true;
1731  }
1732
1733  public List<RegionPlan> executeRegionPlansWithThrottling(List<RegionPlan> plans) {
1734    List<RegionPlan> sucRPs = new ArrayList<>();
1735    int maxRegionsInTransition = getMaxRegionsInTransition();
1736    long balanceStartTime = System.currentTimeMillis();
1737    long cutoffTime = balanceStartTime + this.maxBlancingTime;
1738    int rpCount = 0;  // number of RegionPlans balanced so far
1739    if (plans != null && !plans.isEmpty()) {
1740      int balanceInterval = this.maxBlancingTime / plans.size();
1741      LOG.info("Balancer plans size is " + plans.size() + ", the balance interval is "
1742          + balanceInterval + " ms, and the max number regions in transition is "
1743          + maxRegionsInTransition);
1744
1745      for (RegionPlan plan: plans) {
1746        LOG.info("balance " + plan);
1747        //TODO: bulk assign
1748        try {
1749          this.assignmentManager.moveAsync(plan);
1750        } catch (HBaseIOException hioe) {
1751          //should ignore failed plans here, avoiding the whole balance plans be aborted
1752          //later calls of balance() can fetch up the failed and skipped plans
1753          LOG.warn("Failed balance plan: {}, just skip it", plan, hioe);
1754        }
1755        //rpCount records balance plans processed, does not care if a plan succeeds
1756        rpCount++;
1757
1758        if (this.maxBlancingTime > 0) {
1759          balanceThrottling(balanceStartTime + rpCount * balanceInterval, maxRegionsInTransition,
1760            cutoffTime);
1761        }
1762
1763        // if performing next balance exceeds cutoff time, exit the loop
1764        if (this.maxBlancingTime > 0 && rpCount < plans.size()
1765          && System.currentTimeMillis() > cutoffTime) {
1766          // TODO: After balance, there should not be a cutoff time (keeping it as
1767          // a security net for now)
1768          LOG.debug("No more balancing till next balance run; maxBalanceTime="
1769              + this.maxBlancingTime);
1770          break;
1771        }
1772      }
1773    }
1774    return sucRPs;
1775  }
1776
1777  @Override
1778  @VisibleForTesting
1779  public RegionNormalizer getRegionNormalizer() {
1780    return this.normalizer;
1781  }
1782
1783  /**
1784   * Perform normalization of cluster (invoked by {@link RegionNormalizerChore}).
1785   *
1786   * @return true if normalization step was performed successfully, false otherwise
1787   *    (specifically, if HMaster hasn't been initialized properly or normalization
1788   *    is globally disabled)
1789   */
1790  public boolean normalizeRegions() throws IOException {
1791    if (!isInitialized()) {
1792      LOG.debug("Master has not been initialized, don't run region normalizer.");
1793      return false;
1794    }
1795    if (this.getServerManager().isClusterShutdown()) {
1796      LOG.info("Cluster is shutting down, don't run region normalizer.");
1797      return false;
1798    }
1799    if (isInMaintenanceMode()) {
1800      LOG.info("Master is in maintenance mode, don't run region normalizer.");
1801      return false;
1802    }
1803    if (!this.regionNormalizerTracker.isNormalizerOn()) {
1804      LOG.debug("Region normalization is disabled, don't run region normalizer.");
1805      return false;
1806    }
1807
1808    synchronized (this.normalizer) {
1809      // Don't run the normalizer concurrently
1810      List<TableName> allEnabledTables = new ArrayList<>(
1811        this.tableStateManager.getTablesInStates(TableState.State.ENABLED));
1812
1813      Collections.shuffle(allEnabledTables);
1814
1815      for (TableName table : allEnabledTables) {
1816        if (isInMaintenanceMode()) {
1817          LOG.debug("Master is in maintenance mode, stop running region normalizer.");
1818          return false;
1819        }
1820
1821        TableDescriptor tblDesc = getTableDescriptors().get(table);
1822        if (table.isSystemTable() || (tblDesc != null &&
1823            !tblDesc.isNormalizationEnabled())) {
1824          LOG.trace("Skipping normalization for {}, as it's either system"
1825              + " table or doesn't have auto normalization turned on", table);
1826          continue;
1827        }
1828        List<NormalizationPlan> plans = this.normalizer.computePlanForTable(table);
1829        if (plans != null) {
1830          for (NormalizationPlan plan : plans) {
1831            plan.execute(clusterConnection.getAdmin());
1832            if (plan.getType() == PlanType.SPLIT) {
1833              splitPlanCount++;
1834            } else if (plan.getType() == PlanType.MERGE) {
1835              mergePlanCount++;
1836            }
1837          }
1838        }
1839      }
1840    }
1841    // If Region did not generate any plans, it means the cluster is already balanced.
1842    // Return true indicating a success.
1843    return true;
1844  }
1845
1846  /**
1847   * @return Client info for use as prefix on an audit log string; who did an action
1848   */
1849  @Override
1850  public String getClientIdAuditPrefix() {
1851    return "Client=" + RpcServer.getRequestUserName().orElse(null)
1852        + "/" + RpcServer.getRemoteAddress().orElse(null);
1853  }
1854
1855  /**
1856   * Switch for the background CatalogJanitor thread.
1857   * Used for testing.  The thread will continue to run.  It will just be a noop
1858   * if disabled.
1859   * @param b If false, the catalog janitor won't do anything.
1860   */
1861  public void setCatalogJanitorEnabled(final boolean b) {
1862    this.catalogJanitorChore.setEnabled(b);
1863  }
1864
1865  @Override
1866  public long mergeRegions(
1867      final RegionInfo[] regionsToMerge,
1868      final boolean forcible,
1869      final long ng,
1870      final long nonce) throws IOException {
1871    checkInitialized();
1872
1873    final String mergeRegionsStr = Arrays.stream(regionsToMerge).
1874      map(r -> RegionInfo.getShortNameToLog(r)).collect(Collectors.joining(", "));
1875    return MasterProcedureUtil.submitProcedure(new NonceProcedureRunnable(this, ng, nonce) {
1876      @Override
1877      protected void run() throws IOException {
1878        getMaster().getMasterCoprocessorHost().preMergeRegions(regionsToMerge);
1879        String aid = getClientIdAuditPrefix();
1880        LOG.info("{} merge regions {}", aid, mergeRegionsStr);
1881        submitProcedure(new MergeTableRegionsProcedure(procedureExecutor.getEnvironment(),
1882            regionsToMerge, forcible));
1883        getMaster().getMasterCoprocessorHost().postMergeRegions(regionsToMerge);
1884      }
1885
1886      @Override
1887      protected String getDescription() {
1888        return "MergeTableProcedure";
1889      }
1890    });
1891  }
1892
1893  @Override
1894  public long splitRegion(final RegionInfo regionInfo, final byte[] splitRow,
1895      final long nonceGroup, final long nonce)
1896  throws IOException {
1897    checkInitialized();
1898    return MasterProcedureUtil.submitProcedure(
1899        new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
1900      @Override
1901      protected void run() throws IOException {
1902        getMaster().getMasterCoprocessorHost().preSplitRegion(regionInfo.getTable(), splitRow);
1903        LOG.info(getClientIdAuditPrefix() + " split " + regionInfo.getRegionNameAsString());
1904
1905        // Execute the operation asynchronously
1906        submitProcedure(getAssignmentManager().createSplitProcedure(regionInfo, splitRow));
1907      }
1908
1909      @Override
1910      protected String getDescription() {
1911        return "SplitTableProcedure";
1912      }
1913    });
1914  }
1915
1916  // Public so can be accessed by tests. Blocks until move is done.
1917  // Replace with an async implementation from which you can get
1918  // a success/failure result.
1919  @VisibleForTesting
1920  public void move(final byte[] encodedRegionName, byte[] destServerName) throws HBaseIOException {
1921    RegionState regionState = assignmentManager.getRegionStates().
1922      getRegionState(Bytes.toString(encodedRegionName));
1923
1924    RegionInfo hri;
1925    if (regionState != null) {
1926      hri = regionState.getRegion();
1927    } else {
1928      throw new UnknownRegionException(Bytes.toStringBinary(encodedRegionName));
1929    }
1930
1931    ServerName dest;
1932    List<ServerName> exclude = hri.getTable().isSystemTable() ? assignmentManager.getExcludedServersForSystemTable()
1933        : new ArrayList<>(1);
1934    if (destServerName != null && exclude.contains(ServerName.valueOf(Bytes.toString(destServerName)))) {
1935      LOG.info(
1936          Bytes.toString(encodedRegionName) + " can not move to " + Bytes.toString(destServerName)
1937              + " because the server is in exclude list");
1938      destServerName = null;
1939    }
1940    if (destServerName == null || destServerName.length == 0) {
1941      LOG.info("Passed destination servername is null/empty so " +
1942        "choosing a server at random");
1943      exclude.add(regionState.getServerName());
1944      final List<ServerName> destServers = this.serverManager.createDestinationServersList(exclude);
1945      dest = balancer.randomAssignment(hri, destServers);
1946      if (dest == null) {
1947        LOG.debug("Unable to determine a plan to assign " + hri);
1948        return;
1949      }
1950    } else {
1951      ServerName candidate = ServerName.valueOf(Bytes.toString(destServerName));
1952      dest = balancer.randomAssignment(hri, Lists.newArrayList(candidate));
1953      if (dest == null) {
1954        LOG.debug("Unable to determine a plan to assign " + hri);
1955        return;
1956      }
1957      // TODO: What is this? I don't get it.
1958      if (dest.equals(serverName) && balancer instanceof BaseLoadBalancer
1959          && !((BaseLoadBalancer)balancer).shouldBeOnMaster(hri)) {
1960        // To avoid unnecessary region moving later by balancer. Don't put user
1961        // regions on master.
1962        LOG.debug("Skipping move of region " + hri.getRegionNameAsString()
1963          + " to avoid unnecessary region moving later by load balancer,"
1964          + " because it should not be on master");
1965        return;
1966      }
1967    }
1968
1969    if (dest.equals(regionState.getServerName())) {
1970      LOG.debug("Skipping move of region " + hri.getRegionNameAsString()
1971        + " because region already assigned to the same server " + dest + ".");
1972      return;
1973    }
1974
1975    // Now we can do the move
1976    RegionPlan rp = new RegionPlan(hri, regionState.getServerName(), dest);
1977    assert rp.getDestination() != null: rp.toString() + " " + dest;
1978
1979    try {
1980      checkInitialized();
1981      if (this.cpHost != null) {
1982        this.cpHost.preMove(hri, rp.getSource(), rp.getDestination());
1983      }
1984
1985      TransitRegionStateProcedure proc =
1986          this.assignmentManager.createMoveRegionProcedure(rp.getRegionInfo(), rp.getDestination());
1987      // Warmup the region on the destination before initiating the move. this call
1988      // is synchronous and takes some time. doing it before the source region gets
1989      // closed
1990      serverManager.sendRegionWarmup(rp.getDestination(), hri);
1991      LOG.info(getClientIdAuditPrefix() + " move " + rp + ", running balancer");
1992      Future<byte[]> future = ProcedureSyncWait.submitProcedure(this.procedureExecutor, proc);
1993      try {
1994        // Is this going to work? Will we throw exception on error?
1995        // TODO: CompletableFuture rather than this stunted Future.
1996        future.get();
1997      } catch (InterruptedException | ExecutionException e) {
1998        throw new HBaseIOException(e);
1999      }
2000      if (this.cpHost != null) {
2001        this.cpHost.postMove(hri, rp.getSource(), rp.getDestination());
2002      }
2003    } catch (IOException ioe) {
2004      if (ioe instanceof HBaseIOException) {
2005        throw (HBaseIOException)ioe;
2006      }
2007      throw new HBaseIOException(ioe);
2008    }
2009  }
2010
2011  @Override
2012  public long createTable(final TableDescriptor tableDescriptor, final byte[][] splitKeys,
2013      final long nonceGroup, final long nonce) throws IOException {
2014    checkInitialized();
2015    TableDescriptor desc = getMasterCoprocessorHost().preCreateTableRegionsInfos(tableDescriptor);
2016    if (desc == null) {
2017      throw new IOException("Creation for " + tableDescriptor + " is canceled by CP");
2018    }
2019    String namespace = desc.getTableName().getNamespaceAsString();
2020    this.clusterSchemaService.getNamespace(namespace);
2021
2022    RegionInfo[] newRegions = ModifyRegionUtils.createRegionInfos(desc, splitKeys);
2023    TableDescriptorChecker.sanityCheck(conf, desc);
2024
2025    return MasterProcedureUtil
2026      .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2027        @Override
2028        protected void run() throws IOException {
2029          getMaster().getMasterCoprocessorHost().preCreateTable(desc, newRegions);
2030
2031          LOG.info(getClientIdAuditPrefix() + " create " + desc);
2032
2033          // TODO: We can handle/merge duplicate requests, and differentiate the case of
2034          // TableExistsException by saying if the schema is the same or not.
2035          //
2036          // We need to wait for the procedure to potentially fail due to "prepare" sanity
2037          // checks. This will block only the beginning of the procedure. See HBASE-19953.
2038          ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch();
2039          submitProcedure(
2040            new CreateTableProcedure(procedureExecutor.getEnvironment(), desc, newRegions, latch));
2041          latch.await();
2042
2043          getMaster().getMasterCoprocessorHost().postCreateTable(desc, newRegions);
2044        }
2045
2046        @Override
2047        protected String getDescription() {
2048          return "CreateTableProcedure";
2049        }
2050      });
2051  }
2052
2053  @Override
2054  public long createSystemTable(final TableDescriptor tableDescriptor) throws IOException {
2055    if (isStopped()) {
2056      throw new MasterNotRunningException();
2057    }
2058
2059    TableName tableName = tableDescriptor.getTableName();
2060    if (!(tableName.isSystemTable())) {
2061      throw new IllegalArgumentException(
2062        "Only system table creation can use this createSystemTable API");
2063    }
2064
2065    RegionInfo[] newRegions = ModifyRegionUtils.createRegionInfos(tableDescriptor, null);
2066
2067    LOG.info(getClientIdAuditPrefix() + " create " + tableDescriptor);
2068
2069    // This special create table is called locally to master.  Therefore, no RPC means no need
2070    // to use nonce to detect duplicated RPC call.
2071    long procId = this.procedureExecutor.submitProcedure(
2072      new CreateTableProcedure(procedureExecutor.getEnvironment(), tableDescriptor, newRegions));
2073
2074    return procId;
2075  }
2076
2077  private void startActiveMasterManager(int infoPort) throws KeeperException {
2078    String backupZNode = ZNodePaths.joinZNode(
2079      zooKeeper.getZNodePaths().backupMasterAddressesZNode, serverName.toString());
2080    /*
2081    * Add a ZNode for ourselves in the backup master directory since we
2082    * may not become the active master. If so, we want the actual active
2083    * master to know we are backup masters, so that it won't assign
2084    * regions to us if so configured.
2085    *
2086    * If we become the active master later, ActiveMasterManager will delete
2087    * this node explicitly.  If we crash before then, ZooKeeper will delete
2088    * this node for us since it is ephemeral.
2089    */
2090    LOG.info("Adding backup master ZNode " + backupZNode);
2091    if (!MasterAddressTracker.setMasterAddress(zooKeeper, backupZNode, serverName, infoPort)) {
2092      LOG.warn("Failed create of " + backupZNode + " by " + serverName);
2093    }
2094    this.activeMasterManager.setInfoPort(infoPort);
2095    int timeout = conf.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
2096    // If we're a backup master, stall until a primary to write this address
2097    if (conf.getBoolean(HConstants.MASTER_TYPE_BACKUP, HConstants.DEFAULT_MASTER_TYPE_BACKUP)) {
2098      LOG.debug("HMaster started in backup mode. Stalling until master znode is written.");
2099      // This will only be a minute or so while the cluster starts up,
2100      // so don't worry about setting watches on the parent znode
2101      while (!activeMasterManager.hasActiveMaster()) {
2102        LOG.debug("Waiting for master address and cluster state znode to be written.");
2103        Threads.sleep(timeout);
2104      }
2105    }
2106    MonitoredTask status = TaskMonitor.get().createStatus("Master startup");
2107    status.setDescription("Master startup");
2108    try {
2109      if (activeMasterManager.blockUntilBecomingActiveMaster(timeout, status)) {
2110        finishActiveMasterInitialization(status);
2111      }
2112    } catch (Throwable t) {
2113      status.setStatus("Failed to become active: " + t.getMessage());
2114      LOG.error(HBaseMarkers.FATAL, "Failed to become active master", t);
2115      // HBASE-5680: Likely hadoop23 vs hadoop 20.x/1.x incompatibility
2116      if (t instanceof NoClassDefFoundError && t.getMessage().
2117          contains("org/apache/hadoop/hdfs/protocol/HdfsConstants$SafeModeAction")) {
2118        // improved error message for this special case
2119        abort("HBase is having a problem with its Hadoop jars.  You may need to recompile " +
2120          "HBase against Hadoop version " + org.apache.hadoop.util.VersionInfo.getVersion() +
2121          " or change your hadoop jars to start properly", t);
2122      } else {
2123        abort("Unhandled exception. Starting shutdown.", t);
2124      }
2125    } finally {
2126      status.cleanup();
2127    }
2128  }
2129
2130  private static boolean isCatalogTable(final TableName tableName) {
2131    return tableName.equals(TableName.META_TABLE_NAME);
2132  }
2133
2134  @Override
2135  public long deleteTable(
2136      final TableName tableName,
2137      final long nonceGroup,
2138      final long nonce) throws IOException {
2139    checkInitialized();
2140
2141    return MasterProcedureUtil.submitProcedure(
2142        new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2143      @Override
2144      protected void run() throws IOException {
2145        getMaster().getMasterCoprocessorHost().preDeleteTable(tableName);
2146
2147        LOG.info(getClientIdAuditPrefix() + " delete " + tableName);
2148
2149        // TODO: We can handle/merge duplicate request
2150        //
2151        // We need to wait for the procedure to potentially fail due to "prepare" sanity
2152        // checks. This will block only the beginning of the procedure. See HBASE-19953.
2153        ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch();
2154        submitProcedure(new DeleteTableProcedure(procedureExecutor.getEnvironment(),
2155            tableName, latch));
2156        latch.await();
2157
2158        getMaster().getMasterCoprocessorHost().postDeleteTable(tableName);
2159      }
2160
2161      @Override
2162      protected String getDescription() {
2163        return "DeleteTableProcedure";
2164      }
2165    });
2166  }
2167
2168  @Override
2169  public long truncateTable(
2170      final TableName tableName,
2171      final boolean preserveSplits,
2172      final long nonceGroup,
2173      final long nonce) throws IOException {
2174    checkInitialized();
2175
2176    return MasterProcedureUtil.submitProcedure(
2177        new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2178      @Override
2179      protected void run() throws IOException {
2180        getMaster().getMasterCoprocessorHost().preTruncateTable(tableName);
2181
2182        LOG.info(getClientIdAuditPrefix() + " truncate " + tableName);
2183        ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(2, 0);
2184        submitProcedure(new TruncateTableProcedure(procedureExecutor.getEnvironment(),
2185            tableName, preserveSplits, latch));
2186        latch.await();
2187
2188        getMaster().getMasterCoprocessorHost().postTruncateTable(tableName);
2189      }
2190
2191      @Override
2192      protected String getDescription() {
2193        return "TruncateTableProcedure";
2194      }
2195    });
2196  }
2197
2198  @Override
2199  public long addColumn(final TableName tableName, final ColumnFamilyDescriptor column,
2200      final long nonceGroup, final long nonce) throws IOException {
2201    checkInitialized();
2202    checkTableExists(tableName);
2203
2204    return modifyTable(tableName, new TableDescriptorGetter() {
2205
2206      @Override
2207      public TableDescriptor get() throws IOException {
2208        TableDescriptor old = getTableDescriptors().get(tableName);
2209        if (old.hasColumnFamily(column.getName())) {
2210          throw new InvalidFamilyOperationException("Column family '" + column.getNameAsString()
2211              + "' in table '" + tableName + "' already exists so cannot be added");
2212        }
2213
2214        return TableDescriptorBuilder.newBuilder(old).setColumnFamily(column).build();
2215      }
2216    }, nonceGroup, nonce, true);
2217  }
2218
2219  /**
2220   * Implement to return TableDescriptor after pre-checks
2221   */
2222  protected interface TableDescriptorGetter {
2223    TableDescriptor get() throws IOException;
2224  }
2225
2226  @Override
2227  public long modifyColumn(final TableName tableName, final ColumnFamilyDescriptor descriptor,
2228      final long nonceGroup, final long nonce) throws IOException {
2229    checkInitialized();
2230    checkTableExists(tableName);
2231    return modifyTable(tableName, new TableDescriptorGetter() {
2232
2233      @Override
2234      public TableDescriptor get() throws IOException {
2235        TableDescriptor old = getTableDescriptors().get(tableName);
2236        if (!old.hasColumnFamily(descriptor.getName())) {
2237          throw new InvalidFamilyOperationException("Family '" + descriptor.getNameAsString()
2238              + "' does not exist, so it cannot be modified");
2239        }
2240
2241        return TableDescriptorBuilder.newBuilder(old).modifyColumnFamily(descriptor).build();
2242      }
2243    }, nonceGroup, nonce, true);
2244  }
2245
2246  @Override
2247  public long deleteColumn(final TableName tableName, final byte[] columnName,
2248      final long nonceGroup, final long nonce) throws IOException {
2249    checkInitialized();
2250    checkTableExists(tableName);
2251
2252    return modifyTable(tableName, new TableDescriptorGetter() {
2253
2254      @Override
2255      public TableDescriptor get() throws IOException {
2256        TableDescriptor old = getTableDescriptors().get(tableName);
2257
2258        if (!old.hasColumnFamily(columnName)) {
2259          throw new InvalidFamilyOperationException("Family '" + Bytes.toString(columnName)
2260              + "' does not exist, so it cannot be deleted");
2261        }
2262        if (old.getColumnFamilyCount() == 1) {
2263          throw new InvalidFamilyOperationException("Family '" + Bytes.toString(columnName)
2264              + "' is the only column family in the table, so it cannot be deleted");
2265        }
2266        return TableDescriptorBuilder.newBuilder(old).removeColumnFamily(columnName).build();
2267      }
2268    }, nonceGroup, nonce, true);
2269  }
2270
2271  @Override
2272  public long enableTable(final TableName tableName, final long nonceGroup, final long nonce)
2273      throws IOException {
2274    checkInitialized();
2275
2276    return MasterProcedureUtil.submitProcedure(
2277        new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2278      @Override
2279      protected void run() throws IOException {
2280        getMaster().getMasterCoprocessorHost().preEnableTable(tableName);
2281
2282        // Normally, it would make sense for this authorization check to exist inside
2283        // AccessController, but because the authorization check is done based on internal state
2284        // (rather than explicit permissions) we'll do the check here instead of in the
2285        // coprocessor.
2286        MasterQuotaManager quotaManager = getMasterQuotaManager();
2287        if (quotaManager != null) {
2288          if (quotaManager.isQuotaInitialized()) {
2289              SpaceQuotaSnapshot currSnapshotOfTable =
2290                  QuotaTableUtil.getCurrentSnapshotFromQuotaTable(getConnection(), tableName);
2291              if (currSnapshotOfTable != null) {
2292                SpaceQuotaStatus quotaStatus = currSnapshotOfTable.getQuotaStatus();
2293                if (quotaStatus.isInViolation()
2294                    && SpaceViolationPolicy.DISABLE == quotaStatus.getPolicy().orElse(null)) {
2295                throw new AccessDeniedException("Enabling the table '" + tableName
2296                    + "' is disallowed due to a violated space quota.");
2297              }
2298            }
2299          } else if (LOG.isTraceEnabled()) {
2300            LOG.trace("Unable to check for space quotas as the MasterQuotaManager is not enabled");
2301          }
2302        }
2303
2304        LOG.info(getClientIdAuditPrefix() + " enable " + tableName);
2305
2306        // Execute the operation asynchronously - client will check the progress of the operation
2307        // In case the request is from a <1.1 client before returning,
2308        // we want to make sure that the table is prepared to be
2309        // enabled (the table is locked and the table state is set).
2310        // Note: if the procedure throws exception, we will catch it and rethrow.
2311        final ProcedurePrepareLatch prepareLatch = ProcedurePrepareLatch.createLatch();
2312        submitProcedure(new EnableTableProcedure(procedureExecutor.getEnvironment(),
2313            tableName, prepareLatch));
2314        prepareLatch.await();
2315
2316        getMaster().getMasterCoprocessorHost().postEnableTable(tableName);
2317      }
2318
2319      @Override
2320      protected String getDescription() {
2321        return "EnableTableProcedure";
2322      }
2323    });
2324  }
2325
2326  @Override
2327  public long disableTable(final TableName tableName, final long nonceGroup, final long nonce)
2328      throws IOException {
2329    checkInitialized();
2330
2331    return MasterProcedureUtil.submitProcedure(
2332        new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2333      @Override
2334      protected void run() throws IOException {
2335        getMaster().getMasterCoprocessorHost().preDisableTable(tableName);
2336
2337        LOG.info(getClientIdAuditPrefix() + " disable " + tableName);
2338
2339        // Execute the operation asynchronously - client will check the progress of the operation
2340        // In case the request is from a <1.1 client before returning,
2341        // we want to make sure that the table is prepared to be
2342        // enabled (the table is locked and the table state is set).
2343        // Note: if the procedure throws exception, we will catch it and rethrow.
2344        //
2345        // We need to wait for the procedure to potentially fail due to "prepare" sanity
2346        // checks. This will block only the beginning of the procedure. See HBASE-19953.
2347        final ProcedurePrepareLatch prepareLatch = ProcedurePrepareLatch.createBlockingLatch();
2348        submitProcedure(new DisableTableProcedure(procedureExecutor.getEnvironment(),
2349            tableName, false, prepareLatch));
2350        prepareLatch.await();
2351
2352        getMaster().getMasterCoprocessorHost().postDisableTable(tableName);
2353      }
2354
2355      @Override
2356      protected String getDescription() {
2357        return "DisableTableProcedure";
2358      }
2359    });
2360  }
2361
2362  private long modifyTable(final TableName tableName,
2363      final TableDescriptorGetter newDescriptorGetter, final long nonceGroup, final long nonce,
2364      final boolean shouldCheckDescriptor) throws IOException {
2365    return MasterProcedureUtil
2366        .submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2367          @Override
2368          protected void run() throws IOException {
2369            TableDescriptor oldDescriptor = getMaster().getTableDescriptors().get(tableName);
2370            TableDescriptor newDescriptor = getMaster().getMasterCoprocessorHost()
2371                .preModifyTable(tableName, oldDescriptor, newDescriptorGetter.get());
2372            TableDescriptorChecker.sanityCheck(conf, newDescriptor);
2373            LOG.info("{} modify table {} from {} to {}", getClientIdAuditPrefix(), tableName,
2374              oldDescriptor, newDescriptor);
2375
2376            // Execute the operation synchronously - wait for the operation completes before
2377            // continuing.
2378            //
2379            // We need to wait for the procedure to potentially fail due to "prepare" sanity
2380            // checks. This will block only the beginning of the procedure. See HBASE-19953.
2381            ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch();
2382            submitProcedure(new ModifyTableProcedure(procedureExecutor.getEnvironment(),
2383                newDescriptor, latch, oldDescriptor, shouldCheckDescriptor));
2384            latch.await();
2385
2386            getMaster().getMasterCoprocessorHost().postModifyTable(tableName, oldDescriptor,
2387              newDescriptor);
2388          }
2389
2390          @Override
2391          protected String getDescription() {
2392            return "ModifyTableProcedure";
2393          }
2394        });
2395
2396  }
2397
2398  @Override
2399  public long modifyTable(final TableName tableName, final TableDescriptor newDescriptor,
2400      final long nonceGroup, final long nonce) throws IOException {
2401    checkInitialized();
2402    return modifyTable(tableName, new TableDescriptorGetter() {
2403      @Override
2404      public TableDescriptor get() throws IOException {
2405        return newDescriptor;
2406      }
2407    }, nonceGroup, nonce, false);
2408
2409  }
2410
2411  public long restoreSnapshot(final SnapshotDescription snapshotDesc,
2412      final long nonceGroup, final long nonce, final boolean restoreAcl) throws IOException {
2413    checkInitialized();
2414    getSnapshotManager().checkSnapshotSupport();
2415
2416    // Ensure namespace exists. Will throw exception if non-known NS.
2417    final TableName dstTable = TableName.valueOf(snapshotDesc.getTable());
2418    getClusterSchema().getNamespace(dstTable.getNamespaceAsString());
2419
2420    return MasterProcedureUtil.submitProcedure(
2421        new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2422      @Override
2423      protected void run() throws IOException {
2424          setProcId(
2425            getSnapshotManager().restoreOrCloneSnapshot(snapshotDesc, getNonceKey(), restoreAcl));
2426      }
2427
2428      @Override
2429      protected String getDescription() {
2430        return "RestoreSnapshotProcedure";
2431      }
2432    });
2433  }
2434
2435  private void checkTableExists(final TableName tableName)
2436      throws IOException, TableNotFoundException {
2437    if (!MetaTableAccessor.tableExists(getConnection(), tableName)) {
2438      throw new TableNotFoundException(tableName);
2439    }
2440  }
2441
2442  @Override
2443  public void checkTableModifiable(final TableName tableName)
2444      throws IOException, TableNotFoundException, TableNotDisabledException {
2445    if (isCatalogTable(tableName)) {
2446      throw new IOException("Can't modify catalog tables");
2447    }
2448    checkTableExists(tableName);
2449    TableState ts = getTableStateManager().getTableState(tableName);
2450    if (!ts.isDisabled()) {
2451      throw new TableNotDisabledException("Not DISABLED; " + ts);
2452    }
2453  }
2454
2455  public ClusterMetrics getClusterMetricsWithoutCoprocessor() throws InterruptedIOException {
2456    return getClusterMetricsWithoutCoprocessor(EnumSet.allOf(Option.class));
2457  }
2458
2459  public ClusterMetrics getClusterMetricsWithoutCoprocessor(EnumSet<Option> options)
2460      throws InterruptedIOException {
2461    ClusterMetricsBuilder builder = ClusterMetricsBuilder.newBuilder();
2462    // given that hbase1 can't submit the request with Option,
2463    // we return all information to client if the list of Option is empty.
2464    if (options.isEmpty()) {
2465      options = EnumSet.allOf(Option.class);
2466    }
2467
2468    for (Option opt : options) {
2469      switch (opt) {
2470        case HBASE_VERSION: builder.setHBaseVersion(VersionInfo.getVersion()); break;
2471        case CLUSTER_ID: builder.setClusterId(getClusterId()); break;
2472        case MASTER: builder.setMasterName(getServerName()); break;
2473        case BACKUP_MASTERS: builder.setBackerMasterNames(getBackupMasters()); break;
2474        case LIVE_SERVERS: {
2475          if (serverManager != null) {
2476            builder.setLiveServerMetrics(serverManager.getOnlineServers().entrySet().stream()
2477              .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())));
2478          }
2479          break;
2480        }
2481        case DEAD_SERVERS: {
2482          if (serverManager != null) {
2483            builder.setDeadServerNames(new ArrayList<>(
2484              serverManager.getDeadServers().copyServerNames()));
2485          }
2486          break;
2487        }
2488        case MASTER_COPROCESSORS: {
2489          if (cpHost != null) {
2490            builder.setMasterCoprocessorNames(Arrays.asList(getMasterCoprocessors()));
2491          }
2492          break;
2493        }
2494        case REGIONS_IN_TRANSITION: {
2495          if (assignmentManager != null) {
2496            builder.setRegionsInTransition(assignmentManager.getRegionStates()
2497                .getRegionsStateInTransition());
2498          }
2499          break;
2500        }
2501        case BALANCER_ON: {
2502          if (loadBalancerTracker != null) {
2503            builder.setBalancerOn(loadBalancerTracker.isBalancerOn());
2504          }
2505          break;
2506        }
2507        case MASTER_INFO_PORT: {
2508          if (infoServer != null) {
2509            builder.setMasterInfoPort(infoServer.getPort());
2510          }
2511          break;
2512        }
2513        case SERVERS_NAME: {
2514          if (serverManager != null) {
2515            builder.setServerNames(serverManager.getOnlineServersList());
2516          }
2517          break;
2518        }
2519        case TABLE_TO_REGIONS_COUNT: {
2520          if (isActiveMaster() && isInitialized() && assignmentManager != null) {
2521            try {
2522              Map<TableName, RegionStatesCount> tableRegionStatesCountMap = new HashMap<>();
2523              Map<String, TableDescriptor> tableDescriptorMap = getTableDescriptors().getAll();
2524              for (TableDescriptor tableDescriptor : tableDescriptorMap.values()) {
2525                TableName tableName = tableDescriptor.getTableName();
2526                RegionStatesCount regionStatesCount = assignmentManager
2527                  .getRegionStatesCount(tableName);
2528                tableRegionStatesCountMap.put(tableName, regionStatesCount);
2529              }
2530              builder.setTableRegionStatesCount(tableRegionStatesCountMap);
2531            } catch (IOException e) {
2532              LOG.error("Error while populating TABLE_TO_REGIONS_COUNT for Cluster Metrics..", e);
2533            }
2534          }
2535          break;
2536        }
2537      }
2538    }
2539    return builder.build();
2540  }
2541
2542  /**
2543   * @return cluster status
2544   */
2545  public ClusterMetrics getClusterMetrics() throws IOException {
2546    return getClusterMetrics(EnumSet.allOf(Option.class));
2547  }
2548
2549  public ClusterMetrics getClusterMetrics(EnumSet<Option> options) throws IOException {
2550    if (cpHost != null) {
2551      cpHost.preGetClusterMetrics();
2552    }
2553    ClusterMetrics status = getClusterMetricsWithoutCoprocessor(options);
2554    if (cpHost != null) {
2555      cpHost.postGetClusterMetrics(status);
2556    }
2557    return status;
2558  }
2559
2560  private List<ServerName> getBackupMasters() throws InterruptedIOException {
2561    // Build Set of backup masters from ZK nodes
2562    List<String> backupMasterStrings;
2563    try {
2564      backupMasterStrings = ZKUtil.listChildrenNoWatch(this.zooKeeper,
2565        this.zooKeeper.getZNodePaths().backupMasterAddressesZNode);
2566    } catch (KeeperException e) {
2567      LOG.warn(this.zooKeeper.prefix("Unable to list backup servers"), e);
2568      backupMasterStrings = null;
2569    }
2570
2571    List<ServerName> backupMasters = Collections.emptyList();
2572    if (backupMasterStrings != null && !backupMasterStrings.isEmpty()) {
2573      backupMasters = new ArrayList<>(backupMasterStrings.size());
2574      for (String s: backupMasterStrings) {
2575        try {
2576          byte [] bytes;
2577          try {
2578            bytes = ZKUtil.getData(this.zooKeeper, ZNodePaths.joinZNode(
2579                this.zooKeeper.getZNodePaths().backupMasterAddressesZNode, s));
2580          } catch (InterruptedException e) {
2581            throw new InterruptedIOException();
2582          }
2583          if (bytes != null) {
2584            ServerName sn;
2585            try {
2586              sn = ProtobufUtil.parseServerNameFrom(bytes);
2587            } catch (DeserializationException e) {
2588              LOG.warn("Failed parse, skipping registering backup server", e);
2589              continue;
2590            }
2591            backupMasters.add(sn);
2592          }
2593        } catch (KeeperException e) {
2594          LOG.warn(this.zooKeeper.prefix("Unable to get information about " +
2595                   "backup servers"), e);
2596        }
2597      }
2598      Collections.sort(backupMasters, new Comparator<ServerName>() {
2599        @Override
2600        public int compare(ServerName s1, ServerName s2) {
2601          return s1.getServerName().compareTo(s2.getServerName());
2602        }});
2603    }
2604    return backupMasters;
2605  }
2606
2607  /**
2608   * The set of loaded coprocessors is stored in a static set. Since it's
2609   * statically allocated, it does not require that HMaster's cpHost be
2610   * initialized prior to accessing it.
2611   * @return a String representation of the set of names of the loaded coprocessors.
2612   */
2613  public static String getLoadedCoprocessors() {
2614    return CoprocessorHost.getLoadedCoprocessors().toString();
2615  }
2616
2617  /**
2618   * @return timestamp in millis when HMaster was started.
2619   */
2620  public long getMasterStartTime() {
2621    return startcode;
2622  }
2623
2624  /**
2625   * @return timestamp in millis when HMaster became the active master.
2626   */
2627  public long getMasterActiveTime() {
2628    return masterActiveTime;
2629  }
2630
2631  /**
2632   * @return timestamp in millis when HMaster finished becoming the active master
2633   */
2634  public long getMasterFinishedInitializationTime() {
2635    return masterFinishedInitializationTime;
2636  }
2637
2638  public int getNumWALFiles() {
2639    return procedureStore != null ? procedureStore.getActiveLogs().size() : 0;
2640  }
2641
2642  public WALProcedureStore getWalProcedureStore() {
2643    return procedureStore;
2644  }
2645
2646  public int getRegionServerInfoPort(final ServerName sn) {
2647    int port = this.serverManager.getInfoPort(sn);
2648    return port == 0 ? conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
2649      HConstants.DEFAULT_REGIONSERVER_INFOPORT) : port;
2650  }
2651
2652  @Override
2653  public String getRegionServerVersion(ServerName sn) {
2654    // Will return "0.0.0" if the server is not online to prevent move system region to unknown
2655    // version RS.
2656    return this.serverManager.getVersion(sn);
2657  }
2658
2659  @Override
2660  public void checkIfShouldMoveSystemRegionAsync() {
2661    assignmentManager.checkIfShouldMoveSystemRegionAsync();
2662  }
2663
2664  /**
2665   * @return array of coprocessor SimpleNames.
2666   */
2667  public String[] getMasterCoprocessors() {
2668    Set<String> masterCoprocessors = getMasterCoprocessorHost().getCoprocessors();
2669    return masterCoprocessors.toArray(new String[masterCoprocessors.size()]);
2670  }
2671
2672  @Override
2673  public void abort(String reason, Throwable cause) {
2674    if (isAborted() || isStopped()) {
2675      return;
2676    }
2677    setAbortRequested();
2678    if (cpHost != null) {
2679      // HBASE-4014: dump a list of loaded coprocessors.
2680      LOG.error(HBaseMarkers.FATAL, "Master server abort: loaded coprocessors are: " +
2681          getLoadedCoprocessors());
2682    }
2683    String msg = "***** ABORTING master " + this + ": " + reason + " *****";
2684    if (cause != null) {
2685      LOG.error(HBaseMarkers.FATAL, msg, cause);
2686    } else {
2687      LOG.error(HBaseMarkers.FATAL, msg);
2688    }
2689
2690    try {
2691      stopMaster();
2692    } catch (IOException e) {
2693      LOG.error("Exception occurred while stopping master", e);
2694    }
2695  }
2696
2697  @Override
2698  public ZKWatcher getZooKeeper() {
2699    return zooKeeper;
2700  }
2701
2702  @Override
2703  public MasterCoprocessorHost getMasterCoprocessorHost() {
2704    return cpHost;
2705  }
2706
2707  @Override
2708  public MasterQuotaManager getMasterQuotaManager() {
2709    return quotaManager;
2710  }
2711
2712  @Override
2713  public ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
2714    return procedureExecutor;
2715  }
2716
2717  @Override
2718  public ServerName getServerName() {
2719    return this.serverName;
2720  }
2721
2722  @Override
2723  public AssignmentManager getAssignmentManager() {
2724    return this.assignmentManager;
2725  }
2726
2727  @Override
2728  public CatalogJanitor getCatalogJanitor() {
2729    return this.catalogJanitorChore;
2730  }
2731
2732  public MemoryBoundedLogMessageBuffer getRegionServerFatalLogBuffer() {
2733    return rsFatals;
2734  }
2735
2736  /**
2737   * Shutdown the cluster.
2738   * Master runs a coordinated stop of all RegionServers and then itself.
2739   */
2740  public void shutdown() throws IOException {
2741    if (cpHost != null) {
2742      cpHost.preShutdown();
2743    }
2744    // Tell the servermanager cluster shutdown has been called. This makes it so when Master is
2745    // last running server, it'll stop itself. Next, we broadcast the cluster shutdown by setting
2746    // the cluster status as down. RegionServers will notice this change in state and will start
2747    // shutting themselves down. When last has exited, Master can go down.
2748    if (this.serverManager != null) {
2749      this.serverManager.shutdownCluster();
2750    }
2751    if (this.clusterStatusTracker != null) {
2752      try {
2753        this.clusterStatusTracker.setClusterDown();
2754      } catch (KeeperException e) {
2755        LOG.error("ZooKeeper exception trying to set cluster as down in ZK", e);
2756      }
2757    }
2758    // Stop the procedure executor. Will stop any ongoing assign, unassign, server crash etc.,
2759    // processing so we can go down.
2760    if (this.procedureExecutor != null) {
2761      this.procedureExecutor.stop();
2762    }
2763    // Shutdown our cluster connection. This will kill any hosted RPCs that might be going on;
2764    // this is what we want especially if the Master is in startup phase doing call outs to
2765    // hbase:meta, etc. when cluster is down. Without ths connection close, we'd have to wait on
2766    // the rpc to timeout.
2767    if (this.clusterConnection != null) {
2768      this.clusterConnection.close();
2769    }
2770  }
2771
2772  public void stopMaster() throws IOException {
2773    if (cpHost != null) {
2774      cpHost.preStopMaster();
2775    }
2776    stop("Stopped by " + Thread.currentThread().getName());
2777  }
2778
2779  @Override
2780  public void stop(String msg) {
2781    if (!isStopped()) {
2782      super.stop(msg);
2783      if (this.activeMasterManager != null) {
2784        this.activeMasterManager.stop();
2785      }
2786    }
2787  }
2788
2789  @VisibleForTesting
2790  protected void checkServiceStarted() throws ServerNotRunningYetException {
2791    if (!serviceStarted) {
2792      throw new ServerNotRunningYetException("Server is not running yet");
2793    }
2794  }
2795
2796  public static class MasterStoppedException extends DoNotRetryIOException {
2797    MasterStoppedException() {
2798      super();
2799    }
2800  }
2801
2802  void checkInitialized() throws PleaseHoldException, ServerNotRunningYetException,
2803      MasterNotRunningException, MasterStoppedException {
2804    checkServiceStarted();
2805    if (!isInitialized()) {
2806      throw new PleaseHoldException("Master is initializing");
2807    }
2808    if (isStopped()) {
2809      throw new MasterStoppedException();
2810    }
2811  }
2812
2813  /**
2814   * Report whether this master is currently the active master or not.
2815   * If not active master, we are parked on ZK waiting to become active.
2816   *
2817   * This method is used for testing.
2818   *
2819   * @return true if active master, false if not.
2820   */
2821  @Override
2822  public boolean isActiveMaster() {
2823    return activeMaster;
2824  }
2825
2826  /**
2827   * Report whether this master has completed with its initialization and is
2828   * ready.  If ready, the master is also the active master.  A standby master
2829   * is never ready.
2830   *
2831   * This method is used for testing.
2832   *
2833   * @return true if master is ready to go, false if not.
2834   */
2835  @Override
2836  public boolean isInitialized() {
2837    return initialized.isReady();
2838  }
2839
2840  /**
2841   * Report whether this master is in maintenance mode.
2842   *
2843   * @return true if master is in maintenanceMode
2844   */
2845  @Override
2846  public boolean isInMaintenanceMode() {
2847    return maintenanceMode;
2848  }
2849
2850  @VisibleForTesting
2851  public void setInitialized(boolean isInitialized) {
2852    procedureExecutor.getEnvironment().setEventReady(initialized, isInitialized);
2853  }
2854
2855  @Override
2856  public ProcedureEvent<?> getInitializedEvent() {
2857    return initialized;
2858  }
2859
2860  /**
2861   * Compute the average load across all region servers.
2862   * Currently, this uses a very naive computation - just uses the number of
2863   * regions being served, ignoring stats about number of requests.
2864   * @return the average load
2865   */
2866  public double getAverageLoad() {
2867    if (this.assignmentManager == null) {
2868      return 0;
2869    }
2870
2871    RegionStates regionStates = this.assignmentManager.getRegionStates();
2872    if (regionStates == null) {
2873      return 0;
2874    }
2875    return regionStates.getAverageLoad();
2876  }
2877
2878  /*
2879   * @return the count of region split plans executed
2880   */
2881  public long getSplitPlanCount() {
2882    return splitPlanCount;
2883  }
2884
2885  /*
2886   * @return the count of region merge plans executed
2887   */
2888  public long getMergePlanCount() {
2889    return mergePlanCount;
2890  }
2891
2892  @Override
2893  public boolean registerService(Service instance) {
2894    /*
2895     * No stacking of instances is allowed for a single service name
2896     */
2897    Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
2898    String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc);
2899    if (coprocessorServiceHandlers.containsKey(serviceName)) {
2900      LOG.error("Coprocessor service "+serviceName+
2901          " already registered, rejecting request from "+instance
2902      );
2903      return false;
2904    }
2905
2906    coprocessorServiceHandlers.put(serviceName, instance);
2907    if (LOG.isDebugEnabled()) {
2908      LOG.debug("Registered master coprocessor service: service="+serviceName);
2909    }
2910    return true;
2911  }
2912
2913  /**
2914   * Utility for constructing an instance of the passed HMaster class.
2915   * @param masterClass
2916   * @return HMaster instance.
2917   */
2918  public static HMaster constructMaster(Class<? extends HMaster> masterClass,
2919      final Configuration conf)  {
2920    try {
2921      Constructor<? extends HMaster> c = masterClass.getConstructor(Configuration.class);
2922      return c.newInstance(conf);
2923    } catch(Exception e) {
2924      Throwable error = e;
2925      if (e instanceof InvocationTargetException &&
2926          ((InvocationTargetException)e).getTargetException() != null) {
2927        error = ((InvocationTargetException)e).getTargetException();
2928      }
2929      throw new RuntimeException("Failed construction of Master: " + masterClass.toString() + ". "
2930        , error);
2931    }
2932  }
2933
2934  /**
2935   * @see org.apache.hadoop.hbase.master.HMasterCommandLine
2936   */
2937  public static void main(String [] args) {
2938    LOG.info("STARTING service " + HMaster.class.getSimpleName());
2939    VersionInfo.logVersion();
2940    new HMasterCommandLine(HMaster.class).doMain(args);
2941  }
2942
2943  public HFileCleaner getHFileCleaner() {
2944    return this.hfileCleaner;
2945  }
2946
2947  public LogCleaner getLogCleaner() {
2948    return this.logCleaner;
2949  }
2950
2951  /**
2952   * @return the underlying snapshot manager
2953   */
2954  @Override
2955  public SnapshotManager getSnapshotManager() {
2956    return this.snapshotManager;
2957  }
2958
2959  /**
2960   * @return the underlying MasterProcedureManagerHost
2961   */
2962  @Override
2963  public MasterProcedureManagerHost getMasterProcedureManagerHost() {
2964    return mpmHost;
2965  }
2966
2967  @Override
2968  public ClusterSchema getClusterSchema() {
2969    return this.clusterSchemaService;
2970  }
2971
2972  /**
2973   * Create a new Namespace.
2974   * @param namespaceDescriptor descriptor for new Namespace
2975   * @param nonceGroup Identifier for the source of the request, a client or process.
2976   * @param nonce A unique identifier for this operation from the client or process identified by
2977   * <code>nonceGroup</code> (the source must ensure each operation gets a unique id).
2978   * @return procedure id
2979   */
2980  long createNamespace(final NamespaceDescriptor namespaceDescriptor, final long nonceGroup,
2981      final long nonce) throws IOException {
2982    checkInitialized();
2983
2984    TableName.isLegalNamespaceName(Bytes.toBytes(namespaceDescriptor.getName()));
2985
2986    return MasterProcedureUtil.submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this,
2987          nonceGroup, nonce) {
2988      @Override
2989      protected void run() throws IOException {
2990        getMaster().getMasterCoprocessorHost().preCreateNamespace(namespaceDescriptor);
2991        // We need to wait for the procedure to potentially fail due to "prepare" sanity
2992        // checks. This will block only the beginning of the procedure. See HBASE-19953.
2993        ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch();
2994        LOG.info(getClientIdAuditPrefix() + " creating " + namespaceDescriptor);
2995        // Execute the operation synchronously - wait for the operation to complete before
2996        // continuing.
2997        setProcId(getClusterSchema().createNamespace(namespaceDescriptor, getNonceKey(), latch));
2998        latch.await();
2999        getMaster().getMasterCoprocessorHost().postCreateNamespace(namespaceDescriptor);
3000      }
3001
3002      @Override
3003      protected String getDescription() {
3004        return "CreateNamespaceProcedure";
3005      }
3006    });
3007  }
3008
3009  /**
3010   * Modify an existing Namespace.
3011   * @param nonceGroup Identifier for the source of the request, a client or process.
3012   * @param nonce A unique identifier for this operation from the client or process identified by
3013   * <code>nonceGroup</code> (the source must ensure each operation gets a unique id).
3014   * @return procedure id
3015   */
3016  long modifyNamespace(final NamespaceDescriptor newNsDescriptor, final long nonceGroup,
3017      final long nonce) throws IOException {
3018    checkInitialized();
3019
3020    TableName.isLegalNamespaceName(Bytes.toBytes(newNsDescriptor.getName()));
3021
3022    return MasterProcedureUtil.submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this,
3023          nonceGroup, nonce) {
3024      @Override
3025      protected void run() throws IOException {
3026        NamespaceDescriptor oldNsDescriptor = getNamespace(newNsDescriptor.getName());
3027        getMaster().getMasterCoprocessorHost().preModifyNamespace(oldNsDescriptor, newNsDescriptor);
3028        // We need to wait for the procedure to potentially fail due to "prepare" sanity
3029        // checks. This will block only the beginning of the procedure. See HBASE-19953.
3030        ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch();
3031        LOG.info(getClientIdAuditPrefix() + " modify " + newNsDescriptor);
3032        // Execute the operation synchronously - wait for the operation to complete before
3033        // continuing.
3034        setProcId(getClusterSchema().modifyNamespace(newNsDescriptor, getNonceKey(), latch));
3035        latch.await();
3036        getMaster().getMasterCoprocessorHost().postModifyNamespace(oldNsDescriptor,
3037          newNsDescriptor);
3038      }
3039
3040      @Override
3041      protected String getDescription() {
3042        return "ModifyNamespaceProcedure";
3043      }
3044    });
3045  }
3046
3047  /**
3048   * Delete an existing Namespace. Only empty Namespaces (no tables) can be removed.
3049   * @param nonceGroup Identifier for the source of the request, a client or process.
3050   * @param nonce A unique identifier for this operation from the client or process identified by
3051   * <code>nonceGroup</code> (the source must ensure each operation gets a unique id).
3052   * @return procedure id
3053   */
3054  long deleteNamespace(final String name, final long nonceGroup, final long nonce)
3055      throws IOException {
3056    checkInitialized();
3057
3058    return MasterProcedureUtil.submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this,
3059          nonceGroup, nonce) {
3060      @Override
3061      protected void run() throws IOException {
3062        getMaster().getMasterCoprocessorHost().preDeleteNamespace(name);
3063        LOG.info(getClientIdAuditPrefix() + " delete " + name);
3064        // Execute the operation synchronously - wait for the operation to complete before
3065        // continuing.
3066        //
3067        // We need to wait for the procedure to potentially fail due to "prepare" sanity
3068        // checks. This will block only the beginning of the procedure. See HBASE-19953.
3069        ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch();
3070        setProcId(submitProcedure(
3071              new DeleteNamespaceProcedure(procedureExecutor.getEnvironment(), name, latch)));
3072        latch.await();
3073        // Will not be invoked in the face of Exception thrown by the Procedure's execution
3074        getMaster().getMasterCoprocessorHost().postDeleteNamespace(name);
3075      }
3076
3077      @Override
3078      protected String getDescription() {
3079        return "DeleteNamespaceProcedure";
3080      }
3081    });
3082  }
3083
3084  /**
3085   * Get a Namespace
3086   * @param name Name of the Namespace
3087   * @return Namespace descriptor for <code>name</code>
3088   */
3089  NamespaceDescriptor getNamespace(String name) throws IOException {
3090    checkInitialized();
3091    if (this.cpHost != null) this.cpHost.preGetNamespaceDescriptor(name);
3092    NamespaceDescriptor nsd = this.clusterSchemaService.getNamespace(name);
3093    if (this.cpHost != null) this.cpHost.postGetNamespaceDescriptor(nsd);
3094    return nsd;
3095  }
3096
3097  /**
3098   * Get all Namespaces
3099   * @return All Namespace descriptors
3100   */
3101  List<NamespaceDescriptor> getNamespaces() throws IOException {
3102    checkInitialized();
3103    final List<NamespaceDescriptor> nsds = new ArrayList<>();
3104    if (cpHost != null) {
3105      cpHost.preListNamespaceDescriptors(nsds);
3106    }
3107    nsds.addAll(this.clusterSchemaService.getNamespaces());
3108    if (this.cpHost != null) {
3109      this.cpHost.postListNamespaceDescriptors(nsds);
3110    }
3111    return nsds;
3112  }
3113
3114  @Override
3115  public List<TableName> listTableNamesByNamespace(String name) throws IOException {
3116    checkInitialized();
3117    return listTableNames(name, null, true);
3118  }
3119
3120  @Override
3121  public List<TableDescriptor> listTableDescriptorsByNamespace(String name) throws IOException {
3122    checkInitialized();
3123    return listTableDescriptors(name, null, null, true);
3124  }
3125
3126  @Override
3127  public boolean abortProcedure(final long procId, final boolean mayInterruptIfRunning)
3128      throws IOException {
3129    if (cpHost != null) {
3130      cpHost.preAbortProcedure(this.procedureExecutor, procId);
3131    }
3132
3133    final boolean result = this.procedureExecutor.abort(procId, mayInterruptIfRunning);
3134
3135    if (cpHost != null) {
3136      cpHost.postAbortProcedure();
3137    }
3138
3139    return result;
3140  }
3141
3142  @Override
3143  public List<Procedure<?>> getProcedures() throws IOException {
3144    if (cpHost != null) {
3145      cpHost.preGetProcedures();
3146    }
3147
3148    @SuppressWarnings({ "unchecked", "rawtypes" })
3149    List<Procedure<?>> procList = (List) this.procedureExecutor.getProcedures();
3150
3151    if (cpHost != null) {
3152      cpHost.postGetProcedures(procList);
3153    }
3154
3155    return procList;
3156  }
3157
3158  @Override
3159  public List<LockedResource> getLocks() throws IOException {
3160    if (cpHost != null) {
3161      cpHost.preGetLocks();
3162    }
3163
3164    MasterProcedureScheduler procedureScheduler =
3165      procedureExecutor.getEnvironment().getProcedureScheduler();
3166
3167    final List<LockedResource> lockedResources = procedureScheduler.getLocks();
3168
3169    if (cpHost != null) {
3170      cpHost.postGetLocks(lockedResources);
3171    }
3172
3173    return lockedResources;
3174  }
3175
3176  /**
3177   * Returns the list of table descriptors that match the specified request
3178   * @param namespace the namespace to query, or null if querying for all
3179   * @param regex The regular expression to match against, or null if querying for all
3180   * @param tableNameList the list of table names, or null if querying for all
3181   * @param includeSysTables False to match only against userspace tables
3182   * @return the list of table descriptors
3183   */
3184  public List<TableDescriptor> listTableDescriptors(final String namespace, final String regex,
3185      final List<TableName> tableNameList, final boolean includeSysTables)
3186  throws IOException {
3187    List<TableDescriptor> htds = new ArrayList<>();
3188    if (cpHost != null) {
3189      cpHost.preGetTableDescriptors(tableNameList, htds, regex);
3190    }
3191    htds = getTableDescriptors(htds, namespace, regex, tableNameList, includeSysTables);
3192    if (cpHost != null) {
3193      cpHost.postGetTableDescriptors(tableNameList, htds, regex);
3194    }
3195    return htds;
3196  }
3197
3198  /**
3199   * Returns the list of table names that match the specified request
3200   * @param regex The regular expression to match against, or null if querying for all
3201   * @param namespace the namespace to query, or null if querying for all
3202   * @param includeSysTables False to match only against userspace tables
3203   * @return the list of table names
3204   */
3205  public List<TableName> listTableNames(final String namespace, final String regex,
3206      final boolean includeSysTables) throws IOException {
3207    List<TableDescriptor> htds = new ArrayList<>();
3208    if (cpHost != null) {
3209      cpHost.preGetTableNames(htds, regex);
3210    }
3211    htds = getTableDescriptors(htds, namespace, regex, null, includeSysTables);
3212    if (cpHost != null) {
3213      cpHost.postGetTableNames(htds, regex);
3214    }
3215    List<TableName> result = new ArrayList<>(htds.size());
3216    for (TableDescriptor htd: htds) result.add(htd.getTableName());
3217    return result;
3218  }
3219
3220  /**
3221   * @return list of table table descriptors after filtering by regex and whether to include system
3222   *    tables, etc.
3223   * @throws IOException
3224   */
3225  private List<TableDescriptor> getTableDescriptors(final List<TableDescriptor> htds,
3226      final String namespace, final String regex, final List<TableName> tableNameList,
3227      final boolean includeSysTables)
3228  throws IOException {
3229    if (tableNameList == null || tableNameList.isEmpty()) {
3230      // request for all TableDescriptors
3231      Collection<TableDescriptor> allHtds;
3232      if (namespace != null && namespace.length() > 0) {
3233        // Do a check on the namespace existence. Will fail if does not exist.
3234        this.clusterSchemaService.getNamespace(namespace);
3235        allHtds = tableDescriptors.getByNamespace(namespace).values();
3236      } else {
3237        allHtds = tableDescriptors.getAll().values();
3238      }
3239      for (TableDescriptor desc: allHtds) {
3240        if (tableStateManager.isTablePresent(desc.getTableName())
3241            && (includeSysTables || !desc.getTableName().isSystemTable())) {
3242          htds.add(desc);
3243        }
3244      }
3245    } else {
3246      for (TableName s: tableNameList) {
3247        if (tableStateManager.isTablePresent(s)) {
3248          TableDescriptor desc = tableDescriptors.get(s);
3249          if (desc != null) {
3250            htds.add(desc);
3251          }
3252        }
3253      }
3254    }
3255
3256    // Retains only those matched by regular expression.
3257    if (regex != null) filterTablesByRegex(htds, Pattern.compile(regex));
3258    return htds;
3259  }
3260
3261  /**
3262   * Removes the table descriptors that don't match the pattern.
3263   * @param descriptors list of table descriptors to filter
3264   * @param pattern the regex to use
3265   */
3266  private static void filterTablesByRegex(final Collection<TableDescriptor> descriptors,
3267      final Pattern pattern) {
3268    final String defaultNS = NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR;
3269    Iterator<TableDescriptor> itr = descriptors.iterator();
3270    while (itr.hasNext()) {
3271      TableDescriptor htd = itr.next();
3272      String tableName = htd.getTableName().getNameAsString();
3273      boolean matched = pattern.matcher(tableName).matches();
3274      if (!matched && htd.getTableName().getNamespaceAsString().equals(defaultNS)) {
3275        matched = pattern.matcher(defaultNS + TableName.NAMESPACE_DELIM + tableName).matches();
3276      }
3277      if (!matched) {
3278        itr.remove();
3279      }
3280    }
3281  }
3282
3283  @Override
3284  public long getLastMajorCompactionTimestamp(TableName table) throws IOException {
3285    return getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))
3286        .getLastMajorCompactionTimestamp(table);
3287  }
3288
3289  @Override
3290  public long getLastMajorCompactionTimestampForRegion(byte[] regionName) throws IOException {
3291    return getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))
3292        .getLastMajorCompactionTimestamp(regionName);
3293  }
3294
3295  /**
3296   * Gets the mob file compaction state for a specific table.
3297   * Whether all the mob files are selected is known during the compaction execution, but
3298   * the statistic is done just before compaction starts, it is hard to know the compaction
3299   * type at that time, so the rough statistics are chosen for the mob file compaction. Only two
3300   * compaction states are available, CompactionState.MAJOR_AND_MINOR and CompactionState.NONE.
3301   * @param tableName The current table name.
3302   * @return If a given table is in mob file compaction now.
3303   */
3304  public CompactionState getMobCompactionState(TableName tableName) {
3305    AtomicInteger compactionsCount = mobCompactionStates.get(tableName);
3306    if (compactionsCount != null && compactionsCount.get() != 0) {
3307      return CompactionState.MAJOR_AND_MINOR;
3308    }
3309    return CompactionState.NONE;
3310  }
3311
3312  public void reportMobCompactionStart(TableName tableName) throws IOException {
3313    IdLock.Entry lockEntry = null;
3314    try {
3315      lockEntry = mobCompactionLock.getLockEntry(tableName.hashCode());
3316      AtomicInteger compactionsCount = mobCompactionStates.get(tableName);
3317      if (compactionsCount == null) {
3318        compactionsCount = new AtomicInteger(0);
3319        mobCompactionStates.put(tableName, compactionsCount);
3320      }
3321      compactionsCount.incrementAndGet();
3322    } finally {
3323      if (lockEntry != null) {
3324        mobCompactionLock.releaseLockEntry(lockEntry);
3325      }
3326    }
3327  }
3328
3329  public void reportMobCompactionEnd(TableName tableName) throws IOException {
3330    IdLock.Entry lockEntry = null;
3331    try {
3332      lockEntry = mobCompactionLock.getLockEntry(tableName.hashCode());
3333      AtomicInteger compactionsCount = mobCompactionStates.get(tableName);
3334      if (compactionsCount != null) {
3335        int count = compactionsCount.decrementAndGet();
3336        // remove the entry if the count is 0.
3337        if (count == 0) {
3338          mobCompactionStates.remove(tableName);
3339        }
3340      }
3341    } finally {
3342      if (lockEntry != null) {
3343        mobCompactionLock.releaseLockEntry(lockEntry);
3344      }
3345    }
3346  }
3347
3348  /**
3349   * Requests mob compaction.
3350   * @param tableName The table the compact.
3351   * @param columns The compacted columns.
3352   * @param allFiles Whether add all mob files into the compaction.
3353   */
3354  public void requestMobCompaction(TableName tableName,
3355                                   List<ColumnFamilyDescriptor> columns, boolean allFiles) throws IOException {
3356    mobCompactThread.requestMobCompaction(conf, fs, tableName, columns, allFiles);
3357  }
3358
3359  /**
3360   * Queries the state of the {@link LoadBalancerTracker}. If the balancer is not initialized,
3361   * false is returned.
3362   *
3363   * @return The state of the load balancer, or false if the load balancer isn't defined.
3364   */
3365  public boolean isBalancerOn() {
3366    return !isInMaintenanceMode()
3367        && loadBalancerTracker != null
3368        && loadBalancerTracker.isBalancerOn();
3369  }
3370
3371  /**
3372   * Queries the state of the {@link RegionNormalizerTracker}. If it's not initialized,
3373   * false is returned.
3374   */
3375  public boolean isNormalizerOn() {
3376    return !isInMaintenanceMode()
3377        && regionNormalizerTracker != null
3378        && regionNormalizerTracker.isNormalizerOn();
3379  }
3380
3381  /**
3382   * Queries the state of the {@link SplitOrMergeTracker}. If it is not initialized,
3383   * false is returned. If switchType is illegal, false will return.
3384   * @param switchType see {@link org.apache.hadoop.hbase.client.MasterSwitchType}
3385   * @return The state of the switch
3386   */
3387  @Override
3388  public boolean isSplitOrMergeEnabled(MasterSwitchType switchType) {
3389    return !isInMaintenanceMode()
3390        && splitOrMergeTracker != null
3391        && splitOrMergeTracker.isSplitOrMergeEnabled(switchType);
3392  }
3393
3394  /**
3395   * Fetch the configured {@link LoadBalancer} class name. If none is set, a default is returned.
3396   *
3397   * @return The name of the {@link LoadBalancer} in use.
3398   */
3399  public String getLoadBalancerClassName() {
3400    return conf.get(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, LoadBalancerFactory
3401        .getDefaultLoadBalancerClass().getName());
3402  }
3403
3404  /**
3405   * @return RegionNormalizerTracker instance
3406   */
3407  public RegionNormalizerTracker getRegionNormalizerTracker() {
3408    return regionNormalizerTracker;
3409  }
3410
3411  public SplitOrMergeTracker getSplitOrMergeTracker() {
3412    return splitOrMergeTracker;
3413  }
3414
3415  @Override
3416  public LoadBalancer getLoadBalancer() {
3417    return balancer;
3418  }
3419
3420  @Override
3421  public FavoredNodesManager getFavoredNodesManager() {
3422    return favoredNodesManager;
3423  }
3424
3425  private long executePeerProcedure(ModifyPeerProcedure procedure) throws IOException {
3426    long procId = procedureExecutor.submitProcedure(procedure);
3427    procedure.getLatch().await();
3428    return procId;
3429  }
3430
3431  @Override
3432  public long addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
3433      throws ReplicationException, IOException {
3434    LOG.info(getClientIdAuditPrefix() + " creating replication peer, id=" + peerId + ", config=" +
3435      peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED"));
3436    return executePeerProcedure(new AddPeerProcedure(peerId, peerConfig, enabled));
3437  }
3438
3439  @Override
3440  public long removeReplicationPeer(String peerId) throws ReplicationException, IOException {
3441    LOG.info(getClientIdAuditPrefix() + " removing replication peer, id=" + peerId);
3442    return executePeerProcedure(new RemovePeerProcedure(peerId));
3443  }
3444
3445  @Override
3446  public long enableReplicationPeer(String peerId) throws ReplicationException, IOException {
3447    LOG.info(getClientIdAuditPrefix() + " enable replication peer, id=" + peerId);
3448    return executePeerProcedure(new EnablePeerProcedure(peerId));
3449  }
3450
3451  @Override
3452  public long disableReplicationPeer(String peerId) throws ReplicationException, IOException {
3453    LOG.info(getClientIdAuditPrefix() + " disable replication peer, id=" + peerId);
3454    return executePeerProcedure(new DisablePeerProcedure(peerId));
3455  }
3456
3457  @Override
3458  public ReplicationPeerConfig getReplicationPeerConfig(String peerId)
3459      throws ReplicationException, IOException {
3460    if (cpHost != null) {
3461      cpHost.preGetReplicationPeerConfig(peerId);
3462    }
3463    LOG.info(getClientIdAuditPrefix() + " get replication peer config, id=" + peerId);
3464    ReplicationPeerConfig peerConfig = this.replicationPeerManager.getPeerConfig(peerId)
3465        .orElseThrow(() -> new ReplicationPeerNotFoundException(peerId));
3466    if (cpHost != null) {
3467      cpHost.postGetReplicationPeerConfig(peerId);
3468    }
3469    return peerConfig;
3470  }
3471
3472  @Override
3473  public long updateReplicationPeerConfig(String peerId, ReplicationPeerConfig peerConfig)
3474      throws ReplicationException, IOException {
3475    LOG.info(getClientIdAuditPrefix() + " update replication peer config, id=" + peerId +
3476      ", config=" + peerConfig);
3477    return executePeerProcedure(new UpdatePeerConfigProcedure(peerId, peerConfig));
3478  }
3479
3480  @Override
3481  public List<ReplicationPeerDescription> listReplicationPeers(String regex)
3482      throws ReplicationException, IOException {
3483    if (cpHost != null) {
3484      cpHost.preListReplicationPeers(regex);
3485    }
3486    LOG.debug("{} list replication peers, regex={}", getClientIdAuditPrefix(), regex);
3487    Pattern pattern = regex == null ? null : Pattern.compile(regex);
3488    List<ReplicationPeerDescription> peers =
3489      this.replicationPeerManager.listPeers(pattern);
3490    if (cpHost != null) {
3491      cpHost.postListReplicationPeers(regex);
3492    }
3493    return peers;
3494  }
3495
3496  /**
3497   * Mark region server(s) as decommissioned (previously called 'draining') to prevent additional
3498   * regions from getting assigned to them. Also unload the regions on the servers asynchronously.0
3499   * @param servers Region servers to decommission.
3500   */
3501  public void decommissionRegionServers(final List<ServerName> servers, final boolean offload)
3502      throws HBaseIOException {
3503    List<ServerName> serversAdded = new ArrayList<>(servers.size());
3504    // Place the decommission marker first.
3505    String parentZnode = getZooKeeper().getZNodePaths().drainingZNode;
3506    for (ServerName server : servers) {
3507      try {
3508        String node = ZNodePaths.joinZNode(parentZnode, server.getServerName());
3509        ZKUtil.createAndFailSilent(getZooKeeper(), node);
3510      } catch (KeeperException ke) {
3511        throw new HBaseIOException(
3512          this.zooKeeper.prefix("Unable to decommission '" + server.getServerName() + "'."), ke);
3513      }
3514      if (this.serverManager.addServerToDrainList(server)) {
3515        serversAdded.add(server);
3516      };
3517    }
3518    // Move the regions off the decommissioned servers.
3519    if (offload) {
3520      final List<ServerName> destServers = this.serverManager.createDestinationServersList();
3521      for (ServerName server : serversAdded) {
3522        final List<RegionInfo> regionsOnServer = this.assignmentManager.getRegionsOnServer(server);
3523        for (RegionInfo hri : regionsOnServer) {
3524          ServerName dest = balancer.randomAssignment(hri, destServers);
3525          if (dest == null) {
3526            throw new HBaseIOException("Unable to determine a plan to move " + hri);
3527          }
3528          RegionPlan rp = new RegionPlan(hri, server, dest);
3529          this.assignmentManager.moveAsync(rp);
3530        }
3531      }
3532    }
3533  }
3534
3535  /**
3536   * List region servers marked as decommissioned (previously called 'draining') to not get regions
3537   * assigned to them.
3538   * @return List of decommissioned servers.
3539   */
3540  public List<ServerName> listDecommissionedRegionServers() {
3541    return this.serverManager.getDrainingServersList();
3542  }
3543
3544  /**
3545   * Remove decommission marker (previously called 'draining') from a region server to allow regions
3546   * assignments. Load regions onto the server asynchronously if a list of regions is given
3547   * @param server Region server to remove decommission marker from.
3548   */
3549  public void recommissionRegionServer(final ServerName server,
3550      final List<byte[]> encodedRegionNames) throws IOException {
3551    // Remove the server from decommissioned (draining) server list.
3552    String parentZnode = getZooKeeper().getZNodePaths().drainingZNode;
3553    String node = ZNodePaths.joinZNode(parentZnode, server.getServerName());
3554    try {
3555      ZKUtil.deleteNodeFailSilent(getZooKeeper(), node);
3556    } catch (KeeperException ke) {
3557      throw new HBaseIOException(
3558        this.zooKeeper.prefix("Unable to recommission '" + server.getServerName() + "'."), ke);
3559    }
3560    this.serverManager.removeServerFromDrainList(server);
3561
3562    // Load the regions onto the server if we are given a list of regions.
3563    if (encodedRegionNames == null || encodedRegionNames.isEmpty()) {
3564      return;
3565    }
3566    if (!this.serverManager.isServerOnline(server)) {
3567      return;
3568    }
3569    for (byte[] encodedRegionName : encodedRegionNames) {
3570      RegionState regionState =
3571        assignmentManager.getRegionStates().getRegionState(Bytes.toString(encodedRegionName));
3572      if (regionState == null) {
3573        LOG.warn("Unknown region " + Bytes.toStringBinary(encodedRegionName));
3574        continue;
3575      }
3576      RegionInfo hri = regionState.getRegion();
3577      if (server.equals(regionState.getServerName())) {
3578        LOG.info("Skipping move of region " + hri.getRegionNameAsString() +
3579          " because region already assigned to the same server " + server + ".");
3580        continue;
3581      }
3582      RegionPlan rp = new RegionPlan(hri, regionState.getServerName(), server);
3583      this.assignmentManager.moveAsync(rp);
3584    }
3585  }
3586
3587  @Override
3588  public LockManager getLockManager() {
3589    return lockManager;
3590  }
3591
3592  public QuotaObserverChore getQuotaObserverChore() {
3593    return this.quotaObserverChore;
3594  }
3595
3596  public SpaceQuotaSnapshotNotifier getSpaceQuotaSnapshotNotifier() {
3597    return this.spaceQuotaSnapshotNotifier;
3598  }
3599
3600  @SuppressWarnings("unchecked")
3601  private RemoteProcedure<MasterProcedureEnv, ?> getRemoteProcedure(long procId) {
3602    Procedure<?> procedure = procedureExecutor.getProcedure(procId);
3603    if (procedure == null) {
3604      return null;
3605    }
3606    assert procedure instanceof RemoteProcedure;
3607    return (RemoteProcedure<MasterProcedureEnv, ?>) procedure;
3608  }
3609
3610  public void remoteProcedureCompleted(long procId) {
3611    LOG.debug("Remote procedure done, pid={}", procId);
3612    RemoteProcedure<MasterProcedureEnv, ?> procedure = getRemoteProcedure(procId);
3613    if (procedure != null) {
3614      procedure.remoteOperationCompleted(procedureExecutor.getEnvironment());
3615    }
3616  }
3617
3618  public void remoteProcedureFailed(long procId, RemoteProcedureException error) {
3619    LOG.debug("Remote procedure failed, pid={}", procId, error);
3620    RemoteProcedure<MasterProcedureEnv, ?> procedure = getRemoteProcedure(procId);
3621    if (procedure != null) {
3622      procedure.remoteOperationFailed(procedureExecutor.getEnvironment(), error);
3623    }
3624  }
3625
3626  @Override
3627  public ReplicationPeerManager getReplicationPeerManager() {
3628    return replicationPeerManager;
3629  }
3630
3631  public HashMap<String, List<Pair<ServerName, ReplicationLoadSource>>>
3632      getReplicationLoad(ServerName[] serverNames) {
3633    List<ReplicationPeerDescription> peerList = this.getReplicationPeerManager().listPeers(null);
3634    if (peerList == null) {
3635      return null;
3636    }
3637    HashMap<String, List<Pair<ServerName, ReplicationLoadSource>>> replicationLoadSourceMap =
3638        new HashMap<>(peerList.size());
3639    peerList.stream()
3640        .forEach(peer -> replicationLoadSourceMap.put(peer.getPeerId(), new ArrayList<>()));
3641    for (ServerName serverName : serverNames) {
3642      List<ReplicationLoadSource> replicationLoadSources =
3643          getServerManager().getLoad(serverName).getReplicationLoadSourceList();
3644      for (ReplicationLoadSource replicationLoadSource : replicationLoadSources) {
3645        replicationLoadSourceMap.get(replicationLoadSource.getPeerID())
3646            .add(new Pair<>(serverName, replicationLoadSource));
3647      }
3648    }
3649    for (List<Pair<ServerName, ReplicationLoadSource>> loads : replicationLoadSourceMap.values()) {
3650      if (loads.size() > 0) {
3651        loads.sort(Comparator.comparingLong(load -> (-1) * load.getSecond().getReplicationLag()));
3652      }
3653    }
3654    return replicationLoadSourceMap;
3655  }
3656
3657  /**
3658   * This method modifies the master's configuration in order to inject replication-related features
3659   */
3660  @VisibleForTesting
3661  public static void decorateMasterConfiguration(Configuration conf) {
3662    String plugins = conf.get(HBASE_MASTER_LOGCLEANER_PLUGINS);
3663    String cleanerClass = ReplicationLogCleaner.class.getCanonicalName();
3664    if (!plugins.contains(cleanerClass)) {
3665      conf.set(HBASE_MASTER_LOGCLEANER_PLUGINS, plugins + "," + cleanerClass);
3666    }
3667    if (ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf)) {
3668      plugins = conf.get(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
3669      cleanerClass = ReplicationHFileCleaner.class.getCanonicalName();
3670      if (!plugins.contains(cleanerClass)) {
3671        conf.set(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, plugins + "," + cleanerClass);
3672      }
3673    }
3674  }
3675
3676  @Override
3677  public Map<String, ReplicationStatus> getWalGroupsReplicationStatus() {
3678    if (!this.isOnline() || !LoadBalancer.isMasterCanHostUserRegions(conf)) {
3679      return new HashMap<>();
3680    }
3681    return super.getWalGroupsReplicationStatus();
3682  }
3683
3684  public HbckChore getHbckChore() {
3685    return this.hbckChore;
3686  }
3687
3688  @Override
3689  public void runReplicationBarrierCleaner() {
3690    ReplicationBarrierCleaner rbc = this.replicationBarrierCleaner;
3691    if (rbc != null) {
3692      rbc.chore();
3693    }
3694  }
3695
3696  public SnapshotQuotaObserverChore getSnapshotQuotaObserverChore() {
3697    return this.snapshotQuotaChore;
3698  }
3699}