View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.lang.reflect.Constructor;
24  import java.lang.reflect.InvocationTargetException;
25  import java.net.InetAddress;
26  import java.net.InetSocketAddress;
27  import java.net.UnknownHostException;
28  import java.util.ArrayList;
29  import java.util.Collection;
30  import java.util.Collections;
31  import java.util.Comparator;
32  import java.util.HashSet;
33  import java.util.Iterator;
34  import java.util.List;
35  import java.util.Map;
36  import java.util.Set;
37  import java.util.concurrent.TimeUnit;
38  import java.util.concurrent.atomic.AtomicReference;
39  import java.util.regex.Pattern;
40  
41  import javax.servlet.ServletException;
42  import javax.servlet.http.HttpServlet;
43  import javax.servlet.http.HttpServletRequest;
44  import javax.servlet.http.HttpServletResponse;
45  
46  import org.apache.commons.logging.Log;
47  import org.apache.commons.logging.LogFactory;
48  import org.apache.hadoop.conf.Configuration;
49  import org.apache.hadoop.fs.Path;
50  import org.apache.hadoop.hbase.ClusterStatus;
51  import org.apache.hadoop.hbase.CoordinatedStateException;
52  import org.apache.hadoop.hbase.CoordinatedStateManager;
53  import org.apache.hadoop.hbase.DoNotRetryIOException;
54  import org.apache.hadoop.hbase.HBaseIOException;
55  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
56  import org.apache.hadoop.hbase.HColumnDescriptor;
57  import org.apache.hadoop.hbase.HConstants;
58  import org.apache.hadoop.hbase.HRegionInfo;
59  import org.apache.hadoop.hbase.HTableDescriptor;
60  import org.apache.hadoop.hbase.MasterNotRunningException;
61  import org.apache.hadoop.hbase.MetaMigrationConvertingToPB;
62  import org.apache.hadoop.hbase.MetaTableAccessor;
63  import org.apache.hadoop.hbase.NamespaceDescriptor;
64  import org.apache.hadoop.hbase.NamespaceNotFoundException;
65  import org.apache.hadoop.hbase.PleaseHoldException;
66  import org.apache.hadoop.hbase.ProcedureInfo;
67  import org.apache.hadoop.hbase.RegionStateListener;
68  import org.apache.hadoop.hbase.ScheduledChore;
69  import org.apache.hadoop.hbase.Server;
70  import org.apache.hadoop.hbase.ServerLoad;
71  import org.apache.hadoop.hbase.ServerName;
72  import org.apache.hadoop.hbase.TableDescriptors;
73  import org.apache.hadoop.hbase.TableName;
74  import org.apache.hadoop.hbase.TableNotDisabledException;
75  import org.apache.hadoop.hbase.TableNotFoundException;
76  import org.apache.hadoop.hbase.UnknownRegionException;
77  import org.apache.hadoop.hbase.classification.InterfaceAudience;
78  import org.apache.hadoop.hbase.client.MetaScanner;
79  import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
80  import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
81  import org.apache.hadoop.hbase.client.RegionReplicaUtil;
82  import org.apache.hadoop.hbase.client.Result;
83  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
84  import org.apache.hadoop.hbase.exceptions.DeserializationException;
85  import org.apache.hadoop.hbase.executor.ExecutorType;
86  import org.apache.hadoop.hbase.http.InfoServer;
87  import org.apache.hadoop.hbase.ipc.RpcServer;
88  import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
89  import org.apache.hadoop.hbase.master.MasterRpcServices.BalanceSwitchMode;
90  import org.apache.hadoop.hbase.master.RegionState.State;
91  import org.apache.hadoop.hbase.master.balancer.BalancerChore;
92  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
93  import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
94  import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
95  import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
96  import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
97  import org.apache.hadoop.hbase.master.cleaner.ReplicationZKLockCleanerChore;
98  import org.apache.hadoop.hbase.master.handler.DispatchMergingRegionHandler;
99  import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
100 import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerChore;
101 import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerFactory;
102 import org.apache.hadoop.hbase.master.procedure.AddColumnFamilyProcedure;
103 import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
104 import org.apache.hadoop.hbase.master.procedure.DeleteColumnFamilyProcedure;
105 import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure;
106 import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure;
107 import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure;
108 import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
109 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
110 import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent;
111 import org.apache.hadoop.hbase.master.procedure.ModifyColumnFamilyProcedure;
112 import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
113 import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure;
114 import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
115 import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
116 import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure;
117 import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
118 import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
119 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
120 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
121 import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan;
122 import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
123 import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager;
124 import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
125 import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
126 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
127 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
128 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
129 import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
130 import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
131 import org.apache.hadoop.hbase.regionserver.HRegionServer;
132 import org.apache.hadoop.hbase.regionserver.HStore;
133 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
134 import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
135 import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy;
136 import org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolicy;
137 import org.apache.hadoop.hbase.regionserver.compactions.FIFOCompactionPolicy;
138 import org.apache.hadoop.hbase.replication.regionserver.Replication;
139 import org.apache.hadoop.hbase.security.UserProvider;
140 import org.apache.hadoop.hbase.util.Addressing;
141 import org.apache.hadoop.hbase.util.Bytes;
142 import org.apache.hadoop.hbase.util.CompressionTest;
143 import org.apache.hadoop.hbase.util.ConfigUtil;
144 import org.apache.hadoop.hbase.util.EncryptionTest;
145 import org.apache.hadoop.hbase.util.FSUtils;
146 import org.apache.hadoop.hbase.util.HFileArchiveUtil;
147 import org.apache.hadoop.hbase.util.HasThread;
148 import org.apache.hadoop.hbase.util.ModifyRegionUtils;
149 import org.apache.hadoop.hbase.util.Pair;
150 import org.apache.hadoop.hbase.util.Threads;
151 import org.apache.hadoop.hbase.util.VersionInfo;
152 import org.apache.hadoop.hbase.zookeeper.DrainingServerTracker;
153 import org.apache.hadoop.hbase.zookeeper.LoadBalancerTracker;
154 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
155 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
156 import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker;
157 import org.apache.hadoop.hbase.zookeeper.RegionServerTracker;
158 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
159 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
160 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
161 import org.apache.zookeeper.KeeperException;
162 import org.mortbay.jetty.Connector;
163 import org.mortbay.jetty.nio.SelectChannelConnector;
164 import org.mortbay.jetty.servlet.Context;
165 import org.mortbay.jetty.servlet.ServletHolder;
166 
167 import com.google.common.annotations.VisibleForTesting;
168 import com.google.common.collect.Maps;
169 import com.google.protobuf.Descriptors;
170 import com.google.protobuf.Service;
171 
172 /**
173  * HMaster is the "master server" for HBase. An HBase cluster has one active
174  * master.  If many masters are started, all compete.  Whichever wins goes on to
175  * run the cluster.  All others park themselves in their constructor until
176  * master or cluster shutdown or until the active master loses its lease in
177  * zookeeper.  Thereafter, all running master jostle to take over master role.
178  *
179  * <p>The Master can be asked shutdown the cluster. See {@link #shutdown()}.  In
180  * this case it will tell all regionservers to go down and then wait on them
181  * all reporting in that they are down.  This master will then shut itself down.
182  *
183  * <p>You can also shutdown just this master.  Call {@link #stopMaster()}.
184  *
185  * @see org.apache.zookeeper.Watcher
186  */
187 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
188 @SuppressWarnings("deprecation")
189 public class HMaster extends HRegionServer implements MasterServices, Server {
190   private static final Log LOG = LogFactory.getLog(HMaster.class.getName());
191 
192   /**
193    * Protection against zombie master. Started once Master accepts active responsibility and
194    * starts taking over responsibilities. Allows a finite time window before giving up ownership.
195    */
196   private static class InitializationMonitor extends HasThread {
197     /** The amount of time in milliseconds to sleep before checking initialization status. */
198     public static final String TIMEOUT_KEY = "hbase.master.initializationmonitor.timeout";
199     public static final long TIMEOUT_DEFAULT = TimeUnit.MILLISECONDS.convert(15, TimeUnit.MINUTES);
200 
201     /**
202      * When timeout expired and initialization has not complete, call {@link System#exit(int)} when
203      * true, do nothing otherwise.
204      */
205     public static final String HALT_KEY = "hbase.master.initializationmonitor.haltontimeout";
206     public static final boolean HALT_DEFAULT = false;
207 
208     private final HMaster master;
209     private final long timeout;
210     private final boolean haltOnTimeout;
211 
212     /** Creates a Thread that monitors the {@link #isInitialized()} state. */
213     InitializationMonitor(HMaster master) {
214       super("MasterInitializationMonitor");
215       this.master = master;
216       this.timeout = master.getConfiguration().getLong(TIMEOUT_KEY, TIMEOUT_DEFAULT);
217       this.haltOnTimeout = master.getConfiguration().getBoolean(HALT_KEY, HALT_DEFAULT);
218       this.setDaemon(true);
219     }
220 
221     @Override
222     public void run() {
223       try {
224         while (!master.isStopped() && master.isActiveMaster()) {
225           Thread.sleep(timeout);
226           if (master.isInitialized()) {
227             LOG.debug("Initialization completed within allotted tolerance. Monitor exiting.");
228           } else {
229             LOG.error("Master failed to complete initialization after " + timeout + "ms. Please"
230                 + " consider submitting a bug report including a thread dump of this process.");
231             if (haltOnTimeout) {
232               LOG.error("Zombie Master exiting. Thread dump to stdout");
233               Threads.printThreadInfo(System.out, "Zombie HMaster");
234               System.exit(-1);
235             }
236           }
237         }
238       } catch (InterruptedException ie) {
239         LOG.trace("InitMonitor thread interrupted. Existing.");
240       }
241     }
242   }
243 
244   // MASTER is name of the webapp and the attribute name used stuffing this
245   //instance into web context.
246   public static final String MASTER = "master";
247 
248   // Manager and zk listener for master election
249   private final ActiveMasterManager activeMasterManager;
250   // Region server tracker
251   RegionServerTracker regionServerTracker;
252   // Draining region server tracker
253   private DrainingServerTracker drainingServerTracker;
254   // Tracker for load balancer state
255   LoadBalancerTracker loadBalancerTracker;
256 
257   // Tracker for region normalizer state
258   private RegionNormalizerTracker regionNormalizerTracker;
259 
260   /** Namespace stuff */
261   private TableNamespaceManager tableNamespaceManager;
262 
263   // Metrics for the HMaster
264   final MetricsMaster metricsMaster;
265   // file system manager for the master FS operations
266   private MasterFileSystem fileSystemManager;
267 
268   // server manager to deal with region server info
269   volatile ServerManager serverManager;
270 
271   // manager of assignment nodes in zookeeper
272   AssignmentManager assignmentManager;
273 
274   // buffer for "fatal error" notices from region servers
275   // in the cluster. This is only used for assisting
276   // operations/debugging.
277   MemoryBoundedLogMessageBuffer rsFatals;
278 
279   // flag set after we become the active master (used for testing)
280   private volatile boolean isActiveMaster = false;
281 
282   // flag set after we complete initialization once active,
283   // it is not private since it's used in unit tests
284   private final ProcedureEvent initialized = new ProcedureEvent("master initialized");
285 
286   // flag set after master services are started,
287   // initialization may have not completed yet.
288   volatile boolean serviceStarted = false;
289 
290   // flag set after we complete assignMeta.
291   private final ProcedureEvent serverCrashProcessingEnabled =
292     new ProcedureEvent("server crash processing");
293 
294   LoadBalancer balancer;
295   private RegionNormalizer normalizer;
296   private BalancerChore balancerChore;
297   private RegionNormalizerChore normalizerChore;
298   private ClusterStatusChore clusterStatusChore;
299   private ClusterStatusPublisher clusterStatusPublisherChore = null;
300   private PeriodicDoMetrics periodicDoMetricsChore = null;
301 
302   CatalogJanitor catalogJanitorChore;
303   private ReplicationZKLockCleanerChore replicationZKLockCleanerChore;
304   private LogCleaner logCleaner;
305   private HFileCleaner hfileCleaner;
306 
307   MasterCoprocessorHost cpHost;
308 
309   private final boolean preLoadTableDescriptors;
310 
311   // Time stamps for when a hmaster became active
312   private long masterActiveTime;
313 
314   //should we check the compression codec type at master side, default true, HBASE-6370
315   private final boolean masterCheckCompression;
316 
317   //should we check encryption settings at master side, default true
318   private final boolean masterCheckEncryption;
319 
320   Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
321 
322   // monitor for snapshot of hbase tables
323   SnapshotManager snapshotManager;
324   // monitor for distributed procedures
325   MasterProcedureManagerHost mpmHost;
326 
327   // it is assigned after 'initialized' guard set to true, so should be volatile
328   private volatile MasterQuotaManager quotaManager;
329 
330   private ProcedureExecutor<MasterProcedureEnv> procedureExecutor;
331   private WALProcedureStore procedureStore;
332 
333   /** flag used in test cases in order to simulate RS failures during master initialization */
334   private volatile boolean initializationBeforeMetaAssignment = false;
335 
336   /** jetty server for master to redirect requests to regionserver infoServer */
337   private org.mortbay.jetty.Server masterJettyServer;
338 
339   public static class RedirectServlet extends HttpServlet {
340     private static final long serialVersionUID = 2894774810058302473L;
341     private final int regionServerInfoPort;
342     private final String regionServerHostname;
343 
344     /**
345      * @param infoServer that we're trying to send all requests to
346      * @param hostname may be null. if given, will be used for redirects instead of host from client.
347      */
348     public RedirectServlet(InfoServer infoServer, String hostname) {
349        regionServerInfoPort = infoServer.getPort();
350        regionServerHostname = hostname;
351     }
352 
353     @Override
354     public void doGet(HttpServletRequest request,
355         HttpServletResponse response) throws ServletException, IOException {
356       String redirectHost = regionServerHostname;
357       if(redirectHost == null) {
358         redirectHost = request.getServerName();
359         if(!Addressing.isLocalAddress(InetAddress.getByName(redirectHost))) {
360           LOG.warn("Couldn't resolve '" + redirectHost + "' as an address local to this node and '" +
361               MASTER_HOSTNAME_KEY + "' is not set; client will get a HTTP 400 response. If " +
362               "your HBase deployment relies on client accessible names that the region server process " +
363               "can't resolve locally, then you should set the previously mentioned configuration variable " +
364               "to an appropriate hostname.");
365           // no sending client provided input back to the client, so the goal host is just in the logs.
366           response.sendError(400, "Request was to a host that I can't resolve for any of the network interfaces on " +
367               "this node. If this is due to an intermediary such as an HTTP load balancer or other proxy, your HBase " +
368               "administrator can set '" + MASTER_HOSTNAME_KEY + "' to point to the correct hostname.");
369           return;
370         }
371       }
372       // TODO this scheme should come from looking at the scheme registered in the infoserver's http server for the
373       // host and port we're using, but it's buried way too deep to do that ATM.
374       String redirectUrl = request.getScheme() + "://"
375         + redirectHost + ":" + regionServerInfoPort
376         + request.getRequestURI();
377       response.sendRedirect(redirectUrl);
378     }
379   }
380 
381   private static class PeriodicDoMetrics extends ScheduledChore {
382     private final HMaster server;
383     public PeriodicDoMetrics(int doMetricsInterval, final HMaster server) {
384       super(server.getServerName() + "-DoMetricsChore", server, doMetricsInterval);
385       this.server = server;
386     }
387 
388     @Override
389     protected void chore() {
390       server.doMetrics();
391     }
392   }
393 
394   /**
395    * Initializes the HMaster. The steps are as follows:
396    * <p>
397    * <ol>
398    * <li>Initialize the local HRegionServer
399    * <li>Start the ActiveMasterManager.
400    * </ol>
401    * <p>
402    * Remaining steps of initialization occur in
403    * #finishActiveMasterInitialization(MonitoredTask) after
404    * the master becomes the active one.
405    *
406    * @throws InterruptedException
407    * @throws KeeperException
408    * @throws IOException
409    */
410   public HMaster(final Configuration conf, CoordinatedStateManager csm)
411       throws IOException, KeeperException, InterruptedException {
412     super(conf, csm);
413     this.rsFatals = new MemoryBoundedLogMessageBuffer(
414       conf.getLong("hbase.master.buffer.for.rs.fatals", 1*1024*1024));
415 
416     LOG.info("hbase.rootdir=" + getRootDir() +
417       ", hbase.cluster.distributed=" + this.conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, false));
418 
419     // Disable usage of meta replicas in the master
420     this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
421 
422     Replication.decorateMasterConfiguration(this.conf);
423 
424     // Hack! Maps DFSClient => Master for logs.  HDFS made this
425     // config param for task trackers, but we can piggyback off of it.
426     if (this.conf.get("mapreduce.task.attempt.id") == null) {
427       this.conf.set("mapreduce.task.attempt.id", "hb_m_" + this.serverName.toString());
428     }
429 
430     // should we check the compression codec type at master side, default true, HBASE-6370
431     this.masterCheckCompression = conf.getBoolean("hbase.master.check.compression", true);
432 
433     // should we check encryption settings at master side, default true
434     this.masterCheckEncryption = conf.getBoolean("hbase.master.check.encryption", true);
435 
436     this.metricsMaster = new MetricsMaster(new MetricsMasterWrapperImpl(this));
437 
438     // preload table descriptor at startup
439     this.preLoadTableDescriptors = conf.getBoolean("hbase.master.preload.tabledescriptors", true);
440 
441     // Do we publish the status?
442 
443     boolean shouldPublish = conf.getBoolean(HConstants.STATUS_PUBLISHED,
444         HConstants.STATUS_PUBLISHED_DEFAULT);
445     Class<? extends ClusterStatusPublisher.Publisher> publisherClass =
446         conf.getClass(ClusterStatusPublisher.STATUS_PUBLISHER_CLASS,
447             ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS,
448             ClusterStatusPublisher.Publisher.class);
449 
450     if (shouldPublish) {
451       if (publisherClass == null) {
452         LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
453             ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS +
454             " is not set - not publishing status");
455       } else {
456         clusterStatusPublisherChore = new ClusterStatusPublisher(this, conf, publisherClass);
457         getChoreService().scheduleChore(clusterStatusPublisherChore);
458       }
459     }
460 
461     // Some unit tests don't need a cluster, so no zookeeper at all
462     if (!conf.getBoolean("hbase.testing.nocluster", false)) {
463       activeMasterManager = new ActiveMasterManager(zooKeeper, this.serverName, this);
464       int infoPort = putUpJettyServer();
465       startActiveMasterManager(infoPort);
466     } else {
467       activeMasterManager = null;
468     }
469   }
470 
471   // return the actual infoPort, -1 means disable info server.
472   private int putUpJettyServer() throws IOException {
473     if (!conf.getBoolean("hbase.master.infoserver.redirect", true)) {
474       return -1;
475     }
476     final int infoPort = conf.getInt("hbase.master.info.port.orig",
477       HConstants.DEFAULT_MASTER_INFOPORT);
478     // -1 is for disabling info server, so no redirecting
479     if (infoPort < 0 || infoServer == null) {
480       return -1;
481     }
482     if(infoPort == infoServer.getPort()) {
483       return infoPort;
484     }
485     final String addr = conf.get("hbase.master.info.bindAddress", "0.0.0.0");
486     if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) {
487       String msg =
488           "Failed to start redirecting jetty server. Address " + addr
489               + " does not belong to this host. Correct configuration parameter: "
490               + "hbase.master.info.bindAddress";
491       LOG.error(msg);
492       throw new IOException(msg);
493     }
494 
495     // TODO I'm pretty sure we could just add another binding to the InfoServer run by
496     // the RegionServer and have it run the RedirectServlet instead of standing up
497     // a second entire stack here.
498     masterJettyServer = new org.mortbay.jetty.Server();
499     Connector connector = new SelectChannelConnector();
500     connector.setHost(addr);
501     connector.setPort(infoPort);
502     masterJettyServer.addConnector(connector);
503     masterJettyServer.setStopAtShutdown(true);
504 
505     final String redirectHostname = shouldUseThisHostnameInstead() ? useThisHostnameInstead : null;
506 
507     final RedirectServlet redirect = new RedirectServlet(infoServer, redirectHostname);
508     Context context = new Context(masterJettyServer, "/", Context.NO_SESSIONS);
509     context.addServlet(new ServletHolder(redirect), "/*");
510 
511     try {
512       masterJettyServer.start();
513     } catch (Exception e) {
514       throw new IOException("Failed to start redirecting jetty server", e);
515     }
516     return connector.getLocalPort();
517   }
518 
519   /**
520    * For compatibility, if failed with regionserver credentials, try the master one
521    */
522   @Override
523   protected void login(UserProvider user, String host) throws IOException {
524     try {
525       super.login(user, host);
526     } catch (IOException ie) {
527       user.login("hbase.master.keytab.file",
528         "hbase.master.kerberos.principal", host);
529     }
530   }
531 
532   /**
533    * If configured to put regions on active master,
534    * wait till a backup master becomes active.
535    * Otherwise, loop till the server is stopped or aborted.
536    */
537   @Override
538   protected void waitForMasterActive(){
539     boolean tablesOnMaster = BaseLoadBalancer.tablesOnMaster(conf);
540     while (!(tablesOnMaster && isActiveMaster)
541         && !isStopped() && !isAborted()) {
542       sleeper.sleep();
543     }
544   }
545 
546   @VisibleForTesting
547   public MasterRpcServices getMasterRpcServices() {
548     return (MasterRpcServices)rpcServices;
549   }
550 
551   public boolean balanceSwitch(final boolean b) throws IOException {
552     return getMasterRpcServices().switchBalancer(b, BalanceSwitchMode.ASYNC);
553   }
554 
555   @Override
556   protected String getProcessName() {
557     return MASTER;
558   }
559 
560   @Override
561   protected boolean canCreateBaseZNode() {
562     return true;
563   }
564 
565   @Override
566   protected boolean canUpdateTableDescriptor() {
567     return true;
568   }
569 
570   @Override
571   protected RSRpcServices createRpcServices() throws IOException {
572     return new MasterRpcServices(this);
573   }
574 
575   @Override
576   protected void configureInfoServer() {
577     infoServer.addServlet("master-status", "/master-status", MasterStatusServlet.class);
578     infoServer.setAttribute(MASTER, this);
579     if (BaseLoadBalancer.tablesOnMaster(conf)) {
580       super.configureInfoServer();
581     }
582   }
583 
584   @Override
585   protected Class<? extends HttpServlet> getDumpServlet() {
586     return MasterDumpServlet.class;
587   }
588 
589   /**
590    * Emit the HMaster metrics, such as region in transition metrics.
591    * Surrounding in a try block just to be sure metrics doesn't abort HMaster.
592    */
593   private void doMetrics() {
594     try {
595       if (assignmentManager != null) {
596         assignmentManager.updateRegionsInTransitionMetrics();
597       }
598     } catch (Throwable e) {
599       LOG.error("Couldn't update metrics: " + e.getMessage());
600     }
601   }
602 
603   MetricsMaster getMasterMetrics() {
604     return metricsMaster;
605   }
606 
607   /**
608    * Initialize all ZK based system trackers.
609    * @throws IOException
610    * @throws InterruptedException
611    * @throws KeeperException
612    * @throws CoordinatedStateException
613    */
614   void initializeZKBasedSystemTrackers() throws IOException,
615       InterruptedException, KeeperException, CoordinatedStateException {
616     this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
617     this.normalizer = RegionNormalizerFactory.getRegionNormalizer(conf);
618     this.normalizer.setMasterServices(this);
619     this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this);
620     this.loadBalancerTracker.start();
621     this.regionNormalizerTracker = new RegionNormalizerTracker(zooKeeper, this);
622     this.regionNormalizerTracker.start();
623     this.assignmentManager = new AssignmentManager(this, serverManager,
624       this.balancer, this.service, this.metricsMaster,
625       this.tableLockManager);
626     zooKeeper.registerListenerFirst(assignmentManager);
627 
628     this.regionServerTracker = new RegionServerTracker(zooKeeper, this,
629         this.serverManager);
630     this.regionServerTracker.start();
631 
632     this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this,
633       this.serverManager);
634     this.drainingServerTracker.start();
635 
636     // Set the cluster as up.  If new RSs, they'll be waiting on this before
637     // going ahead with their startup.
638     boolean wasUp = this.clusterStatusTracker.isClusterUp();
639     if (!wasUp) this.clusterStatusTracker.setClusterUp();
640 
641     LOG.info("Server active/primary master=" + this.serverName +
642         ", sessionid=0x" +
643         Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) +
644         ", setting cluster-up flag (Was=" + wasUp + ")");
645 
646     // create/initialize the snapshot manager and other procedure managers
647     this.snapshotManager = new SnapshotManager();
648     this.mpmHost = new MasterProcedureManagerHost();
649     this.mpmHost.register(this.snapshotManager);
650     this.mpmHost.register(new MasterFlushTableProcedureManager());
651     this.mpmHost.loadProcedures(conf);
652     this.mpmHost.initialize(this, this.metricsMaster);
653   }
654 
655   /**
656    * Finish initialization of HMaster after becoming the primary master.
657    *
658    * <ol>
659    * <li>Initialize master components - file system manager, server manager,
660    *     assignment manager, region server tracker, etc</li>
661    * <li>Start necessary service threads - balancer, catalog janior,
662    *     executor services, etc</li>
663    * <li>Set cluster as UP in ZooKeeper</li>
664    * <li>Wait for RegionServers to check-in</li>
665    * <li>Split logs and perform data recovery, if necessary</li>
666    * <li>Ensure assignment of meta/namespace regions<li>
667    * <li>Handle either fresh cluster start or master failover</li>
668    * </ol>
669    *
670    * @throws IOException
671    * @throws InterruptedException
672    * @throws KeeperException
673    * @throws CoordinatedStateException
674    */
675   private void finishActiveMasterInitialization(MonitoredTask status)
676       throws IOException, InterruptedException, KeeperException, CoordinatedStateException {
677 
678     isActiveMaster = true;
679     Thread zombieDetector = new Thread(new InitializationMonitor(this),
680         "ActiveMasterInitializationMonitor-" + System.currentTimeMillis());
681     zombieDetector.start();
682 
683     /*
684      * We are active master now... go initialize components we need to run.
685      * Note, there may be dross in zk from previous runs; it'll get addressed
686      * below after we determine if cluster startup or failover.
687      */
688 
689     status.setStatus("Initializing Master file system");
690 
691     this.masterActiveTime = System.currentTimeMillis();
692     // TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring.
693     this.fileSystemManager = new MasterFileSystem(this, this);
694 
695     // enable table descriptors cache
696     this.tableDescriptors.setCacheOn();
697     // set the META's descriptor to the correct replication
698     this.tableDescriptors.get(TableName.META_TABLE_NAME).setRegionReplication(
699         conf.getInt(HConstants.META_REPLICAS_NUM, HConstants.DEFAULT_META_REPLICA_NUM));
700     // warm-up HTDs cache on master initialization
701     if (preLoadTableDescriptors) {
702       status.setStatus("Pre-loading table descriptors");
703       this.tableDescriptors.getAll();
704     }
705 
706     // publish cluster ID
707     status.setStatus("Publishing Cluster ID in ZooKeeper");
708     ZKClusterId.setClusterId(this.zooKeeper, fileSystemManager.getClusterId());
709     this.serverManager = createServerManager(this, this);
710 
711     setupClusterConnection();
712 
713     // Invalidate all write locks held previously
714     this.tableLockManager.reapWriteLocks();
715 
716     status.setStatus("Initializing ZK system trackers");
717     initializeZKBasedSystemTrackers();
718 
719     // initialize master side coprocessors before we start handling requests
720     status.setStatus("Initializing master coprocessors");
721     this.cpHost = new MasterCoprocessorHost(this, this.conf);
722 
723     // start up all service threads.
724     status.setStatus("Initializing master service threads");
725     startServiceThreads();
726 
727     // Wake up this server to check in
728     sleeper.skipSleepCycle();
729 
730     // Wait for region servers to report in
731     this.serverManager.waitForRegionServers(status);
732     // Check zk for region servers that are up but didn't register
733     for (ServerName sn: this.regionServerTracker.getOnlineServers()) {
734       // The isServerOnline check is opportunistic, correctness is handled inside
735       if (!this.serverManager.isServerOnline(sn)
736           && serverManager.checkAndRecordNewServer(sn, ServerLoad.EMPTY_SERVERLOAD)) {
737         LOG.info("Registered server found up in zk but who has not yet reported in: " + sn);
738       }
739     }
740 
741     // get a list for previously failed RS which need log splitting work
742     // we recover hbase:meta region servers inside master initialization and
743     // handle other failed servers in SSH in order to start up master node ASAP
744     Set<ServerName> previouslyFailedServers =
745       this.fileSystemManager.getFailedServersFromLogFolders();
746 
747     // log splitting for hbase:meta server
748     ServerName oldMetaServerLocation = metaTableLocator.getMetaRegionLocation(this.getZooKeeper());
749     if (oldMetaServerLocation != null && previouslyFailedServers.contains(oldMetaServerLocation)) {
750       splitMetaLogBeforeAssignment(oldMetaServerLocation);
751       // Note: we can't remove oldMetaServerLocation from previousFailedServers list because it
752       // may also host user regions
753     }
754     Set<ServerName> previouslyFailedMetaRSs = getPreviouselyFailedMetaServersFromZK();
755     // need to use union of previouslyFailedMetaRSs recorded in ZK and previouslyFailedServers
756     // instead of previouslyFailedMetaRSs alone to address the following two situations:
757     // 1) the chained failure situation(recovery failed multiple times in a row).
758     // 2) master get killed right before it could delete the recovering hbase:meta from ZK while the
759     // same server still has non-meta wals to be replayed so that
760     // removeStaleRecoveringRegionsFromZK can't delete the stale hbase:meta region
761     // Passing more servers into splitMetaLog is all right. If a server doesn't have hbase:meta wal,
762     // there is no op for the server.
763     previouslyFailedMetaRSs.addAll(previouslyFailedServers);
764 
765     this.initializationBeforeMetaAssignment = true;
766 
767     // Wait for regionserver to finish initialization.
768     if (BaseLoadBalancer.tablesOnMaster(conf)) {
769       waitForServerOnline();
770     }
771 
772     //initialize load balancer
773     this.balancer.setClusterStatus(getClusterStatus());
774     this.balancer.setMasterServices(this);
775     this.balancer.initialize();
776 
777     // Check if master is shutting down because of some issue
778     // in initializing the regionserver or the balancer.
779     if (isStopped()) return;
780 
781     // Make sure meta assigned before proceeding.
782     status.setStatus("Assigning Meta Region");
783     assignMeta(status, previouslyFailedMetaRSs, HRegionInfo.DEFAULT_REPLICA_ID);
784     // check if master is shutting down because above assignMeta could return even hbase:meta isn't
785     // assigned when master is shutting down
786     if (isStopped()) return;
787 
788     status.setStatus("Submitting log splitting work for previously failed region servers");
789     // Master has recovered hbase:meta region server and we put
790     // other failed region servers in a queue to be handled later by SSH
791     for (ServerName tmpServer : previouslyFailedServers) {
792       this.serverManager.processDeadServer(tmpServer, true);
793     }
794 
795     // Update meta with new PB serialization if required. i.e migrate all HRI to PB serialization
796     // in meta. This must happen before we assign all user regions or else the assignment will fail.
797     if (this.conf.getBoolean("hbase.MetaMigrationConvertingToPB", true)) {
798       MetaMigrationConvertingToPB.updateMetaIfNecessary(this);
799     }
800 
801     // Fix up assignment manager status
802     status.setStatus("Starting assignment manager");
803     this.assignmentManager.joinCluster();
804 
805     // set cluster status again after user regions are assigned
806     this.balancer.setClusterStatus(getClusterStatus());
807 
808     // Start balancer and meta catalog janitor after meta and regions have been assigned.
809     status.setStatus("Starting balancer and catalog janitor");
810     this.clusterStatusChore = new ClusterStatusChore(this, balancer);
811     getChoreService().scheduleChore(clusterStatusChore);
812     this.balancerChore = new BalancerChore(this);
813     getChoreService().scheduleChore(balancerChore);
814     this.normalizerChore = new RegionNormalizerChore(this);
815     getChoreService().scheduleChore(normalizerChore);
816     this.catalogJanitorChore = createCatalogJanitor();
817     getChoreService().scheduleChore(catalogJanitorChore);
818 
819     // Do Metrics periodically
820     periodicDoMetricsChore = new PeriodicDoMetrics(msgInterval, this);
821     getChoreService().scheduleChore(periodicDoMetricsChore);
822 
823     status.setStatus("Starting namespace manager");
824     initNamespace();
825 
826     if (this.cpHost != null) {
827       try {
828         this.cpHost.preMasterInitialization();
829       } catch (IOException e) {
830         LOG.error("Coprocessor preMasterInitialization() hook failed", e);
831       }
832     }
833 
834     status.markComplete("Initialization successful");
835     LOG.info("Master has completed initialization");
836     configurationManager.registerObserver(this.balancer);
837 
838     // Set master as 'initialized'.
839     setInitialized(true);
840 
841     status.setStatus("Starting quota manager");
842     initQuotaManager();
843 
844     // assign the meta replicas
845     Set<ServerName> EMPTY_SET = new HashSet<ServerName>();
846     int numReplicas = conf.getInt(HConstants.META_REPLICAS_NUM,
847            HConstants.DEFAULT_META_REPLICA_NUM);
848     for (int i = 1; i < numReplicas; i++) {
849       assignMeta(status, EMPTY_SET, i);
850     }
851     unassignExcessMetaReplica(zooKeeper, numReplicas);
852 
853     // clear the dead servers with same host name and port of online server because we are not
854     // removing dead server with same hostname and port of rs which is trying to check in before
855     // master initialization. See HBASE-5916.
856     this.serverManager.clearDeadServersWithSameHostNameAndPortOfOnlineServer();
857 
858     // Check and set the znode ACLs if needed in case we are overtaking a non-secure configuration
859     status.setStatus("Checking ZNode ACLs");
860     zooKeeper.checkAndSetZNodeAcls();
861 
862     status.setStatus("Calling postStartMaster coprocessors");
863     if (this.cpHost != null) {
864       // don't let cp initialization errors kill the master
865       try {
866         this.cpHost.postStartMaster();
867       } catch (IOException ioe) {
868         LOG.error("Coprocessor postStartMaster() hook failed", ioe);
869       }
870     }
871 
872     zombieDetector.interrupt();
873   }
874 
875   @VisibleForTesting
876   protected CatalogJanitor createCatalogJanitor() {
877     return new CatalogJanitor(this, this);
878   }
879 
880   private void initQuotaManager() throws IOException {
881     quotaManager = new MasterQuotaManager(this);
882     this.assignmentManager.setRegionStateListener((RegionStateListener) quotaManager);
883     quotaManager.start();
884   }
885 
886   /**
887    * Create a {@link ServerManager} instance.
888    * @param master
889    * @param services
890    * @return An instance of {@link ServerManager}
891    * @throws org.apache.hadoop.hbase.ZooKeeperConnectionException
892    * @throws IOException
893    */
894   ServerManager createServerManager(final Server master,
895       final MasterServices services)
896   throws IOException {
897     // We put this out here in a method so can do a Mockito.spy and stub it out
898     // w/ a mocked up ServerManager.
899     return new ServerManager(master, services);
900   }
901 
902   private void unassignExcessMetaReplica(ZooKeeperWatcher zkw, int numMetaReplicasConfigured) {
903     // unassign the unneeded replicas (for e.g., if the previous master was configured
904     // with a replication of 3 and now it is 2, we need to unassign the 1 unneeded replica)
905     try {
906       List<String> metaReplicaZnodes = zooKeeper.getMetaReplicaNodes();
907       for (String metaReplicaZnode : metaReplicaZnodes) {
908         int replicaId = zooKeeper.getMetaReplicaIdFromZnode(metaReplicaZnode);
909         if (replicaId >= numMetaReplicasConfigured) {
910           RegionState r = MetaTableLocator.getMetaRegionState(zkw, replicaId);
911           LOG.info("Closing excess replica of meta region " + r.getRegion());
912           // send a close and wait for a max of 30 seconds
913           ServerManager.closeRegionSilentlyAndWait(getConnection(), r.getServerName(),
914               r.getRegion(), 30000);
915           ZKUtil.deleteNode(zkw, zkw.getZNodeForReplica(replicaId));
916         }
917       }
918     } catch (Exception ex) {
919       // ignore the exception since we don't want the master to be wedged due to potential
920       // issues in the cleanup of the extra regions. We can do that cleanup via hbck or manually
921       LOG.warn("Ignoring exception " + ex);
922     }
923   }
924 
925   /**
926    * Check <code>hbase:meta</code> is assigned. If not, assign it.
927    * @param status MonitoredTask
928    * @param previouslyFailedMetaRSs
929    * @param replicaId
930    * @throws InterruptedException
931    * @throws IOException
932    * @throws KeeperException
933    */
934   void assignMeta(MonitoredTask status, Set<ServerName> previouslyFailedMetaRSs, int replicaId)
935       throws InterruptedException, IOException, KeeperException {
936     // Work on meta region
937     int assigned = 0;
938     long timeout = this.conf.getLong("hbase.catalog.verification.timeout", 1000);
939     if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) {
940       status.setStatus("Assigning hbase:meta region");
941     } else {
942       status.setStatus("Assigning hbase:meta region, replicaId " + replicaId);
943     }
944     // Get current meta state from zk.
945     RegionStates regionStates = assignmentManager.getRegionStates();
946     RegionState metaState = MetaTableLocator.getMetaRegionState(getZooKeeper(), replicaId);
947     HRegionInfo hri = RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO,
948         replicaId);
949     ServerName currentMetaServer = metaState.getServerName();
950     if (!ConfigUtil.useZKForAssignment(conf)) {
951       regionStates.createRegionState(hri, metaState.getState(),
952         currentMetaServer, null);
953     } else {
954       regionStates.createRegionState(hri);
955     }
956     boolean rit = this.assignmentManager.
957       processRegionInTransitionAndBlockUntilAssigned(hri);
958     boolean metaRegionLocation = metaTableLocator.verifyMetaRegionLocation(
959       this.getConnection(), this.getZooKeeper(), timeout, replicaId);
960     if (!metaRegionLocation || !metaState.isOpened()) {
961       // Meta location is not verified. It should be in transition, or offline.
962       // We will wait for it to be assigned in enableSSHandWaitForMeta below.
963       assigned++;
964       if (!ConfigUtil.useZKForAssignment(conf)) {
965         assignMetaZkLess(regionStates, metaState, timeout, previouslyFailedMetaRSs);
966       } else if (!rit) {
967         // Assign meta since not already in transition
968         if (currentMetaServer != null) {
969           // If the meta server is not known to be dead or online,
970           // just split the meta log, and don't expire it since this
971           // could be a full cluster restart. Otherwise, we will think
972           // this is a failover and lose previous region locations.
973           // If it is really a failover case, AM will find out in rebuilding
974           // user regions. Otherwise, we are good since all logs are split
975           // or known to be replayed before user regions are assigned.
976           if (serverManager.isServerOnline(currentMetaServer)) {
977             LOG.info("Forcing expire of " + currentMetaServer);
978             serverManager.expireServer(currentMetaServer);
979           }
980           if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) {
981             splitMetaLogBeforeAssignment(currentMetaServer);
982             previouslyFailedMetaRSs.add(currentMetaServer);
983           }
984         }
985         assignmentManager.assignMeta(hri);
986       }
987     } else {
988       // Region already assigned. We didn't assign it. Add to in-memory state.
989       regionStates.updateRegionState(hri, State.OPEN, currentMetaServer);
990       this.assignmentManager.regionOnline(hri, currentMetaServer);
991     }
992 
993     if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) enableMeta(TableName.META_TABLE_NAME);
994 
995     if ((RecoveryMode.LOG_REPLAY == this.getMasterFileSystem().getLogRecoveryMode())
996         && (!previouslyFailedMetaRSs.isEmpty())) {
997       // replay WAL edits mode need new hbase:meta RS is assigned firstly
998       status.setStatus("replaying log for Meta Region");
999       this.fileSystemManager.splitMetaLog(previouslyFailedMetaRSs);
1000     }
1001 
1002     // Make sure a hbase:meta location is set. We need to enable SSH here since
1003     // if the meta region server is died at this time, we need it to be re-assigned
1004     // by SSH so that system tables can be assigned.
1005     // No need to wait for meta is assigned = 0 when meta is just verified.
1006     if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) enableCrashedServerProcessing(assigned != 0);
1007     LOG.info("hbase:meta with replicaId " + replicaId + " assigned=" + assigned + ", rit=" + rit +
1008       ", location=" + metaTableLocator.getMetaRegionLocation(this.getZooKeeper(), replicaId));
1009     status.setStatus("META assigned.");
1010   }
1011 
1012   private void assignMetaZkLess(RegionStates regionStates, RegionState regionState, long timeout,
1013       Set<ServerName> previouslyFailedRs) throws IOException, KeeperException {
1014     ServerName currentServer = regionState.getServerName();
1015     if (serverManager.isServerOnline(currentServer)) {
1016       LOG.info("Meta was in transition on " + currentServer);
1017       assignmentManager.processRegionInTransitionZkLess();
1018     } else {
1019       if (currentServer != null) {
1020         if (regionState.getRegion().getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID) {
1021           splitMetaLogBeforeAssignment(currentServer);
1022           regionStates.logSplit(HRegionInfo.FIRST_META_REGIONINFO);
1023           previouslyFailedRs.add(currentServer);
1024         }
1025       }
1026       LOG.info("Re-assigning hbase:meta, it was on " + currentServer);
1027       regionStates.updateRegionState(regionState.getRegion(), State.OFFLINE);
1028       assignmentManager.assignMeta(regionState.getRegion());
1029     }
1030   }
1031 
1032   void initNamespace() throws IOException {
1033     //create namespace manager
1034     tableNamespaceManager = new TableNamespaceManager(this);
1035     tableNamespaceManager.start();
1036   }
1037 
1038   boolean isCatalogJanitorEnabled() {
1039     return catalogJanitorChore != null ?
1040       catalogJanitorChore.getEnabled() : false;
1041   }
1042 
1043   private void splitMetaLogBeforeAssignment(ServerName currentMetaServer) throws IOException {
1044     if (RecoveryMode.LOG_REPLAY == this.getMasterFileSystem().getLogRecoveryMode()) {
1045       // In log replay mode, we mark hbase:meta region as recovering in ZK
1046       Set<HRegionInfo> regions = new HashSet<HRegionInfo>();
1047       regions.add(HRegionInfo.FIRST_META_REGIONINFO);
1048       this.fileSystemManager.prepareLogReplay(currentMetaServer, regions);
1049     } else {
1050       // In recovered.edits mode: create recovered edits file for hbase:meta server
1051       this.fileSystemManager.splitMetaLog(currentMetaServer);
1052     }
1053   }
1054 
1055   private void enableCrashedServerProcessing(final boolean waitForMeta)
1056   throws IOException, InterruptedException {
1057     // If crashed server processing is disabled, we enable it and expire those dead but not expired
1058     // servers. This is required so that if meta is assigning to a server which dies after
1059     // assignMeta starts assignment, ServerCrashProcedure can re-assign it. Otherwise, we will be
1060     // stuck here waiting forever if waitForMeta is specified.
1061     if (!isServerCrashProcessingEnabled()) {
1062       setServerCrashProcessingEnabled(true);
1063       this.serverManager.processQueuedDeadServers();
1064     }
1065 
1066     if (waitForMeta) {
1067       metaTableLocator.waitMetaRegionLocation(this.getZooKeeper());
1068       // Above check waits for general meta availability but this does not
1069       // guarantee that the transition has completed
1070       this.assignmentManager.waitForAssignment(HRegionInfo.FIRST_META_REGIONINFO);
1071     }
1072   }
1073 
1074   private void enableMeta(TableName metaTableName) {
1075     if (!this.assignmentManager.getTableStateManager().isTableState(metaTableName,
1076         ZooKeeperProtos.Table.State.ENABLED)) {
1077       this.assignmentManager.setEnabledTable(metaTableName);
1078     }
1079   }
1080 
1081   /**
1082    * This function returns a set of region server names under hbase:meta recovering region ZK node
1083    * @return Set of meta server names which were recorded in ZK
1084    * @throws KeeperException
1085    */
1086   private Set<ServerName> getPreviouselyFailedMetaServersFromZK() throws KeeperException {
1087     Set<ServerName> result = new HashSet<ServerName>();
1088     String metaRecoveringZNode = ZKUtil.joinZNode(zooKeeper.recoveringRegionsZNode,
1089       HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
1090     List<String> regionFailedServers = ZKUtil.listChildrenNoWatch(zooKeeper, metaRecoveringZNode);
1091     if (regionFailedServers == null) return result;
1092 
1093     for(String failedServer : regionFailedServers) {
1094       ServerName server = ServerName.parseServerName(failedServer);
1095       result.add(server);
1096     }
1097     return result;
1098   }
1099 
1100   @Override
1101   public TableDescriptors getTableDescriptors() {
1102     return this.tableDescriptors;
1103   }
1104 
1105   @Override
1106   public ServerManager getServerManager() {
1107     return this.serverManager;
1108   }
1109 
1110   @Override
1111   public MasterFileSystem getMasterFileSystem() {
1112     return this.fileSystemManager;
1113   }
1114 
1115   /*
1116    * Start up all services. If any of these threads gets an unhandled exception
1117    * then they just die with a logged message.  This should be fine because
1118    * in general, we do not expect the master to get such unhandled exceptions
1119    *  as OOMEs; it should be lightly loaded. See what HRegionServer does if
1120    *  need to install an unexpected exception handler.
1121    */
1122   private void startServiceThreads() throws IOException{
1123    // Start the executor service pools
1124    this.service.startExecutorService(ExecutorType.MASTER_OPEN_REGION,
1125       conf.getInt("hbase.master.executor.openregion.threads", 5));
1126    this.service.startExecutorService(ExecutorType.MASTER_CLOSE_REGION,
1127       conf.getInt("hbase.master.executor.closeregion.threads", 5));
1128    this.service.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
1129       conf.getInt("hbase.master.executor.serverops.threads", 5));
1130    this.service.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS,
1131       conf.getInt("hbase.master.executor.serverops.threads", 5));
1132    this.service.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS,
1133       conf.getInt("hbase.master.executor.logreplayops.threads", 10));
1134 
1135    // We depend on there being only one instance of this executor running
1136    // at a time.  To do concurrency, would need fencing of enable/disable of
1137    // tables.
1138    // Any time changing this maxThreads to > 1, pls see the comment at
1139    // AccessController#postCreateTableHandler
1140    this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
1141    startProcedureExecutor();
1142 
1143    // Start log cleaner thread
1144    int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 60 * 1000);
1145    this.logCleaner =
1146       new LogCleaner(cleanerInterval,
1147          this, conf, getMasterFileSystem().getFileSystem(),
1148          getMasterFileSystem().getOldLogDir());
1149     getChoreService().scheduleChore(logCleaner);
1150 
1151    //start the hfile archive cleaner thread
1152     Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
1153     this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem()
1154         .getFileSystem(), archiveDir);
1155     getChoreService().scheduleChore(hfileCleaner);
1156     serviceStarted = true;
1157     if (LOG.isTraceEnabled()) {
1158       LOG.trace("Started service threads");
1159     }
1160     if (!conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true)) {
1161       try {
1162         replicationZKLockCleanerChore = new ReplicationZKLockCleanerChore(this, this,
1163             cleanerInterval, this.getZooKeeper(), this.conf);
1164         getChoreService().scheduleChore(replicationZKLockCleanerChore);
1165       } catch (Exception e) {
1166         LOG.error("start replicationZKLockCleanerChore failed", e);
1167       }
1168     }
1169   }
1170 
1171   @Override
1172   protected void sendShutdownInterrupt() {
1173     super.sendShutdownInterrupt();
1174     stopProcedureExecutor();
1175   }
1176 
1177   @Override
1178   protected void stopServiceThreads() {
1179     if (masterJettyServer != null) {
1180       LOG.info("Stopping master jetty server");
1181       try {
1182         masterJettyServer.stop();
1183       } catch (Exception e) {
1184         LOG.error("Failed to stop master jetty server", e);
1185       }
1186     }
1187     super.stopServiceThreads();
1188     stopChores();
1189 
1190     // Wait for all the remaining region servers to report in IFF we were
1191     // running a cluster shutdown AND we were NOT aborting.
1192     if (!isAborted() && this.serverManager != null &&
1193         this.serverManager.isClusterShutdown()) {
1194       this.serverManager.letRegionServersShutdown();
1195     }
1196     if (LOG.isDebugEnabled()) {
1197       LOG.debug("Stopping service threads");
1198     }
1199     // Clean up and close up shop
1200     if (this.logCleaner != null) this.logCleaner.cancel(true);
1201     if (this.hfileCleaner != null) this.hfileCleaner.cancel(true);
1202     if (this.replicationZKLockCleanerChore != null) this.replicationZKLockCleanerChore.cancel(true);
1203     if (this.quotaManager != null) this.quotaManager.stop();
1204     if (this.activeMasterManager != null) this.activeMasterManager.stop();
1205     if (this.serverManager != null) this.serverManager.stop();
1206     if (this.assignmentManager != null) this.assignmentManager.stop();
1207     if (this.fileSystemManager != null) this.fileSystemManager.stop();
1208     if (this.mpmHost != null) this.mpmHost.stop("server shutting down.");
1209   }
1210 
1211   private void startProcedureExecutor() throws IOException {
1212     final MasterProcedureEnv procEnv = new MasterProcedureEnv(this);
1213     final Path logDir = new Path(fileSystemManager.getRootDir(),
1214         MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR);
1215 
1216     procedureStore = new WALProcedureStore(conf, fileSystemManager.getFileSystem(), logDir,
1217         new MasterProcedureEnv.WALStoreLeaseRecovery(this));
1218     procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this));
1219     procedureExecutor = new ProcedureExecutor(conf, procEnv, procedureStore,
1220         procEnv.getProcedureQueue());
1221 
1222     final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS,
1223         Math.max(Runtime.getRuntime().availableProcessors(),
1224           MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS));
1225     final boolean abortOnCorruption = conf.getBoolean(
1226         MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION,
1227         MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION);
1228     procedureStore.start(numThreads);
1229     procedureExecutor.start(numThreads, abortOnCorruption);
1230   }
1231 
1232   private void stopProcedureExecutor() {
1233     if (procedureExecutor != null) {
1234       procedureExecutor.stop();
1235     }
1236 
1237     if (procedureStore != null) {
1238       procedureStore.stop(isAborted());
1239     }
1240   }
1241 
1242   private void stopChores() {
1243     if (this.balancerChore != null) {
1244       this.balancerChore.cancel(true);
1245     }
1246     if (this.normalizerChore != null) {
1247       this.normalizerChore.cancel(true);
1248     }
1249     if (this.clusterStatusChore != null) {
1250       this.clusterStatusChore.cancel(true);
1251     }
1252     if (this.catalogJanitorChore != null) {
1253       this.catalogJanitorChore.cancel(true);
1254     }
1255     if (this.clusterStatusPublisherChore != null){
1256       clusterStatusPublisherChore.cancel(true);
1257     }
1258     if (this.periodicDoMetricsChore != null) {
1259       periodicDoMetricsChore.cancel();
1260     }
1261   }
1262 
1263   /**
1264    * @return Get remote side's InetAddress
1265    * @throws UnknownHostException
1266    */
1267   InetAddress getRemoteInetAddress(final int port,
1268       final long serverStartCode) throws UnknownHostException {
1269     // Do it out here in its own little method so can fake an address when
1270     // mocking up in tests.
1271     InetAddress ia = RpcServer.getRemoteIp();
1272 
1273     // The call could be from the local regionserver,
1274     // in which case, there is no remote address.
1275     if (ia == null && serverStartCode == startcode) {
1276       InetSocketAddress isa = rpcServices.getSocketAddress();
1277       if (isa != null && isa.getPort() == port) {
1278         ia = isa.getAddress();
1279       }
1280     }
1281     return ia;
1282   }
1283 
1284   /**
1285    * @return Maximum time we should run balancer for
1286    */
1287   private int getBalancerCutoffTime() {
1288     int balancerCutoffTime =
1289       getConfiguration().getInt("hbase.balancer.max.balancing", -1);
1290     if (balancerCutoffTime == -1) {
1291       // No time period set so create one
1292       int balancerPeriod =
1293         getConfiguration().getInt("hbase.balancer.period", 300000);
1294       balancerCutoffTime = balancerPeriod;
1295       // If nonsense period, set it to balancerPeriod
1296       if (balancerCutoffTime <= 0) balancerCutoffTime = balancerPeriod;
1297     }
1298     return balancerCutoffTime;
1299   }
1300 
1301   public boolean balance() throws IOException {
1302     // if master not initialized, don't run balancer.
1303     if (!isInitialized()) {
1304       LOG.debug("Master has not been initialized, don't run balancer.");
1305       return false;
1306     }
1307     // Do this call outside of synchronized block.
1308     int maximumBalanceTime = getBalancerCutoffTime();
1309     synchronized (this.balancer) {
1310       // If balance not true, don't run balancer.
1311       if (!this.loadBalancerTracker.isBalancerOn()) return false;
1312       // Only allow one balance run at at time.
1313       if (this.assignmentManager.getRegionStates().isRegionsInTransition()) {
1314         Map<String, RegionState> regionsInTransition =
1315           this.assignmentManager.getRegionStates().getRegionsInTransition();
1316         LOG.debug("Not running balancer because " + regionsInTransition.size() +
1317           " region(s) in transition: " + org.apache.commons.lang.StringUtils.
1318             abbreviate(regionsInTransition.toString(), 256));
1319         return false;
1320       }
1321       if (this.serverManager.areDeadServersInProgress()) {
1322         LOG.debug("Not running balancer because processing dead regionserver(s): " +
1323           this.serverManager.getDeadServers());
1324         return false;
1325       }
1326 
1327       if (this.cpHost != null) {
1328         try {
1329           if (this.cpHost.preBalance()) {
1330             LOG.debug("Coprocessor bypassing balancer request");
1331             return false;
1332           }
1333         } catch (IOException ioe) {
1334           LOG.error("Error invoking master coprocessor preBalance()", ioe);
1335           return false;
1336         }
1337       }
1338 
1339       Map<TableName, Map<ServerName, List<HRegionInfo>>> assignmentsByTable =
1340         this.assignmentManager.getRegionStates().getAssignmentsByTable();
1341 
1342       List<RegionPlan> plans = new ArrayList<RegionPlan>();
1343       //Give the balancer the current cluster state.
1344       this.balancer.setClusterStatus(getClusterStatus());
1345       for (Map<ServerName, List<HRegionInfo>> assignments : assignmentsByTable.values()) {
1346         List<RegionPlan> partialPlans = this.balancer.balanceCluster(assignments);
1347         if (partialPlans != null) plans.addAll(partialPlans);
1348       }
1349       long cutoffTime = System.currentTimeMillis() + maximumBalanceTime;
1350       int rpCount = 0;  // number of RegionPlans balanced so far
1351       long totalRegPlanExecTime = 0;
1352       if (plans != null && !plans.isEmpty()) {
1353         for (RegionPlan plan: plans) {
1354           LOG.info("balance " + plan);
1355           long balStartTime = System.currentTimeMillis();
1356           //TODO: bulk assign
1357           this.assignmentManager.balance(plan);
1358           totalRegPlanExecTime += System.currentTimeMillis()-balStartTime;
1359           rpCount++;
1360           if (rpCount < plans.size() &&
1361               // if performing next balance exceeds cutoff time, exit the loop
1362               (System.currentTimeMillis() + (totalRegPlanExecTime / rpCount)) > cutoffTime) {
1363             //TODO: After balance, there should not be a cutoff time (keeping it as a security net for now)
1364             LOG.debug("No more balancing till next balance run; maximumBalanceTime=" +
1365               maximumBalanceTime);
1366             break;
1367           }
1368         }
1369       }
1370       if (this.cpHost != null) {
1371         try {
1372           this.cpHost.postBalance(rpCount < plans.size() ? plans.subList(0, rpCount) : plans);
1373         } catch (IOException ioe) {
1374           // balancing already succeeded so don't change the result
1375           LOG.error("Error invoking master coprocessor postBalance()", ioe);
1376         }
1377       }
1378     }
1379     // If LoadBalancer did not generate any plans, it means the cluster is already balanced.
1380     // Return true indicating a success.
1381     return true;
1382   }
1383 
1384   /**
1385    * Perform normalization of cluster (invoked by {@link RegionNormalizerChore}).
1386    *
1387    * @return true if normalization step was performed successfully, false otherwise
1388    *   (specifically, if HMaster hasn't been initialized properly or normalization
1389    *   is globally disabled)
1390    * @throws IOException
1391    * @throws CoordinatedStateException
1392    */
1393   public boolean normalizeRegions() throws IOException, CoordinatedStateException {
1394     if (!isInitialized()) {
1395       LOG.debug("Master has not been initialized, don't run region normalizer.");
1396       return false;
1397     }
1398 
1399     if (!this.regionNormalizerTracker.isNormalizerOn()) {
1400       LOG.debug("Region normalization is disabled, don't run region normalizer.");
1401       return false;
1402     }
1403 
1404     synchronized (this.normalizer) {
1405       // Don't run the normalizer concurrently
1406       List<TableName> allEnabledTables = new ArrayList<>(
1407         this.assignmentManager.getTableStateManager().getTablesInStates(
1408           ZooKeeperProtos.Table.State.ENABLED));
1409 
1410       Collections.shuffle(allEnabledTables);
1411 
1412       for (TableName table : allEnabledTables) {
1413         if (quotaManager.getNamespaceQuotaManager() != null &&
1414             quotaManager.getNamespaceQuotaManager().getState(table.getNamespaceAsString()) != null){
1415           LOG.debug("Skipping normalizing " + table + " since its namespace has quota");
1416           continue;
1417         }
1418         if (table.isSystemTable() || (getTableDescriptors().get(table) != null &&
1419             !getTableDescriptors().get(table).isNormalizationEnabled())) {
1420           LOG.debug("Skipping normalization for table: " + table + ", as it's either system"
1421               + " table or doesn't have auto normalization turned on");
1422           continue;
1423         }
1424         List<NormalizationPlan> plans = this.normalizer.computePlanForTable(table);
1425         if (plans != null) {
1426           for (NormalizationPlan plan : plans) {
1427             plan.execute(clusterConnection.getAdmin());
1428           }
1429         }
1430       }
1431     }
1432     // If Region did not generate any plans, it means the cluster is already balanced.
1433     // Return true indicating a success.
1434     return true;
1435   }
1436 
1437   /**
1438    * @return Client info for use as prefix on an audit log string; who did an action
1439    */
1440   String getClientIdAuditPrefix() {
1441     return "Client=" + RpcServer.getRequestUserName() + "/" + RpcServer.getRemoteAddress();
1442   }
1443 
1444   /**
1445    * Switch for the background CatalogJanitor thread.
1446    * Used for testing.  The thread will continue to run.  It will just be a noop
1447    * if disabled.
1448    * @param b If false, the catalog janitor won't do anything.
1449    */
1450   public void setCatalogJanitorEnabled(final boolean b) {
1451     this.catalogJanitorChore.setEnabled(b);
1452   }
1453 
1454   @Override
1455   public void dispatchMergingRegions(final HRegionInfo region_a,
1456       final HRegionInfo region_b, final boolean forcible) throws IOException {
1457     checkInitialized();
1458     this.service.submit(new DispatchMergingRegionHandler(this,
1459       this.catalogJanitorChore, region_a, region_b, forcible));
1460   }
1461 
1462   void move(final byte[] encodedRegionName,
1463       final byte[] destServerName) throws HBaseIOException {
1464     RegionState regionState = assignmentManager.getRegionStates().
1465       getRegionState(Bytes.toString(encodedRegionName));
1466     if (regionState == null) {
1467       throw new UnknownRegionException(Bytes.toStringBinary(encodedRegionName));
1468     }
1469 
1470     HRegionInfo hri = regionState.getRegion();
1471     ServerName dest;
1472     if (destServerName == null || destServerName.length == 0) {
1473       LOG.info("Passed destination servername is null/empty so " +
1474         "choosing a server at random");
1475       final List<ServerName> destServers = this.serverManager.createDestinationServersList(
1476         regionState.getServerName());
1477       dest = balancer.randomAssignment(hri, destServers);
1478       if (dest == null) {
1479         LOG.debug("Unable to determine a plan to assign " + hri);
1480         return;
1481       }
1482     } else {
1483       dest = ServerName.valueOf(Bytes.toString(destServerName));
1484       if (dest.equals(serverName) && balancer instanceof BaseLoadBalancer
1485           && !((BaseLoadBalancer)balancer).shouldBeOnMaster(hri)) {
1486         // To avoid unnecessary region moving later by balancer. Don't put user
1487         // regions on master. Regions on master could be put on other region
1488         // server intentionally by test however.
1489         LOG.debug("Skipping move of region " + hri.getRegionNameAsString()
1490           + " to avoid unnecessary region moving later by load balancer,"
1491           + " because it should not be on master");
1492         return;
1493       }
1494     }
1495 
1496     if (dest.equals(regionState.getServerName())) {
1497       LOG.debug("Skipping move of region " + hri.getRegionNameAsString()
1498         + " because region already assigned to the same server " + dest + ".");
1499       return;
1500     }
1501 
1502     // Now we can do the move
1503     RegionPlan rp = new RegionPlan(hri, regionState.getServerName(), dest);
1504 
1505     try {
1506       checkInitialized();
1507       if (this.cpHost != null) {
1508         if (this.cpHost.preMove(hri, rp.getSource(), rp.getDestination())) {
1509           return;
1510         }
1511       }
1512       // warmup the region on the destination before initiating the move. this call
1513       // is synchronous and takes some time. doing it before the source region gets
1514       // closed
1515       serverManager.sendRegionWarmup(rp.getDestination(), hri);
1516 
1517       LOG.info(getClientIdAuditPrefix() + " move " + rp + ", running balancer");
1518       this.assignmentManager.balance(rp);
1519       if (this.cpHost != null) {
1520         this.cpHost.postMove(hri, rp.getSource(), rp.getDestination());
1521       }
1522     } catch (IOException ioe) {
1523       if (ioe instanceof HBaseIOException) {
1524         throw (HBaseIOException)ioe;
1525       }
1526       throw new HBaseIOException(ioe);
1527     }
1528   }
1529 
1530   @Override
1531   public long createTable(
1532       final HTableDescriptor hTableDescriptor,
1533       final byte [][] splitKeys,
1534       final long nonceGroup,
1535       final long nonce) throws IOException {
1536     if (isStopped()) {
1537       throw new MasterNotRunningException();
1538     }
1539 
1540     String namespace = hTableDescriptor.getTableName().getNamespaceAsString();
1541     ensureNamespaceExists(namespace);
1542 
1543     final HRegionInfo[] newRegions =
1544         ModifyRegionUtils.createHRegionInfos(hTableDescriptor, splitKeys);
1545     checkInitialized();
1546     sanityCheckTableDescriptor(hTableDescriptor);
1547 
1548     return MasterProcedureUtil.submitProcedure(
1549       new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
1550       @Override
1551       protected void run() throws IOException {
1552         getMaster().getMasterCoprocessorHost().preCreateTable(hTableDescriptor, newRegions);
1553 
1554         LOG.info(getClientIdAuditPrefix() + " create " + hTableDescriptor);
1555 
1556         // TODO: We can handle/merge duplicate requests, and differentiate the case of
1557         //       TableExistsException by saying if the schema is the same or not.
1558         ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch();
1559         submitProcedure(new CreateTableProcedure(
1560           procedureExecutor.getEnvironment(), hTableDescriptor, newRegions, latch));
1561         latch.await();
1562 
1563         getMaster().getMasterCoprocessorHost().postCreateTable(hTableDescriptor, newRegions);
1564       }
1565 
1566       @Override
1567       protected String getDescription() {
1568         return "CreateTableProcedure";
1569       }
1570     });
1571   }
1572 
1573   /**
1574    * Checks whether the table conforms to some sane limits, and configured
1575    * values (compression, etc) work. Throws an exception if something is wrong.
1576    * @throws IOException
1577    */
1578   private void sanityCheckTableDescriptor(final HTableDescriptor htd) throws IOException {
1579     final String CONF_KEY = "hbase.table.sanity.checks";
1580     boolean logWarn = false;
1581     if (!conf.getBoolean(CONF_KEY, true)) {
1582       logWarn = true;
1583     }
1584     String tableVal = htd.getConfigurationValue(CONF_KEY);
1585     if (tableVal != null && !Boolean.valueOf(tableVal)) {
1586       logWarn = true;
1587     }
1588 
1589     // check max file size
1590     long maxFileSizeLowerLimit = 2 * 1024 * 1024L; // 2M is the default lower limit
1591     long maxFileSize = htd.getMaxFileSize();
1592     if (maxFileSize < 0) {
1593       maxFileSize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, maxFileSizeLowerLimit);
1594     }
1595     if (maxFileSize < conf.getLong("hbase.hregion.max.filesize.limit", maxFileSizeLowerLimit)) {
1596       String message = "MAX_FILESIZE for table descriptor or "
1597           + "\"hbase.hregion.max.filesize\" (" + maxFileSize
1598           + ") is too small, which might cause over splitting into unmanageable "
1599           + "number of regions.";
1600       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1601     }
1602 
1603     // check flush size
1604     long flushSizeLowerLimit = 1024 * 1024L; // 1M is the default lower limit
1605     long flushSize = htd.getMemStoreFlushSize();
1606     if (flushSize < 0) {
1607       flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, flushSizeLowerLimit);
1608     }
1609     if (flushSize < conf.getLong("hbase.hregion.memstore.flush.size.limit", flushSizeLowerLimit)) {
1610       String message = "MEMSTORE_FLUSHSIZE for table descriptor or "
1611           + "\"hbase.hregion.memstore.flush.size\" ("+flushSize+") is too small, which might cause"
1612           + " very frequent flushing.";
1613       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1614     }
1615 
1616     // check that coprocessors and other specified plugin classes can be loaded
1617     try {
1618       checkClassLoading(conf, htd);
1619     } catch (Exception ex) {
1620       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, ex.getMessage(), null);
1621     }
1622 
1623     // check compression can be loaded
1624     try {
1625       checkCompression(htd);
1626     } catch (IOException e) {
1627       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, e.getMessage(), e);
1628     }
1629 
1630     // check encryption can be loaded
1631     try {
1632       checkEncryption(conf, htd);
1633     } catch (IOException e) {
1634       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, e.getMessage(), e);
1635     }
1636     // Verify compaction policy
1637     try{
1638       checkCompactionPolicy(conf, htd);
1639     } catch(IOException e){
1640       warnOrThrowExceptionForFailure(false, CONF_KEY, e.getMessage(), e);
1641     }
1642     // check that we have at least 1 CF
1643     if (htd.getColumnFamilies().length == 0) {
1644       String message = "Table should have at least one column family.";
1645       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1646     }
1647 
1648     for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1649       if (hcd.getTimeToLive() <= 0) {
1650         String message = "TTL for column family " + hcd.getNameAsString() + " must be positive.";
1651         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1652       }
1653 
1654       // check blockSize
1655       if (hcd.getBlocksize() < 1024 || hcd.getBlocksize() > 16 * 1024 * 1024) {
1656         String message = "Block size for column family " + hcd.getNameAsString()
1657             + "  must be between 1K and 16MB.";
1658         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1659       }
1660 
1661       // check versions
1662       if (hcd.getMinVersions() < 0) {
1663         String message = "Min versions for column family " + hcd.getNameAsString()
1664           + "  must be positive.";
1665         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1666       }
1667       // max versions already being checked
1668 
1669       // HBASE-13776 Setting illegal versions for HColumnDescriptor
1670       //  does not throw IllegalArgumentException
1671       // check minVersions <= maxVerions
1672       if (hcd.getMinVersions() > hcd.getMaxVersions()) {
1673         String message = "Min versions for column family " + hcd.getNameAsString()
1674             + " must be less than the Max versions.";
1675         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1676       }
1677 
1678       // check replication scope
1679       if (hcd.getScope() < 0) {
1680         String message = "Replication scope for column family "
1681           + hcd.getNameAsString() + "  must be positive.";
1682         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1683       }
1684 
1685       // check data replication factor, it can be 0(default value) when user has not explicitly
1686       // set the value, in this case we use default replication factor set in the file system.
1687       if (hcd.getDFSReplication() < 0) {
1688         String message = "HFile Replication for column family " + hcd.getNameAsString()
1689             + "  must be greater than zero.";
1690         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1691       }
1692 
1693       // TODO: should we check coprocessors and encryption ?
1694     }
1695   }
1696 
1697   private void checkCompactionPolicy(Configuration conf, HTableDescriptor htd)
1698       throws IOException {
1699     // FIFO compaction has some requirements
1700     // Actually FCP ignores periodic major compactions
1701     String className =
1702         htd.getConfigurationValue(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY);
1703     if (className == null) {
1704       className =
1705           conf.get(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY,
1706             ExploringCompactionPolicy.class.getName());
1707     }
1708 
1709     int blockingFileCount = HStore.DEFAULT_BLOCKING_STOREFILE_COUNT;
1710     String sv = htd.getConfigurationValue(HStore.BLOCKING_STOREFILES_KEY);
1711     if (sv != null) {
1712       blockingFileCount = Integer.parseInt(sv);
1713     } else {
1714       blockingFileCount = conf.getInt(HStore.BLOCKING_STOREFILES_KEY, blockingFileCount);
1715     }
1716 
1717     for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1718       String compactionPolicy =
1719           hcd.getConfigurationValue(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY);
1720       if (compactionPolicy == null) {
1721         compactionPolicy = className;
1722       }
1723       if (!compactionPolicy.equals(FIFOCompactionPolicy.class.getName())) {
1724         continue;
1725       }
1726       // FIFOCompaction
1727       String message = null;
1728 
1729       // 1. Check TTL
1730       if (hcd.getTimeToLive() == HColumnDescriptor.DEFAULT_TTL) {
1731         message = "Default TTL is not supported for FIFO compaction";
1732         throw new IOException(message);
1733       }
1734 
1735       // 2. Check min versions
1736       if (hcd.getMinVersions() > 0) {
1737         message = "MIN_VERSION > 0 is not supported for FIFO compaction";
1738         throw new IOException(message);
1739       }
1740 
1741       // 3. blocking file count
1742       String sbfc = htd.getConfigurationValue(HStore.BLOCKING_STOREFILES_KEY);
1743       if (sbfc != null) {
1744         blockingFileCount = Integer.parseInt(sbfc);
1745       }
1746       if (blockingFileCount < 1000) {
1747         message =
1748             "blocking file count '" + HStore.BLOCKING_STOREFILES_KEY + "' " + blockingFileCount
1749                 + " is below recommended minimum of 1000";
1750         throw new IOException(message);
1751       }
1752     }
1753   }
1754 
1755   // HBASE-13350 - Helper method to log warning on sanity check failures if checks disabled.
1756   private static void warnOrThrowExceptionForFailure(boolean logWarn, String confKey,
1757       String message, Exception cause) throws IOException {
1758     if (!logWarn) {
1759       throw new DoNotRetryIOException(message + " Set " + confKey +
1760           " to false at conf or table descriptor if you want to bypass sanity checks", cause);
1761     }
1762     LOG.warn(message);
1763   }
1764 
1765   private void startActiveMasterManager(int infoPort) throws KeeperException {
1766     String backupZNode = ZKUtil.joinZNode(
1767       zooKeeper.backupMasterAddressesZNode, serverName.toString());
1768     /*
1769     * Add a ZNode for ourselves in the backup master directory since we
1770     * may not become the active master. If so, we want the actual active
1771     * master to know we are backup masters, so that it won't assign
1772     * regions to us if so configured.
1773     *
1774     * If we become the active master later, ActiveMasterManager will delete
1775     * this node explicitly.  If we crash before then, ZooKeeper will delete
1776     * this node for us since it is ephemeral.
1777     */
1778     LOG.info("Adding backup master ZNode " + backupZNode);
1779     if (!MasterAddressTracker.setMasterAddress(zooKeeper, backupZNode,
1780         serverName, infoPort)) {
1781       LOG.warn("Failed create of " + backupZNode + " by " + serverName);
1782     }
1783 
1784     activeMasterManager.setInfoPort(infoPort);
1785     // Start a thread to try to become the active master, so we won't block here
1786     Threads.setDaemonThreadRunning(new Thread(new Runnable() {
1787       @Override
1788       public void run() {
1789         int timeout = conf.getInt(HConstants.ZK_SESSION_TIMEOUT,
1790           HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
1791         // If we're a backup master, stall until a primary to writes his address
1792         if (conf.getBoolean(HConstants.MASTER_TYPE_BACKUP,
1793           HConstants.DEFAULT_MASTER_TYPE_BACKUP)) {
1794           LOG.debug("HMaster started in backup mode. "
1795             + "Stalling until master znode is written.");
1796           // This will only be a minute or so while the cluster starts up,
1797           // so don't worry about setting watches on the parent znode
1798           while (!activeMasterManager.hasActiveMaster()) {
1799             LOG.debug("Waiting for master address ZNode to be written "
1800               + "(Also watching cluster state node)");
1801             Threads.sleep(timeout);
1802           }
1803         }
1804         MonitoredTask status = TaskMonitor.get().createStatus("Master startup");
1805         status.setDescription("Master startup");
1806         try {
1807           if (activeMasterManager.blockUntilBecomingActiveMaster(timeout, status)) {
1808             finishActiveMasterInitialization(status);
1809           }
1810         } catch (Throwable t) {
1811           status.setStatus("Failed to become active: " + t.getMessage());
1812           LOG.fatal("Failed to become active master", t);
1813           // HBASE-5680: Likely hadoop23 vs hadoop 20.x/1.x incompatibility
1814           if (t instanceof NoClassDefFoundError &&
1815             t.getMessage()
1816               .contains("org/apache/hadoop/hdfs/protocol/HdfsConstants$SafeModeAction")) {
1817             // improved error message for this special case
1818             abort("HBase is having a problem with its Hadoop jars.  You may need to "
1819               + "recompile HBase against Hadoop version "
1820               + org.apache.hadoop.util.VersionInfo.getVersion()
1821               + " or change your hadoop jars to start properly", t);
1822           } else {
1823             abort("Unhandled exception. Starting shutdown.", t);
1824           }
1825         } finally {
1826           status.cleanup();
1827         }
1828       }
1829     }, getServerName().toShortString() + ".activeMasterManager"));
1830   }
1831 
1832   private void checkCompression(final HTableDescriptor htd)
1833   throws IOException {
1834     if (!this.masterCheckCompression) return;
1835     for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1836       checkCompression(hcd);
1837     }
1838   }
1839 
1840   private void checkCompression(final HColumnDescriptor hcd)
1841   throws IOException {
1842     if (!this.masterCheckCompression) return;
1843     CompressionTest.testCompression(hcd.getCompression());
1844     CompressionTest.testCompression(hcd.getCompactionCompression());
1845   }
1846 
1847   private void checkEncryption(final Configuration conf, final HTableDescriptor htd)
1848   throws IOException {
1849     if (!this.masterCheckEncryption) return;
1850     for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1851       checkEncryption(conf, hcd);
1852     }
1853   }
1854 
1855   private void checkEncryption(final Configuration conf, final HColumnDescriptor hcd)
1856   throws IOException {
1857     if (!this.masterCheckEncryption) return;
1858     EncryptionTest.testEncryption(conf, hcd.getEncryptionType(), hcd.getEncryptionKey());
1859   }
1860 
1861   private void checkClassLoading(final Configuration conf, final HTableDescriptor htd)
1862   throws IOException {
1863     RegionSplitPolicy.getSplitPolicyClass(htd, conf);
1864     RegionCoprocessorHost.testTableCoprocessorAttrs(conf, htd);
1865   }
1866 
1867   private static boolean isCatalogTable(final TableName tableName) {
1868     return tableName.equals(TableName.META_TABLE_NAME);
1869   }
1870 
1871   @Override
1872   public long deleteTable(
1873       final TableName tableName,
1874       final long nonceGroup,
1875       final long nonce) throws IOException {
1876     checkInitialized();
1877 
1878     return MasterProcedureUtil.submitProcedure(
1879       new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
1880       @Override
1881       protected void run() throws IOException {
1882         getMaster().getMasterCoprocessorHost().preDeleteTable(tableName);
1883 
1884         LOG.info(getClientIdAuditPrefix() + " delete " + tableName);
1885 
1886         // TODO: We can handle/merge duplicate request
1887         ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch();
1888         submitProcedure(new DeleteTableProcedure(procedureExecutor.getEnvironment(),
1889             tableName, latch));
1890         latch.await();
1891 
1892         getMaster().getMasterCoprocessorHost().postDeleteTable(tableName);
1893       }
1894 
1895       @Override
1896       protected String getDescription() {
1897         return "DeleteTableProcedure";
1898       }
1899     });
1900   }
1901 
1902   @Override
1903   public void truncateTable(
1904       final TableName tableName,
1905       final boolean preserveSplits,
1906       final long nonceGroup,
1907       final long nonce) throws IOException {
1908     checkInitialized();
1909 
1910     MasterProcedureUtil.submitProcedure(
1911       new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
1912       @Override
1913       protected void run() throws IOException {
1914         getMaster().getMasterCoprocessorHost().preTruncateTable(tableName);
1915 
1916         LOG.info(getClientIdAuditPrefix() + " truncate " + tableName);
1917 
1918         long procId = submitProcedure(new TruncateTableProcedure(procedureExecutor.getEnvironment(),
1919             tableName, preserveSplits));
1920         ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
1921 
1922         getMaster().getMasterCoprocessorHost().postTruncateTable(tableName);
1923       }
1924 
1925       @Override
1926       protected String getDescription() {
1927         return "TruncateTableProcedure";
1928       }
1929     });
1930   }
1931 
1932   @Override
1933   public void addColumn(
1934       final TableName tableName,
1935       final HColumnDescriptor columnDescriptor,
1936       final long nonceGroup,
1937       final long nonce)
1938       throws IOException {
1939     checkInitialized();
1940     checkCompression(columnDescriptor);
1941     checkEncryption(conf, columnDescriptor);
1942 
1943     MasterProcedureUtil.submitProcedure(
1944       new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
1945       @Override
1946       protected void run() throws IOException {
1947         if (getMaster().getMasterCoprocessorHost().preAddColumn(tableName, columnDescriptor)) {
1948           return;
1949         }
1950 
1951         // Execute the operation synchronously, wait for the operation to complete before continuing
1952         long procId = submitProcedure(new AddColumnFamilyProcedure(
1953           procedureExecutor.getEnvironment(), tableName, columnDescriptor));
1954         ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
1955 
1956         getMaster().getMasterCoprocessorHost().postAddColumn(tableName, columnDescriptor);
1957       }
1958 
1959       @Override
1960       protected String getDescription() {
1961         return "AddColumnFamilyProcedure";
1962       }
1963     });
1964   }
1965 
1966   @Override
1967   public void modifyColumn(
1968       final TableName tableName,
1969       final HColumnDescriptor descriptor,
1970       final long nonceGroup,
1971       final long nonce)
1972       throws IOException {
1973     checkInitialized();
1974     checkCompression(descriptor);
1975     checkEncryption(conf, descriptor);
1976 
1977     MasterProcedureUtil.submitProcedure(
1978       new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
1979       @Override
1980       protected void run() throws IOException {
1981         if (getMaster().getMasterCoprocessorHost().preModifyColumn(tableName, descriptor)) {
1982           return;
1983         }
1984 
1985         LOG.info(getClientIdAuditPrefix() + " modify " + descriptor);
1986 
1987         // Execute the operation synchronously - wait for the operation to complete before continuing.
1988         long procId = submitProcedure(new ModifyColumnFamilyProcedure(
1989           procedureExecutor.getEnvironment(), tableName, descriptor));
1990         ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
1991 
1992         getMaster().getMasterCoprocessorHost().postModifyColumn(tableName, descriptor);
1993       }
1994 
1995       @Override
1996       protected String getDescription() {
1997         return "ModifyColumnFamilyProcedure";
1998       }
1999     });
2000   }
2001 
2002   @Override
2003   public void deleteColumn(
2004       final TableName tableName,
2005       final byte[] columnName,
2006       final long nonceGroup,
2007       final long nonce)
2008       throws IOException {
2009     checkInitialized();
2010 
2011     MasterProcedureUtil.submitProcedure(
2012       new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2013       @Override
2014       protected void run() throws IOException {
2015         if (getMaster().getMasterCoprocessorHost().preDeleteColumn(tableName, columnName)) {
2016           return;
2017         }
2018 
2019         LOG.info(getClientIdAuditPrefix() + " delete " + Bytes.toString(columnName));
2020 
2021         // Execute the operation synchronously - wait for the operation to complete before
2022         // continuing.
2023         long procId = submitProcedure(new DeleteColumnFamilyProcedure(
2024           procedureExecutor.getEnvironment(), tableName, columnName));
2025         ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
2026 
2027         getMaster().getMasterCoprocessorHost().postDeleteColumn(tableName, columnName);
2028       }
2029 
2030       @Override
2031       protected String getDescription() {
2032         return "DeleteColumnFamilyProcedure";
2033       }
2034     });
2035   }
2036 
2037   @Override
2038   public long enableTable(final TableName tableName, final long nonceGroup, final long nonce)
2039       throws IOException {
2040     checkInitialized();
2041 
2042     return MasterProcedureUtil.submitProcedure(
2043         new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2044       @Override
2045       protected void run() throws IOException {
2046         getMaster().getMasterCoprocessorHost().preEnableTable(tableName);
2047 
2048         LOG.info(getClientIdAuditPrefix() + " enable " + tableName);
2049 
2050         // Execute the operation asynchronously - client will check the progress of the operation
2051         // In case the request is from a <1.1 client before returning,
2052         // we want to make sure that the table is prepared to be
2053         // enabled (the table is locked and the table state is set).
2054         // Note: if the procedure throws exception, we will catch it and rethrow.
2055         final ProcedurePrepareLatch prepareLatch = ProcedurePrepareLatch.createLatch();
2056         submitProcedure(new EnableTableProcedure(procedureExecutor.getEnvironment(),
2057             tableName, false, prepareLatch));
2058         prepareLatch.await();
2059 
2060         getMaster().getMasterCoprocessorHost().postEnableTable(tableName);
2061       }
2062 
2063       @Override
2064       protected String getDescription() {
2065         return "EnableTableProcedure";
2066       }
2067     });
2068   }
2069 
2070   @Override
2071   public long disableTable(final TableName tableName, final long nonceGroup, final long nonce)
2072       throws IOException {
2073     checkInitialized();
2074 
2075     return MasterProcedureUtil.submitProcedure(
2076         new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2077       @Override
2078       protected void run() throws IOException {
2079         getMaster().getMasterCoprocessorHost().preDisableTable(tableName);
2080 
2081         LOG.info(getClientIdAuditPrefix() + " disable " + tableName);
2082 
2083         // Execute the operation asynchronously - client will check the progress of the operation
2084         // In case the request is from a <1.1 client before returning,
2085         // we want to make sure that the table is prepared to be
2086         // enabled (the table is locked and the table state is set).
2087         // Note: if the procedure throws exception, we will catch it and rethrow.
2088         final ProcedurePrepareLatch prepareLatch = ProcedurePrepareLatch.createLatch();
2089         submitProcedure(new DisableTableProcedure(procedureExecutor.getEnvironment(),
2090             tableName, false, prepareLatch));
2091         prepareLatch.await();
2092 
2093         getMaster().getMasterCoprocessorHost().postDisableTable(tableName);
2094       }
2095 
2096       @Override
2097       protected String getDescription() {
2098         return "DisableTableProcedure";
2099       }
2100     });
2101   }
2102 
2103   /**
2104    * Return the region and current deployment for the region containing
2105    * the given row. If the region cannot be found, returns null. If it
2106    * is found, but not currently deployed, the second element of the pair
2107    * may be null.
2108    */
2109   @VisibleForTesting // Used by TestMaster.
2110   Pair<HRegionInfo, ServerName> getTableRegionForRow(
2111       final TableName tableName, final byte [] rowKey)
2112   throws IOException {
2113     final AtomicReference<Pair<HRegionInfo, ServerName>> result =
2114       new AtomicReference<Pair<HRegionInfo, ServerName>>(null);
2115 
2116     MetaScannerVisitor visitor =
2117       new MetaScannerVisitorBase() {
2118         @Override
2119         public boolean processRow(Result data) throws IOException {
2120           if (data == null || data.size() <= 0) {
2121             return true;
2122           }
2123           Pair<HRegionInfo, ServerName> pair = HRegionInfo.getHRegionInfoAndServerName(data);
2124           if (pair == null) {
2125             return false;
2126           }
2127           if (!pair.getFirst().getTable().equals(tableName)) {
2128             return false;
2129           }
2130           result.set(pair);
2131           return true;
2132         }
2133     };
2134 
2135     MetaScanner.metaScan(clusterConnection, visitor, tableName, rowKey, 1);
2136     return result.get();
2137   }
2138 
2139   @Override
2140   public void modifyTable(
2141       final TableName tableName,
2142       final HTableDescriptor descriptor,
2143       final long nonceGroup,
2144       final long nonce)
2145       throws IOException {
2146     checkInitialized();
2147     sanityCheckTableDescriptor(descriptor);
2148 
2149     MasterProcedureUtil.submitProcedure(
2150       new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
2151       @Override
2152       protected void run() throws IOException {
2153         getMaster().getMasterCoprocessorHost().preModifyTable(tableName, descriptor);
2154 
2155         LOG.info(getClientIdAuditPrefix() + " modify " + tableName);
2156 
2157         // Execute the operation synchronously - wait for the operation completes before continuing.
2158         long procId = submitProcedure(new ModifyTableProcedure(
2159           procedureExecutor.getEnvironment(), descriptor));
2160         ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
2161 
2162         getMaster().getMasterCoprocessorHost().postModifyTable(tableName, descriptor);
2163       }
2164 
2165       @Override
2166       protected String getDescription() {
2167         return "ModifyTableProcedure";
2168       }
2169     });
2170   }
2171 
2172   @Override
2173   public void checkTableModifiable(final TableName tableName)
2174       throws IOException, TableNotFoundException, TableNotDisabledException {
2175     if (isCatalogTable(tableName)) {
2176       throw new IOException("Can't modify catalog tables");
2177     }
2178     if (!MetaTableAccessor.tableExists(getConnection(), tableName)) {
2179       throw new TableNotFoundException(tableName);
2180     }
2181     if (!getAssignmentManager().getTableStateManager().
2182         isTableState(tableName, ZooKeeperProtos.Table.State.DISABLED)) {
2183       throw new TableNotDisabledException(tableName);
2184     }
2185   }
2186 
2187   /**
2188    * @return cluster status
2189    */
2190   public ClusterStatus getClusterStatus() throws InterruptedIOException {
2191     // Build Set of backup masters from ZK nodes
2192     List<String> backupMasterStrings;
2193     try {
2194       backupMasterStrings = ZKUtil.listChildrenNoWatch(this.zooKeeper,
2195         this.zooKeeper.backupMasterAddressesZNode);
2196     } catch (KeeperException e) {
2197       LOG.warn(this.zooKeeper.prefix("Unable to list backup servers"), e);
2198       backupMasterStrings = null;
2199     }
2200 
2201     List<ServerName> backupMasters = null;
2202     if (backupMasterStrings != null && !backupMasterStrings.isEmpty()) {
2203       backupMasters = new ArrayList<ServerName>(backupMasterStrings.size());
2204       for (String s: backupMasterStrings) {
2205         try {
2206           byte [] bytes;
2207           try {
2208             bytes = ZKUtil.getData(this.zooKeeper, ZKUtil.joinZNode(
2209                 this.zooKeeper.backupMasterAddressesZNode, s));
2210           } catch (InterruptedException e) {
2211             throw new InterruptedIOException();
2212           }
2213           if (bytes != null) {
2214             ServerName sn;
2215             try {
2216               sn = ServerName.parseFrom(bytes);
2217             } catch (DeserializationException e) {
2218               LOG.warn("Failed parse, skipping registering backup server", e);
2219               continue;
2220             }
2221             backupMasters.add(sn);
2222           }
2223         } catch (KeeperException e) {
2224           LOG.warn(this.zooKeeper.prefix("Unable to get information about " +
2225                    "backup servers"), e);
2226         }
2227       }
2228       Collections.sort(backupMasters, new Comparator<ServerName>() {
2229         @Override
2230         public int compare(ServerName s1, ServerName s2) {
2231           return s1.getServerName().compareTo(s2.getServerName());
2232         }});
2233     }
2234 
2235     String clusterId = fileSystemManager != null ?
2236       fileSystemManager.getClusterId().toString() : null;
2237     Map<String, RegionState> regionsInTransition = assignmentManager != null ?
2238       assignmentManager.getRegionStates().getRegionsInTransition() : null;
2239     String[] coprocessors = cpHost != null ? getMasterCoprocessors() : null;
2240     boolean balancerOn = loadBalancerTracker != null ?
2241       loadBalancerTracker.isBalancerOn() : false;
2242     Map<ServerName, ServerLoad> onlineServers = null;
2243     Set<ServerName> deadServers = null;
2244     if (serverManager != null) {
2245       deadServers = serverManager.getDeadServers().copyServerNames();
2246       onlineServers = serverManager.getOnlineServers();
2247     }
2248     return new ClusterStatus(VersionInfo.getVersion(), clusterId,
2249       onlineServers, deadServers, serverName, backupMasters,
2250       regionsInTransition, coprocessors, balancerOn);
2251   }
2252 
2253   /**
2254    * The set of loaded coprocessors is stored in a static set. Since it's
2255    * statically allocated, it does not require that HMaster's cpHost be
2256    * initialized prior to accessing it.
2257    * @return a String representation of the set of names of the loaded
2258    * coprocessors.
2259    */
2260   public static String getLoadedCoprocessors() {
2261     return CoprocessorHost.getLoadedCoprocessors().toString();
2262   }
2263 
2264   /**
2265    * @return timestamp in millis when HMaster was started.
2266    */
2267   public long getMasterStartTime() {
2268     return startcode;
2269   }
2270 
2271   /**
2272    * @return timestamp in millis when HMaster became the active master.
2273    */
2274   public long getMasterActiveTime() {
2275     return masterActiveTime;
2276   }
2277 
2278   public int getRegionServerInfoPort(final ServerName sn) {
2279     RegionServerInfo info = this.regionServerTracker.getRegionServerInfo(sn);
2280     if (info == null || info.getInfoPort() == 0) {
2281       return conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
2282         HConstants.DEFAULT_REGIONSERVER_INFOPORT);
2283     }
2284     return info.getInfoPort();
2285   }
2286 
2287   public String getRegionServerVersion(final ServerName sn) {
2288     RegionServerInfo info = this.regionServerTracker.getRegionServerInfo(sn);
2289     if (info != null && info.hasVersionInfo()) {
2290       return info.getVersionInfo().getVersion();
2291     }
2292     return "Unknown";
2293   }
2294 
2295   /**
2296    * @return array of coprocessor SimpleNames.
2297    */
2298   public String[] getMasterCoprocessors() {
2299     Set<String> masterCoprocessors = getMasterCoprocessorHost().getCoprocessors();
2300     return masterCoprocessors.toArray(new String[masterCoprocessors.size()]);
2301   }
2302 
2303   @Override
2304   public void abort(final String msg, final Throwable t) {
2305     if (isAborted() || isStopped()) {
2306       return;
2307     }
2308     if (cpHost != null) {
2309       // HBASE-4014: dump a list of loaded coprocessors.
2310       LOG.fatal("Master server abort: loaded coprocessors are: " +
2311           getLoadedCoprocessors());
2312     }
2313     if (t != null) LOG.fatal(msg, t);
2314 
2315     try {
2316       stopMaster();
2317     } catch (IOException e) {
2318       LOG.error("Exception occurred while stopping master", e);
2319     }
2320   }
2321 
2322   @Override
2323   public ZooKeeperWatcher getZooKeeper() {
2324     return zooKeeper;
2325   }
2326 
2327   @Override
2328   public MasterCoprocessorHost getMasterCoprocessorHost() {
2329     return cpHost;
2330   }
2331 
2332   @Override
2333   public MasterQuotaManager getMasterQuotaManager() {
2334     return quotaManager;
2335   }
2336 
2337   @Override
2338   public ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
2339     return procedureExecutor;
2340   }
2341 
2342   @Override
2343   public ServerName getServerName() {
2344     return this.serverName;
2345   }
2346 
2347   @Override
2348   public AssignmentManager getAssignmentManager() {
2349     return this.assignmentManager;
2350   }
2351 
2352   public MemoryBoundedLogMessageBuffer getRegionServerFatalLogBuffer() {
2353     return rsFatals;
2354   }
2355 
2356   public void shutdown() throws IOException {
2357     if (cpHost != null) {
2358       cpHost.preShutdown();
2359     }
2360 
2361     if (this.serverManager != null) {
2362       this.serverManager.shutdownCluster();
2363     }
2364     if (this.clusterStatusTracker != null){
2365       try {
2366         this.clusterStatusTracker.setClusterDown();
2367       } catch (KeeperException e) {
2368         LOG.error("ZooKeeper exception trying to set cluster as down in ZK", e);
2369       }
2370     }
2371   }
2372 
2373   public void stopMaster() throws IOException {
2374     if (cpHost != null) {
2375       cpHost.preStopMaster();
2376     }
2377     stop("Stopped by " + Thread.currentThread().getName());
2378   }
2379 
2380   void checkServiceStarted() throws ServerNotRunningYetException {
2381     if (!serviceStarted) {
2382       throw new ServerNotRunningYetException("Server is not running yet");
2383     }
2384   }
2385 
2386   void checkInitialized() throws PleaseHoldException, ServerNotRunningYetException {
2387     checkServiceStarted();
2388     if (!isInitialized()) {
2389       throw new PleaseHoldException("Master is initializing");
2390     }
2391   }
2392 
2393   void checkNamespaceManagerReady() throws IOException {
2394     checkInitialized();
2395     if (tableNamespaceManager == null ||
2396         !tableNamespaceManager.isTableAvailableAndInitialized()) {
2397       throw new IOException("Table Namespace Manager not ready yet, try again later");
2398     }
2399   }
2400   /**
2401    * Report whether this master is currently the active master or not.
2402    * If not active master, we are parked on ZK waiting to become active.
2403    *
2404    * This method is used for testing.
2405    *
2406    * @return true if active master, false if not.
2407    */
2408   public boolean isActiveMaster() {
2409     return isActiveMaster;
2410   }
2411 
2412   /**
2413    * Report whether this master has completed with its initialization and is
2414    * ready.  If ready, the master is also the active master.  A standby master
2415    * is never ready.
2416    *
2417    * This method is used for testing.
2418    *
2419    * @return true if master is ready to go, false if not.
2420    */
2421   @Override
2422   public boolean isInitialized() {
2423     return initialized.isReady();
2424   }
2425 
2426   @VisibleForTesting
2427   public void setInitialized(boolean isInitialized) {
2428     procedureExecutor.getEnvironment().setEventReady(initialized, isInitialized);
2429   }
2430 
2431   public ProcedureEvent getInitializedEvent() {
2432     return initialized;
2433   }
2434 
2435   /**
2436    * ServerCrashProcessingEnabled is set false before completing assignMeta to prevent processing
2437    * of crashed servers.
2438    * @return true if assignMeta has completed;
2439    */
2440   @Override
2441   public boolean isServerCrashProcessingEnabled() {
2442     return serverCrashProcessingEnabled.isReady();
2443   }
2444 
2445   @VisibleForTesting
2446   public void setServerCrashProcessingEnabled(final boolean b) {
2447     procedureExecutor.getEnvironment().setEventReady(serverCrashProcessingEnabled, b);
2448   }
2449 
2450   public ProcedureEvent getServerCrashProcessingEnabledEvent() {
2451     return serverCrashProcessingEnabled;
2452   }
2453 
2454   /**
2455    * Report whether this master has started initialization and is about to do meta region assignment
2456    * @return true if master is in initialization &amp; about to assign hbase:meta regions
2457    */
2458   public boolean isInitializationStartsMetaRegionAssignment() {
2459     return this.initializationBeforeMetaAssignment;
2460   }
2461 
2462   public void assignRegion(HRegionInfo hri) {
2463     assignmentManager.assign(hri, true);
2464   }
2465 
2466   /**
2467    * Compute the average load across all region servers.
2468    * Currently, this uses a very naive computation - just uses the number of
2469    * regions being served, ignoring stats about number of requests.
2470    * @return the average load
2471    */
2472   public double getAverageLoad() {
2473     if (this.assignmentManager == null) {
2474       return 0;
2475     }
2476 
2477     RegionStates regionStates = this.assignmentManager.getRegionStates();
2478     if (regionStates == null) {
2479       return 0;
2480     }
2481     return regionStates.getAverageLoad();
2482   }
2483 
2484   @Override
2485   public boolean registerService(Service instance) {
2486     /*
2487      * No stacking of instances is allowed for a single service name
2488      */
2489     Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
2490     if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
2491       LOG.error("Coprocessor service "+serviceDesc.getFullName()+
2492           " already registered, rejecting request from "+instance
2493       );
2494       return false;
2495     }
2496 
2497     coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
2498     if (LOG.isDebugEnabled()) {
2499       LOG.debug("Registered master coprocessor service: service="+serviceDesc.getFullName());
2500     }
2501     return true;
2502   }
2503 
2504   /**
2505    * Utility for constructing an instance of the passed HMaster class.
2506    * @param masterClass
2507    * @param conf
2508    * @return HMaster instance.
2509    */
2510   public static HMaster constructMaster(Class<? extends HMaster> masterClass,
2511       final Configuration conf, final CoordinatedStateManager cp)  {
2512     try {
2513       Constructor<? extends HMaster> c =
2514         masterClass.getConstructor(Configuration.class, CoordinatedStateManager.class);
2515       return c.newInstance(conf, cp);
2516     } catch(Exception e) {
2517       Throwable error = e;
2518       if (e instanceof InvocationTargetException &&
2519           ((InvocationTargetException)e).getTargetException() != null) {
2520         error = ((InvocationTargetException)e).getTargetException();
2521       }
2522       throw new RuntimeException("Failed construction of Master: " + masterClass.toString() + ". "
2523         , error);
2524     }
2525   }
2526 
2527   /**
2528    * @see org.apache.hadoop.hbase.master.HMasterCommandLine
2529    */
2530   public static void main(String [] args) {
2531     VersionInfo.logVersion();
2532     new HMasterCommandLine(HMaster.class).doMain(args);
2533   }
2534 
2535   public HFileCleaner getHFileCleaner() {
2536     return this.hfileCleaner;
2537   }
2538 
2539   /**
2540    * Exposed for TESTING!
2541    * @return the underlying snapshot manager
2542    */
2543   public SnapshotManager getSnapshotManagerForTesting() {
2544     return this.snapshotManager;
2545   }
2546 
2547   @Override
2548   public void createNamespace(NamespaceDescriptor descriptor) throws IOException {
2549     TableName.isLegalNamespaceName(Bytes.toBytes(descriptor.getName()));
2550     checkNamespaceManagerReady();
2551     if (cpHost != null) {
2552       if (cpHost.preCreateNamespace(descriptor)) {
2553         return;
2554       }
2555     }
2556     LOG.info(getClientIdAuditPrefix() + " creating " + descriptor);
2557     tableNamespaceManager.create(descriptor);
2558     if (cpHost != null) {
2559       cpHost.postCreateNamespace(descriptor);
2560     }
2561   }
2562 
2563   @Override
2564   public void modifyNamespace(NamespaceDescriptor descriptor) throws IOException {
2565     TableName.isLegalNamespaceName(Bytes.toBytes(descriptor.getName()));
2566     checkNamespaceManagerReady();
2567     if (cpHost != null) {
2568       if (cpHost.preModifyNamespace(descriptor)) {
2569         return;
2570       }
2571     }
2572     LOG.info(getClientIdAuditPrefix() + " modify " + descriptor);
2573     tableNamespaceManager.update(descriptor);
2574     if (cpHost != null) {
2575       cpHost.postModifyNamespace(descriptor);
2576     }
2577   }
2578 
2579   @Override
2580   public void deleteNamespace(String name) throws IOException {
2581     checkNamespaceManagerReady();
2582     if (cpHost != null) {
2583       if (cpHost.preDeleteNamespace(name)) {
2584         return;
2585       }
2586     }
2587     LOG.info(getClientIdAuditPrefix() + " delete " + name);
2588     tableNamespaceManager.remove(name);
2589     if (cpHost != null) {
2590       cpHost.postDeleteNamespace(name);
2591     }
2592   }
2593 
2594   /**
2595    * Ensure that the specified namespace exists, otherwise throws a NamespaceNotFoundException
2596    *
2597    * @param name the namespace to check
2598    * @throws IOException if the namespace manager is not ready yet.
2599    * @throws NamespaceNotFoundException if the namespace does not exists
2600    */
2601   protected void ensureNamespaceExists(final String name)
2602       throws IOException, NamespaceNotFoundException {
2603     checkNamespaceManagerReady();
2604     NamespaceDescriptor nsd = tableNamespaceManager.get(name);
2605     if (nsd == null) {
2606       throw new NamespaceNotFoundException(name);
2607     }
2608   }
2609 
2610   @Override
2611   public NamespaceDescriptor getNamespaceDescriptor(String name) throws IOException {
2612     checkNamespaceManagerReady();
2613 
2614     if (cpHost != null) {
2615       cpHost.preGetNamespaceDescriptor(name);
2616     }
2617 
2618     NamespaceDescriptor nsd = tableNamespaceManager.get(name);
2619     if (nsd == null) {
2620       throw new NamespaceNotFoundException(name);
2621     }
2622 
2623     if (cpHost != null) {
2624       cpHost.postGetNamespaceDescriptor(nsd);
2625     }
2626 
2627     return nsd;
2628   }
2629 
2630   @Override
2631   public List<NamespaceDescriptor> listNamespaceDescriptors() throws IOException {
2632     checkNamespaceManagerReady();
2633 
2634     final List<NamespaceDescriptor> descriptors = new ArrayList<NamespaceDescriptor>();
2635     boolean bypass = false;
2636     if (cpHost != null) {
2637       bypass = cpHost.preListNamespaceDescriptors(descriptors);
2638     }
2639 
2640     if (!bypass) {
2641       descriptors.addAll(tableNamespaceManager.list());
2642 
2643       if (cpHost != null) {
2644         cpHost.postListNamespaceDescriptors(descriptors);
2645       }
2646     }
2647     return descriptors;
2648   }
2649 
2650   @Override
2651   public boolean abortProcedure(final long procId, final boolean mayInterruptIfRunning)
2652       throws IOException {
2653     if (cpHost != null) {
2654       cpHost.preAbortProcedure(this.procedureExecutor, procId);
2655     }
2656 
2657     final boolean result = this.procedureExecutor.abort(procId, mayInterruptIfRunning);
2658 
2659     if (cpHost != null) {
2660       cpHost.postAbortProcedure();
2661     }
2662 
2663     return result;
2664   }
2665 
2666   @Override
2667   public List<ProcedureInfo> listProcedures() throws IOException {
2668     if (cpHost != null) {
2669       cpHost.preListProcedures();
2670     }
2671 
2672     final List<ProcedureInfo> procInfoList = this.procedureExecutor.listProcedures();
2673 
2674     if (cpHost != null) {
2675       cpHost.postListProcedures(procInfoList);
2676     }
2677 
2678     return procInfoList;
2679   }
2680 
2681   @Override
2682   public List<HTableDescriptor> listTableDescriptorsByNamespace(String name) throws IOException {
2683     ensureNamespaceExists(name);
2684     return listTableDescriptors(name, null, null, true);
2685   }
2686 
2687   @Override
2688   public List<TableName> listTableNamesByNamespace(String name) throws IOException {
2689     ensureNamespaceExists(name);
2690     return listTableNames(name, null, true);
2691   }
2692 
2693   /**
2694    * Returns the list of table descriptors that match the specified request
2695    *
2696    * @param namespace the namespace to query, or null if querying for all
2697    * @param regex The regular expression to match against, or null if querying for all
2698    * @param tableNameList the list of table names, or null if querying for all
2699    * @param includeSysTables False to match only against userspace tables
2700    * @return the list of table descriptors
2701    */
2702   public List<HTableDescriptor> listTableDescriptors(final String namespace, final String regex,
2703       final List<TableName> tableNameList, final boolean includeSysTables)
2704       throws IOException {
2705     final List<HTableDescriptor> descriptors = new ArrayList<HTableDescriptor>();
2706 
2707     boolean bypass = false;
2708     if (cpHost != null) {
2709       bypass = cpHost.preGetTableDescriptors(tableNameList, descriptors);
2710       // method required for AccessController.
2711       bypass |= cpHost.preGetTableDescriptors(tableNameList, descriptors, regex);
2712     }
2713 
2714     if (!bypass) {
2715       if (tableNameList == null || tableNameList.size() == 0) {
2716         // request for all TableDescriptors
2717         Collection<HTableDescriptor> htds;
2718         if (namespace != null && namespace.length() > 0) {
2719           htds = tableDescriptors.getByNamespace(namespace).values();
2720         } else {
2721           htds = tableDescriptors.getAll().values();
2722         }
2723 
2724         for (HTableDescriptor desc: htds) {
2725           if (includeSysTables || !desc.getTableName().isSystemTable()) {
2726             descriptors.add(desc);
2727           }
2728         }
2729       } else {
2730         for (TableName s: tableNameList) {
2731           HTableDescriptor desc = tableDescriptors.get(s);
2732           if (desc != null) {
2733             descriptors.add(desc);
2734           }
2735         }
2736       }
2737 
2738       // Retains only those matched by regular expression.
2739       if (regex != null) {
2740         filterTablesByRegex(descriptors, Pattern.compile(regex));
2741       }
2742 
2743       if (cpHost != null) {
2744         cpHost.postGetTableDescriptors(descriptors);
2745         // method required for AccessController.
2746         cpHost.postGetTableDescriptors(tableNameList, descriptors, regex);
2747       }
2748     }
2749     return descriptors;
2750   }
2751 
2752   /**
2753    * Returns the list of table names that match the specified request
2754    * @param regex The regular expression to match against, or null if querying for all
2755    * @param namespace the namespace to query, or null if querying for all
2756    * @param includeSysTables False to match only against userspace tables
2757    * @return the list of table names
2758    */
2759   public List<TableName> listTableNames(final String namespace, final String regex,
2760       final boolean includeSysTables) throws IOException {
2761     final List<HTableDescriptor> descriptors = new ArrayList<HTableDescriptor>();
2762 
2763     boolean bypass = false;
2764     if (cpHost != null) {
2765       bypass = cpHost.preGetTableNames(descriptors, regex);
2766     }
2767 
2768     if (!bypass) {
2769       // get all descriptors
2770       Collection<HTableDescriptor> htds;
2771       if (namespace != null && namespace.length() > 0) {
2772         htds = tableDescriptors.getByNamespace(namespace).values();
2773       } else {
2774         htds = tableDescriptors.getAll().values();
2775       }
2776 
2777       for (HTableDescriptor htd: htds) {
2778         if (includeSysTables || !htd.getTableName().isSystemTable()) {
2779           descriptors.add(htd);
2780         }
2781       }
2782 
2783       // Retains only those matched by regular expression.
2784       if (regex != null) {
2785         filterTablesByRegex(descriptors, Pattern.compile(regex));
2786       }
2787 
2788       if (cpHost != null) {
2789         cpHost.postGetTableNames(descriptors, regex);
2790       }
2791     }
2792 
2793     List<TableName> result = new ArrayList<TableName>(descriptors.size());
2794     for (HTableDescriptor htd: descriptors) {
2795       result.add(htd.getTableName());
2796     }
2797     return result;
2798   }
2799 
2800 
2801   /**
2802    * Removes the table descriptors that don't match the pattern.
2803    * @param descriptors list of table descriptors to filter
2804    * @param pattern the regex to use
2805    */
2806   private static void filterTablesByRegex(final Collection<HTableDescriptor> descriptors,
2807       final Pattern pattern) {
2808     final String defaultNS = NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR;
2809     Iterator<HTableDescriptor> itr = descriptors.iterator();
2810     while (itr.hasNext()) {
2811       HTableDescriptor htd = itr.next();
2812       String tableName = htd.getTableName().getNameAsString();
2813       boolean matched = pattern.matcher(tableName).matches();
2814       if (!matched && htd.getTableName().getNamespaceAsString().equals(defaultNS)) {
2815         matched = pattern.matcher(defaultNS + TableName.NAMESPACE_DELIM + tableName).matches();
2816       }
2817       if (!matched) {
2818         itr.remove();
2819       }
2820     }
2821   }
2822 
2823   @Override
2824   public long getLastMajorCompactionTimestamp(TableName table) throws IOException {
2825     return getClusterStatus().getLastMajorCompactionTsForTable(table);
2826   }
2827 
2828   @Override
2829   public long getLastMajorCompactionTimestampForRegion(byte[] regionName) throws IOException {
2830     return getClusterStatus().getLastMajorCompactionTsForRegion(regionName);
2831   }
2832 
2833   /**
2834    * Queries the state of the {@link LoadBalancerTracker}. If the balancer is not initialized,
2835    * false is returned.
2836    *
2837    * @return The state of the load balancer, or false if the load balancer isn't defined.
2838    */
2839   public boolean isBalancerOn() {
2840     if (null == loadBalancerTracker) return false;
2841     return loadBalancerTracker.isBalancerOn();
2842   }
2843 
2844   /**
2845    * Queries the state of the {@link RegionNormalizerTracker}. If it's not initialized,
2846    * false is returned.
2847    */
2848    public boolean isNormalizerOn() {
2849     if (null == regionNormalizerTracker) {
2850       return false;
2851     }
2852     return regionNormalizerTracker.isNormalizerOn();
2853   }
2854 
2855   /**
2856    * Fetch the configured {@link LoadBalancer} class name. If none is set, a default is returned.
2857    *
2858    * @return The name of the {@link LoadBalancer} in use.
2859    */
2860   public String getLoadBalancerClassName() {
2861     return conf.get(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, LoadBalancerFactory
2862         .getDefaultLoadBalancerClass().getName());
2863   }
2864 
2865   /**
2866    * @return RegionNormalizerTracker instance
2867    */
2868   public RegionNormalizerTracker getRegionNormalizerTracker() {
2869     return regionNormalizerTracker;
2870   }
2871 }