View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.lang.reflect.Constructor;
24  import java.lang.reflect.InvocationTargetException;
25  import java.net.InetAddress;
26  import java.net.InetSocketAddress;
27  import java.net.UnknownHostException;
28  import java.util.ArrayList;
29  import java.util.Collection;
30  import java.util.Collections;
31  import java.util.Comparator;
32  import java.util.HashSet;
33  import java.util.Iterator;
34  import java.util.List;
35  import java.util.Map;
36  import java.util.Set;
37  import java.util.concurrent.TimeUnit;
38  import java.util.concurrent.atomic.AtomicReference;
39  import java.util.regex.Pattern;
40  
41  import javax.servlet.ServletException;
42  import javax.servlet.http.HttpServlet;
43  import javax.servlet.http.HttpServletRequest;
44  import javax.servlet.http.HttpServletResponse;
45  
46  import org.apache.commons.logging.Log;
47  import org.apache.commons.logging.LogFactory;
48  import org.apache.hadoop.conf.Configuration;
49  import org.apache.hadoop.fs.Path;
50  import org.apache.hadoop.hbase.ClusterStatus;
51  import org.apache.hadoop.hbase.CoordinatedStateException;
52  import org.apache.hadoop.hbase.CoordinatedStateManager;
53  import org.apache.hadoop.hbase.DoNotRetryIOException;
54  import org.apache.hadoop.hbase.HBaseIOException;
55  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
56  import org.apache.hadoop.hbase.HColumnDescriptor;
57  import org.apache.hadoop.hbase.HConstants;
58  import org.apache.hadoop.hbase.HRegionInfo;
59  import org.apache.hadoop.hbase.HTableDescriptor;
60  import org.apache.hadoop.hbase.MasterNotRunningException;
61  import org.apache.hadoop.hbase.MetaMigrationConvertingToPB;
62  import org.apache.hadoop.hbase.MetaTableAccessor;
63  import org.apache.hadoop.hbase.NamespaceDescriptor;
64  import org.apache.hadoop.hbase.NamespaceNotFoundException;
65  import org.apache.hadoop.hbase.PleaseHoldException;
66  import org.apache.hadoop.hbase.ProcedureInfo;
67  import org.apache.hadoop.hbase.RegionStateListener;
68  import org.apache.hadoop.hbase.ScheduledChore;
69  import org.apache.hadoop.hbase.Server;
70  import org.apache.hadoop.hbase.ServerLoad;
71  import org.apache.hadoop.hbase.ServerName;
72  import org.apache.hadoop.hbase.TableDescriptors;
73  import org.apache.hadoop.hbase.TableName;
74  import org.apache.hadoop.hbase.TableNotDisabledException;
75  import org.apache.hadoop.hbase.TableNotFoundException;
76  import org.apache.hadoop.hbase.UnknownRegionException;
77  import org.apache.hadoop.hbase.classification.InterfaceAudience;
78  import org.apache.hadoop.hbase.client.MetaScanner;
79  import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
80  import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
81  import org.apache.hadoop.hbase.client.RegionReplicaUtil;
82  import org.apache.hadoop.hbase.client.Result;
83  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
84  import org.apache.hadoop.hbase.exceptions.DeserializationException;
85  import org.apache.hadoop.hbase.executor.ExecutorType;
86  import org.apache.hadoop.hbase.http.InfoServer;
87  import org.apache.hadoop.hbase.ipc.RpcServer;
88  import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
89  import org.apache.hadoop.hbase.master.MasterRpcServices.BalanceSwitchMode;
90  import org.apache.hadoop.hbase.master.RegionState.State;
91  import org.apache.hadoop.hbase.master.balancer.BalancerChore;
92  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
93  import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
94  import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
95  import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
96  import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
97  import org.apache.hadoop.hbase.master.cleaner.ReplicationZKLockCleanerChore;
98  import org.apache.hadoop.hbase.master.handler.DispatchMergingRegionHandler;
99  import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
100 import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerChore;
101 import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerFactory;
102 import org.apache.hadoop.hbase.master.procedure.AddColumnFamilyProcedure;
103 import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
104 import org.apache.hadoop.hbase.master.procedure.DeleteColumnFamilyProcedure;
105 import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure;
106 import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure;
107 import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure;
108 import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
109 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
110 import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent;
111 import org.apache.hadoop.hbase.master.procedure.ModifyColumnFamilyProcedure;
112 import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
113 import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure;
114 import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
115 import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
116 import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure;
117 import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
118 import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
119 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
120 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
121 import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan;
122 import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
123 import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager;
124 import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
125 import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
126 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
127 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
128 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
129 import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
130 import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
131 import org.apache.hadoop.hbase.regionserver.HRegionServer;
132 import org.apache.hadoop.hbase.regionserver.HStore;
133 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
134 import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
135 import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy;
136 import org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolicy;
137 import org.apache.hadoop.hbase.regionserver.compactions.FIFOCompactionPolicy;
138 import org.apache.hadoop.hbase.replication.regionserver.Replication;
139 import org.apache.hadoop.hbase.security.UserProvider;
140 import org.apache.hadoop.hbase.util.Addressing;
141 import org.apache.hadoop.hbase.util.Bytes;
142 import org.apache.hadoop.hbase.util.CompressionTest;
143 import org.apache.hadoop.hbase.util.ConfigUtil;
144 import org.apache.hadoop.hbase.util.EncryptionTest;
145 import org.apache.hadoop.hbase.util.FSUtils;
146 import org.apache.hadoop.hbase.util.HFileArchiveUtil;
147 import org.apache.hadoop.hbase.util.HasThread;
148 import org.apache.hadoop.hbase.util.ModifyRegionUtils;
149 import org.apache.hadoop.hbase.util.Pair;
150 import org.apache.hadoop.hbase.util.Threads;
151 import org.apache.hadoop.hbase.util.VersionInfo;
152 import org.apache.hadoop.hbase.zookeeper.DrainingServerTracker;
153 import org.apache.hadoop.hbase.zookeeper.LoadBalancerTracker;
154 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
155 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
156 import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker;
157 import org.apache.hadoop.hbase.zookeeper.RegionServerTracker;
158 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
159 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
160 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
161 import org.apache.zookeeper.KeeperException;
162 import org.mortbay.jetty.Connector;
163 import org.mortbay.jetty.nio.SelectChannelConnector;
164 import org.mortbay.jetty.servlet.Context;
165 import org.mortbay.jetty.servlet.ServletHolder;
166 
167 import com.google.common.annotations.VisibleForTesting;
168 import com.google.common.collect.Maps;
169 import com.google.protobuf.Descriptors;
170 import com.google.protobuf.Service;
171 
172 /**
173  * HMaster is the "master server" for HBase. An HBase cluster has one active
174  * master.  If many masters are started, all compete.  Whichever wins goes on to
175  * run the cluster.  All others park themselves in their constructor until
176  * master or cluster shutdown or until the active master loses its lease in
177  * zookeeper.  Thereafter, all running master jostle to take over master role.
178  *
179  * <p>The Master can be asked shutdown the cluster. See {@link #shutdown()}.  In
180  * this case it will tell all regionservers to go down and then wait on them
181  * all reporting in that they are down.  This master will then shut itself down.
182  *
183  * <p>You can also shutdown just this master.  Call {@link #stopMaster()}.
184  *
185  * @see org.apache.zookeeper.Watcher
186  */
187 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
188 @SuppressWarnings("deprecation")
189 public class HMaster extends HRegionServer implements MasterServices, Server {
190   private static final Log LOG = LogFactory.getLog(HMaster.class.getName());
191 
192   /**
193    * Protection against zombie master. Started once Master accepts active responsibility and
194    * starts taking over responsibilities. Allows a finite time window before giving up ownership.
195    */
196   private static class InitializationMonitor extends HasThread {
197     /** The amount of time in milliseconds to sleep before checking initialization status. */
198     public static final String TIMEOUT_KEY = "hbase.master.initializationmonitor.timeout";
199     public static final long TIMEOUT_DEFAULT = TimeUnit.MILLISECONDS.convert(15, TimeUnit.MINUTES);
200 
201     /**
202      * When timeout expired and initialization has not complete, call {@link System#exit(int)} when
203      * true, do nothing otherwise.
204      */
205     public static final String HALT_KEY = "hbase.master.initializationmonitor.haltontimeout";
206     public static final boolean HALT_DEFAULT = false;
207 
208     private final HMaster master;
209     private final long timeout;
210     private final boolean haltOnTimeout;
211 
212     /** Creates a Thread that monitors the {@link #isInitialized()} state. */
213     InitializationMonitor(HMaster master) {
214       super("MasterInitializationMonitor");
215       this.master = master;
216       this.timeout = master.getConfiguration().getLong(TIMEOUT_KEY, TIMEOUT_DEFAULT);
217       this.haltOnTimeout = master.getConfiguration().getBoolean(HALT_KEY, HALT_DEFAULT);
218       this.setDaemon(true);
219     }
220 
221     @Override
222     public void run() {
223       try {
224         while (!master.isStopped() && master.isActiveMaster()) {
225           Thread.sleep(timeout);
226           if (master.isInitialized()) {
227             LOG.debug("Initialization completed within allotted tolerance. Monitor exiting.");
228           } else {
229             LOG.error("Master failed to complete initialization after " + timeout + "ms. Please"
230                 + " consider submitting a bug report including a thread dump of this process.");
231             if (haltOnTimeout) {
232               LOG.error("Zombie Master exiting. Thread dump to stdout");
233               Threads.printThreadInfo(System.out, "Zombie HMaster");
234               System.exit(-1);
235             }
236           }
237         }
238       } catch (InterruptedException ie) {
239         LOG.trace("InitMonitor thread interrupted. Existing.");
240       }
241     }
242   }
243 
244   // MASTER is name of the webapp and the attribute name used stuffing this
245   //instance into web context.
246   public static final String MASTER = "master";
247 
248   // Manager and zk listener for master election
249   private final ActiveMasterManager activeMasterManager;
250   // Region server tracker
251   RegionServerTracker regionServerTracker;
252   // Draining region server tracker
253   private DrainingServerTracker drainingServerTracker;
254   // Tracker for load balancer state
255   LoadBalancerTracker loadBalancerTracker;
256 
257   // Tracker for region normalizer state
258   private RegionNormalizerTracker regionNormalizerTracker;
259 
260   /** Namespace stuff */
261   private TableNamespaceManager tableNamespaceManager;
262 
263   // Metrics for the HMaster
264   final MetricsMaster metricsMaster;
265   // file system manager for the master FS operations
266   private MasterFileSystem fileSystemManager;
267 
268   // server manager to deal with region server info
269   volatile ServerManager serverManager;
270 
271   // manager of assignment nodes in zookeeper
272   AssignmentManager assignmentManager;
273 
274   // buffer for "fatal error" notices from region servers
275   // in the cluster. This is only used for assisting
276   // operations/debugging.
277   MemoryBoundedLogMessageBuffer rsFatals;
278 
279   // flag set after we become the active master (used for testing)
280   private volatile boolean isActiveMaster = false;
281 
282   // flag set after we complete initialization once active,
283   // it is not private since it's used in unit tests
284   private final ProcedureEvent initialized = new ProcedureEvent("master initialized");
285 
286   // flag set after master services are started,
287   // initialization may have not completed yet.
288   volatile boolean serviceStarted = false;
289 
290   // flag set after we complete assignMeta.
291   private final ProcedureEvent serverCrashProcessingEnabled =
292     new ProcedureEvent("server crash processing");
293 
294   LoadBalancer balancer;
295   private RegionNormalizer normalizer;
296   private BalancerChore balancerChore;
297   private RegionNormalizerChore normalizerChore;
298   private ClusterStatusChore clusterStatusChore;
299   private ClusterStatusPublisher clusterStatusPublisherChore = null;
300   private PeriodicDoMetrics periodicDoMetricsChore = null;
301 
302   CatalogJanitor catalogJanitorChore;
303   private ReplicationZKLockCleanerChore replicationZKLockCleanerChore;
304   private LogCleaner logCleaner;
305   private HFileCleaner hfileCleaner;
306 
307   MasterCoprocessorHost cpHost;
308 
309   private final boolean preLoadTableDescriptors;
310 
311   // Time stamps for when a hmaster became active
312   private long masterActiveTime;
313 
314   //should we check the compression codec type at master side, default true, HBASE-6370
315   private final boolean masterCheckCompression;
316 
317   //should we check encryption settings at master side, default true
318   private final boolean masterCheckEncryption;
319 
320   Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
321 
322   // monitor for snapshot of hbase tables
323   SnapshotManager snapshotManager;
324   // monitor for distributed procedures
325   MasterProcedureManagerHost mpmHost;
326 
327   // it is assigned after 'initialized' guard set to true, so should be volatile
328   private volatile MasterQuotaManager quotaManager;
329 
330   private ProcedureExecutor<MasterProcedureEnv> procedureExecutor;
331   private WALProcedureStore procedureStore;
332 
333   /** flag used in test cases in order to simulate RS failures during master initialization */
334   private volatile boolean initializationBeforeMetaAssignment = false;
335 
336   /** jetty server for master to redirect requests to regionserver infoServer */
337   private org.mortbay.jetty.Server masterJettyServer;
338 
339   public static class RedirectServlet extends HttpServlet {
340     private static final long serialVersionUID = 2894774810058302473L;
341     private final int regionServerInfoPort;
342     private final String regionServerHostname;
343 
344     /**
345      * @param infoServer that we're trying to send all requests to
346      * @param hostname may be null. if given, will be used for redirects instead of host from client.
347      */
348     public RedirectServlet(InfoServer infoServer, String hostname) {
349        regionServerInfoPort = infoServer.getPort();
350        regionServerHostname = hostname;
351     }
352 
353     @Override
354     public void doGet(HttpServletRequest request,
355         HttpServletResponse response) throws ServletException, IOException {
356       String redirectHost = regionServerHostname;
357       if(redirectHost == null) {
358         redirectHost = request.getServerName();
359         if(!Addressing.isLocalAddress(InetAddress.getByName(redirectHost))) {
360           LOG.warn("Couldn't resolve '" + redirectHost + "' as an address local to this node and '" +
361               MASTER_HOSTNAME_KEY + "' is not set; client will get a HTTP 400 response. If " +
362               "your HBase deployment relies on client accessible names that the region server process " +
363               "can't resolve locally, then you should set the previously mentioned configuration variable " +
364               "to an appropriate hostname.");
365           // no sending client provided input back to the client, so the goal host is just in the logs.
366           response.sendError(400, "Request was to a host that I can't resolve for any of the network interfaces on " +
367               "this node. If this is due to an intermediary such as an HTTP load balancer or other proxy, your HBase " +
368               "administrator can set '" + MASTER_HOSTNAME_KEY + "' to point to the correct hostname.");
369           return;
370         }
371       }
372       // TODO this scheme should come from looking at the scheme registered in the infoserver's http server for the
373       // host and port we're using, but it's buried way too deep to do that ATM.
374       String redirectUrl = request.getScheme() + "://"
375         + redirectHost + ":" + regionServerInfoPort
376         + request.getRequestURI();
377       response.sendRedirect(redirectUrl);
378     }
379   }
380 
381   private static class PeriodicDoMetrics extends ScheduledChore {
382     private final HMaster server;
383     public PeriodicDoMetrics(int doMetricsInterval, final HMaster server) {
384       super(server.getServerName() + "-DoMetricsChore", server, doMetricsInterval);
385       this.server = server;
386     }
387 
388     @Override
389     protected void chore() {
390       server.doMetrics();
391     }
392   }
393 
394   /**
395    * Initializes the HMaster. The steps are as follows:
396    * <p>
397    * <ol>
398    * <li>Initialize the local HRegionServer
399    * <li>Start the ActiveMasterManager.
400    * </ol>
401    * <p>
402    * Remaining steps of initialization occur in
403    * #finishActiveMasterInitialization(MonitoredTask) after
404    * the master becomes the active one.
405    *
406    * @throws InterruptedException
407    * @throws KeeperException
408    * @throws IOException
409    */
410   public HMaster(final Configuration conf, CoordinatedStateManager csm)
411       throws IOException, KeeperException, InterruptedException {
412     super(conf, csm);
413     this.rsFatals = new MemoryBoundedLogMessageBuffer(
414       conf.getLong("hbase.master.buffer.for.rs.fatals", 1*1024*1024));
415 
416     LOG.info("hbase.rootdir=" + getRootDir() +
417       ", hbase.cluster.distributed=" + this.conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, false));
418 
419     // Disable usage of meta replicas in the master
420     this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
421 
422     Replication.decorateMasterConfiguration(this.conf);
423 
424     // Hack! Maps DFSClient => Master for logs.  HDFS made this
425     // config param for task trackers, but we can piggyback off of it.
426     if (this.conf.get("mapreduce.task.attempt.id") == null) {
427       this.conf.set("mapreduce.task.attempt.id", "hb_m_" + this.serverName.toString());
428     }
429 
430     // should we check the compression codec type at master side, default true, HBASE-6370
431     this.masterCheckCompression = conf.getBoolean("hbase.master.check.compression", true);
432 
433     // should we check encryption settings at master side, default true
434     this.masterCheckEncryption = conf.getBoolean("hbase.master.check.encryption", true);
435 
436     this.metricsMaster = new MetricsMaster(new MetricsMasterWrapperImpl(this));
437 
438     // preload table descriptor at startup
439     this.preLoadTableDescriptors = conf.getBoolean("hbase.master.preload.tabledescriptors", true);
440 
441     // Do we publish the status?
442 
443     boolean shouldPublish = conf.getBoolean(HConstants.STATUS_PUBLISHED,
444         HConstants.STATUS_PUBLISHED_DEFAULT);
445     Class<? extends ClusterStatusPublisher.Publisher> publisherClass =
446         conf.getClass(ClusterStatusPublisher.STATUS_PUBLISHER_CLASS,
447             ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS,
448             ClusterStatusPublisher.Publisher.class);
449 
450     if (shouldPublish) {
451       if (publisherClass == null) {
452         LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
453             ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS +
454             " is not set - not publishing status");
455       } else {
456         clusterStatusPublisherChore = new ClusterStatusPublisher(this, conf, publisherClass);
457         getChoreService().scheduleChore(clusterStatusPublisherChore);
458       }
459     }
460 
461     // Some unit tests don't need a cluster, so no zookeeper at all
462     if (!conf.getBoolean("hbase.testing.nocluster", false)) {
463       activeMasterManager = new ActiveMasterManager(zooKeeper, this.serverName, this);
464       int infoPort = putUpJettyServer();
465       startActiveMasterManager(infoPort);
466     } else {
467       activeMasterManager = null;
468     }
469   }
470 
471   // return the actual infoPort, -1 means disable info server.
472   private int putUpJettyServer() throws IOException {
473     if (!conf.getBoolean("hbase.master.infoserver.redirect", true)) {
474       return -1;
475     }
476     final int infoPort = conf.getInt("hbase.master.info.port.orig",
477       HConstants.DEFAULT_MASTER_INFOPORT);
478     // -1 is for disabling info server, so no redirecting
479     if (infoPort < 0 || infoServer == null) {
480       return -1;
481     }
482     if(infoPort == infoServer.getPort()) {
483       return infoPort;
484     }
485     final String addr = conf.get("hbase.master.info.bindAddress", "0.0.0.0");
486     if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) {
487       String msg =
488           "Failed to start redirecting jetty server. Address " + addr
489               + " does not belong to this host. Correct configuration parameter: "
490               + "hbase.master.info.bindAddress";
491       LOG.error(msg);
492       throw new IOException(msg);
493     }
494 
495     // TODO I'm pretty sure we could just add another binding to the InfoServer run by
496     // the RegionServer and have it run the RedirectServlet instead of standing up
497     // a second entire stack here.
498     masterJettyServer = new org.mortbay.jetty.Server();
499     Connector connector = new SelectChannelConnector();
500     connector.setHost(addr);
501     connector.setPort(infoPort);
502     masterJettyServer.addConnector(connector);
503     masterJettyServer.setStopAtShutdown(true);
504 
505     final String redirectHostname = shouldUseThisHostnameInstead() ? useThisHostnameInstead : null;
506 
507     final RedirectServlet redirect = new RedirectServlet(infoServer, redirectHostname);
508     Context context = new Context(masterJettyServer, "/", Context.NO_SESSIONS);
509     context.addServlet(new ServletHolder(redirect), "/*");
510 
511     try {
512       masterJettyServer.start();
513     } catch (Exception e) {
514       throw new IOException("Failed to start redirecting jetty server", e);
515     }
516     return connector.getLocalPort();
517   }
518 
519   /**
520    * For compatibility, if failed with regionserver credentials, try the master one
521    */
522   @Override
523   protected void login(UserProvider user, String host) throws IOException {
524     try {
525       super.login(user, host);
526     } catch (IOException ie) {
527       user.login("hbase.master.keytab.file",
528         "hbase.master.kerberos.principal", host);
529     }
530   }
531 
532   /**
533    * If configured to put regions on active master,
534    * wait till a backup master becomes active.
535    * Otherwise, loop till the server is stopped or aborted.
536    */
537   @Override
538   protected void waitForMasterActive(){
539     boolean tablesOnMaster = BaseLoadBalancer.tablesOnMaster(conf);
540     while (!(tablesOnMaster && isActiveMaster)
541         && !isStopped() && !isAborted()) {
542       sleeper.sleep();
543     }
544   }
545 
546   @VisibleForTesting
547   public MasterRpcServices getMasterRpcServices() {
548     return (MasterRpcServices)rpcServices;
549   }
550 
551   public boolean balanceSwitch(final boolean b) throws IOException {
552     return getMasterRpcServices().switchBalancer(b, BalanceSwitchMode.ASYNC);
553   }
554 
555   @Override
556   protected String getProcessName() {
557     return MASTER;
558   }
559 
560   @Override
561   protected boolean canCreateBaseZNode() {
562     return true;
563   }
564 
565   @Override
566   protected boolean canUpdateTableDescriptor() {
567     return true;
568   }
569 
570   @Override
571   protected RSRpcServices createRpcServices() throws IOException {
572     return new MasterRpcServices(this);
573   }
574 
575   @Override
576   protected void configureInfoServer() {
577     infoServer.addServlet("master-status", "/master-status", MasterStatusServlet.class);
578     infoServer.setAttribute(MASTER, this);
579     if (BaseLoadBalancer.tablesOnMaster(conf)) {
580       super.configureInfoServer();
581     }
582   }
583 
584   @Override
585   protected Class<? extends HttpServlet> getDumpServlet() {
586     return MasterDumpServlet.class;
587   }
588 
589   /**
590    * Emit the HMaster metrics, such as region in transition metrics.
591    * Surrounding in a try block just to be sure metrics doesn't abort HMaster.
592    */
593   private void doMetrics() {
594     try {
595       if (assignmentManager != null) {
596         assignmentManager.updateRegionsInTransitionMetrics();
597       }
598     } catch (Throwable e) {
599       LOG.error("Couldn't update metrics: " + e.getMessage());
600     }
601   }
602 
603   MetricsMaster getMasterMetrics() {
604     return metricsMaster;
605   }
606 
607   /**
608    * Initialize all ZK based system trackers.
609    * @throws IOException
610    * @throws InterruptedException
611    * @throws KeeperException
612    * @throws CoordinatedStateException
613    */
614   void initializeZKBasedSystemTrackers() throws IOException,
615       InterruptedException, KeeperException, CoordinatedStateException {
616     this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
617     this.normalizer = RegionNormalizerFactory.getRegionNormalizer(conf);
618     this.normalizer.setMasterServices(this);
619     this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this);
620     this.loadBalancerTracker.start();
621     this.regionNormalizerTracker = new RegionNormalizerTracker(zooKeeper, this);
622     this.regionNormalizerTracker.start();
623     this.assignmentManager = new AssignmentManager(this, serverManager,
624       this.balancer, this.service, this.metricsMaster,
625       this.tableLockManager);
626     zooKeeper.registerListenerFirst(assignmentManager);
627 
628     this.regionServerTracker = new RegionServerTracker(zooKeeper, this,
629         this.serverManager);
630     this.regionServerTracker.start();
631 
632     this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this,
633       this.serverManager);
634     this.drainingServerTracker.start();
635 
636     // Set the cluster as up.  If new RSs, they'll be waiting on this before
637     // going ahead with their startup.
638     boolean wasUp = this.clusterStatusTracker.isClusterUp();
639     if (!wasUp) this.clusterStatusTracker.setClusterUp();
640 
641     LOG.info("Server active/primary master=" + this.serverName +
642         ", sessionid=0x" +
643         Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) +
644         ", setting cluster-up flag (Was=" + wasUp + ")");
645 
646     // create/initialize the snapshot manager and other procedure managers
647     this.snapshotManager = new SnapshotManager();
648     this.mpmHost = new MasterProcedureManagerHost();
649     this.mpmHost.register(this.snapshotManager);
650     this.mpmHost.register(new MasterFlushTableProcedureManager());
651     this.mpmHost.loadProcedures(conf);
652     this.mpmHost.initialize(this, this.metricsMaster);
653   }
654 
655   /**
656    * Finish initialization of HMaster after becoming the primary master.
657    *
658    * <ol>
659    * <li>Initialize master components - file system manager, server manager,
660    *     assignment manager, region server tracker, etc</li>
661    * <li>Start necessary service threads - balancer, catalog janior,
662    *     executor services, etc</li>
663    * <li>Set cluster as UP in ZooKeeper</li>
664    * <li>Wait for RegionServers to check-in</li>
665    * <li>Split logs and perform data recovery, if necessary</li>
666    * <li>Ensure assignment of meta/namespace regions<li>
667    * <li>Handle either fresh cluster start or master failover</li>
668    * </ol>
669    *
670    * @throws IOException
671    * @throws InterruptedException
672    * @throws KeeperException
673    * @throws CoordinatedStateException
674    */
675   private void finishActiveMasterInitialization(MonitoredTask status)
676       throws IOException, InterruptedException, KeeperException, CoordinatedStateException {
677 
678     isActiveMaster = true;
679     Thread zombieDetector = new Thread(new InitializationMonitor(this),
680         "ActiveMasterInitializationMonitor-" + System.currentTimeMillis());
681     zombieDetector.start();
682 
683     /*
684      * We are active master now... go initialize components we need to run.
685      * Note, there may be dross in zk from previous runs; it'll get addressed
686      * below after we determine if cluster startup or failover.
687      */
688 
689     status.setStatus("Initializing Master file system");
690 
691     this.masterActiveTime = System.currentTimeMillis();
692     // TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring.
693     this.fileSystemManager = new MasterFileSystem(this, this);
694 
695     // enable table descriptors cache
696     this.tableDescriptors.setCacheOn();
697     // set the META's descriptor to the correct replication
698     this.tableDescriptors.get(TableName.META_TABLE_NAME).setRegionReplication(
699         conf.getInt(HConstants.META_REPLICAS_NUM, HConstants.DEFAULT_META_REPLICA_NUM));
700     // warm-up HTDs cache on master initialization
701     if (preLoadTableDescriptors) {
702       status.setStatus("Pre-loading table descriptors");
703       this.tableDescriptors.getAll();
704     }
705 
706     // publish cluster ID
707     status.setStatus("Publishing Cluster ID in ZooKeeper");
708     ZKClusterId.setClusterId(this.zooKeeper, fileSystemManager.getClusterId());
709     this.serverManager = createServerManager(this, this);
710 
711     setupClusterConnection();
712 
713     // Invalidate all write locks held previously
714     this.tableLockManager.reapWriteLocks();
715 
716     status.setStatus("Initializing ZK system trackers");
717     initializeZKBasedSystemTrackers();
718 
719     // initialize master side coprocessors before we start handling requests
720     status.setStatus("Initializing master coprocessors");
721     this.cpHost = new MasterCoprocessorHost(this, this.conf);
722 
723     // start up all service threads.
724     status.setStatus("Initializing master service threads");
725     startServiceThreads();
726 
727     // Wake up this server to check in
728     sleeper.skipSleepCycle();
729 
730     // Wait for region servers to report in
731     this.serverManager.waitForRegionServers(status);
732     // Check zk for region servers that are up but didn't register
733     for (ServerName sn: this.regionServerTracker.getOnlineServers()) {
734       // The isServerOnline check is opportunistic, correctness is handled inside
735       if (!this.serverManager.isServerOnline(sn)
736           && serverManager.checkAndRecordNewServer(sn, ServerLoad.EMPTY_SERVERLOAD)) {
737         LOG.info("Registered server found up in zk but who has not yet reported in: " + sn);
738       }
739     }
740 
741     // get a list for previously failed RS which need log splitting work
742     // we recover hbase:meta region servers inside master initialization and
743     // handle other failed servers in SSH in order to start up master node ASAP
744     Set<ServerName> previouslyFailedServers =
745       this.fileSystemManager.getFailedServersFromLogFolders();
746 
747     // log splitting for hbase:meta server
748     ServerName oldMetaServerLocation = metaTableLocator.getMetaRegionLocation(this.getZooKeeper());
749     if (oldMetaServerLocation != null && previouslyFailedServers.contains(oldMetaServerLocation)) {
750       splitMetaLogBeforeAssignment(oldMetaServerLocation);
751       // Note: we can't remove oldMetaServerLocation from previousFailedServers list because it
752       // may also host user regions
753     }
754     Set<ServerName> previouslyFailedMetaRSs = getPreviouselyFailedMetaServersFromZK();
755     // need to use union of previouslyFailedMetaRSs recorded in ZK and previouslyFailedServers
756     // instead of previouslyFailedMetaRSs alone to address the following two situations:
757     // 1) the chained failure situation(recovery failed multiple times in a row).
758     // 2) master get killed right before it could delete the recovering hbase:meta from ZK while the
759     // same server still has non-meta wals to be replayed so that
760     // removeStaleRecoveringRegionsFromZK can't delete the stale hbase:meta region
761     // Passing more servers into splitMetaLog is all right. If a server doesn't have hbase:meta wal,
762     // there is no op for the server.
763     previouslyFailedMetaRSs.addAll(previouslyFailedServers);
764 
765     this.initializationBeforeMetaAssignment = true;
766 
767     // Wait for regionserver to finish initialization.
768     if (BaseLoadBalancer.tablesOnMaster(conf)) {
769       waitForServerOnline();
770     }
771 
772     //initialize load balancer
773     this.balancer.setClusterStatus(getClusterStatus());
774     this.balancer.setMasterServices(this);
775     this.balancer.initialize();
776 
777     // Check if master is shutting down because of some issue
778     // in initializing the regionserver or the balancer.
779     if (isStopped()) return;
780 
781     // Make sure meta assigned before proceeding.
782     status.setStatus("Assigning Meta Region");
783     assignMeta(status, previouslyFailedMetaRSs, HRegionInfo.DEFAULT_REPLICA_ID);
784     // check if master is shutting down because above assignMeta could return even hbase:meta isn't
785     // assigned when master is shutting down
786     if (isStopped()) return;
787 
788     status.setStatus("Submitting log splitting work for previously failed region servers");
789     // Master has recovered hbase:meta region server and we put
790     // other failed region servers in a queue to be handled later by SSH
791     for (ServerName tmpServer : previouslyFailedServers) {
792       this.serverManager.processDeadServer(tmpServer, true);
793     }
794 
795     // Update meta with new PB serialization if required. i.e migrate all HRI to PB serialization
796     // in meta. This must happen before we assign all user regions or else the assignment will fail.
797     if (this.conf.getBoolean("hbase.MetaMigrationConvertingToPB", true)) {
798       MetaMigrationConvertingToPB.updateMetaIfNecessary(this);
799     }
800 
801     // Fix up assignment manager status
802     status.setStatus("Starting assignment manager");
803     this.assignmentManager.joinCluster();
804 
805     // set cluster status again after user regions are assigned
806     this.balancer.setClusterStatus(getClusterStatus());
807 
808     // Start balancer and meta catalog janitor after meta and regions have been assigned.
809     status.setStatus("Starting balancer and catalog janitor");
810     this.clusterStatusChore = new ClusterStatusChore(this, balancer);
811     getChoreService().scheduleChore(clusterStatusChore);
812     this.balancerChore = new BalancerChore(this);
813     getChoreService().scheduleChore(balancerChore);
814     this.normalizerChore = new RegionNormalizerChore(this);
815     getChoreService().scheduleChore(normalizerChore);
816     this.catalogJanitorChore = new CatalogJanitor(this, this);
817     getChoreService().scheduleChore(catalogJanitorChore);
818 
819     // Do Metrics periodically
820     periodicDoMetricsChore = new PeriodicDoMetrics(msgInterval, this);
821     getChoreService().scheduleChore(periodicDoMetricsChore);
822 
823     status.setStatus("Starting namespace manager");
824     initNamespace();
825 
826     if (this.cpHost != null) {
827       try {
828         this.cpHost.preMasterInitialization();
829       } catch (IOException e) {
830         LOG.error("Coprocessor preMasterInitialization() hook failed", e);
831       }
832     }
833 
834     status.markComplete("Initialization successful");
835     LOG.info("Master has completed initialization");
836     configurationManager.registerObserver(this.balancer);
837 
838     // Set master as 'initialized'.
839     setInitialized(true);
840 
841     status.setStatus("Starting quota manager");
842     initQuotaManager();
843 
844     // assign the meta replicas
845     Set<ServerName> EMPTY_SET = new HashSet<ServerName>();
846     int numReplicas = conf.getInt(HConstants.META_REPLICAS_NUM,
847            HConstants.DEFAULT_META_REPLICA_NUM);
848     for (int i = 1; i < numReplicas; i++) {
849       assignMeta(status, EMPTY_SET, i);
850     }
851     unassignExcessMetaReplica(zooKeeper, numReplicas);
852 
853     // clear the dead servers with same host name and port of online server because we are not
854     // removing dead server with same hostname and port of rs which is trying to check in before
855     // master initialization. See HBASE-5916.
856     this.serverManager.clearDeadServersWithSameHostNameAndPortOfOnlineServer();
857 
858     // Check and set the znode ACLs if needed in case we are overtaking a non-secure configuration
859     status.setStatus("Checking ZNode ACLs");
860     zooKeeper.checkAndSetZNodeAcls();
861 
862     status.setStatus("Calling postStartMaster coprocessors");
863     if (this.cpHost != null) {
864       // don't let cp initialization errors kill the master
865       try {
866         this.cpHost.postStartMaster();
867       } catch (IOException ioe) {
868         LOG.error("Coprocessor postStartMaster() hook failed", ioe);
869       }
870     }
871 
872     zombieDetector.interrupt();
873   }
874 
875   private void initQuotaManager() throws IOException {
876     quotaManager = new MasterQuotaManager(this);
877     this.assignmentManager.setRegionStateListener((RegionStateListener) quotaManager);
878     quotaManager.start();
879   }
880 
881   /**
882    * Create a {@link ServerManager} instance.
883    * @param master
884    * @param services
885    * @return An instance of {@link ServerManager}
886    * @throws org.apache.hadoop.hbase.ZooKeeperConnectionException
887    * @throws IOException
888    */
889   ServerManager createServerManager(final Server master,
890       final MasterServices services)
891   throws IOException {
892     // We put this out here in a method so can do a Mockito.spy and stub it out
893     // w/ a mocked up ServerManager.
894     return new ServerManager(master, services);
895   }
896 
897   private void unassignExcessMetaReplica(ZooKeeperWatcher zkw, int numMetaReplicasConfigured) {
898     // unassign the unneeded replicas (for e.g., if the previous master was configured
899     // with a replication of 3 and now it is 2, we need to unassign the 1 unneeded replica)
900     try {
901       List<String> metaReplicaZnodes = zooKeeper.getMetaReplicaNodes();
902       for (String metaReplicaZnode : metaReplicaZnodes) {
903         int replicaId = zooKeeper.getMetaReplicaIdFromZnode(metaReplicaZnode);
904         if (replicaId >= numMetaReplicasConfigured) {
905           RegionState r = MetaTableLocator.getMetaRegionState(zkw, replicaId);
906           LOG.info("Closing excess replica of meta region " + r.getRegion());
907           // send a close and wait for a max of 30 seconds
908           ServerManager.closeRegionSilentlyAndWait(getConnection(), r.getServerName(),
909               r.getRegion(), 30000);
910           ZKUtil.deleteNode(zkw, zkw.getZNodeForReplica(replicaId));
911         }
912       }
913     } catch (Exception ex) {
914       // ignore the exception since we don't want the master to be wedged due to potential
915       // issues in the cleanup of the extra regions. We can do that cleanup via hbck or manually
916       LOG.warn("Ignoring exception " + ex);
917     }
918   }
919 
920   /**
921    * Check <code>hbase:meta</code> is assigned. If not, assign it.
922    * @param status MonitoredTask
923    * @param previouslyFailedMetaRSs
924    * @param replicaId
925    * @throws InterruptedException
926    * @throws IOException
927    * @throws KeeperException
928    */
929   void assignMeta(MonitoredTask status, Set<ServerName> previouslyFailedMetaRSs, int replicaId)
930       throws InterruptedException, IOException, KeeperException {
931     // Work on meta region
932     int assigned = 0;
933     long timeout = this.conf.getLong("hbase.catalog.verification.timeout", 1000);
934     if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) {
935       status.setStatus("Assigning hbase:meta region");
936     } else {
937       status.setStatus("Assigning hbase:meta region, replicaId " + replicaId);
938     }
939     // Get current meta state from zk.
940     RegionStates regionStates = assignmentManager.getRegionStates();
941     RegionState metaState = MetaTableLocator.getMetaRegionState(getZooKeeper(), replicaId);
942     HRegionInfo hri = RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO,
943         replicaId);
944     ServerName currentMetaServer = metaState.getServerName();
945     if (!ConfigUtil.useZKForAssignment(conf)) {
946       regionStates.createRegionState(hri, metaState.getState(),
947         currentMetaServer, null);
948     } else {
949       regionStates.createRegionState(hri);
950     }
951     boolean rit = this.assignmentManager.
952       processRegionInTransitionAndBlockUntilAssigned(hri);
953     boolean metaRegionLocation = metaTableLocator.verifyMetaRegionLocation(
954       this.getConnection(), this.getZooKeeper(), timeout, replicaId);
955     if (!metaRegionLocation || !metaState.isOpened()) {
956       // Meta location is not verified. It should be in transition, or offline.
957       // We will wait for it to be assigned in enableSSHandWaitForMeta below.
958       assigned++;
959       if (!ConfigUtil.useZKForAssignment(conf)) {
960         assignMetaZkLess(regionStates, metaState, timeout, previouslyFailedMetaRSs);
961       } else if (!rit) {
962         // Assign meta since not already in transition
963         if (currentMetaServer != null) {
964           // If the meta server is not known to be dead or online,
965           // just split the meta log, and don't expire it since this
966           // could be a full cluster restart. Otherwise, we will think
967           // this is a failover and lose previous region locations.
968           // If it is really a failover case, AM will find out in rebuilding
969           // user regions. Otherwise, we are good since all logs are split
970           // or known to be replayed before user regions are assigned.
971           if (serverManager.isServerOnline(currentMetaServer)) {
972             LOG.info("Forcing expire of " + currentMetaServer);
973             serverManager.expireServer(currentMetaServer);
974           }
975           if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) {
976             splitMetaLogBeforeAssignment(currentMetaServer);
977             previouslyFailedMetaRSs.add(currentMetaServer);
978           }
979         }
980         assignmentManager.assignMeta(hri);
981       }
982     } else {
983       // Region already assigned. We didn't assign it. Add to in-memory state.
984       regionStates.updateRegionState(hri, State.OPEN, currentMetaServer);
985       this.assignmentManager.regionOnline(hri, currentMetaServer);
986     }
987 
988     if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) enableMeta(TableName.META_TABLE_NAME);
989 
990     if ((RecoveryMode.LOG_REPLAY == this.getMasterFileSystem().getLogRecoveryMode())
991         && (!previouslyFailedMetaRSs.isEmpty())) {
992       // replay WAL edits mode need new hbase:meta RS is assigned firstly
993       status.setStatus("replaying log for Meta Region");
994       this.fileSystemManager.splitMetaLog(previouslyFailedMetaRSs);
995     }
996 
997     // Make sure a hbase:meta location is set. We need to enable SSH here since
998     // if the meta region server is died at this time, we need it to be re-assigned
999     // by SSH so that system tables can be assigned.
1000     // No need to wait for meta is assigned = 0 when meta is just verified.
1001     if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) enableCrashedServerProcessing(assigned != 0);
1002     LOG.info("hbase:meta with replicaId " + replicaId + " assigned=" + assigned + ", rit=" + rit +
1003       ", location=" + metaTableLocator.getMetaRegionLocation(this.getZooKeeper(), replicaId));
1004     status.setStatus("META assigned.");
1005   }
1006 
1007   private void assignMetaZkLess(RegionStates regionStates, RegionState regionState, long timeout,
1008       Set<ServerName> previouslyFailedRs) throws IOException, KeeperException {
1009     ServerName currentServer = regionState.getServerName();
1010     if (serverManager.isServerOnline(currentServer)) {
1011       LOG.info("Meta was in transition on " + currentServer);
1012       assignmentManager.processRegionInTransitionZkLess();
1013     } else {
1014       if (currentServer != null) {
1015         if (regionState.getRegion().getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID) {
1016           splitMetaLogBeforeAssignment(currentServer);
1017           regionStates.logSplit(HRegionInfo.FIRST_META_REGIONINFO);
1018           previouslyFailedRs.add(currentServer);
1019         }
1020       }
1021       LOG.info("Re-assigning hbase:meta, it was on " + currentServer);
1022       regionStates.updateRegionState(regionState.getRegion(), State.OFFLINE);
1023       assignmentManager.assignMeta(regionState.getRegion());
1024     }
1025   }
1026 
1027   void initNamespace() throws IOException {
1028     //create namespace manager
1029     tableNamespaceManager = new TableNamespaceManager(this);
1030     tableNamespaceManager.start();
1031   }
1032 
1033   boolean isCatalogJanitorEnabled() {
1034     return catalogJanitorChore != null ?
1035       catalogJanitorChore.getEnabled() : false;
1036   }
1037 
1038   private void splitMetaLogBeforeAssignment(ServerName currentMetaServer) throws IOException {
1039     if (RecoveryMode.LOG_REPLAY == this.getMasterFileSystem().getLogRecoveryMode()) {
1040       // In log replay mode, we mark hbase:meta region as recovering in ZK
1041       Set<HRegionInfo> regions = new HashSet<HRegionInfo>();
1042       regions.add(HRegionInfo.FIRST_META_REGIONINFO);
1043       this.fileSystemManager.prepareLogReplay(currentMetaServer, regions);
1044     } else {
1045       // In recovered.edits mode: create recovered edits file for hbase:meta server
1046       this.fileSystemManager.splitMetaLog(currentMetaServer);
1047     }
1048   }
1049 
1050   private void enableCrashedServerProcessing(final boolean waitForMeta)
1051   throws IOException, InterruptedException {
1052     // If crashed server processing is disabled, we enable it and expire those dead but not expired
1053     // servers. This is required so that if meta is assigning to a server which dies after
1054     // assignMeta starts assignment, ServerCrashProcedure can re-assign it. Otherwise, we will be
1055     // stuck here waiting forever if waitForMeta is specified.
1056     if (!isServerCrashProcessingEnabled()) {
1057       setServerCrashProcessingEnabled(true);
1058       this.serverManager.processQueuedDeadServers();
1059     }
1060 
1061     if (waitForMeta) {
1062       metaTableLocator.waitMetaRegionLocation(this.getZooKeeper());
1063       // Above check waits for general meta availability but this does not
1064       // guarantee that the transition has completed
1065       this.assignmentManager.waitForAssignment(HRegionInfo.FIRST_META_REGIONINFO);
1066     }
1067   }
1068 
1069   private void enableMeta(TableName metaTableName) {
1070     if (!this.assignmentManager.getTableStateManager().isTableState(metaTableName,
1071         ZooKeeperProtos.Table.State.ENABLED)) {
1072       this.assignmentManager.setEnabledTable(metaTableName);
1073     }
1074   }
1075 
1076   /**
1077    * This function returns a set of region server names under hbase:meta recovering region ZK node
1078    * @return Set of meta server names which were recorded in ZK
1079    * @throws KeeperException
1080    */
1081   private Set<ServerName> getPreviouselyFailedMetaServersFromZK() throws KeeperException {
1082     Set<ServerName> result = new HashSet<ServerName>();
1083     String metaRecoveringZNode = ZKUtil.joinZNode(zooKeeper.recoveringRegionsZNode,
1084       HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
1085     List<String> regionFailedServers = ZKUtil.listChildrenNoWatch(zooKeeper, metaRecoveringZNode);
1086     if (regionFailedServers == null) return result;
1087 
1088     for(String failedServer : regionFailedServers) {
1089       ServerName server = ServerName.parseServerName(failedServer);
1090       result.add(server);
1091     }
1092     return result;
1093   }
1094 
1095   @Override
1096   public TableDescriptors getTableDescriptors() {
1097     return this.tableDescriptors;
1098   }
1099 
1100   @Override
1101   public ServerManager getServerManager() {
1102     return this.serverManager;
1103   }
1104 
1105   @Override
1106   public MasterFileSystem getMasterFileSystem() {
1107     return this.fileSystemManager;
1108   }
1109 
1110   /*
1111    * Start up all services. If any of these threads gets an unhandled exception
1112    * then they just die with a logged message.  This should be fine because
1113    * in general, we do not expect the master to get such unhandled exceptions
1114    *  as OOMEs; it should be lightly loaded. See what HRegionServer does if
1115    *  need to install an unexpected exception handler.
1116    */
1117   private void startServiceThreads() throws IOException{
1118    // Start the executor service pools
1119    this.service.startExecutorService(ExecutorType.MASTER_OPEN_REGION,
1120       conf.getInt("hbase.master.executor.openregion.threads", 5));
1121    this.service.startExecutorService(ExecutorType.MASTER_CLOSE_REGION,
1122       conf.getInt("hbase.master.executor.closeregion.threads", 5));
1123    this.service.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
1124       conf.getInt("hbase.master.executor.serverops.threads", 5));
1125    this.service.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS,
1126       conf.getInt("hbase.master.executor.serverops.threads", 5));
1127    this.service.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS,
1128       conf.getInt("hbase.master.executor.logreplayops.threads", 10));
1129 
1130    // We depend on there being only one instance of this executor running
1131    // at a time.  To do concurrency, would need fencing of enable/disable of
1132    // tables.
1133    // Any time changing this maxThreads to > 1, pls see the comment at
1134    // AccessController#postCreateTableHandler
1135    this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
1136    startProcedureExecutor();
1137 
1138    // Start log cleaner thread
1139    int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 60 * 1000);
1140    this.logCleaner =
1141       new LogCleaner(cleanerInterval,
1142          this, conf, getMasterFileSystem().getFileSystem(),
1143          getMasterFileSystem().getOldLogDir());
1144     getChoreService().scheduleChore(logCleaner);
1145 
1146    //start the hfile archive cleaner thread
1147     Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
1148     this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem()
1149         .getFileSystem(), archiveDir);
1150     getChoreService().scheduleChore(hfileCleaner);
1151     serviceStarted = true;
1152     if (LOG.isTraceEnabled()) {
1153       LOG.trace("Started service threads");
1154     }
1155     if (!conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true)) {
1156       try {
1157         replicationZKLockCleanerChore = new ReplicationZKLockCleanerChore(this, this,
1158             cleanerInterval, this.getZooKeeper(), this.conf);
1159         getChoreService().scheduleChore(replicationZKLockCleanerChore);
1160       } catch (Exception e) {
1161         LOG.error("start replicationZKLockCleanerChore failed", e);
1162       }
1163     }
1164   }
1165 
1166   @Override
1167   protected void sendShutdownInterrupt() {
1168     super.sendShutdownInterrupt();
1169     stopProcedureExecutor();
1170   }
1171 
1172   @Override
1173   protected void stopServiceThreads() {
1174     if (masterJettyServer != null) {
1175       LOG.info("Stopping master jetty server");
1176       try {
1177         masterJettyServer.stop();
1178       } catch (Exception e) {
1179         LOG.error("Failed to stop master jetty server", e);
1180       }
1181     }
1182     super.stopServiceThreads();
1183     stopChores();
1184 
1185     // Wait for all the remaining region servers to report in IFF we were
1186     // running a cluster shutdown AND we were NOT aborting.
1187     if (!isAborted() && this.serverManager != null &&
1188         this.serverManager.isClusterShutdown()) {
1189       this.serverManager.letRegionServersShutdown();
1190     }
1191     if (LOG.isDebugEnabled()) {
1192       LOG.debug("Stopping service threads");
1193     }
1194     // Clean up and close up shop
1195     if (this.logCleaner != null) this.logCleaner.cancel(true);
1196     if (this.hfileCleaner != null) this.hfileCleaner.cancel(true);
1197     if (this.replicationZKLockCleanerChore != null) this.replicationZKLockCleanerChore.cancel(true);
1198     if (this.quotaManager != null) this.quotaManager.stop();
1199     if (this.activeMasterManager != null) this.activeMasterManager.stop();
1200     if (this.serverManager != null) this.serverManager.stop();
1201     if (this.assignmentManager != null) this.assignmentManager.stop();
1202     if (this.fileSystemManager != null) this.fileSystemManager.stop();
1203     if (this.mpmHost != null) this.mpmHost.stop("server shutting down.");
1204   }
1205 
1206   private void startProcedureExecutor() throws IOException {
1207     final MasterProcedureEnv procEnv = new MasterProcedureEnv(this);
1208     final Path logDir = new Path(fileSystemManager.getRootDir(),
1209         MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR);
1210 
1211     procedureStore = new WALProcedureStore(conf, fileSystemManager.getFileSystem(), logDir,
1212         new MasterProcedureEnv.WALStoreLeaseRecovery(this));
1213     procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this));
1214     procedureExecutor = new ProcedureExecutor(conf, procEnv, procedureStore,
1215         procEnv.getProcedureQueue());
1216 
1217     final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS,
1218         Math.max(Runtime.getRuntime().availableProcessors(),
1219           MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS));
1220     final boolean abortOnCorruption = conf.getBoolean(
1221         MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION,
1222         MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION);
1223     procedureStore.start(numThreads);
1224     procedureExecutor.start(numThreads, abortOnCorruption);
1225   }
1226 
1227   private void stopProcedureExecutor() {
1228     if (procedureExecutor != null) {
1229       procedureExecutor.stop();
1230     }
1231 
1232     if (procedureStore != null) {
1233       procedureStore.stop(isAborted());
1234     }
1235   }
1236 
1237   private void stopChores() {
1238     if (this.balancerChore != null) {
1239       this.balancerChore.cancel(true);
1240     }
1241     if (this.normalizerChore != null) {
1242       this.normalizerChore.cancel(true);
1243     }
1244     if (this.clusterStatusChore != null) {
1245       this.clusterStatusChore.cancel(true);
1246     }
1247     if (this.catalogJanitorChore != null) {
1248       this.catalogJanitorChore.cancel(true);
1249     }
1250     if (this.clusterStatusPublisherChore != null){
1251       clusterStatusPublisherChore.cancel(true);
1252     }
1253     if (this.periodicDoMetricsChore != null) {
1254       periodicDoMetricsChore.cancel();
1255     }
1256   }
1257 
1258   /**
1259    * @return Get remote side's InetAddress
1260    * @throws UnknownHostException
1261    */
1262   InetAddress getRemoteInetAddress(final int port,
1263       final long serverStartCode) throws UnknownHostException {
1264     // Do it out here in its own little method so can fake an address when
1265     // mocking up in tests.
1266     InetAddress ia = RpcServer.getRemoteIp();
1267 
1268     // The call could be from the local regionserver,
1269     // in which case, there is no remote address.
1270     if (ia == null && serverStartCode == startcode) {
1271       InetSocketAddress isa = rpcServices.getSocketAddress();
1272       if (isa != null && isa.getPort() == port) {
1273         ia = isa.getAddress();
1274       }
1275     }
1276     return ia;
1277   }
1278 
1279   /**
1280    * @return Maximum time we should run balancer for
1281    */
1282   private int getBalancerCutoffTime() {
1283     int balancerCutoffTime =
1284       getConfiguration().getInt("hbase.balancer.max.balancing", -1);
1285     if (balancerCutoffTime == -1) {
1286       // No time period set so create one
1287       int balancerPeriod =
1288         getConfiguration().getInt("hbase.balancer.period", 300000);
1289       balancerCutoffTime = balancerPeriod;
1290       // If nonsense period, set it to balancerPeriod
1291       if (balancerCutoffTime <= 0) balancerCutoffTime = balancerPeriod;
1292     }
1293     return balancerCutoffTime;
1294   }
1295 
1296   public boolean balance() throws IOException {
1297     // if master not initialized, don't run balancer.
1298     if (!isInitialized()) {
1299       LOG.debug("Master has not been initialized, don't run balancer.");
1300       return false;
1301     }
1302     // Do this call outside of synchronized block.
1303     int maximumBalanceTime = getBalancerCutoffTime();
1304     synchronized (this.balancer) {
1305       // If balance not true, don't run balancer.
1306       if (!this.loadBalancerTracker.isBalancerOn()) return false;
1307       // Only allow one balance run at at time.
1308       if (this.assignmentManager.getRegionStates().isRegionsInTransition()) {
1309         Map<String, RegionState> regionsInTransition =
1310           this.assignmentManager.getRegionStates().getRegionsInTransition();
1311         LOG.debug("Not running balancer because " + regionsInTransition.size() +
1312           " region(s) in transition: " + org.apache.commons.lang.StringUtils.
1313             abbreviate(regionsInTransition.toString(), 256));
1314         return false;
1315       }
1316       if (this.serverManager.areDeadServersInProgress()) {
1317         LOG.debug("Not running balancer because processing dead regionserver(s): " +
1318           this.serverManager.getDeadServers());
1319         return false;
1320       }
1321 
1322       if (this.cpHost != null) {
1323         try {
1324           if (this.cpHost.preBalance()) {
1325             LOG.debug("Coprocessor bypassing balancer request");
1326             return false;
1327           }
1328         } catch (IOException ioe) {
1329           LOG.error("Error invoking master coprocessor preBalance()", ioe);
1330           return false;
1331         }
1332       }
1333 
1334       Map<TableName, Map<ServerName, List<HRegionInfo>>> assignmentsByTable =
1335         this.assignmentManager.getRegionStates().getAssignmentsByTable();
1336 
1337       List<RegionPlan> plans = new ArrayList<RegionPlan>();
1338       //Give the balancer the current cluster state.
1339       this.balancer.setClusterStatus(getClusterStatus());
1340       for (Map<ServerName, List<HRegionInfo>> assignments : assignmentsByTable.values()) {
1341         List<RegionPlan> partialPlans = this.balancer.balanceCluster(assignments);
1342         if (partialPlans != null) plans.addAll(partialPlans);
1343       }
1344       long cutoffTime = System.currentTimeMillis() + maximumBalanceTime;
1345       int rpCount = 0;  // number of RegionPlans balanced so far
1346       long totalRegPlanExecTime = 0;
1347       if (plans != null && !plans.isEmpty()) {
1348         for (RegionPlan plan: plans) {
1349           LOG.info("balance " + plan);
1350           long balStartTime = System.currentTimeMillis();
1351           //TODO: bulk assign
1352           this.assignmentManager.balance(plan);
1353           totalRegPlanExecTime += System.currentTimeMillis()-balStartTime;
1354           rpCount++;
1355           if (rpCount < plans.size() &&
1356               // if performing next balance exceeds cutoff time, exit the loop
1357               (System.currentTimeMillis() + (totalRegPlanExecTime / rpCount)) > cutoffTime) {
1358             //TODO: After balance, there should not be a cutoff time (keeping it as a security net for now)
1359             LOG.debug("No more balancing till next balance run; maximumBalanceTime=" +
1360               maximumBalanceTime);
1361             break;
1362           }
1363         }
1364       }
1365       if (this.cpHost != null) {
1366         try {
1367           this.cpHost.postBalance(rpCount < plans.size() ? plans.subList(0, rpCount) : plans);
1368         } catch (IOException ioe) {
1369           // balancing already succeeded so don't change the result
1370           LOG.error("Error invoking master coprocessor postBalance()", ioe);
1371         }
1372       }
1373     }
1374     // If LoadBalancer did not generate any plans, it means the cluster is already balanced.
1375     // Return true indicating a success.
1376     return true;
1377   }
1378 
1379   /**
1380    * Perform normalization of cluster (invoked by {@link RegionNormalizerChore}).
1381    *
1382    * @return true if normalization step was performed successfully, false otherwise
1383    *   (specifically, if HMaster hasn't been initialized properly or normalization
1384    *   is globally disabled)
1385    * @throws IOException
1386    * @throws CoordinatedStateException
1387    */
1388   public boolean normalizeRegions() throws IOException, CoordinatedStateException {
1389     if (!isInitialized()) {
1390       LOG.debug("Master has not been initialized, don't run region normalizer.");
1391       return false;
1392     }
1393 
1394     if (!this.regionNormalizerTracker.isNormalizerOn()) {
1395       LOG.debug("Region normalization is disabled, don't run region normalizer.");
1396       return false;
1397     }
1398 
1399     synchronized (this.normalizer) {
1400       // Don't run the normalizer concurrently
1401       List<TableName> allEnabledTables = new ArrayList<>(
1402         this.assignmentManager.getTableStateManager().getTablesInStates(
1403           ZooKeeperProtos.Table.State.ENABLED));
1404 
1405       Collections.shuffle(allEnabledTables);
1406 
1407       for (TableName table : allEnabledTables) {
1408         if (quotaManager.getNamespaceQuotaManager() != null &&
1409             quotaManager.getNamespaceQuotaManager().getState(table.getNamespaceAsString()) != null){
1410           LOG.debug("Skipping normalizing " + table + " since its namespace has quota");
1411           continue;
1412         }
1413         if (table.isSystemTable() || (getTableDescriptors().get(table) != null &&
1414             !getTableDescriptors().get(table).isNormalizationEnabled())) {
1415           LOG.debug("Skipping normalization for table: " + table + ", as it's either system"
1416               + " table or doesn't have auto normalization turned on");
1417           continue;
1418         }
1419         List<NormalizationPlan> plans = this.normalizer.computePlanForTable(table);
1420         if (plans != null) {
1421           for (NormalizationPlan plan : plans) {
1422             plan.execute(clusterConnection.getAdmin());
1423           }
1424         }
1425       }
1426     }
1427     // If Region did not generate any plans, it means the cluster is already balanced.
1428     // Return true indicating a success.
1429     return true;
1430   }
1431 
1432   /**
1433    * @return Client info for use as prefix on an audit log string; who did an action
1434    */
1435   String getClientIdAuditPrefix() {
1436     return "Client=" + RpcServer.getRequestUserName() + "/" + RpcServer.getRemoteAddress();
1437   }
1438 
1439   /**
1440    * Switch for the background CatalogJanitor thread.
1441    * Used for testing.  The thread will continue to run.  It will just be a noop
1442    * if disabled.
1443    * @param b If false, the catalog janitor won't do anything.
1444    */
1445   public void setCatalogJanitorEnabled(final boolean b) {
1446     this.catalogJanitorChore.setEnabled(b);
1447   }
1448 
1449   @Override
1450   public void dispatchMergingRegions(final HRegionInfo region_a,
1451       final HRegionInfo region_b, final boolean forcible) throws IOException {
1452     checkInitialized();
1453     this.service.submit(new DispatchMergingRegionHandler(this,
1454       this.catalogJanitorChore, region_a, region_b, forcible));
1455   }
1456 
1457   void move(final byte[] encodedRegionName,
1458       final byte[] destServerName) throws HBaseIOException {
1459     RegionState regionState = assignmentManager.getRegionStates().
1460       getRegionState(Bytes.toString(encodedRegionName));
1461     if (regionState == null) {
1462       throw new UnknownRegionException(Bytes.toStringBinary(encodedRegionName));
1463     }
1464 
1465     HRegionInfo hri = regionState.getRegion();
1466     ServerName dest;
1467     if (destServerName == null || destServerName.length == 0) {
1468       LOG.info("Passed destination servername is null/empty so " +
1469         "choosing a server at random");
1470       final List<ServerName> destServers = this.serverManager.createDestinationServersList(
1471         regionState.getServerName());
1472       dest = balancer.randomAssignment(hri, destServers);
1473       if (dest == null) {
1474         LOG.debug("Unable to determine a plan to assign " + hri);
1475         return;
1476       }
1477     } else {
1478       dest = ServerName.valueOf(Bytes.toString(destServerName));
1479       if (dest.equals(serverName) && balancer instanceof BaseLoadBalancer
1480           && !((BaseLoadBalancer)balancer).shouldBeOnMaster(hri)) {
1481         // To avoid unnecessary region moving later by balancer. Don't put user
1482         // regions on master. Regions on master could be put on other region
1483         // server intentionally by test however.
1484         LOG.debug("Skipping move of region " + hri.getRegionNameAsString()
1485           + " to avoid unnecessary region moving later by load balancer,"
1486           + " because it should not be on master");
1487         return;
1488       }
1489     }
1490 
1491     if (dest.equals(regionState.getServerName())) {
1492       LOG.debug("Skipping move of region " + hri.getRegionNameAsString()
1493         + " because region already assigned to the same server " + dest + ".");
1494       return;
1495     }
1496 
1497     // Now we can do the move
1498     RegionPlan rp = new RegionPlan(hri, regionState.getServerName(), dest);
1499 
1500     try {
1501       checkInitialized();
1502       if (this.cpHost != null) {
1503         if (this.cpHost.preMove(hri, rp.getSource(), rp.getDestination())) {
1504           return;
1505         }
1506       }
1507       // warmup the region on the destination before initiating the move. this call
1508       // is synchronous and takes some time. doing it before the source region gets
1509       // closed
1510       serverManager.sendRegionWarmup(rp.getDestination(), hri);
1511 
1512       LOG.info(getClientIdAuditPrefix() + " move " + rp + ", running balancer");
1513       this.assignmentManager.balance(rp);
1514       if (this.cpHost != null) {
1515         this.cpHost.postMove(hri, rp.getSource(), rp.getDestination());
1516       }
1517     } catch (IOException ioe) {
1518       if (ioe instanceof HBaseIOException) {
1519         throw (HBaseIOException)ioe;
1520       }
1521       throw new HBaseIOException(ioe);
1522     }
1523   }
1524 
1525   @Override
1526   public long createTable(
1527       final HTableDescriptor hTableDescriptor,
1528       final byte [][] splitKeys,
1529       final long nonceGroup,
1530       final long nonce) throws IOException {
1531     if (isStopped()) {
1532       throw new MasterNotRunningException();
1533     }
1534 
1535     String namespace = hTableDescriptor.getTableName().getNamespaceAsString();
1536     ensureNamespaceExists(namespace);
1537 
1538     final HRegionInfo[] newRegions =
1539         ModifyRegionUtils.createHRegionInfos(hTableDescriptor, splitKeys);
1540     checkInitialized();
1541     sanityCheckTableDescriptor(hTableDescriptor);
1542 
1543     return MasterProcedureUtil.submitProcedure(
1544       new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
1545       @Override
1546       protected void run() throws IOException {
1547         getMaster().getMasterCoprocessorHost().preCreateTable(hTableDescriptor, newRegions);
1548 
1549         LOG.info(getClientIdAuditPrefix() + " create " + hTableDescriptor);
1550 
1551         // TODO: We can handle/merge duplicate requests, and differentiate the case of
1552         //       TableExistsException by saying if the schema is the same or not.
1553         ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch();
1554         submitProcedure(new CreateTableProcedure(
1555           procedureExecutor.getEnvironment(), hTableDescriptor, newRegions, latch));
1556         latch.await();
1557 
1558         getMaster().getMasterCoprocessorHost().postCreateTable(hTableDescriptor, newRegions);
1559       }
1560 
1561       @Override
1562       protected String getDescription() {
1563         return "CreateTableProcedure";
1564       }
1565     });
1566   }
1567 
1568   /**
1569    * Checks whether the table conforms to some sane limits, and configured
1570    * values (compression, etc) work. Throws an exception if something is wrong.
1571    * @throws IOException
1572    */
1573   private void sanityCheckTableDescriptor(final HTableDescriptor htd) throws IOException {
1574     final String CONF_KEY = "hbase.table.sanity.checks";
1575     boolean logWarn = false;
1576     if (!conf.getBoolean(CONF_KEY, true)) {
1577       logWarn = true;
1578     }
1579     String tableVal = htd.getConfigurationValue(CONF_KEY);
1580     if (tableVal != null && !Boolean.valueOf(tableVal)) {
1581       logWarn = true;
1582     }
1583 
1584     // check max file size
1585     long maxFileSizeLowerLimit = 2 * 1024 * 1024L; // 2M is the default lower limit
1586     long maxFileSize = htd.getMaxFileSize();
1587     if (maxFileSize < 0) {
1588       maxFileSize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, maxFileSizeLowerLimit);
1589     }
1590     if (maxFileSize < conf.getLong("hbase.hregion.max.filesize.limit", maxFileSizeLowerLimit)) {
1591       String message = "MAX_FILESIZE for table descriptor or "
1592           + "\"hbase.hregion.max.filesize\" (" + maxFileSize
1593           + ") is too small, which might cause over splitting into unmanageable "
1594           + "number of regions.";
1595       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1596     }
1597 
1598     // check flush size
1599     long flushSizeLowerLimit = 1024 * 1024L; // 1M is the default lower limit
1600     long flushSize = htd.getMemStoreFlushSize();
1601     if (flushSize < 0) {
1602       flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, flushSizeLowerLimit);
1603     }
1604     if (flushSize < conf.getLong("hbase.hregion.memstore.flush.size.limit", flushSizeLowerLimit)) {
1605       String message = "MEMSTORE_FLUSHSIZE for table descriptor or "
1606           + "\"hbase.hregion.memstore.flush.size\" ("+flushSize+") is too small, which might cause"
1607           + " very frequent flushing.";
1608       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1609     }
1610 
1611     // check that coprocessors and other specified plugin classes can be loaded
1612     try {
1613       checkClassLoading(conf, htd);
1614     } catch (Exception ex) {
1615       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, ex.getMessage(), null);
1616     }
1617 
1618     // check compression can be loaded
1619     try {
1620       checkCompression(htd);
1621     } catch (IOException e) {
1622       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, e.getMessage(), e);
1623     }
1624 
1625     // check encryption can be loaded
1626     try {
1627       checkEncryption(conf, htd);
1628     } catch (IOException e) {
1629       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, e.getMessage(), e);
1630     }
1631     // Verify compaction policy
1632     try{
1633       checkCompactionPolicy(conf, htd);
1634     } catch(IOException e){
1635       warnOrThrowExceptionForFailure(false, CONF_KEY, e.getMessage(), e);
1636     }
1637     // check that we have at least 1 CF
1638     if (htd.getColumnFamilies().length == 0) {
1639       String message = "Table should have at least one column family.";
1640       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1641     }
1642 
1643     for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1644       if (hcd.getTimeToLive() <= 0) {
1645         String message = "TTL for column family " + hcd.getNameAsString() + " must be positive.";
1646         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1647       }
1648 
1649       // check blockSize
1650       if (hcd.getBlocksize() < 1024 || hcd.getBlocksize() > 16 * 1024 * 1024) {
1651         String message = "Block size for column family " + hcd.getNameAsString()
1652             + "  must be between 1K and 16MB.";
1653         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1654       }
1655 
1656       // check versions
1657       if (hcd.getMinVersions() < 0) {
1658         String message = "Min versions for column family " + hcd.getNameAsString()
1659           + "  must be positive.";
1660         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1661       }
1662       // max versions already being checked
1663 
1664       // HBASE-13776 Setting illegal versions for HColumnDescriptor
1665       //  does not throw IllegalArgumentException
1666       // check minVersions <= maxVerions
1667       if (hcd.getMinVersions() > hcd.getMaxVersions()) {
1668         String message = "Min versions for column family " + hcd.getNameAsString()
1669             + " must be less than the Max versions.";
1670         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1671       }
1672 
1673       // check replication scope
1674       if (hcd.getScope() < 0) {
1675         String message = "Replication scope for column family "
1676           + hcd.getNameAsString() + "  must be positive.";
1677         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1678       }
1679 
1680       // check data replication factor, it can be 0(default value) when user has not explicitly
1681       // set the value, in this case we use default replication factor set in the file system.
1682       if (hcd.getDFSReplication() < 0) {
1683         String message = "HFile Replication for column family " + hcd.getNameAsString()
1684             + "  must be greater than zero.";
1685         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1686       }
1687 
1688       // TODO: should we check coprocessors and encryption ?
1689     }
1690   }
1691 
1692   private void checkCompactionPolicy(Configuration conf, HTableDescriptor htd)
1693       throws IOException {
1694     // FIFO compaction has some requirements
1695     // Actually FCP ignores periodic major compactions
1696     String className =
1697         htd.getConfigurationValue(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY);
1698     if (className == null) {
1699       className =
1700           conf.get(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY,
1701             ExploringCompactionPolicy.class.getName());
1702     }
1703 
1704     int blockingFileCount = HStore.DEFAULT_BLOCKING_STOREFILE_COUNT;
1705     String sv = htd.getConfigurationValue(HStore.BLOCKING_STOREFILES_KEY);
1706     if (sv != null) {
1707       blockingFileCount = Integer.parseInt(sv);
1708     } else {
1709       blockingFileCount = conf.getInt(HStore.BLOCKING_STOREFILES_KEY, blockingFileCount);
1710     }
1711 
1712     for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1713       String compactionPolicy =
1714           hcd.getConfigurationValue(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY);
1715       if (compactionPolicy == null) {
1716         compactionPolicy = className;
1717       }
1718       if (!compactionPolicy.equals(FIFOCompactionPolicy.class.getName())) {
1719         continue;
1720       }
1721       // FIFOCompaction
1722       String message = null;
1723 
1724       // 1. Check TTL
1725       if (hcd.getTimeToLive() == HColumnDescriptor.DEFAULT_TTL) {
1726         message = "Default TTL is not supported for FIFO compaction";
1727         throw new IOException(message);
1728       }
1729 
1730       // 2. Check min versions
1731       if (hcd.getMinVersions() > 0) {
1732         message = "MIN_VERSION > 0 is not supported for FIFO compaction";
1733         throw new IOException(message);
1734       }
1735 
1736       // 3. blocking file count
1737       String sbfc = htd.getConfigurationValue(HStore.BLOCKING_STOREFILES_KEY);
1738       if (sbfc != null) {
1739         blockingFileCount = Integer.parseInt(sbfc);
1740       }
1741       if (blockingFileCount < 1000) {
1742         message =
1743             "blocking file count '" + HStore.BLOCKING_STOREFILES_KEY + "' " + blockingFileCount
1744                 + " is below recommended minimum of 1000";
1745         throw new IOException(message);
1746       }
1747     }
1748   }
1749 
1750   // HBASE-13350 - Helper method to log warning on sanity check failures if checks disabled.
1751   private static void warnOrThrowExceptionForFailure(boolean logWarn, String confKey,
1752       String message, Exception cause) throws IOException {
1753     if (!logWarn) {
1754       throw new DoNotRetryIOException(message + " Set " + confKey +
1755           " to false at conf or table descriptor if you want to bypass sanity checks", cause);
1756     }
1757     LOG.warn(message);
1758   }
1759 
1760   private void startActiveMasterManager(int infoPort) throws KeeperException {
1761     String backupZNode = ZKUtil.joinZNode(
1762       zooKeeper.backupMasterAddressesZNode, serverName.toString());
1763     /*
1764     * Add a ZNode for ourselves in the backup master directory since we
1765     * may not become the active master. If so, we want the actual active
1766     * master to know we are backup masters, so that it won't assign
1767     * regions to us if so configured.
1768     *
1769     * If we become the active master later, ActiveMasterManager will delete
1770     * this node explicitly.  If we crash before then, ZooKeeper will delete
1771     * this node for us since it is ephemeral.
1772     */
1773     LOG.info("Adding backup master ZNode " + backupZNode);
1774     if (!MasterAddressTracker.setMasterAddress(zooKeeper, backupZNode,
1775         serverName, infoPort)) {
1776       LOG.warn("Failed create of " + backupZNode + " by " + serverName);
1777     }
1778 
1779     activeMasterManager.setInfoPort(infoPort);
1780     // Start a thread to try to become the active master, so we won't block here
1781     Threads.setDaemonThreadRunning(new Thread(new Runnable() {
1782       @Override
1783       public void run() {
1784         int timeout = conf.getInt(HConstants.ZK_SESSION_TIMEOUT,
1785           HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
1786         // If we're a backup master, stall until a primary to writes his address
1787         if (conf.getBoolean(HConstants.MASTER_TYPE_BACKUP,
1788           HConstants.DEFAULT_MASTER_TYPE_BACKUP)) {
1789           LOG.debug("HMaster started in backup mode. "
1790             + "Stalling until master znode is written.");
1791           // This will only be a minute or so while the cluster starts up,
1792           // so don't worry about setting watches on the parent znode
1793           while (!activeMasterManager.hasActiveMaster()) {
1794             LOG.debug("Waiting for master address ZNode to be written "
1795               + "(Also watching cluster state node)");
1796             Threads.sleep(timeout);
1797           }
1798         }
1799         MonitoredTask status = TaskMonitor.get().createStatus("Master startup");
1800         status.setDescription("Master startup");
1801         try {
1802           if (activeMasterManager.blockUntilBecomingActiveMaster(timeout, status)) {
1803             finishActiveMasterInitialization(status);
1804           }
1805         } catch (Throwable t) {
1806           status.setStatus("Failed to become active: " + t.getMessage());
1807           LOG.fatal("Failed to become active master", t);
1808           // HBASE-5680: Likely hadoop23 vs hadoop 20.x/1.x incompatibility
1809           if (t instanceof NoClassDefFoundError &&
1810             t.getMessage()
1811               .contains("org/apache/hadoop/hdfs/protocol/HdfsConstants$SafeModeAction")) {
1812             // improved error message for this special case
1813             abort("HBase is having a problem with its Hadoop jars.  You may need to "
1814               + "recompile HBase against Hadoop version "
1815               + org.apache.hadoop.util.VersionInfo.getVersion()
1816               + " or change your hadoop jars to start properly", t);
1817           } else {
1818             abort("Unhandled exception. Starting shutdown.", t);
1819           }
1820         } finally {
1821           status.cleanup();
1822         }
1823       }
1824     }, getServerName().toShortString() + ".activeMasterManager"));
1825   }
1826 
1827   private void checkCompression(final HTableDescriptor htd)
1828   throws IOException {
1829     if (!this.masterCheckCompression) return;
1830     for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1831       checkCompression(hcd);
1832     }
1833   }
1834 
1835   private void checkCompression(final HColumnDescriptor hcd)
1836   throws IOException {
1837     if (!this.masterCheckCompression) return;
1838     CompressionTest.testCompression(hcd.getCompression());
1839     CompressionTest.testCompression(hcd.getCompactionCompression());
1840   }
1841 
1842   private void checkEncryption(final Configuration conf, final HTableDescriptor htd)
1843   throws IOException {
1844     if (!this.masterCheckEncryption) return;
1845     for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1846       checkEncryption(conf, hcd);
1847     }
1848   }
1849 
1850   private void checkEncryption(final Configuration conf, final HColumnDescriptor hcd)
1851   throws IOException {
1852     if (!this.masterCheckEncryption) return;
1853     EncryptionTest.testEncryption(conf, hcd.getEncryptionType(), hcd.getEncryptionKey());
1854   }
1855 
1856   private void checkClassLoading(final Configuration conf, final HTableDescriptor htd)
1857   throws IOException {
1858     RegionSplitPolicy.getSplitPolicyClass(htd, conf);
1859     RegionCoprocessorHost.testTableCoprocessorAttrs(conf, htd);
1860   }
1861 
1862   private static boolean isCatalogTable(final TableName tableName) {
1863     return tableName.equals(TableName.META_TABLE_NAME);
1864   }
1865 
1866   @Override
1867   public long deleteTable(
1868       final TableName tableName,
1869       final long nonceGroup,
1870       final long nonce) throws IOException {
1871     checkInitialized();
1872 
1873     return MasterProcedureUtil.submitProcedure(
1874       new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
1875       @Override
1876       protected void run() throws IOException {
1877         getMaster().getMasterCoprocessorHost().preDeleteTable(tableName);
1878 
1879         LOG.info(getClientIdAuditPrefix() + " delete " + tableName);
1880 
1881         // TODO: We can handle/merge duplicate request
1882         ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch();
1883         submitProcedure(new DeleteTableProcedure(procedureExecutor.getEnvironment(),
1884             tableName, latch));
1885         latch.await();
1886 
1887         getMaster().getMasterCoprocessorHost().postDeleteTable(tableName);
1888       }
1889 
1890       @Override
1891       protected String getDescription() {
1892         return "DeleteTableProcedure";
1893       }
1894     });
1895   }
1896 
1897   @Override
1898   public void truncateTable(
1899       final TableName tableName,
1900       final boolean preserveSplits,
1901       final long nonceGroup,
1902       final long nonce) throws IOException {
1903     checkInitialized();
1904 
1905     MasterProcedureUtil.submitProcedure(
1906       new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
1907       @Override
1908       protected void run() throws IOException {
1909         getMaster().getMasterCoprocessorHost().preTruncateTable(tableName);
1910 
1911         LOG.info(getClientIdAuditPrefix() + " truncate " + tableName);
1912 
1913         long procId = submitProcedure(new TruncateTableProcedure(procedureExecutor.getEnvironment(),
1914             tableName, preserveSplits));
1915         ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
1916 
1917         getMaster().getMasterCoprocessorHost().postTruncateTable(tableName);
1918       }
1919 
1920       @Override
1921       protected String getDescription() {
1922         return "TruncateTableProcedure";
1923       }
1924     });
1925   }
1926 
1927   @Override
1928   public void addColumn(
1929       final TableName tableName,
1930       final HColumnDescriptor columnDescriptor,
1931       final long nonceGroup,
1932       final long nonce)
1933       throws IOException {
1934     checkInitialized();
1935     checkCompression(columnDescriptor);
1936     checkEncryption(conf, columnDescriptor);
1937 
1938     MasterProcedureUtil.submitProcedure(
1939       new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
1940       @Override
1941       protected void run() throws IOException {
1942         if (getMaster().getMasterCoprocessorHost().preAddColumn(tableName, columnDescriptor)) {
1943           return;
1944         }
1945 
1946         // Execute the operation synchronously, wait for the operation to complete before continuing
1947         long procId = submitProcedure(new AddColumnFamilyProcedure(
1948           procedureExecutor.getEnvironment(), tableName, columnDescriptor));
1949         ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
1950 
1951         getMaster().getMasterCoprocessorHost().postAddColumn(tableName, columnDescriptor);
1952       }
1953 
1954       @Override
1955       protected String getDescription() {
1956         return "AddColumnFamilyProcedure";
1957       }
1958     });
1959   }
1960 
1961   @Override
1962   public void modifyColumn(
1963       final TableName tableName,
1964       final HColumnDescriptor descriptor,
1965       final long nonceGroup,
1966       final long nonce)
1967       throws IOException {
1968     checkInitialized();
1969     checkCompression(descriptor);
1970     checkEncryption(conf, descriptor);
1971 
1972     MasterProcedureUtil.submitProcedure(
1973       new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
1974       @Override
1975       protected void run() throws IOException {
1976         if (getMaster().getMasterCoprocessorHost().preModifyColumn(tableName, descriptor)) {
1977           return;
1978         }
1979 
1980         LOG.info(getClientIdAuditPrefix() + " modify " + descriptor);
1981 
1982         // Execute the operation synchronously - wait for the operation to complete before continuing.
1983         long procId = submitProcedure(new ModifyColumnFamilyProcedure(
1984           procedureExecutor.getEnvironment(), tableName, descriptor));
1985         ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
1986 
1987         getMaster().getMasterCoprocessorHost().postModifyColumn(tableName, descriptor);
1988       }
1989 
1990       @Override
1991       protected String getDescription() {
1992         return "ModifyColumnFamilyProcedure";
1993       }
1994     });
1995   }
1996 
1997   @Override
1998   public void deleteColumn(
1999       final TableName tableName,
2000       final byte[] columnName,
2001       final long nonceGroup,
2002       final long nonce)
2003       throws IOException {
2004     checkInitialized();
2005 
2006     MasterProcedureUtil.submitProcedure(
2007       new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2008       @Override
2009       protected void run() throws IOException {
2010         if (getMaster().getMasterCoprocessorHost().preDeleteColumn(tableName, columnName)) {
2011           return;
2012         }
2013 
2014         LOG.info(getClientIdAuditPrefix() + " delete " + Bytes.toString(columnName));
2015 
2016         // Execute the operation synchronously - wait for the operation to complete before
2017         // continuing.
2018         long procId = submitProcedure(new DeleteColumnFamilyProcedure(
2019           procedureExecutor.getEnvironment(), tableName, columnName));
2020         ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
2021 
2022         getMaster().getMasterCoprocessorHost().postDeleteColumn(tableName, columnName);
2023       }
2024 
2025       @Override
2026       protected String getDescription() {
2027         return "DeleteColumnFamilyProcedure";
2028       }
2029     });
2030   }
2031 
2032   @Override
2033   public long enableTable(final TableName tableName, final long nonceGroup, final long nonce)
2034       throws IOException {
2035     checkInitialized();
2036 
2037     return MasterProcedureUtil.submitProcedure(
2038         new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2039       @Override
2040       protected void run() throws IOException {
2041         getMaster().getMasterCoprocessorHost().preEnableTable(tableName);
2042 
2043         LOG.info(getClientIdAuditPrefix() + " enable " + tableName);
2044 
2045         // Execute the operation asynchronously - client will check the progress of the operation
2046         // In case the request is from a <1.1 client before returning,
2047         // we want to make sure that the table is prepared to be
2048         // enabled (the table is locked and the table state is set).
2049         // Note: if the procedure throws exception, we will catch it and rethrow.
2050         final ProcedurePrepareLatch prepareLatch = ProcedurePrepareLatch.createLatch();
2051         submitProcedure(new EnableTableProcedure(procedureExecutor.getEnvironment(),
2052             tableName, false, prepareLatch));
2053         prepareLatch.await();
2054 
2055         getMaster().getMasterCoprocessorHost().postEnableTable(tableName);
2056       }
2057 
2058       @Override
2059       protected String getDescription() {
2060         return "EnableTableProcedure";
2061       }
2062     });
2063   }
2064 
2065   @Override
2066   public long disableTable(final TableName tableName, final long nonceGroup, final long nonce)
2067       throws IOException {
2068     checkInitialized();
2069 
2070     return MasterProcedureUtil.submitProcedure(
2071         new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2072       @Override
2073       protected void run() throws IOException {
2074         getMaster().getMasterCoprocessorHost().preDisableTable(tableName);
2075 
2076         LOG.info(getClientIdAuditPrefix() + " disable " + tableName);
2077 
2078         // Execute the operation asynchronously - client will check the progress of the operation
2079         // In case the request is from a <1.1 client before returning,
2080         // we want to make sure that the table is prepared to be
2081         // enabled (the table is locked and the table state is set).
2082         // Note: if the procedure throws exception, we will catch it and rethrow.
2083         final ProcedurePrepareLatch prepareLatch = ProcedurePrepareLatch.createLatch();
2084         submitProcedure(new DisableTableProcedure(procedureExecutor.getEnvironment(),
2085             tableName, false, prepareLatch));
2086         prepareLatch.await();
2087 
2088         getMaster().getMasterCoprocessorHost().postDisableTable(tableName);
2089       }
2090 
2091       @Override
2092       protected String getDescription() {
2093         return "DisableTableProcedure";
2094       }
2095     });
2096   }
2097 
2098   /**
2099    * Return the region and current deployment for the region containing
2100    * the given row. If the region cannot be found, returns null. If it
2101    * is found, but not currently deployed, the second element of the pair
2102    * may be null.
2103    */
2104   @VisibleForTesting // Used by TestMaster.
2105   Pair<HRegionInfo, ServerName> getTableRegionForRow(
2106       final TableName tableName, final byte [] rowKey)
2107   throws IOException {
2108     final AtomicReference<Pair<HRegionInfo, ServerName>> result =
2109       new AtomicReference<Pair<HRegionInfo, ServerName>>(null);
2110 
2111     MetaScannerVisitor visitor =
2112       new MetaScannerVisitorBase() {
2113         @Override
2114         public boolean processRow(Result data) throws IOException {
2115           if (data == null || data.size() <= 0) {
2116             return true;
2117           }
2118           Pair<HRegionInfo, ServerName> pair = HRegionInfo.getHRegionInfoAndServerName(data);
2119           if (pair == null) {
2120             return false;
2121           }
2122           if (!pair.getFirst().getTable().equals(tableName)) {
2123             return false;
2124           }
2125           result.set(pair);
2126           return true;
2127         }
2128     };
2129 
2130     MetaScanner.metaScan(clusterConnection, visitor, tableName, rowKey, 1);
2131     return result.get();
2132   }
2133 
2134   @Override
2135   public void modifyTable(
2136       final TableName tableName,
2137       final HTableDescriptor descriptor,
2138       final long nonceGroup,
2139       final long nonce)
2140       throws IOException {
2141     checkInitialized();
2142     sanityCheckTableDescriptor(descriptor);
2143 
2144     MasterProcedureUtil.submitProcedure(
2145       new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2146       @Override
2147       protected void run() throws IOException {
2148         getMaster().getMasterCoprocessorHost().preModifyTable(tableName, descriptor);
2149 
2150         LOG.info(getClientIdAuditPrefix() + " modify " + tableName);
2151 
2152         // Execute the operation synchronously - wait for the operation completes before continuing.
2153         long procId = submitProcedure(new ModifyTableProcedure(
2154           procedureExecutor.getEnvironment(), descriptor));
2155         ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
2156 
2157         getMaster().getMasterCoprocessorHost().postModifyTable(tableName, descriptor);
2158       }
2159 
2160       @Override
2161       protected String getDescription() {
2162         return "ModifyTableProcedure";
2163       }
2164     });
2165   }
2166 
2167   @Override
2168   public void checkTableModifiable(final TableName tableName)
2169       throws IOException, TableNotFoundException, TableNotDisabledException {
2170     if (isCatalogTable(tableName)) {
2171       throw new IOException("Can't modify catalog tables");
2172     }
2173     if (!MetaTableAccessor.tableExists(getConnection(), tableName)) {
2174       throw new TableNotFoundException(tableName);
2175     }
2176     if (!getAssignmentManager().getTableStateManager().
2177         isTableState(tableName, ZooKeeperProtos.Table.State.DISABLED)) {
2178       throw new TableNotDisabledException(tableName);
2179     }
2180   }
2181 
2182   /**
2183    * @return cluster status
2184    */
2185   public ClusterStatus getClusterStatus() throws InterruptedIOException {
2186     // Build Set of backup masters from ZK nodes
2187     List<String> backupMasterStrings;
2188     try {
2189       backupMasterStrings = ZKUtil.listChildrenNoWatch(this.zooKeeper,
2190         this.zooKeeper.backupMasterAddressesZNode);
2191     } catch (KeeperException e) {
2192       LOG.warn(this.zooKeeper.prefix("Unable to list backup servers"), e);
2193       backupMasterStrings = null;
2194     }
2195 
2196     List<ServerName> backupMasters = null;
2197     if (backupMasterStrings != null && !backupMasterStrings.isEmpty()) {
2198       backupMasters = new ArrayList<ServerName>(backupMasterStrings.size());
2199       for (String s: backupMasterStrings) {
2200         try {
2201           byte [] bytes;
2202           try {
2203             bytes = ZKUtil.getData(this.zooKeeper, ZKUtil.joinZNode(
2204                 this.zooKeeper.backupMasterAddressesZNode, s));
2205           } catch (InterruptedException e) {
2206             throw new InterruptedIOException();
2207           }
2208           if (bytes != null) {
2209             ServerName sn;
2210             try {
2211               sn = ServerName.parseFrom(bytes);
2212             } catch (DeserializationException e) {
2213               LOG.warn("Failed parse, skipping registering backup server", e);
2214               continue;
2215             }
2216             backupMasters.add(sn);
2217           }
2218         } catch (KeeperException e) {
2219           LOG.warn(this.zooKeeper.prefix("Unable to get information about " +
2220                    "backup servers"), e);
2221         }
2222       }
2223       Collections.sort(backupMasters, new Comparator<ServerName>() {
2224         @Override
2225         public int compare(ServerName s1, ServerName s2) {
2226           return s1.getServerName().compareTo(s2.getServerName());
2227         }});
2228     }
2229 
2230     String clusterId = fileSystemManager != null ?
2231       fileSystemManager.getClusterId().toString() : null;
2232     Map<String, RegionState> regionsInTransition = assignmentManager != null ?
2233       assignmentManager.getRegionStates().getRegionsInTransition() : null;
2234     String[] coprocessors = cpHost != null ? getMasterCoprocessors() : null;
2235     boolean balancerOn = loadBalancerTracker != null ?
2236       loadBalancerTracker.isBalancerOn() : false;
2237     Map<ServerName, ServerLoad> onlineServers = null;
2238     Set<ServerName> deadServers = null;
2239     if (serverManager != null) {
2240       deadServers = serverManager.getDeadServers().copyServerNames();
2241       onlineServers = serverManager.getOnlineServers();
2242     }
2243     return new ClusterStatus(VersionInfo.getVersion(), clusterId,
2244       onlineServers, deadServers, serverName, backupMasters,
2245       regionsInTransition, coprocessors, balancerOn);
2246   }
2247 
2248   /**
2249    * The set of loaded coprocessors is stored in a static set. Since it's
2250    * statically allocated, it does not require that HMaster's cpHost be
2251    * initialized prior to accessing it.
2252    * @return a String representation of the set of names of the loaded
2253    * coprocessors.
2254    */
2255   public static String getLoadedCoprocessors() {
2256     return CoprocessorHost.getLoadedCoprocessors().toString();
2257   }
2258 
2259   /**
2260    * @return timestamp in millis when HMaster was started.
2261    */
2262   public long getMasterStartTime() {
2263     return startcode;
2264   }
2265 
2266   /**
2267    * @return timestamp in millis when HMaster became the active master.
2268    */
2269   public long getMasterActiveTime() {
2270     return masterActiveTime;
2271   }
2272 
2273   public int getRegionServerInfoPort(final ServerName sn) {
2274     RegionServerInfo info = this.regionServerTracker.getRegionServerInfo(sn);
2275     if (info == null || info.getInfoPort() == 0) {
2276       return conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
2277         HConstants.DEFAULT_REGIONSERVER_INFOPORT);
2278     }
2279     return info.getInfoPort();
2280   }
2281 
2282   public String getRegionServerVersion(final ServerName sn) {
2283     RegionServerInfo info = this.regionServerTracker.getRegionServerInfo(sn);
2284     if (info != null && info.hasVersionInfo()) {
2285       return info.getVersionInfo().getVersion();
2286     }
2287     return "Unknown";
2288   }
2289 
2290   /**
2291    * @return array of coprocessor SimpleNames.
2292    */
2293   public String[] getMasterCoprocessors() {
2294     Set<String> masterCoprocessors = getMasterCoprocessorHost().getCoprocessors();
2295     return masterCoprocessors.toArray(new String[masterCoprocessors.size()]);
2296   }
2297 
2298   @Override
2299   public void abort(final String msg, final Throwable t) {
2300     if (isAborted() || isStopped()) {
2301       return;
2302     }
2303     if (cpHost != null) {
2304       // HBASE-4014: dump a list of loaded coprocessors.
2305       LOG.fatal("Master server abort: loaded coprocessors are: " +
2306           getLoadedCoprocessors());
2307     }
2308     if (t != null) LOG.fatal(msg, t);
2309     stop(msg);
2310   }
2311 
2312   @Override
2313   public ZooKeeperWatcher getZooKeeper() {
2314     return zooKeeper;
2315   }
2316 
2317   @Override
2318   public MasterCoprocessorHost getMasterCoprocessorHost() {
2319     return cpHost;
2320   }
2321 
2322   @Override
2323   public MasterQuotaManager getMasterQuotaManager() {
2324     return quotaManager;
2325   }
2326 
2327   @Override
2328   public ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
2329     return procedureExecutor;
2330   }
2331 
2332   @Override
2333   public ServerName getServerName() {
2334     return this.serverName;
2335   }
2336 
2337   @Override
2338   public AssignmentManager getAssignmentManager() {
2339     return this.assignmentManager;
2340   }
2341 
2342   public MemoryBoundedLogMessageBuffer getRegionServerFatalLogBuffer() {
2343     return rsFatals;
2344   }
2345 
2346   public void shutdown() throws IOException {
2347     if (cpHost != null) {
2348       cpHost.preShutdown();
2349     }
2350 
2351     if (this.serverManager != null) {
2352       this.serverManager.shutdownCluster();
2353     }
2354     if (this.clusterStatusTracker != null){
2355       try {
2356         this.clusterStatusTracker.setClusterDown();
2357       } catch (KeeperException e) {
2358         LOG.error("ZooKeeper exception trying to set cluster as down in ZK", e);
2359       }
2360     }
2361   }
2362 
2363   public void stopMaster() throws IOException {
2364     if (cpHost != null) {
2365       cpHost.preStopMaster();
2366     }
2367     stop("Stopped by " + Thread.currentThread().getName());
2368   }
2369 
2370   void checkServiceStarted() throws ServerNotRunningYetException {
2371     if (!serviceStarted) {
2372       throw new ServerNotRunningYetException("Server is not running yet");
2373     }
2374   }
2375 
2376   void checkInitialized() throws PleaseHoldException, ServerNotRunningYetException {
2377     checkServiceStarted();
2378     if (!isInitialized()) {
2379       throw new PleaseHoldException("Master is initializing");
2380     }
2381   }
2382 
2383   void checkNamespaceManagerReady() throws IOException {
2384     checkInitialized();
2385     if (tableNamespaceManager == null ||
2386         !tableNamespaceManager.isTableAvailableAndInitialized()) {
2387       throw new IOException("Table Namespace Manager not ready yet, try again later");
2388     }
2389   }
2390   /**
2391    * Report whether this master is currently the active master or not.
2392    * If not active master, we are parked on ZK waiting to become active.
2393    *
2394    * This method is used for testing.
2395    *
2396    * @return true if active master, false if not.
2397    */
2398   public boolean isActiveMaster() {
2399     return isActiveMaster;
2400   }
2401 
2402   /**
2403    * Report whether this master has completed with its initialization and is
2404    * ready.  If ready, the master is also the active master.  A standby master
2405    * is never ready.
2406    *
2407    * This method is used for testing.
2408    *
2409    * @return true if master is ready to go, false if not.
2410    */
2411   @Override
2412   public boolean isInitialized() {
2413     return initialized.isReady();
2414   }
2415 
2416   @VisibleForTesting
2417   public void setInitialized(boolean isInitialized) {
2418     procedureExecutor.getEnvironment().setEventReady(initialized, isInitialized);
2419   }
2420 
2421   public ProcedureEvent getInitializedEvent() {
2422     return initialized;
2423   }
2424 
2425   /**
2426    * ServerCrashProcessingEnabled is set false before completing assignMeta to prevent processing
2427    * of crashed servers.
2428    * @return true if assignMeta has completed;
2429    */
2430   @Override
2431   public boolean isServerCrashProcessingEnabled() {
2432     return serverCrashProcessingEnabled.isReady();
2433   }
2434 
2435   @VisibleForTesting
2436   public void setServerCrashProcessingEnabled(final boolean b) {
2437     procedureExecutor.getEnvironment().setEventReady(serverCrashProcessingEnabled, b);
2438   }
2439 
2440   public ProcedureEvent getServerCrashProcessingEnabledEvent() {
2441     return serverCrashProcessingEnabled;
2442   }
2443 
2444   /**
2445    * Report whether this master has started initialization and is about to do meta region assignment
2446    * @return true if master is in initialization &amp; about to assign hbase:meta regions
2447    */
2448   public boolean isInitializationStartsMetaRegionAssignment() {
2449     return this.initializationBeforeMetaAssignment;
2450   }
2451 
2452   public void assignRegion(HRegionInfo hri) {
2453     assignmentManager.assign(hri, true);
2454   }
2455 
2456   /**
2457    * Compute the average load across all region servers.
2458    * Currently, this uses a very naive computation - just uses the number of
2459    * regions being served, ignoring stats about number of requests.
2460    * @return the average load
2461    */
2462   public double getAverageLoad() {
2463     if (this.assignmentManager == null) {
2464       return 0;
2465     }
2466 
2467     RegionStates regionStates = this.assignmentManager.getRegionStates();
2468     if (regionStates == null) {
2469       return 0;
2470     }
2471     return regionStates.getAverageLoad();
2472   }
2473 
2474   @Override
2475   public boolean registerService(Service instance) {
2476     /*
2477      * No stacking of instances is allowed for a single service name
2478      */
2479     Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
2480     if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
2481       LOG.error("Coprocessor service "+serviceDesc.getFullName()+
2482           " already registered, rejecting request from "+instance
2483       );
2484       return false;
2485     }
2486 
2487     coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
2488     if (LOG.isDebugEnabled()) {
2489       LOG.debug("Registered master coprocessor service: service="+serviceDesc.getFullName());
2490     }
2491     return true;
2492   }
2493 
2494   /**
2495    * Utility for constructing an instance of the passed HMaster class.
2496    * @param masterClass
2497    * @param conf
2498    * @return HMaster instance.
2499    */
2500   public static HMaster constructMaster(Class<? extends HMaster> masterClass,
2501       final Configuration conf, final CoordinatedStateManager cp)  {
2502     try {
2503       Constructor<? extends HMaster> c =
2504         masterClass.getConstructor(Configuration.class, CoordinatedStateManager.class);
2505       return c.newInstance(conf, cp);
2506     } catch(Exception e) {
2507       Throwable error = e;
2508       if (e instanceof InvocationTargetException &&
2509           ((InvocationTargetException)e).getTargetException() != null) {
2510         error = ((InvocationTargetException)e).getTargetException();
2511       }
2512       throw new RuntimeException("Failed construction of Master: " + masterClass.toString() + ". "
2513         , error);
2514     }
2515   }
2516 
2517   /**
2518    * @see org.apache.hadoop.hbase.master.HMasterCommandLine
2519    */
2520   public static void main(String [] args) {
2521     VersionInfo.logVersion();
2522     new HMasterCommandLine(HMaster.class).doMain(args);
2523   }
2524 
2525   public HFileCleaner getHFileCleaner() {
2526     return this.hfileCleaner;
2527   }
2528 
2529   /**
2530    * Exposed for TESTING!
2531    * @return the underlying snapshot manager
2532    */
2533   public SnapshotManager getSnapshotManagerForTesting() {
2534     return this.snapshotManager;
2535   }
2536 
2537   @Override
2538   public void createNamespace(NamespaceDescriptor descriptor) throws IOException {
2539     TableName.isLegalNamespaceName(Bytes.toBytes(descriptor.getName()));
2540     checkNamespaceManagerReady();
2541     if (cpHost != null) {
2542       if (cpHost.preCreateNamespace(descriptor)) {
2543         return;
2544       }
2545     }
2546     LOG.info(getClientIdAuditPrefix() + " creating " + descriptor);
2547     tableNamespaceManager.create(descriptor);
2548     if (cpHost != null) {
2549       cpHost.postCreateNamespace(descriptor);
2550     }
2551   }
2552 
2553   @Override
2554   public void modifyNamespace(NamespaceDescriptor descriptor) throws IOException {
2555     TableName.isLegalNamespaceName(Bytes.toBytes(descriptor.getName()));
2556     checkNamespaceManagerReady();
2557     if (cpHost != null) {
2558       if (cpHost.preModifyNamespace(descriptor)) {
2559         return;
2560       }
2561     }
2562     LOG.info(getClientIdAuditPrefix() + " modify " + descriptor);
2563     tableNamespaceManager.update(descriptor);
2564     if (cpHost != null) {
2565       cpHost.postModifyNamespace(descriptor);
2566     }
2567   }
2568 
2569   @Override
2570   public void deleteNamespace(String name) throws IOException {
2571     checkNamespaceManagerReady();
2572     if (cpHost != null) {
2573       if (cpHost.preDeleteNamespace(name)) {
2574         return;
2575       }
2576     }
2577     LOG.info(getClientIdAuditPrefix() + " delete " + name);
2578     tableNamespaceManager.remove(name);
2579     if (cpHost != null) {
2580       cpHost.postDeleteNamespace(name);
2581     }
2582   }
2583 
2584   /**
2585    * Ensure that the specified namespace exists, otherwise throws a NamespaceNotFoundException
2586    *
2587    * @param name the namespace to check
2588    * @throws IOException if the namespace manager is not ready yet.
2589    * @throws NamespaceNotFoundException if the namespace does not exists
2590    */
2591   protected void ensureNamespaceExists(final String name)
2592       throws IOException, NamespaceNotFoundException {
2593     checkNamespaceManagerReady();
2594     NamespaceDescriptor nsd = tableNamespaceManager.get(name);
2595     if (nsd == null) {
2596       throw new NamespaceNotFoundException(name);
2597     }
2598   }
2599 
2600   @Override
2601   public NamespaceDescriptor getNamespaceDescriptor(String name) throws IOException {
2602     checkNamespaceManagerReady();
2603 
2604     if (cpHost != null) {
2605       cpHost.preGetNamespaceDescriptor(name);
2606     }
2607 
2608     NamespaceDescriptor nsd = tableNamespaceManager.get(name);
2609     if (nsd == null) {
2610       throw new NamespaceNotFoundException(name);
2611     }
2612 
2613     if (cpHost != null) {
2614       cpHost.postGetNamespaceDescriptor(nsd);
2615     }
2616 
2617     return nsd;
2618   }
2619 
2620   @Override
2621   public List<NamespaceDescriptor> listNamespaceDescriptors() throws IOException {
2622     checkNamespaceManagerReady();
2623 
2624     final List<NamespaceDescriptor> descriptors = new ArrayList<NamespaceDescriptor>();
2625     boolean bypass = false;
2626     if (cpHost != null) {
2627       bypass = cpHost.preListNamespaceDescriptors(descriptors);
2628     }
2629 
2630     if (!bypass) {
2631       descriptors.addAll(tableNamespaceManager.list());
2632 
2633       if (cpHost != null) {
2634         cpHost.postListNamespaceDescriptors(descriptors);
2635       }
2636     }
2637     return descriptors;
2638   }
2639 
2640   @Override
2641   public boolean abortProcedure(final long procId, final boolean mayInterruptIfRunning)
2642       throws IOException {
2643     if (cpHost != null) {
2644       cpHost.preAbortProcedure(this.procedureExecutor, procId);
2645     }
2646 
2647     final boolean result = this.procedureExecutor.abort(procId, mayInterruptIfRunning);
2648 
2649     if (cpHost != null) {
2650       cpHost.postAbortProcedure();
2651     }
2652 
2653     return result;
2654   }
2655 
2656   @Override
2657   public List<ProcedureInfo> listProcedures() throws IOException {
2658     if (cpHost != null) {
2659       cpHost.preListProcedures();
2660     }
2661 
2662     final List<ProcedureInfo> procInfoList = this.procedureExecutor.listProcedures();
2663 
2664     if (cpHost != null) {
2665       cpHost.postListProcedures(procInfoList);
2666     }
2667 
2668     return procInfoList;
2669   }
2670 
2671   @Override
2672   public List<HTableDescriptor> listTableDescriptorsByNamespace(String name) throws IOException {
2673     ensureNamespaceExists(name);
2674     return listTableDescriptors(name, null, null, true);
2675   }
2676 
2677   @Override
2678   public List<TableName> listTableNamesByNamespace(String name) throws IOException {
2679     ensureNamespaceExists(name);
2680     return listTableNames(name, null, true);
2681   }
2682 
2683   /**
2684    * Returns the list of table descriptors that match the specified request
2685    *
2686    * @param namespace the namespace to query, or null if querying for all
2687    * @param regex The regular expression to match against, or null if querying for all
2688    * @param tableNameList the list of table names, or null if querying for all
2689    * @param includeSysTables False to match only against userspace tables
2690    * @return the list of table descriptors
2691    */
2692   public List<HTableDescriptor> listTableDescriptors(final String namespace, final String regex,
2693       final List<TableName> tableNameList, final boolean includeSysTables)
2694       throws IOException {
2695     final List<HTableDescriptor> descriptors = new ArrayList<HTableDescriptor>();
2696 
2697     boolean bypass = false;
2698     if (cpHost != null) {
2699       bypass = cpHost.preGetTableDescriptors(tableNameList, descriptors);
2700       // method required for AccessController.
2701       bypass |= cpHost.preGetTableDescriptors(tableNameList, descriptors, regex);
2702     }
2703 
2704     if (!bypass) {
2705       if (tableNameList == null || tableNameList.size() == 0) {
2706         // request for all TableDescriptors
2707         Collection<HTableDescriptor> htds;
2708         if (namespace != null && namespace.length() > 0) {
2709           htds = tableDescriptors.getByNamespace(namespace).values();
2710         } else {
2711           htds = tableDescriptors.getAll().values();
2712         }
2713 
2714         for (HTableDescriptor desc: htds) {
2715           if (includeSysTables || !desc.getTableName().isSystemTable()) {
2716             descriptors.add(desc);
2717           }
2718         }
2719       } else {
2720         for (TableName s: tableNameList) {
2721           HTableDescriptor desc = tableDescriptors.get(s);
2722           if (desc != null) {
2723             descriptors.add(desc);
2724           }
2725         }
2726       }
2727 
2728       // Retains only those matched by regular expression.
2729       if (regex != null) {
2730         filterTablesByRegex(descriptors, Pattern.compile(regex));
2731       }
2732 
2733       if (cpHost != null) {
2734         cpHost.postGetTableDescriptors(descriptors);
2735         // method required for AccessController.
2736         cpHost.postGetTableDescriptors(tableNameList, descriptors, regex);
2737       }
2738     }
2739     return descriptors;
2740   }
2741 
2742   /**
2743    * Returns the list of table names that match the specified request
2744    * @param regex The regular expression to match against, or null if querying for all
2745    * @param namespace the namespace to query, or null if querying for all
2746    * @param includeSysTables False to match only against userspace tables
2747    * @return the list of table names
2748    */
2749   public List<TableName> listTableNames(final String namespace, final String regex,
2750       final boolean includeSysTables) throws IOException {
2751     final List<HTableDescriptor> descriptors = new ArrayList<HTableDescriptor>();
2752 
2753     boolean bypass = false;
2754     if (cpHost != null) {
2755       bypass = cpHost.preGetTableNames(descriptors, regex);
2756     }
2757 
2758     if (!bypass) {
2759       // get all descriptors
2760       Collection<HTableDescriptor> htds;
2761       if (namespace != null && namespace.length() > 0) {
2762         htds = tableDescriptors.getByNamespace(namespace).values();
2763       } else {
2764         htds = tableDescriptors.getAll().values();
2765       }
2766 
2767       for (HTableDescriptor htd: htds) {
2768         if (includeSysTables || !htd.getTableName().isSystemTable()) {
2769           descriptors.add(htd);
2770         }
2771       }
2772 
2773       // Retains only those matched by regular expression.
2774       if (regex != null) {
2775         filterTablesByRegex(descriptors, Pattern.compile(regex));
2776       }
2777 
2778       if (cpHost != null) {
2779         cpHost.postGetTableNames(descriptors, regex);
2780       }
2781     }
2782 
2783     List<TableName> result = new ArrayList<TableName>(descriptors.size());
2784     for (HTableDescriptor htd: descriptors) {
2785       result.add(htd.getTableName());
2786     }
2787     return result;
2788   }
2789 
2790 
2791   /**
2792    * Removes the table descriptors that don't match the pattern.
2793    * @param descriptors list of table descriptors to filter
2794    * @param pattern the regex to use
2795    */
2796   private static void filterTablesByRegex(final Collection<HTableDescriptor> descriptors,
2797       final Pattern pattern) {
2798     final String defaultNS = NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR;
2799     Iterator<HTableDescriptor> itr = descriptors.iterator();
2800     while (itr.hasNext()) {
2801       HTableDescriptor htd = itr.next();
2802       String tableName = htd.getTableName().getNameAsString();
2803       boolean matched = pattern.matcher(tableName).matches();
2804       if (!matched && htd.getTableName().getNamespaceAsString().equals(defaultNS)) {
2805         matched = pattern.matcher(defaultNS + TableName.NAMESPACE_DELIM + tableName).matches();
2806       }
2807       if (!matched) {
2808         itr.remove();
2809       }
2810     }
2811   }
2812 
2813   @Override
2814   public long getLastMajorCompactionTimestamp(TableName table) throws IOException {
2815     return getClusterStatus().getLastMajorCompactionTsForTable(table);
2816   }
2817 
2818   @Override
2819   public long getLastMajorCompactionTimestampForRegion(byte[] regionName) throws IOException {
2820     return getClusterStatus().getLastMajorCompactionTsForRegion(regionName);
2821   }
2822 
2823   /**
2824    * Queries the state of the {@link LoadBalancerTracker}. If the balancer is not initialized,
2825    * false is returned.
2826    *
2827    * @return The state of the load balancer, or false if the load balancer isn't defined.
2828    */
2829   public boolean isBalancerOn() {
2830     if (null == loadBalancerTracker) return false;
2831     return loadBalancerTracker.isBalancerOn();
2832   }
2833 
2834   /**
2835    * Queries the state of the {@link RegionNormalizerTracker}. If it's not initialized,
2836    * false is returned.
2837    */
2838    public boolean isNormalizerOn() {
2839     if (null == regionNormalizerTracker) {
2840       return false;
2841     }
2842     return regionNormalizerTracker.isNormalizerOn();
2843   }
2844 
2845   /**
2846    * Fetch the configured {@link LoadBalancer} class name. If none is set, a default is returned.
2847    *
2848    * @return The name of the {@link LoadBalancer} in use.
2849    */
2850   public String getLoadBalancerClassName() {
2851     return conf.get(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, LoadBalancerFactory
2852         .getDefaultLoadBalancerClass().getName());
2853   }
2854 
2855   /**
2856    * @return RegionNormalizerTracker instance
2857    */
2858   public RegionNormalizerTracker getRegionNormalizerTracker() {
2859     return regionNormalizerTracker;
2860   }
2861 }