View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.lang.reflect.Constructor;
24  import java.lang.reflect.InvocationTargetException;
25  import java.net.InetAddress;
26  import java.net.InetSocketAddress;
27  import java.net.UnknownHostException;
28  import java.util.ArrayList;
29  import java.util.Arrays;
30  import java.util.Collection;
31  import java.util.Collections;
32  import java.util.Comparator;
33  import java.util.HashSet;
34  import java.util.Iterator;
35  import java.util.List;
36  import java.util.Map;
37  import java.util.Map.Entry;
38  import java.util.Set;
39  import java.util.concurrent.TimeUnit;
40  import java.util.concurrent.atomic.AtomicInteger;
41  import java.util.concurrent.atomic.AtomicReference;
42  import java.util.regex.Pattern;
43  
44  import javax.servlet.ServletException;
45  import javax.servlet.http.HttpServlet;
46  import javax.servlet.http.HttpServletRequest;
47  import javax.servlet.http.HttpServletResponse;
48  
49  import org.apache.commons.logging.Log;
50  import org.apache.commons.logging.LogFactory;
51  import org.apache.hadoop.conf.Configuration;
52  import org.apache.hadoop.fs.Path;
53  import org.apache.hadoop.hbase.ClusterStatus;
54  import org.apache.hadoop.hbase.CoordinatedStateException;
55  import org.apache.hadoop.hbase.CoordinatedStateManager;
56  import org.apache.hadoop.hbase.DoNotRetryIOException;
57  import org.apache.hadoop.hbase.HBaseIOException;
58  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
59  import org.apache.hadoop.hbase.HColumnDescriptor;
60  import org.apache.hadoop.hbase.HConstants;
61  import org.apache.hadoop.hbase.HRegionInfo;
62  import org.apache.hadoop.hbase.HTableDescriptor;
63  import org.apache.hadoop.hbase.MasterNotRunningException;
64  import org.apache.hadoop.hbase.MetaTableAccessor;
65  import org.apache.hadoop.hbase.NamespaceDescriptor;
66  import org.apache.hadoop.hbase.PleaseHoldException;
67  import org.apache.hadoop.hbase.ProcedureInfo;
68  import org.apache.hadoop.hbase.RegionStateListener;
69  import org.apache.hadoop.hbase.Server;
70  import org.apache.hadoop.hbase.ServerLoad;
71  import org.apache.hadoop.hbase.ServerName;
72  import org.apache.hadoop.hbase.TableDescriptor;
73  import org.apache.hadoop.hbase.TableDescriptors;
74  import org.apache.hadoop.hbase.TableName;
75  import org.apache.hadoop.hbase.TableNotDisabledException;
76  import org.apache.hadoop.hbase.TableNotFoundException;
77  import org.apache.hadoop.hbase.UnknownRegionException;
78  import org.apache.hadoop.hbase.classification.InterfaceAudience;
79  import org.apache.hadoop.hbase.client.RegionReplicaUtil;
80  import org.apache.hadoop.hbase.client.Result;
81  import org.apache.hadoop.hbase.client.TableState;
82  import org.apache.hadoop.hbase.coprocessor.BypassCoprocessorException;
83  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
84  import org.apache.hadoop.hbase.exceptions.DeserializationException;
85  import org.apache.hadoop.hbase.executor.ExecutorType;
86  import org.apache.hadoop.hbase.ipc.RpcServer;
87  import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
88  import org.apache.hadoop.hbase.master.MasterRpcServices.BalanceSwitchMode;
89  import org.apache.hadoop.hbase.master.balancer.BalancerChore;
90  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
91  import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
92  import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
93  import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
94  import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
95  import org.apache.hadoop.hbase.master.handler.DispatchMergingRegionHandler;
96  import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan;
97  import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
98  import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
99  import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerChore;
100 import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerFactory;
101 import org.apache.hadoop.hbase.master.procedure.AddColumnFamilyProcedure;
102 import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
103 import org.apache.hadoop.hbase.master.procedure.DeleteColumnFamilyProcedure;
104 import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure;
105 import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure;
106 import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure;
107 import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
108 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
109 import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent;
110 import org.apache.hadoop.hbase.master.procedure.ModifyColumnFamilyProcedure;
111 import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure;
112 import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
113 import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
114 import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure;
115 import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
116 import org.apache.hadoop.hbase.mob.MobConstants;
117 import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
118 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
119 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
120 import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
121 import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager;
122 import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
123 import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
124 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
125 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
126 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
127 import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
128 import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
129 import org.apache.hadoop.hbase.regionserver.HRegionServer;
130 import org.apache.hadoop.hbase.regionserver.HStore;
131 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
132 import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
133 import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy;
134 import org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolicy;
135 import org.apache.hadoop.hbase.regionserver.compactions.FIFOCompactionPolicy;
136 import org.apache.hadoop.hbase.replication.regionserver.Replication;
137 import org.apache.hadoop.hbase.security.User;
138 import org.apache.hadoop.hbase.security.UserProvider;
139 import org.apache.hadoop.hbase.util.Addressing;
140 import org.apache.hadoop.hbase.util.Bytes;
141 import org.apache.hadoop.hbase.util.CompressionTest;
142 import org.apache.hadoop.hbase.util.EncryptionTest;
143 import org.apache.hadoop.hbase.util.FSUtils;
144 import org.apache.hadoop.hbase.util.HFileArchiveUtil;
145 import org.apache.hadoop.hbase.util.HasThread;
146 import org.apache.hadoop.hbase.util.IdLock;
147 import org.apache.hadoop.hbase.util.ModifyRegionUtils;
148 import org.apache.hadoop.hbase.util.Pair;
149 import org.apache.hadoop.hbase.util.Threads;
150 import org.apache.hadoop.hbase.util.VersionInfo;
151 import org.apache.hadoop.hbase.util.ZKDataMigrator;
152 import org.apache.hadoop.hbase.zookeeper.DrainingServerTracker;
153 import org.apache.hadoop.hbase.zookeeper.LoadBalancerTracker;
154 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
155 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
156 import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker;
157 import org.apache.hadoop.hbase.zookeeper.RegionServerTracker;
158 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
159 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
160 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
161 import org.apache.zookeeper.KeeperException;
162 import org.mortbay.jetty.Connector;
163 import org.mortbay.jetty.nio.SelectChannelConnector;
164 import org.mortbay.jetty.servlet.Context;
165 
166 import com.google.common.annotations.VisibleForTesting;
167 import com.google.common.collect.Maps;
168 import com.google.protobuf.Descriptors;
169 import com.google.protobuf.Service;
170 
171 /**
172  * HMaster is the "master server" for HBase. An HBase cluster has one active
173  * master.  If many masters are started, all compete.  Whichever wins goes on to
174  * run the cluster.  All others park themselves in their constructor until
175  * master or cluster shutdown or until the active master loses its lease in
176  * zookeeper.  Thereafter, all running master jostle to take over master role.
177  *
178  * <p>The Master can be asked shutdown the cluster. See {@link #shutdown()}.  In
179  * this case it will tell all regionservers to go down and then wait on them
180  * all reporting in that they are down.  This master will then shut itself down.
181  *
182  * <p>You can also shutdown just this master.  Call {@link #stopMaster()}.
183  *
184  * @see org.apache.zookeeper.Watcher
185  */
186 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
187 @SuppressWarnings("deprecation")
188 public class HMaster extends HRegionServer implements MasterServices {
189   private static final Log LOG = LogFactory.getLog(HMaster.class.getName());
190 
191   /**
192    * Protection against zombie master. Started once Master accepts active responsibility and
193    * starts taking over responsibilities. Allows a finite time window before giving up ownership.
194    */
195   private static class InitializationMonitor extends HasThread {
196     /** The amount of time in milliseconds to sleep before checking initialization status. */
197     public static final String TIMEOUT_KEY = "hbase.master.initializationmonitor.timeout";
198     public static final long TIMEOUT_DEFAULT = TimeUnit.MILLISECONDS.convert(15, TimeUnit.MINUTES);
199 
200     /**
201      * When timeout expired and initialization has not complete, call {@link System#exit(int)} when
202      * true, do nothing otherwise.
203      */
204     public static final String HALT_KEY = "hbase.master.initializationmonitor.haltontimeout";
205     public static final boolean HALT_DEFAULT = false;
206 
207     private final HMaster master;
208     private final long timeout;
209     private final boolean haltOnTimeout;
210 
211     /** Creates a Thread that monitors the {@link #isInitialized()} state. */
212     InitializationMonitor(HMaster master) {
213       super("MasterInitializationMonitor");
214       this.master = master;
215       this.timeout = master.getConfiguration().getLong(TIMEOUT_KEY, TIMEOUT_DEFAULT);
216       this.haltOnTimeout = master.getConfiguration().getBoolean(HALT_KEY, HALT_DEFAULT);
217       this.setDaemon(true);
218     }
219 
220     @Override
221     public void run() {
222       try {
223         while (!master.isStopped() && master.isActiveMaster()) {
224           Thread.sleep(timeout);
225           if (master.isInitialized()) {
226             LOG.debug("Initialization completed within allotted tolerance. Monitor exiting.");
227           } else {
228             LOG.error("Master failed to complete initialization after " + timeout + "ms. Please"
229                 + " consider submitting a bug report including a thread dump of this process.");
230             if (haltOnTimeout) {
231               LOG.error("Zombie Master exiting. Thread dump to stdout");
232               Threads.printThreadInfo(System.out, "Zombie HMaster");
233               System.exit(-1);
234             }
235           }
236         }
237       } catch (InterruptedException ie) {
238         LOG.trace("InitMonitor thread interrupted. Existing.");
239       }
240     }
241   }
242 
243   // MASTER is name of the webapp and the attribute name used stuffing this
244   //instance into web context.
245   public static final String MASTER = "master";
246 
247   // Manager and zk listener for master election
248   private final ActiveMasterManager activeMasterManager;
249   // Region server tracker
250   RegionServerTracker regionServerTracker;
251   // Draining region server tracker
252   private DrainingServerTracker drainingServerTracker;
253   // Tracker for load balancer state
254   LoadBalancerTracker loadBalancerTracker;
255 
256   // Tracker for region normalizer state
257   private RegionNormalizerTracker regionNormalizerTracker;
258 
259   private ClusterSchemaService clusterSchemaService;
260 
261   // Metrics for the HMaster
262   final MetricsMaster metricsMaster;
263   // file system manager for the master FS operations
264   private MasterFileSystem fileSystemManager;
265 
266   // server manager to deal with region server info
267   volatile ServerManager serverManager;
268 
269   // manager of assignment nodes in zookeeper
270   AssignmentManager assignmentManager;
271 
272   // buffer for "fatal error" notices from region servers
273   // in the cluster. This is only used for assisting
274   // operations/debugging.
275   MemoryBoundedLogMessageBuffer rsFatals;
276 
277   // flag set after we become the active master (used for testing)
278   private volatile boolean isActiveMaster = false;
279 
280   // flag set after we complete initialization once active,
281   // it is not private since it's used in unit tests
282   private final ProcedureEvent initialized = new ProcedureEvent("master initialized");
283 
284   // flag set after master services are started,
285   // initialization may have not completed yet.
286   volatile boolean serviceStarted = false;
287 
288   // flag set after we complete assignMeta.
289   private final ProcedureEvent serverCrashProcessingEnabled =
290     new ProcedureEvent("server crash processing");
291 
292   LoadBalancer balancer;
293   private RegionNormalizer normalizer;
294   private BalancerChore balancerChore;
295   private RegionNormalizerChore normalizerChore;
296   private ClusterStatusChore clusterStatusChore;
297   private ClusterStatusPublisher clusterStatusPublisherChore = null;
298 
299   CatalogJanitor catalogJanitorChore;
300   private LogCleaner logCleaner;
301   private HFileCleaner hfileCleaner;
302   private ExpiredMobFileCleanerChore expiredMobFileCleanerChore;
303   private MobCompactionChore mobCompactChore;
304   private MasterMobCompactionThread mobCompactThread;
305   // used to synchronize the mobCompactionStates
306   private final IdLock mobCompactionLock = new IdLock();
307   // save the information of mob compactions in tables.
308   // the key is table name, the value is the number of compactions in that table.
309   private Map<TableName, AtomicInteger> mobCompactionStates = Maps.newConcurrentMap();
310 
311   MasterCoprocessorHost cpHost;
312 
313   private final boolean preLoadTableDescriptors;
314 
315   // Time stamps for when a hmaster became active
316   private long masterActiveTime;
317 
318   //should we check the compression codec type at master side, default true, HBASE-6370
319   private final boolean masterCheckCompression;
320 
321   //should we check encryption settings at master side, default true
322   private final boolean masterCheckEncryption;
323 
324   Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
325 
326   // monitor for snapshot of hbase tables
327   SnapshotManager snapshotManager;
328   // monitor for distributed procedures
329   MasterProcedureManagerHost mpmHost;
330 
331   // it is assigned after 'initialized' guard set to true, so should be volatile
332   private volatile MasterQuotaManager quotaManager;
333 
334   private ProcedureExecutor<MasterProcedureEnv> procedureExecutor;
335   private WALProcedureStore procedureStore;
336 
337   // handle table states
338   private TableStateManager tableStateManager;
339   
340   private long splitPlanCount;
341   private long mergePlanCount;
342 
343   /** flag used in test cases in order to simulate RS failures during master initialization */
344   private volatile boolean initializationBeforeMetaAssignment = false;
345 
346   /** jetty server for master to redirect requests to regionserver infoServer */
347   private org.mortbay.jetty.Server masterJettyServer;
348 
349   public static class RedirectServlet extends HttpServlet {
350     private static final long serialVersionUID = 2894774810058302472L;
351     private static int regionServerInfoPort;
352 
353     @Override
354     public void doGet(HttpServletRequest request,
355         HttpServletResponse response) throws ServletException, IOException {
356       String redirectUrl = request.getScheme() + "://"
357         + request.getServerName() + ":" + regionServerInfoPort
358         + request.getRequestURI();
359       response.sendRedirect(redirectUrl);
360     }
361   }
362 
363   /**
364    * Initializes the HMaster. The steps are as follows:
365    * <p>
366    * <ol>
367    * <li>Initialize the local HRegionServer
368    * <li>Start the ActiveMasterManager.
369    * </ol>
370    * <p>
371    * Remaining steps of initialization occur in
372    * #finishActiveMasterInitialization(MonitoredTask) after
373    * the master becomes the active one.
374    */
375   public HMaster(final Configuration conf, CoordinatedStateManager csm)
376       throws IOException, KeeperException {
377     super(conf, csm);
378     this.rsFatals = new MemoryBoundedLogMessageBuffer(
379       conf.getLong("hbase.master.buffer.for.rs.fatals", 1*1024*1024));
380 
381     LOG.info("hbase.rootdir=" + FSUtils.getRootDir(this.conf) +
382       ", hbase.cluster.distributed=" + this.conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, false));
383 
384     // Disable usage of meta replicas in the master
385     this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
386 
387     Replication.decorateMasterConfiguration(this.conf);
388 
389     // Hack! Maps DFSClient => Master for logs.  HDFS made this
390     // config param for task trackers, but we can piggyback off of it.
391     if (this.conf.get("mapreduce.task.attempt.id") == null) {
392       this.conf.set("mapreduce.task.attempt.id", "hb_m_" + this.serverName.toString());
393     }
394 
395     // should we check the compression codec type at master side, default true, HBASE-6370
396     this.masterCheckCompression = conf.getBoolean("hbase.master.check.compression", true);
397 
398     // should we check encryption settings at master side, default true
399     this.masterCheckEncryption = conf.getBoolean("hbase.master.check.encryption", true);
400 
401     this.metricsMaster = new MetricsMaster(new MetricsMasterWrapperImpl(this));
402 
403     // preload table descriptor at startup
404     this.preLoadTableDescriptors = conf.getBoolean("hbase.master.preload.tabledescriptors", true);
405 
406     // Do we publish the status?
407 
408     boolean shouldPublish = conf.getBoolean(HConstants.STATUS_PUBLISHED,
409         HConstants.STATUS_PUBLISHED_DEFAULT);
410     Class<? extends ClusterStatusPublisher.Publisher> publisherClass =
411         conf.getClass(ClusterStatusPublisher.STATUS_PUBLISHER_CLASS,
412             ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS,
413             ClusterStatusPublisher.Publisher.class);
414 
415     if (shouldPublish) {
416       if (publisherClass == null) {
417         LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
418             ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS +
419             " is not set - not publishing status");
420       } else {
421         clusterStatusPublisherChore = new ClusterStatusPublisher(this, conf, publisherClass);
422         getChoreService().scheduleChore(clusterStatusPublisherChore);
423       }
424     }
425 
426     // Some unit tests don't need a cluster, so no zookeeper at all
427     if (!conf.getBoolean("hbase.testing.nocluster", false)) {
428       activeMasterManager = new ActiveMasterManager(zooKeeper, this.serverName, this);
429       int infoPort = putUpJettyServer();
430       startActiveMasterManager(infoPort);
431     } else {
432       activeMasterManager = null;
433     }
434   }
435 
436   // return the actual infoPort, -1 means disable info server.
437   private int putUpJettyServer() throws IOException {
438     if (!conf.getBoolean("hbase.master.infoserver.redirect", true)) {
439       return -1;
440     }
441     int infoPort = conf.getInt("hbase.master.info.port.orig",
442       HConstants.DEFAULT_MASTER_INFOPORT);
443     // -1 is for disabling info server, so no redirecting
444     if (infoPort < 0 || infoServer == null) {
445       return -1;
446     }
447     String addr = conf.get("hbase.master.info.bindAddress", "0.0.0.0");
448     if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) {
449       String msg =
450           "Failed to start redirecting jetty server. Address " + addr
451               + " does not belong to this host. Correct configuration parameter: "
452               + "hbase.master.info.bindAddress";
453       LOG.error(msg);
454       throw new IOException(msg);
455     }
456 
457     RedirectServlet.regionServerInfoPort = infoServer.getPort();
458     if(RedirectServlet.regionServerInfoPort == infoPort) {
459       return infoPort;
460     }
461     masterJettyServer = new org.mortbay.jetty.Server();
462     Connector connector = new SelectChannelConnector();
463     connector.setHost(addr);
464     connector.setPort(infoPort);
465     masterJettyServer.addConnector(connector);
466     masterJettyServer.setStopAtShutdown(true);
467     Context context = new Context(masterJettyServer, "/", Context.NO_SESSIONS);
468     context.addServlet(RedirectServlet.class, "/*");
469     try {
470       masterJettyServer.start();
471     } catch (Exception e) {
472       throw new IOException("Failed to start redirecting jetty server", e);
473     }
474     return connector.getLocalPort();
475   }
476 
477   @Override
478   protected TableDescriptors getFsTableDescriptors() throws IOException {
479     return super.getFsTableDescriptors();
480   }
481 
482   /**
483    * For compatibility, if failed with regionserver credentials, try the master one
484    */
485   @Override
486   protected void login(UserProvider user, String host) throws IOException {
487     try {
488       super.login(user, host);
489     } catch (IOException ie) {
490       user.login("hbase.master.keytab.file",
491         "hbase.master.kerberos.principal", host);
492     }
493   }
494 
495   /**
496    * If configured to put regions on active master,
497    * wait till a backup master becomes active.
498    * Otherwise, loop till the server is stopped or aborted.
499    */
500   @Override
501   protected void waitForMasterActive(){
502     boolean tablesOnMaster = BaseLoadBalancer.tablesOnMaster(conf);
503     while (!(tablesOnMaster && isActiveMaster)
504         && !isStopped() && !isAborted()) {
505       sleeper.sleep();
506     }
507   }
508 
509   @VisibleForTesting
510   public MasterRpcServices getMasterRpcServices() {
511     return (MasterRpcServices)rpcServices;
512   }
513 
514   public boolean balanceSwitch(final boolean b) throws IOException {
515     return getMasterRpcServices().switchBalancer(b, BalanceSwitchMode.ASYNC);
516   }
517 
518   @Override
519   protected String getProcessName() {
520     return MASTER;
521   }
522 
523   @Override
524   protected boolean canCreateBaseZNode() {
525     return true;
526   }
527 
528   @Override
529   protected boolean canUpdateTableDescriptor() {
530     return true;
531   }
532 
533   @Override
534   protected RSRpcServices createRpcServices() throws IOException {
535     return new MasterRpcServices(this);
536   }
537 
538   @Override
539   protected void configureInfoServer() {
540     infoServer.addServlet("master-status", "/master-status", MasterStatusServlet.class);
541     infoServer.setAttribute(MASTER, this);
542     if (BaseLoadBalancer.tablesOnMaster(conf)) {
543       super.configureInfoServer();
544     }
545   }
546 
547   @Override
548   protected Class<? extends HttpServlet> getDumpServlet() {
549     return MasterDumpServlet.class;
550   }
551 
552   /**
553    * Emit the HMaster metrics, such as region in transition metrics.
554    * Surrounding in a try block just to be sure metrics doesn't abort HMaster.
555    */
556   @Override
557   protected void doMetrics() {
558     try {
559       if (assignmentManager != null) {
560         assignmentManager.updateRegionsInTransitionMetrics();
561       }
562     } catch (Throwable e) {
563       LOG.error("Couldn't update metrics: " + e.getMessage());
564     }
565   }
566 
567   MetricsMaster getMasterMetrics() {
568     return metricsMaster;
569   }
570 
571   /**
572    * Initialize all ZK based system trackers.
573    */
574   void initializeZKBasedSystemTrackers() throws IOException,
575       InterruptedException, KeeperException, CoordinatedStateException {
576     this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
577     this.normalizer = RegionNormalizerFactory.getRegionNormalizer(conf);
578     this.normalizer.setMasterServices(this);
579     this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this);
580     this.loadBalancerTracker.start();
581     this.regionNormalizerTracker = new RegionNormalizerTracker(zooKeeper, this);
582     this.regionNormalizerTracker.start();
583     this.assignmentManager = new AssignmentManager(this, serverManager,
584       this.balancer, this.service, this.metricsMaster,
585       this.tableLockManager, tableStateManager);
586 
587     this.regionServerTracker = new RegionServerTracker(zooKeeper, this, this.serverManager);
588     this.regionServerTracker.start();
589 
590     this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, this.serverManager);
591     this.drainingServerTracker.start();
592 
593     // Set the cluster as up.  If new RSs, they'll be waiting on this before
594     // going ahead with their startup.
595     boolean wasUp = this.clusterStatusTracker.isClusterUp();
596     if (!wasUp) this.clusterStatusTracker.setClusterUp();
597 
598     LOG.info("Server active/primary master=" + this.serverName +
599         ", sessionid=0x" +
600         Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) +
601         ", setting cluster-up flag (Was=" + wasUp + ")");
602 
603     // create/initialize the snapshot manager and other procedure managers
604     this.snapshotManager = new SnapshotManager();
605     this.mpmHost = new MasterProcedureManagerHost();
606     this.mpmHost.register(this.snapshotManager);
607     this.mpmHost.register(new MasterFlushTableProcedureManager());
608     this.mpmHost.loadProcedures(conf);
609     this.mpmHost.initialize(this, this.metricsMaster);
610 
611   }
612 
613   /**
614    * Finish initialization of HMaster after becoming the primary master.
615    *
616    * <ol>
617    * <li>Initialize master components - file system manager, server manager,
618    *     assignment manager, region server tracker, etc</li>
619    * <li>Start necessary service threads - balancer, catalog janior,
620    *     executor services, etc</li>
621    * <li>Set cluster as UP in ZooKeeper</li>
622    * <li>Wait for RegionServers to check-in</li>
623    * <li>Split logs and perform data recovery, if necessary</li>
624    * <li>Ensure assignment of meta/namespace regions<li>
625    * <li>Handle either fresh cluster start or master failover</li>
626    * </ol>
627    */
628   private void finishActiveMasterInitialization(MonitoredTask status)
629       throws IOException, InterruptedException, KeeperException, CoordinatedStateException {
630 
631     isActiveMaster = true;
632     Thread zombieDetector = new Thread(new InitializationMonitor(this));
633     zombieDetector.start();
634 
635     /*
636      * We are active master now... go initialize components we need to run.
637      * Note, there may be dross in zk from previous runs; it'll get addressed
638      * below after we determine if cluster startup or failover.
639      */
640 
641     status.setStatus("Initializing Master file system");
642 
643     this.masterActiveTime = System.currentTimeMillis();
644     // TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring.
645     this.fileSystemManager = new MasterFileSystem(this, this);
646 
647     // enable table descriptors cache
648     this.tableDescriptors.setCacheOn();
649     // set the META's descriptor to the correct replication
650     this.tableDescriptors.get(TableName.META_TABLE_NAME).setRegionReplication(
651         conf.getInt(HConstants.META_REPLICAS_NUM, HConstants.DEFAULT_META_REPLICA_NUM));
652     // warm-up HTDs cache on master initialization
653     if (preLoadTableDescriptors) {
654       status.setStatus("Pre-loading table descriptors");
655       this.tableDescriptors.getAll();
656     }
657 
658     // publish cluster ID
659     status.setStatus("Publishing Cluster ID in ZooKeeper");
660     ZKClusterId.setClusterId(this.zooKeeper, fileSystemManager.getClusterId());
661     this.serverManager = createServerManager(this, this);
662 
663     // Invalidate all write locks held previously
664     this.tableLockManager.reapWriteLocks();
665     this.tableStateManager = new TableStateManager(this);
666 
667     status.setStatus("Initializing ZK system trackers");
668     initializeZKBasedSystemTrackers();
669 
670     // initialize master side coprocessors before we start handling requests
671     status.setStatus("Initializing master coprocessors");
672     this.cpHost = new MasterCoprocessorHost(this, this.conf);
673 
674     // start up all service threads.
675     status.setStatus("Initializing master service threads");
676     startServiceThreads();
677 
678     // Wake up this server to check in
679     sleeper.skipSleepCycle();
680 
681     // Wait for region servers to report in
682     this.serverManager.waitForRegionServers(status);
683     // Check zk for region servers that are up but didn't register
684     for (ServerName sn: this.regionServerTracker.getOnlineServers()) {
685       // The isServerOnline check is opportunistic, correctness is handled inside
686       if (!this.serverManager.isServerOnline(sn)
687           && serverManager.checkAndRecordNewServer(sn, ServerLoad.EMPTY_SERVERLOAD)) {
688         LOG.info("Registered server found up in zk but who has not yet reported in: " + sn);
689       }
690     }
691 
692     // get a list for previously failed RS which need log splitting work
693     // we recover hbase:meta region servers inside master initialization and
694     // handle other failed servers in SSH in order to start up master node ASAP
695     Set<ServerName> previouslyFailedServers =
696       this.fileSystemManager.getFailedServersFromLogFolders();
697 
698     // log splitting for hbase:meta server
699     ServerName oldMetaServerLocation = metaTableLocator.getMetaRegionLocation(this.getZooKeeper());
700     if (oldMetaServerLocation != null && previouslyFailedServers.contains(oldMetaServerLocation)) {
701       splitMetaLogBeforeAssignment(oldMetaServerLocation);
702       // Note: we can't remove oldMetaServerLocation from previousFailedServers list because it
703       // may also host user regions
704     }
705     Set<ServerName> previouslyFailedMetaRSs = getPreviouselyFailedMetaServersFromZK();
706     // need to use union of previouslyFailedMetaRSs recorded in ZK and previouslyFailedServers
707     // instead of previouslyFailedMetaRSs alone to address the following two situations:
708     // 1) the chained failure situation(recovery failed multiple times in a row).
709     // 2) master get killed right before it could delete the recovering hbase:meta from ZK while the
710     // same server still has non-meta wals to be replayed so that
711     // removeStaleRecoveringRegionsFromZK can't delete the stale hbase:meta region
712     // Passing more servers into splitMetaLog is all right. If a server doesn't have hbase:meta wal,
713     // there is no op for the server.
714     previouslyFailedMetaRSs.addAll(previouslyFailedServers);
715 
716     this.initializationBeforeMetaAssignment = true;
717 
718     // Wait for regionserver to finish initialization.
719     if (BaseLoadBalancer.tablesOnMaster(conf)) {
720       waitForServerOnline();
721     }
722 
723     //initialize load balancer
724     this.balancer.setClusterStatus(getClusterStatus());
725     this.balancer.setMasterServices(this);
726     this.balancer.initialize();
727 
728     // Check if master is shutting down because of some issue
729     // in initializing the regionserver or the balancer.
730     if (isStopped()) return;
731 
732     // Make sure meta assigned before proceeding.
733     status.setStatus("Assigning Meta Region");
734     assignMeta(status, previouslyFailedMetaRSs, HRegionInfo.DEFAULT_REPLICA_ID);
735     // check if master is shutting down because above assignMeta could return even hbase:meta isn't
736     // assigned when master is shutting down
737     if (isStopped()) return;
738 
739     // migrating existent table state from zk, so splitters
740     // and recovery process treat states properly.
741     for (Map.Entry<TableName, TableState.State> entry : ZKDataMigrator
742         .queryForTableStates(getZooKeeper()).entrySet()) {
743       LOG.info("Converting state from zk to new states:" + entry);
744       tableStateManager.setTableState(entry.getKey(), entry.getValue());
745     }
746     ZKUtil.deleteChildrenRecursively(getZooKeeper(), getZooKeeper().tableZNode);
747 
748     status.setStatus("Submitting log splitting work for previously failed region servers");
749     // Master has recovered hbase:meta region server and we put
750     // other failed region servers in a queue to be handled later by SSH
751     for (ServerName tmpServer : previouslyFailedServers) {
752       this.serverManager.processDeadServer(tmpServer, true);
753     }
754 
755     // Fix up assignment manager status
756     status.setStatus("Starting assignment manager");
757     this.assignmentManager.joinCluster();
758 
759     // set cluster status again after user regions are assigned
760     this.balancer.setClusterStatus(getClusterStatus());
761 
762     // Start balancer and meta catalog janitor after meta and regions have been assigned.
763     status.setStatus("Starting balancer and catalog janitor");
764     this.clusterStatusChore = new ClusterStatusChore(this, balancer);
765     getChoreService().scheduleChore(clusterStatusChore);
766     this.balancerChore = new BalancerChore(this);
767     getChoreService().scheduleChore(balancerChore);
768     this.normalizerChore = new RegionNormalizerChore(this);
769     getChoreService().scheduleChore(normalizerChore);
770     this.catalogJanitorChore = new CatalogJanitor(this, this);
771     getChoreService().scheduleChore(catalogJanitorChore);
772 
773     status.setStatus("Starting cluster schema service");
774     initClusterSchemaService();
775 
776     if (this.cpHost != null) {
777       try {
778         this.cpHost.preMasterInitialization();
779       } catch (IOException e) {
780         LOG.error("Coprocessor preMasterInitialization() hook failed", e);
781       }
782     }
783 
784     status.markComplete("Initialization successful");
785     LOG.info("Master has completed initialization");
786     configurationManager.registerObserver(this.balancer);
787 
788     // Set master as 'initialized'.
789     setInitialized(true);
790 
791     // assign the meta replicas
792     Set<ServerName> EMPTY_SET = new HashSet<ServerName>();
793     int numReplicas = conf.getInt(HConstants.META_REPLICAS_NUM,
794            HConstants.DEFAULT_META_REPLICA_NUM);
795     for (int i = 1; i < numReplicas; i++) {
796       assignMeta(status, EMPTY_SET, i);
797     }
798     unassignExcessMetaReplica(zooKeeper, numReplicas);
799 
800     status.setStatus("Starting quota manager");
801     initQuotaManager();
802 
803     // clear the dead servers with same host name and port of online server because we are not
804     // removing dead server with same hostname and port of rs which is trying to check in before
805     // master initialization. See HBASE-5916.
806     this.serverManager.clearDeadServersWithSameHostNameAndPortOfOnlineServer();
807 
808     // Check and set the znode ACLs if needed in case we are overtaking a non-secure configuration
809     status.setStatus("Checking ZNode ACLs");
810     zooKeeper.checkAndSetZNodeAcls();
811 
812     status.setStatus("Calling postStartMaster coprocessors");
813 
814     this.expiredMobFileCleanerChore = new ExpiredMobFileCleanerChore(this);
815     getChoreService().scheduleChore(expiredMobFileCleanerChore);
816 
817     int mobCompactionPeriod = conf.getInt(MobConstants.MOB_COMPACTION_CHORE_PERIOD,
818       MobConstants.DEFAULT_MOB_COMPACTION_CHORE_PERIOD);
819     if (mobCompactionPeriod > 0) {
820       this.mobCompactChore = new MobCompactionChore(this, mobCompactionPeriod);
821       getChoreService().scheduleChore(mobCompactChore);
822     } else {
823       LOG
824         .info("The period is " + mobCompactionPeriod + " seconds, MobCompactionChore is disabled");
825     }
826     this.mobCompactThread = new MasterMobCompactionThread(this);
827 
828     if (this.cpHost != null) {
829       // don't let cp initialization errors kill the master
830       try {
831         this.cpHost.postStartMaster();
832       } catch (IOException ioe) {
833         LOG.error("Coprocessor postStartMaster() hook failed", ioe);
834       }
835     }
836 
837     zombieDetector.interrupt();
838   }
839 
840   /**
841    * Create a {@link ServerManager} instance.
842    */
843   ServerManager createServerManager(final Server master,
844       final MasterServices services)
845   throws IOException {
846     // We put this out here in a method so can do a Mockito.spy and stub it out
847     // w/ a mocked up ServerManager.
848     setupClusterConnection();
849     return new ServerManager(master, services);
850   }
851 
852   private void unassignExcessMetaReplica(ZooKeeperWatcher zkw, int numMetaReplicasConfigured) {
853     // unassign the unneeded replicas (for e.g., if the previous master was configured
854     // with a replication of 3 and now it is 2, we need to unassign the 1 unneeded replica)
855     try {
856       List<String> metaReplicaZnodes = zooKeeper.getMetaReplicaNodes();
857       for (String metaReplicaZnode : metaReplicaZnodes) {
858         int replicaId = zooKeeper.getMetaReplicaIdFromZnode(metaReplicaZnode);
859         if (replicaId >= numMetaReplicasConfigured) {
860           RegionState r = MetaTableLocator.getMetaRegionState(zkw, replicaId);
861           LOG.info("Closing excess replica of meta region " + r.getRegion());
862           // send a close and wait for a max of 30 seconds
863           ServerManager.closeRegionSilentlyAndWait(getClusterConnection(), r.getServerName(),
864               r.getRegion(), 30000);
865           ZKUtil.deleteNode(zkw, zkw.getZNodeForReplica(replicaId));
866         }
867       }
868     } catch (Exception ex) {
869       // ignore the exception since we don't want the master to be wedged due to potential
870       // issues in the cleanup of the extra regions. We can do that cleanup via hbck or manually
871       LOG.warn("Ignoring exception " + ex);
872     }
873   }
874 
875   /**
876    * Check <code>hbase:meta</code> is assigned. If not, assign it.
877    */
878   void assignMeta(MonitoredTask status, Set<ServerName> previouslyFailedMetaRSs, int replicaId)
879       throws InterruptedException, IOException, KeeperException {
880     // Work on meta region
881     int assigned = 0;
882     long timeout = this.conf.getLong("hbase.catalog.verification.timeout", 1000);
883     if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) {
884       status.setStatus("Assigning hbase:meta region");
885     } else {
886       status.setStatus("Assigning hbase:meta region, replicaId " + replicaId);
887     }
888 
889     // Get current meta state from zk.
890     RegionState metaState = MetaTableLocator.getMetaRegionState(getZooKeeper(), replicaId);
891     HRegionInfo hri = RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO,
892         replicaId);
893     RegionStates regionStates = assignmentManager.getRegionStates();
894     regionStates.createRegionState(hri, metaState.getState(),
895         metaState.getServerName(), null);
896 
897     if (!metaState.isOpened() || !metaTableLocator.verifyMetaRegionLocation(
898         this.getClusterConnection(), this.getZooKeeper(), timeout, replicaId)) {
899       ServerName currentMetaServer = metaState.getServerName();
900       if (serverManager.isServerOnline(currentMetaServer)) {
901         if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) {
902           LOG.info("Meta was in transition on " + currentMetaServer);
903         } else {
904           LOG.info("Meta with replicaId " + replicaId + " was in transition on " +
905                     currentMetaServer);
906         }
907         assignmentManager.processRegionsInTransition(Arrays.asList(metaState));
908       } else {
909         if (currentMetaServer != null) {
910           if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) {
911             splitMetaLogBeforeAssignment(currentMetaServer);
912             regionStates.logSplit(HRegionInfo.FIRST_META_REGIONINFO);
913             previouslyFailedMetaRSs.add(currentMetaServer);
914           }
915         }
916         LOG.info("Re-assigning hbase:meta with replicaId, " + replicaId +
917             " it was on " + currentMetaServer);
918         assignmentManager.assignMeta(hri);
919       }
920       assigned++;
921     }
922 
923     if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID)
924       getTableStateManager().setTableState(TableName.META_TABLE_NAME, TableState.State.ENABLED);
925     // TODO: should we prevent from using state manager before meta was initialized?
926     // tableStateManager.start();
927 
928     if ((RecoveryMode.LOG_REPLAY == this.getMasterFileSystem().getLogRecoveryMode())
929         && (!previouslyFailedMetaRSs.isEmpty())) {
930       // replay WAL edits mode need new hbase:meta RS is assigned firstly
931       status.setStatus("replaying log for Meta Region");
932       this.fileSystemManager.splitMetaLog(previouslyFailedMetaRSs);
933     }
934 
935     this.assignmentManager.setEnabledTable(TableName.META_TABLE_NAME);
936     tableStateManager.start();
937 
938     // Make sure a hbase:meta location is set. We need to enable SSH here since
939     // if the meta region server is died at this time, we need it to be re-assigned
940     // by SSH so that system tables can be assigned.
941     // No need to wait for meta is assigned = 0 when meta is just verified.
942     if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) enableCrashedServerProcessing(assigned != 0);
943     LOG.info("hbase:meta with replicaId " + replicaId + " assigned=" + assigned + ", location="
944       + metaTableLocator.getMetaRegionLocation(this.getZooKeeper(), replicaId));
945     status.setStatus("META assigned.");
946   }
947 
948   void initClusterSchemaService() throws IOException, InterruptedException {
949     this.clusterSchemaService = new ClusterSchemaServiceImpl(this);
950     this.clusterSchemaService.startAndWait();
951     if (!this.clusterSchemaService.isRunning()) throw new HBaseIOException("Failed start");
952   }
953 
954   void initQuotaManager() throws IOException {
955     MasterQuotaManager quotaManager = new MasterQuotaManager(this);
956     this.assignmentManager.setRegionStateListener((RegionStateListener)quotaManager);
957     quotaManager.start();
958     this.quotaManager = quotaManager;
959   }
960 
961   boolean isCatalogJanitorEnabled() {
962     return catalogJanitorChore != null ?
963       catalogJanitorChore.getEnabled() : false;
964   }
965 
966   private void splitMetaLogBeforeAssignment(ServerName currentMetaServer) throws IOException {
967     if (RecoveryMode.LOG_REPLAY == this.getMasterFileSystem().getLogRecoveryMode()) {
968       // In log replay mode, we mark hbase:meta region as recovering in ZK
969       Set<HRegionInfo> regions = new HashSet<HRegionInfo>();
970       regions.add(HRegionInfo.FIRST_META_REGIONINFO);
971       this.fileSystemManager.prepareLogReplay(currentMetaServer, regions);
972     } else {
973       // In recovered.edits mode: create recovered edits file for hbase:meta server
974       this.fileSystemManager.splitMetaLog(currentMetaServer);
975     }
976   }
977 
978   private void enableCrashedServerProcessing(final boolean waitForMeta)
979   throws IOException, InterruptedException {
980     // If crashed server processing is disabled, we enable it and expire those dead but not expired
981     // servers. This is required so that if meta is assigning to a server which dies after
982     // assignMeta starts assignment, ServerCrashProcedure can re-assign it. Otherwise, we will be
983     // stuck here waiting forever if waitForMeta is specified.
984     if (!isServerCrashProcessingEnabled()) {
985       setServerCrashProcessingEnabled(true);
986       this.serverManager.processQueuedDeadServers();
987     }
988 
989     if (waitForMeta) {
990       metaTableLocator.waitMetaRegionLocation(this.getZooKeeper());
991     }
992   }
993 
994   /**
995    * This function returns a set of region server names under hbase:meta recovering region ZK node
996    * @return Set of meta server names which were recorded in ZK
997    */
998   private Set<ServerName> getPreviouselyFailedMetaServersFromZK() throws KeeperException {
999     Set<ServerName> result = new HashSet<ServerName>();
1000     String metaRecoveringZNode = ZKUtil.joinZNode(zooKeeper.recoveringRegionsZNode,
1001       HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
1002     List<String> regionFailedServers = ZKUtil.listChildrenNoWatch(zooKeeper, metaRecoveringZNode);
1003     if (regionFailedServers == null) return result;
1004 
1005     for(String failedServer : regionFailedServers) {
1006       ServerName server = ServerName.parseServerName(failedServer);
1007       result.add(server);
1008     }
1009     return result;
1010   }
1011 
1012   @Override
1013   public TableDescriptors getTableDescriptors() {
1014     return this.tableDescriptors;
1015   }
1016 
1017   @Override
1018   public ServerManager getServerManager() {
1019     return this.serverManager;
1020   }
1021 
1022   @Override
1023   public MasterFileSystem getMasterFileSystem() {
1024     return this.fileSystemManager;
1025   }
1026 
1027   @Override
1028   public TableStateManager getTableStateManager() {
1029     return tableStateManager;
1030   }
1031 
1032   /*
1033    * Start up all services. If any of these threads gets an unhandled exception
1034    * then they just die with a logged message.  This should be fine because
1035    * in general, we do not expect the master to get such unhandled exceptions
1036    *  as OOMEs; it should be lightly loaded. See what HRegionServer does if
1037    *  need to install an unexpected exception handler.
1038    */
1039   private void startServiceThreads() throws IOException{
1040    // Start the executor service pools
1041    this.service.startExecutorService(ExecutorType.MASTER_OPEN_REGION,
1042       conf.getInt("hbase.master.executor.openregion.threads", 5));
1043    this.service.startExecutorService(ExecutorType.MASTER_CLOSE_REGION,
1044       conf.getInt("hbase.master.executor.closeregion.threads", 5));
1045    this.service.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
1046       conf.getInt("hbase.master.executor.serverops.threads", 5));
1047    this.service.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS,
1048       conf.getInt("hbase.master.executor.meta.serverops.threads", 5));
1049    this.service.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS,
1050       conf.getInt("hbase.master.executor.logreplayops.threads", 10));
1051 
1052    // We depend on there being only one instance of this executor running
1053    // at a time.  To do concurrency, would need fencing of enable/disable of
1054    // tables.
1055    // Any time changing this maxThreads to > 1, pls see the comment at
1056    // AccessController#postCreateTableHandler
1057    this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
1058    startProcedureExecutor();
1059 
1060    // Start log cleaner thread
1061    int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 60 * 1000);
1062    this.logCleaner =
1063       new LogCleaner(cleanerInterval,
1064          this, conf, getMasterFileSystem().getFileSystem(),
1065          getMasterFileSystem().getOldLogDir());
1066     getChoreService().scheduleChore(logCleaner);
1067 
1068    //start the hfile archive cleaner thread
1069     Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
1070     this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem()
1071         .getFileSystem(), archiveDir);
1072     getChoreService().scheduleChore(hfileCleaner);
1073     serviceStarted = true;
1074     if (LOG.isTraceEnabled()) {
1075       LOG.trace("Started service threads");
1076     }
1077   }
1078 
1079   @Override
1080   protected void sendShutdownInterrupt() {
1081     super.sendShutdownInterrupt();
1082     stopProcedureExecutor();
1083   }
1084 
1085   @Override
1086   protected void stopServiceThreads() {
1087     if (masterJettyServer != null) {
1088       LOG.info("Stopping master jetty server");
1089       try {
1090         masterJettyServer.stop();
1091       } catch (Exception e) {
1092         LOG.error("Failed to stop master jetty server", e);
1093       }
1094     }
1095     super.stopServiceThreads();
1096     stopChores();
1097 
1098     // Wait for all the remaining region servers to report in IFF we were
1099     // running a cluster shutdown AND we were NOT aborting.
1100     if (!isAborted() && this.serverManager != null &&
1101         this.serverManager.isClusterShutdown()) {
1102       this.serverManager.letRegionServersShutdown();
1103     }
1104     if (LOG.isDebugEnabled()) {
1105       LOG.debug("Stopping service threads");
1106     }
1107     // Clean up and close up shop
1108     if (this.logCleaner != null) this.logCleaner.cancel(true);
1109     if (this.hfileCleaner != null) this.hfileCleaner.cancel(true);
1110     if (this.quotaManager != null) this.quotaManager.stop();
1111     if (this.activeMasterManager != null) this.activeMasterManager.stop();
1112     if (this.serverManager != null) this.serverManager.stop();
1113     if (this.assignmentManager != null) this.assignmentManager.stop();
1114     if (this.fileSystemManager != null) this.fileSystemManager.stop();
1115     if (this.mpmHost != null) this.mpmHost.stop("server shutting down.");
1116   }
1117 
1118   private void startProcedureExecutor() throws IOException {
1119     final MasterProcedureEnv procEnv = new MasterProcedureEnv(this);
1120     final Path logDir = new Path(fileSystemManager.getRootDir(),
1121         MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR);
1122 
1123     procedureStore = new WALProcedureStore(conf, fileSystemManager.getFileSystem(), logDir,
1124         new MasterProcedureEnv.WALStoreLeaseRecovery(this));
1125     procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this));
1126     procedureExecutor = new ProcedureExecutor(conf, procEnv, procedureStore,
1127         procEnv.getProcedureQueue());
1128 
1129     final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS,
1130         Math.max(Runtime.getRuntime().availableProcessors(),
1131           MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS));
1132     final boolean abortOnCorruption = conf.getBoolean(
1133         MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION,
1134         MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION);
1135     procedureStore.start(numThreads);
1136     procedureExecutor.start(numThreads, abortOnCorruption);
1137   }
1138 
1139   private void stopProcedureExecutor() {
1140     if (procedureExecutor != null) {
1141       procedureExecutor.stop();
1142     }
1143 
1144     if (procedureStore != null) {
1145       procedureStore.stop(isAborted());
1146     }
1147   }
1148 
1149   private void stopChores() {
1150     if (this.expiredMobFileCleanerChore != null) {
1151       this.expiredMobFileCleanerChore.cancel(true);
1152     }
1153     if (this.mobCompactChore != null) {
1154       this.mobCompactChore.cancel(true);
1155     }
1156     if (this.balancerChore != null) {
1157       this.balancerChore.cancel(true);
1158     }
1159     if (this.normalizerChore != null) {
1160       this.normalizerChore.cancel(true);
1161     }
1162     if (this.clusterStatusChore != null) {
1163       this.clusterStatusChore.cancel(true);
1164     }
1165     if (this.catalogJanitorChore != null) {
1166       this.catalogJanitorChore.cancel(true);
1167     }
1168     if (this.clusterStatusPublisherChore != null){
1169       clusterStatusPublisherChore.cancel(true);
1170     }
1171     if (this.mobCompactThread != null) {
1172       this.mobCompactThread.close();
1173     }
1174   }
1175 
1176   /**
1177    * @return Get remote side's InetAddress
1178    */
1179   InetAddress getRemoteInetAddress(final int port,
1180       final long serverStartCode) throws UnknownHostException {
1181     // Do it out here in its own little method so can fake an address when
1182     // mocking up in tests.
1183     InetAddress ia = RpcServer.getRemoteIp();
1184 
1185     // The call could be from the local regionserver,
1186     // in which case, there is no remote address.
1187     if (ia == null && serverStartCode == startcode) {
1188       InetSocketAddress isa = rpcServices.getSocketAddress();
1189       if (isa != null && isa.getPort() == port) {
1190         ia = isa.getAddress();
1191       }
1192     }
1193     return ia;
1194   }
1195 
1196   /**
1197    * @return Maximum time we should run balancer for
1198    */
1199   private int getBalancerCutoffTime() {
1200     int balancerCutoffTime = getConfiguration().getInt("hbase.balancer.max.balancing", -1);
1201     if (balancerCutoffTime == -1) {
1202       // if cutoff time isn't set, defaulting it to period time
1203       int balancerPeriod = getConfiguration().getInt("hbase.balancer.period", 300000);
1204       balancerCutoffTime = balancerPeriod;
1205     }
1206     return balancerCutoffTime;
1207   }
1208 
1209   public boolean balance() throws IOException {
1210     return balance(false);
1211   }
1212 
1213   public boolean balance(boolean force) throws IOException {
1214     // if master not initialized, don't run balancer.
1215     if (!isInitialized()) {
1216       LOG.debug("Master has not been initialized, don't run balancer.");
1217       return false;
1218     }
1219     // Do this call outside of synchronized block.
1220     int maximumBalanceTime = getBalancerCutoffTime();
1221     synchronized (this.balancer) {
1222       // If balance not true, don't run balancer.
1223       if (!this.loadBalancerTracker.isBalancerOn()) return false;
1224       // Only allow one balance run at at time.
1225       if (this.assignmentManager.getRegionStates().isRegionsInTransition()) {
1226         Map<String, RegionState> regionsInTransition =
1227           this.assignmentManager.getRegionStates().getRegionsInTransition();
1228         // if hbase:meta region is in transition, result of assignment cannot be recorded
1229         // ignore the force flag in that case
1230         boolean metaInTransition = assignmentManager.getRegionStates().isMetaRegionInTransition();
1231         String prefix = force && !metaInTransition ? "R" : "Not r";
1232         LOG.debug(prefix + "unning balancer because " + regionsInTransition.size() +
1233           " region(s) in transition: " + org.apache.commons.lang.StringUtils.
1234             abbreviate(regionsInTransition.toString(), 256));
1235         if (!force || metaInTransition) return false;
1236       }
1237       if (this.serverManager.areDeadServersInProgress()) {
1238         LOG.debug("Not running balancer because processing dead regionserver(s): " +
1239           this.serverManager.getDeadServers());
1240         return false;
1241       }
1242 
1243       if (this.cpHost != null) {
1244         try {
1245           if (this.cpHost.preBalance()) {
1246             LOG.debug("Coprocessor bypassing balancer request");
1247             return false;
1248           }
1249         } catch (IOException ioe) {
1250           LOG.error("Error invoking master coprocessor preBalance()", ioe);
1251           return false;
1252         }
1253       }
1254 
1255       Map<TableName, Map<ServerName, List<HRegionInfo>>> assignmentsByTable =
1256         this.assignmentManager.getRegionStates().getAssignmentsByTable();
1257 
1258       List<RegionPlan> plans = new ArrayList<RegionPlan>();
1259 
1260       //Give the balancer the current cluster state.
1261       this.balancer.setClusterStatus(getClusterStatus());
1262       for (Entry<TableName, Map<ServerName, List<HRegionInfo>>> e : assignmentsByTable.entrySet()) {
1263         List<RegionPlan> partialPlans = this.balancer.balanceCluster(e.getKey(), e.getValue());
1264         if (partialPlans != null) plans.addAll(partialPlans);
1265       }
1266 
1267       long cutoffTime = System.currentTimeMillis() + maximumBalanceTime;
1268       int rpCount = 0;  // number of RegionPlans balanced so far
1269       long totalRegPlanExecTime = 0;
1270       if (plans != null && !plans.isEmpty()) {
1271         for (RegionPlan plan: plans) {
1272           LOG.info("balance " + plan);
1273           long balStartTime = System.currentTimeMillis();
1274           //TODO: bulk assign
1275           this.assignmentManager.balance(plan);
1276           totalRegPlanExecTime += System.currentTimeMillis()-balStartTime;
1277           rpCount++;
1278           if (rpCount < plans.size() &&
1279               // if performing next balance exceeds cutoff time, exit the loop
1280               (System.currentTimeMillis() + (totalRegPlanExecTime / rpCount)) > cutoffTime) {
1281             //TODO: After balance, there should not be a cutoff time (keeping it as
1282             // a security net for now)
1283             LOG.debug("No more balancing till next balance run; maximumBalanceTime=" +
1284               maximumBalanceTime);
1285             break;
1286           }
1287         }
1288       }
1289       if (this.cpHost != null) {
1290         try {
1291           this.cpHost.postBalance(rpCount < plans.size() ? plans.subList(0, rpCount) : plans);
1292         } catch (IOException ioe) {
1293           // balancing already succeeded so don't change the result
1294           LOG.error("Error invoking master coprocessor postBalance()", ioe);
1295         }
1296       }
1297     }
1298     // If LoadBalancer did not generate any plans, it means the cluster is already balanced.
1299     // Return true indicating a success.
1300     return true;
1301   }
1302 
1303   @VisibleForTesting
1304   public RegionNormalizer getRegionNormalizer() {
1305     return this.normalizer;
1306   }
1307 
1308   /**
1309    * Perform normalization of cluster (invoked by {@link RegionNormalizerChore}).
1310    *
1311    * @return true if normalization step was performed successfully, false otherwise
1312    *    (specifically, if HMaster hasn't been initialized properly or normalization
1313    *    is globally disabled)
1314    */
1315   public boolean normalizeRegions() throws IOException {
1316     if (!isInitialized()) {
1317       LOG.debug("Master has not been initialized, don't run region normalizer.");
1318       return false;
1319     }
1320 
1321     if (!this.regionNormalizerTracker.isNormalizerOn()) {
1322       LOG.debug("Region normalization is disabled, don't run region normalizer.");
1323       return false;
1324     }
1325 
1326     synchronized (this.normalizer) {
1327       // Don't run the normalizer concurrently
1328       List<TableName> allEnabledTables = new ArrayList<>(
1329         this.tableStateManager.getTablesInStates(TableState.State.ENABLED));
1330 
1331       Collections.shuffle(allEnabledTables);
1332 
1333       for (TableName table : allEnabledTables) {
1334         TableDescriptor tblDesc = getTableDescriptors().getDescriptor(table);
1335         if (table.isSystemTable() || (tblDesc != null &&
1336             tblDesc.getHTableDescriptor() != null &&
1337             !tblDesc.getHTableDescriptor().isNormalizationEnabled())) {
1338           LOG.debug("Skipping normalization for table: " + table + ", as it's either system"
1339               + " table or doesn't have auto normalization turned on");
1340           continue;
1341         }
1342         List<NormalizationPlan> plans = this.normalizer.computePlanForTable(table);
1343         if (plans != null) {
1344           for (NormalizationPlan plan : plans) {
1345             plan.execute(clusterConnection.getAdmin());
1346             if (plan.getType() == PlanType.SPLIT) {
1347               splitPlanCount++;
1348             } else if (plan.getType() == PlanType.MERGE) {
1349               mergePlanCount++;
1350             }
1351           }
1352         }
1353       }
1354     }
1355     // If Region did not generate any plans, it means the cluster is already balanced.
1356     // Return true indicating a success.
1357     return true;
1358   }
1359 
1360   /**
1361    * @return Client info for use as prefix on an audit log string; who did an action
1362    */
1363   String getClientIdAuditPrefix() {
1364     return "Client=" + RpcServer.getRequestUserName() + "/" + RpcServer.getRemoteAddress();
1365   }
1366 
1367   /**
1368    * Switch for the background CatalogJanitor thread.
1369    * Used for testing.  The thread will continue to run.  It will just be a noop
1370    * if disabled.
1371    * @param b If false, the catalog janitor won't do anything.
1372    */
1373   public void setCatalogJanitorEnabled(final boolean b) {
1374     this.catalogJanitorChore.setEnabled(b);
1375   }
1376 
1377   @Override
1378   public void dispatchMergingRegions(final HRegionInfo region_a,
1379       final HRegionInfo region_b, final boolean forcible, final User user) throws IOException {
1380     checkInitialized();
1381     this.service.submit(new DispatchMergingRegionHandler(this,
1382       this.catalogJanitorChore, region_a, region_b, forcible, user));
1383   }
1384 
1385   void move(final byte[] encodedRegionName,
1386       final byte[] destServerName) throws HBaseIOException {
1387     RegionState regionState = assignmentManager.getRegionStates().
1388       getRegionState(Bytes.toString(encodedRegionName));
1389     if (regionState == null) {
1390       throw new UnknownRegionException(Bytes.toStringBinary(encodedRegionName));
1391     }
1392 
1393     HRegionInfo hri = regionState.getRegion();
1394     ServerName dest;
1395     if (destServerName == null || destServerName.length == 0) {
1396       LOG.info("Passed destination servername is null/empty so " +
1397         "choosing a server at random");
1398       final List<ServerName> destServers = this.serverManager.createDestinationServersList(
1399         regionState.getServerName());
1400       dest = balancer.randomAssignment(hri, destServers);
1401       if (dest == null) {
1402         LOG.debug("Unable to determine a plan to assign " + hri);
1403         return;
1404       }
1405     } else {
1406       dest = ServerName.valueOf(Bytes.toString(destServerName));
1407       if (dest.equals(serverName) && balancer instanceof BaseLoadBalancer
1408           && !((BaseLoadBalancer)balancer).shouldBeOnMaster(hri)) {
1409         // To avoid unnecessary region moving later by balancer. Don't put user
1410         // regions on master. Regions on master could be put on other region
1411         // server intentionally by test however.
1412         LOG.debug("Skipping move of region " + hri.getRegionNameAsString()
1413           + " to avoid unnecessary region moving later by load balancer,"
1414           + " because it should not be on master");
1415         return;
1416       }
1417     }
1418 
1419     if (dest.equals(regionState.getServerName())) {
1420       LOG.debug("Skipping move of region " + hri.getRegionNameAsString()
1421         + " because region already assigned to the same server " + dest + ".");
1422       return;
1423     }
1424 
1425     // Now we can do the move
1426     RegionPlan rp = new RegionPlan(hri, regionState.getServerName(), dest);
1427 
1428     try {
1429       checkInitialized();
1430       if (this.cpHost != null) {
1431         if (this.cpHost.preMove(hri, rp.getSource(), rp.getDestination())) {
1432           return;
1433         }
1434       }
1435       // warmup the region on the destination before initiating the move. this call
1436       // is synchronous and takes some time. doing it before the source region gets
1437       // closed
1438       serverManager.sendRegionWarmup(rp.getDestination(), hri);
1439 
1440       LOG.info(getClientIdAuditPrefix() + " move " + rp + ", running balancer");
1441       this.assignmentManager.balance(rp);
1442       if (this.cpHost != null) {
1443         this.cpHost.postMove(hri, rp.getSource(), rp.getDestination());
1444       }
1445     } catch (IOException ioe) {
1446       if (ioe instanceof HBaseIOException) {
1447         throw (HBaseIOException)ioe;
1448       }
1449       throw new HBaseIOException(ioe);
1450     }
1451   }
1452 
1453   @Override
1454   public long createTable(
1455       final HTableDescriptor hTableDescriptor,
1456       final byte [][] splitKeys,
1457       final long nonceGroup,
1458       final long nonce) throws IOException {
1459     if (isStopped()) {
1460       throw new MasterNotRunningException();
1461     }
1462     checkInitialized();
1463     String namespace = hTableDescriptor.getTableName().getNamespaceAsString();
1464     this.clusterSchemaService.getNamespace(namespace);
1465 
1466     HRegionInfo[] newRegions = ModifyRegionUtils.createHRegionInfos(hTableDescriptor, splitKeys);
1467     checkInitialized();
1468     sanityCheckTableDescriptor(hTableDescriptor);
1469 
1470     if (cpHost != null) {
1471       cpHost.preCreateTable(hTableDescriptor, newRegions);
1472     }
1473     LOG.info(getClientIdAuditPrefix() + " create " + hTableDescriptor);
1474 
1475     // TODO: We can handle/merge duplicate requests, and differentiate the case of
1476     //       TableExistsException by saying if the schema is the same or not.
1477     ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch();
1478     long procId = this.procedureExecutor.submitProcedure(
1479       new CreateTableProcedure(
1480         procedureExecutor.getEnvironment(), hTableDescriptor, newRegions, latch),
1481       nonceGroup,
1482       nonce);
1483     latch.await();
1484 
1485     if (cpHost != null) {
1486       cpHost.postCreateTable(hTableDescriptor, newRegions);
1487     }
1488 
1489     return procId;
1490   }
1491 
1492   /**
1493    * Checks whether the table conforms to some sane limits, and configured
1494    * values (compression, etc) work. Throws an exception if something is wrong.
1495    * @throws IOException
1496    */
1497   private void sanityCheckTableDescriptor(final HTableDescriptor htd) throws IOException {
1498     final String CONF_KEY = "hbase.table.sanity.checks";
1499     boolean logWarn = false;
1500     if (!conf.getBoolean(CONF_KEY, true)) {
1501       logWarn = true;
1502     }
1503     String tableVal = htd.getConfigurationValue(CONF_KEY);
1504     if (tableVal != null && !Boolean.valueOf(tableVal)) {
1505       logWarn = true;
1506     }
1507 
1508     // check max file size
1509     long maxFileSizeLowerLimit = 2 * 1024 * 1024L; // 2M is the default lower limit
1510     long maxFileSize = htd.getMaxFileSize();
1511     if (maxFileSize < 0) {
1512       maxFileSize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, maxFileSizeLowerLimit);
1513     }
1514     if (maxFileSize < conf.getLong("hbase.hregion.max.filesize.limit", maxFileSizeLowerLimit)) {
1515       String message = "MAX_FILESIZE for table descriptor or "
1516           + "\"hbase.hregion.max.filesize\" (" + maxFileSize
1517           + ") is too small, which might cause over splitting into unmanageable "
1518           + "number of regions.";
1519       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1520     }
1521 
1522     // check flush size
1523     long flushSizeLowerLimit = 1024 * 1024L; // 1M is the default lower limit
1524     long flushSize = htd.getMemStoreFlushSize();
1525     if (flushSize < 0) {
1526       flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, flushSizeLowerLimit);
1527     }
1528     if (flushSize < conf.getLong("hbase.hregion.memstore.flush.size.limit", flushSizeLowerLimit)) {
1529       String message = "MEMSTORE_FLUSHSIZE for table descriptor or "
1530           + "\"hbase.hregion.memstore.flush.size\" ("+flushSize+") is too small, which might cause"
1531           + " very frequent flushing.";
1532       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1533     }
1534 
1535     // check that coprocessors and other specified plugin classes can be loaded
1536     try {
1537       checkClassLoading(conf, htd);
1538     } catch (Exception ex) {
1539       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, ex.getMessage(), null);
1540     }
1541 
1542     // check compression can be loaded
1543     try {
1544       checkCompression(htd);
1545     } catch (IOException e) {
1546       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, e.getMessage(), e);
1547     }
1548 
1549     // check encryption can be loaded
1550     try {
1551       checkEncryption(conf, htd);
1552     } catch (IOException e) {
1553       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, e.getMessage(), e);
1554     }
1555     // Verify compaction policy
1556     try{
1557       checkCompactionPolicy(conf, htd);
1558     } catch(IOException e){
1559       warnOrThrowExceptionForFailure(false, CONF_KEY, e.getMessage(), e);
1560     }
1561     // check that we have at least 1 CF
1562     if (htd.getColumnFamilies().length == 0) {
1563       String message = "Table should have at least one column family.";
1564       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1565     }
1566 
1567     for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1568       if (hcd.getTimeToLive() <= 0) {
1569         String message = "TTL for column family " + hcd.getNameAsString() + " must be positive.";
1570         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1571       }
1572 
1573       // check blockSize
1574       if (hcd.getBlocksize() < 1024 || hcd.getBlocksize() > 16 * 1024 * 1024) {
1575         String message = "Block size for column family " + hcd.getNameAsString()
1576             + "  must be between 1K and 16MB.";
1577         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1578       }
1579 
1580       // check versions
1581       if (hcd.getMinVersions() < 0) {
1582         String message = "Min versions for column family " + hcd.getNameAsString()
1583           + "  must be positive.";
1584         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1585       }
1586       // max versions already being checked
1587 
1588       // HBASE-13776 Setting illegal versions for HColumnDescriptor
1589       //  does not throw IllegalArgumentException
1590       // check minVersions <= maxVerions
1591       if (hcd.getMinVersions() > hcd.getMaxVersions()) {
1592         String message = "Min versions for column family " + hcd.getNameAsString()
1593             + " must be less than the Max versions.";
1594         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1595       }
1596 
1597       // check replication scope
1598       if (hcd.getScope() < 0) {
1599         String message = "Replication scope for column family "
1600           + hcd.getNameAsString() + "  must be positive.";
1601         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1602       }
1603 
1604       // check data replication factor, it can be 0(default value) when user has not explicitly
1605       // set the value, in this case we use default replication factor set in the file system.
1606       if (hcd.getDFSReplication() < 0) {
1607         String message = "HFile Replication for column family " + hcd.getNameAsString()
1608             + "  must be greater than zero.";
1609         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1610       }
1611 
1612       // TODO: should we check coprocessors and encryption ?
1613     }
1614   }
1615 
1616   private void checkCompactionPolicy(Configuration conf, HTableDescriptor htd)
1617       throws IOException {
1618     // FIFO compaction has some requirements
1619     // Actually FCP ignores periodic major compactions
1620     String className =
1621         htd.getConfigurationValue(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY);
1622     if (className == null) {
1623       className =
1624           conf.get(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY,
1625             ExploringCompactionPolicy.class.getName());
1626     }
1627 
1628     int blockingFileCount = HStore.DEFAULT_BLOCKING_STOREFILE_COUNT;
1629     String sv = htd.getConfigurationValue(HStore.BLOCKING_STOREFILES_KEY);
1630     if (sv != null) {
1631       blockingFileCount = Integer.parseInt(sv);
1632     } else {
1633       blockingFileCount = conf.getInt(HStore.BLOCKING_STOREFILES_KEY, blockingFileCount);
1634     }
1635 
1636     for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1637       String compactionPolicy =
1638           hcd.getConfigurationValue(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY);
1639       if (compactionPolicy == null) {
1640         compactionPolicy = className;
1641       }
1642       if (!compactionPolicy.equals(FIFOCompactionPolicy.class.getName())) {
1643         continue;
1644       }
1645       // FIFOCompaction
1646       String message = null;
1647 
1648       // 1. Check TTL
1649       if (hcd.getTimeToLive() == HColumnDescriptor.DEFAULT_TTL) {
1650         message = "Default TTL is not supported for FIFO compaction";
1651         throw new IOException(message);
1652       }
1653 
1654       // 2. Check min versions
1655       if (hcd.getMinVersions() > 0) {
1656         message = "MIN_VERSION > 0 is not supported for FIFO compaction";
1657         throw new IOException(message);
1658       }
1659 
1660       // 3. blocking file count
1661       String sbfc = htd.getConfigurationValue(HStore.BLOCKING_STOREFILES_KEY);
1662       if (sbfc != null) {
1663         blockingFileCount = Integer.parseInt(sbfc);
1664       }
1665       if (blockingFileCount < 1000) {
1666         message =
1667             "blocking file count '" + HStore.BLOCKING_STOREFILES_KEY + "' " + blockingFileCount
1668                 + " is below recommended minimum of 1000";
1669         throw new IOException(message);
1670       }
1671     }
1672   }
1673 
1674   // HBASE-13350 - Helper method to log warning on sanity check failures if checks disabled.
1675   private static void warnOrThrowExceptionForFailure(boolean logWarn, String confKey,
1676       String message, Exception cause) throws IOException {
1677     if (!logWarn) {
1678       throw new DoNotRetryIOException(message + " Set " + confKey +
1679           " to false at conf or table descriptor if you want to bypass sanity checks", cause);
1680     }
1681     LOG.warn(message);
1682   }
1683 
1684   private void startActiveMasterManager(int infoPort) throws KeeperException {
1685     String backupZNode = ZKUtil.joinZNode(
1686       zooKeeper.backupMasterAddressesZNode, serverName.toString());
1687     /*
1688     * Add a ZNode for ourselves in the backup master directory since we
1689     * may not become the active master. If so, we want the actual active
1690     * master to know we are backup masters, so that it won't assign
1691     * regions to us if so configured.
1692     *
1693     * If we become the active master later, ActiveMasterManager will delete
1694     * this node explicitly.  If we crash before then, ZooKeeper will delete
1695     * this node for us since it is ephemeral.
1696     */
1697     LOG.info("Adding backup master ZNode " + backupZNode);
1698     if (!MasterAddressTracker.setMasterAddress(zooKeeper, backupZNode,
1699         serverName, infoPort)) {
1700       LOG.warn("Failed create of " + backupZNode + " by " + serverName);
1701     }
1702 
1703     activeMasterManager.setInfoPort(infoPort);
1704     // Start a thread to try to become the active master, so we won't block here
1705     Threads.setDaemonThreadRunning(new Thread(new Runnable() {
1706       @Override
1707       public void run() {
1708         int timeout = conf.getInt(HConstants.ZK_SESSION_TIMEOUT,
1709           HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
1710         // If we're a backup master, stall until a primary to writes his address
1711         if (conf.getBoolean(HConstants.MASTER_TYPE_BACKUP,
1712           HConstants.DEFAULT_MASTER_TYPE_BACKUP)) {
1713           LOG.debug("HMaster started in backup mode. "
1714             + "Stalling until master znode is written.");
1715           // This will only be a minute or so while the cluster starts up,
1716           // so don't worry about setting watches on the parent znode
1717           while (!activeMasterManager.hasActiveMaster()) {
1718             LOG.debug("Waiting for master address ZNode to be written "
1719               + "(Also watching cluster state node)");
1720             Threads.sleep(timeout);
1721           }
1722         }
1723         MonitoredTask status = TaskMonitor.get().createStatus("Master startup");
1724         status.setDescription("Master startup");
1725         try {
1726           if (activeMasterManager.blockUntilBecomingActiveMaster(timeout, status)) {
1727             finishActiveMasterInitialization(status);
1728           }
1729         } catch (Throwable t) {
1730           status.setStatus("Failed to become active: " + t.getMessage());
1731           LOG.fatal("Failed to become active master", t);
1732           // HBASE-5680: Likely hadoop23 vs hadoop 20.x/1.x incompatibility
1733           if (t instanceof NoClassDefFoundError &&
1734             t.getMessage()
1735               .contains("org/apache/hadoop/hdfs/protocol/HdfsConstants$SafeModeAction")) {
1736             // improved error message for this special case
1737             abort("HBase is having a problem with its Hadoop jars.  You may need to "
1738               + "recompile HBase against Hadoop version "
1739               + org.apache.hadoop.util.VersionInfo.getVersion()
1740               + " or change your hadoop jars to start properly", t);
1741           } else {
1742             abort("Unhandled exception. Starting shutdown.", t);
1743           }
1744         } finally {
1745           status.cleanup();
1746         }
1747       }
1748     }, getServerName().toShortString() + ".activeMasterManager"));
1749   }
1750 
1751   private void checkCompression(final HTableDescriptor htd)
1752   throws IOException {
1753     if (!this.masterCheckCompression) return;
1754     for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1755       checkCompression(hcd);
1756     }
1757   }
1758 
1759   private void checkCompression(final HColumnDescriptor hcd)
1760   throws IOException {
1761     if (!this.masterCheckCompression) return;
1762     CompressionTest.testCompression(hcd.getCompressionType());
1763     CompressionTest.testCompression(hcd.getCompactionCompressionType());
1764   }
1765 
1766   private void checkEncryption(final Configuration conf, final HTableDescriptor htd)
1767   throws IOException {
1768     if (!this.masterCheckEncryption) return;
1769     for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1770       checkEncryption(conf, hcd);
1771     }
1772   }
1773 
1774   private void checkEncryption(final Configuration conf, final HColumnDescriptor hcd)
1775   throws IOException {
1776     if (!this.masterCheckEncryption) return;
1777     EncryptionTest.testEncryption(conf, hcd.getEncryptionType(), hcd.getEncryptionKey());
1778   }
1779 
1780   private void checkClassLoading(final Configuration conf, final HTableDescriptor htd)
1781   throws IOException {
1782     RegionSplitPolicy.getSplitPolicyClass(htd, conf);
1783     RegionCoprocessorHost.testTableCoprocessorAttrs(conf, htd);
1784   }
1785 
1786   private static boolean isCatalogTable(final TableName tableName) {
1787     return tableName.equals(TableName.META_TABLE_NAME);
1788   }
1789 
1790   @Override
1791   public long deleteTable(
1792       final TableName tableName,
1793       final long nonceGroup,
1794       final long nonce) throws IOException {
1795     checkInitialized();
1796     if (cpHost != null) {
1797       cpHost.preDeleteTable(tableName);
1798     }
1799     LOG.info(getClientIdAuditPrefix() + " delete " + tableName);
1800 
1801     // TODO: We can handle/merge duplicate request
1802     ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch();
1803     long procId = this.procedureExecutor.submitProcedure(
1804       new DeleteTableProcedure(procedureExecutor.getEnvironment(), tableName, latch),
1805       nonceGroup,
1806       nonce);
1807     latch.await();
1808 
1809     if (cpHost != null) {
1810       cpHost.postDeleteTable(tableName);
1811     }
1812 
1813     return procId;
1814   }
1815 
1816   @Override
1817   public long truncateTable(
1818       final TableName tableName,
1819       final boolean preserveSplits,
1820       final long nonceGroup,
1821       final long nonce) throws IOException {
1822     checkInitialized();
1823     if (cpHost != null) {
1824       cpHost.preTruncateTable(tableName);
1825     }
1826     LOG.info(getClientIdAuditPrefix() + " truncate " + tableName);
1827 
1828     long procId = this.procedureExecutor.submitProcedure(
1829       new TruncateTableProcedure(procedureExecutor.getEnvironment(), tableName, preserveSplits),
1830       nonceGroup,
1831       nonce);
1832     ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
1833 
1834     if (cpHost != null) {
1835       cpHost.postTruncateTable(tableName);
1836     }
1837     return procId;
1838   }
1839 
1840   @Override
1841   public long addColumn(
1842       final TableName tableName,
1843       final HColumnDescriptor columnDescriptor,
1844       final long nonceGroup,
1845       final long nonce)
1846       throws IOException {
1847     checkInitialized();
1848     checkCompression(columnDescriptor);
1849     checkEncryption(conf, columnDescriptor);
1850     if (cpHost != null) {
1851       if (cpHost.preAddColumn(tableName, columnDescriptor)) {
1852         return -1;
1853       }
1854     }
1855     // Execute the operation synchronously - wait for the operation to complete before continuing.
1856     long procId = this.procedureExecutor.submitProcedure(
1857       new AddColumnFamilyProcedure(procedureExecutor.getEnvironment(), tableName, columnDescriptor),
1858       nonceGroup,
1859       nonce);
1860     ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
1861     if (cpHost != null) {
1862       cpHost.postAddColumn(tableName, columnDescriptor);
1863     }
1864     return procId;
1865   }
1866 
1867   @Override
1868   public long modifyColumn(
1869       final TableName tableName,
1870       final HColumnDescriptor descriptor,
1871       final long nonceGroup,
1872       final long nonce)
1873       throws IOException {
1874     checkInitialized();
1875     checkCompression(descriptor);
1876     checkEncryption(conf, descriptor);
1877     if (cpHost != null) {
1878       if (cpHost.preModifyColumn(tableName, descriptor)) {
1879         return -1;
1880       }
1881     }
1882     LOG.info(getClientIdAuditPrefix() + " modify " + descriptor);
1883 
1884     // Execute the operation synchronously - wait for the operation to complete before continuing.
1885     long procId = this.procedureExecutor.submitProcedure(
1886       new ModifyColumnFamilyProcedure(procedureExecutor.getEnvironment(), tableName, descriptor),
1887       nonceGroup,
1888       nonce);
1889     ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
1890 
1891     if (cpHost != null) {
1892       cpHost.postModifyColumn(tableName, descriptor);
1893     }
1894     return procId;
1895   }
1896 
1897   @Override
1898   public long deleteColumn(
1899       final TableName tableName,
1900       final byte[] columnName,
1901       final long nonceGroup,
1902       final long nonce)
1903       throws IOException {
1904     checkInitialized();
1905     if (cpHost != null) {
1906       if (cpHost.preDeleteColumn(tableName, columnName)) {
1907         return -1;
1908       }
1909     }
1910     LOG.info(getClientIdAuditPrefix() + " delete " + Bytes.toString(columnName));
1911 
1912     // Execute the operation synchronously - wait for the operation to complete before continuing.
1913     long procId = this.procedureExecutor.submitProcedure(
1914       new DeleteColumnFamilyProcedure(procedureExecutor.getEnvironment(), tableName, columnName),
1915       nonceGroup,
1916       nonce);
1917     ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
1918 
1919     if (cpHost != null) {
1920       cpHost.postDeleteColumn(tableName, columnName);
1921     }
1922     return procId;
1923   }
1924 
1925   @Override
1926   public long enableTable(
1927       final TableName tableName,
1928       final long nonceGroup,
1929       final long nonce) throws IOException {
1930     checkInitialized();
1931     if (cpHost != null) {
1932       cpHost.preEnableTable(tableName);
1933     }
1934     LOG.info(getClientIdAuditPrefix() + " enable " + tableName);
1935 
1936     // Execute the operation asynchronously - client will check the progress of the operation
1937     final ProcedurePrepareLatch prepareLatch = ProcedurePrepareLatch.createLatch();
1938     long procId = this.procedureExecutor.submitProcedure(
1939       new EnableTableProcedure(procedureExecutor.getEnvironment(), tableName, false, prepareLatch),
1940       nonceGroup,
1941       nonce);
1942     // Before returning to client, we want to make sure that the table is prepared to be
1943     // enabled (the table is locked and the table state is set).
1944     //
1945     // Note: if the procedure throws exception, we will catch it and rethrow.
1946     prepareLatch.await();
1947 
1948     if (cpHost != null) {
1949       cpHost.postEnableTable(tableName);
1950     }
1951 
1952     return procId;
1953   }
1954 
1955   @Override
1956   public long disableTable(
1957       final TableName tableName,
1958       final long nonceGroup,
1959       final long nonce) throws IOException {
1960     checkInitialized();
1961     if (cpHost != null) {
1962       cpHost.preDisableTable(tableName);
1963     }
1964     LOG.info(getClientIdAuditPrefix() + " disable " + tableName);
1965 
1966     // Execute the operation asynchronously - client will check the progress of the operation
1967     final ProcedurePrepareLatch prepareLatch = ProcedurePrepareLatch.createLatch();
1968     // Execute the operation asynchronously - client will check the progress of the operation
1969     long procId = this.procedureExecutor.submitProcedure(
1970       new DisableTableProcedure(procedureExecutor.getEnvironment(), tableName, false, prepareLatch),
1971       nonceGroup,
1972       nonce);
1973     // Before returning to client, we want to make sure that the table is prepared to be
1974     // enabled (the table is locked and the table state is set).
1975     //
1976     // Note: if the procedure throws exception, we will catch it and rethrow.
1977     prepareLatch.await();
1978 
1979     if (cpHost != null) {
1980       cpHost.postDisableTable(tableName);
1981     }
1982 
1983     return procId;
1984   }
1985 
1986   /**
1987    * Return the region and current deployment for the region containing
1988    * the given row. If the region cannot be found, returns null. If it
1989    * is found, but not currently deployed, the second element of the pair
1990    * may be null.
1991    */
1992   @VisibleForTesting // Used by TestMaster.
1993   Pair<HRegionInfo, ServerName> getTableRegionForRow(
1994       final TableName tableName, final byte [] rowKey)
1995   throws IOException {
1996     final AtomicReference<Pair<HRegionInfo, ServerName>> result =
1997       new AtomicReference<Pair<HRegionInfo, ServerName>>(null);
1998 
1999     MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() {
2000         @Override
2001         public boolean visit(Result data) throws IOException {
2002           if (data == null || data.size() <= 0) {
2003             return true;
2004           }
2005           Pair<HRegionInfo, ServerName> pair =
2006               new Pair(MetaTableAccessor.getHRegionInfo(data),
2007                   MetaTableAccessor.getServerName(data,0));
2008           if (pair == null) {
2009             return false;
2010           }
2011           if (!pair.getFirst().getTable().equals(tableName)) {
2012             return false;
2013           }
2014           result.set(pair);
2015           return true;
2016         }
2017     };
2018 
2019     MetaTableAccessor.scanMeta(clusterConnection, visitor, tableName, rowKey, 1);
2020     return result.get();
2021   }
2022 
2023   @Override
2024   public long modifyTable(
2025       final TableName tableName,
2026       final HTableDescriptor descriptor,
2027       final long nonceGroup,
2028       final long nonce)
2029       throws IOException {
2030     checkInitialized();
2031     sanityCheckTableDescriptor(descriptor);
2032     if (cpHost != null) {
2033       cpHost.preModifyTable(tableName, descriptor);
2034     }
2035 
2036     LOG.info(getClientIdAuditPrefix() + " modify " + tableName);
2037 
2038     // Execute the operation synchronously - wait for the operation completes before continuing.
2039     long procId = this.procedureExecutor.submitProcedure(
2040       new ModifyTableProcedure(procedureExecutor.getEnvironment(), descriptor),
2041       nonceGroup,
2042       nonce);
2043 
2044     ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
2045 
2046     if (cpHost != null) {
2047       cpHost.postModifyTable(tableName, descriptor);
2048     }
2049 
2050     return procId;
2051   }
2052 
2053   @Override
2054   public void checkTableModifiable(final TableName tableName)
2055       throws IOException, TableNotFoundException, TableNotDisabledException {
2056     if (isCatalogTable(tableName)) {
2057       throw new IOException("Can't modify catalog tables");
2058     }
2059     if (!MetaTableAccessor.tableExists(getConnection(), tableName)) {
2060       throw new TableNotFoundException(tableName);
2061     }
2062     if (!getAssignmentManager().getTableStateManager().
2063         isTableState(tableName, TableState.State.DISABLED)) {
2064       throw new TableNotDisabledException(tableName);
2065     }
2066   }
2067 
2068   /**
2069    * @return cluster status
2070    */
2071   public ClusterStatus getClusterStatus() throws InterruptedIOException {
2072     // Build Set of backup masters from ZK nodes
2073     List<String> backupMasterStrings;
2074     try {
2075       backupMasterStrings = ZKUtil.listChildrenNoWatch(this.zooKeeper,
2076         this.zooKeeper.backupMasterAddressesZNode);
2077     } catch (KeeperException e) {
2078       LOG.warn(this.zooKeeper.prefix("Unable to list backup servers"), e);
2079       backupMasterStrings = null;
2080     }
2081 
2082     List<ServerName> backupMasters = null;
2083     if (backupMasterStrings != null && !backupMasterStrings.isEmpty()) {
2084       backupMasters = new ArrayList<ServerName>(backupMasterStrings.size());
2085       for (String s: backupMasterStrings) {
2086         try {
2087           byte [] bytes;
2088           try {
2089             bytes = ZKUtil.getData(this.zooKeeper, ZKUtil.joinZNode(
2090                 this.zooKeeper.backupMasterAddressesZNode, s));
2091           } catch (InterruptedException e) {
2092             throw new InterruptedIOException();
2093           }
2094           if (bytes != null) {
2095             ServerName sn;
2096             try {
2097               sn = ServerName.parseFrom(bytes);
2098             } catch (DeserializationException e) {
2099               LOG.warn("Failed parse, skipping registering backup server", e);
2100               continue;
2101             }
2102             backupMasters.add(sn);
2103           }
2104         } catch (KeeperException e) {
2105           LOG.warn(this.zooKeeper.prefix("Unable to get information about " +
2106                    "backup servers"), e);
2107         }
2108       }
2109       Collections.sort(backupMasters, new Comparator<ServerName>() {
2110         @Override
2111         public int compare(ServerName s1, ServerName s2) {
2112           return s1.getServerName().compareTo(s2.getServerName());
2113         }});
2114     }
2115 
2116     String clusterId = fileSystemManager != null ?
2117       fileSystemManager.getClusterId().toString() : null;
2118     Map<String, RegionState> regionsInTransition = assignmentManager != null ?
2119       assignmentManager.getRegionStates().getRegionsInTransition() : null;
2120     String[] coprocessors = cpHost != null ? getMasterCoprocessors() : null;
2121     boolean balancerOn = loadBalancerTracker != null ?
2122       loadBalancerTracker.isBalancerOn() : false;
2123     Map<ServerName, ServerLoad> onlineServers = null;
2124     Set<ServerName> deadServers = null;
2125     if (serverManager != null) {
2126       deadServers = serverManager.getDeadServers().copyServerNames();
2127       onlineServers = serverManager.getOnlineServers();
2128     }
2129     return new ClusterStatus(VersionInfo.getVersion(), clusterId,
2130       onlineServers, deadServers, serverName, backupMasters,
2131       regionsInTransition, coprocessors, balancerOn);
2132   }
2133 
2134   /**
2135    * The set of loaded coprocessors is stored in a static set. Since it's
2136    * statically allocated, it does not require that HMaster's cpHost be
2137    * initialized prior to accessing it.
2138    * @return a String representation of the set of names of the loaded coprocessors.
2139    */
2140   public static String getLoadedCoprocessors() {
2141     return CoprocessorHost.getLoadedCoprocessors().toString();
2142   }
2143 
2144   /**
2145    * @return timestamp in millis when HMaster was started.
2146    */
2147   public long getMasterStartTime() {
2148     return startcode;
2149   }
2150 
2151   /**
2152    * @return timestamp in millis when HMaster became the active master.
2153    */
2154   public long getMasterActiveTime() {
2155     return masterActiveTime;
2156   }
2157 
2158   public int getNumWALFiles() {
2159     return procedureStore != null ? procedureStore.getActiveLogs().size() : 0;
2160   }
2161 
2162   public int getRegionServerInfoPort(final ServerName sn) {
2163     RegionServerInfo info = this.regionServerTracker.getRegionServerInfo(sn);
2164     if (info == null || info.getInfoPort() == 0) {
2165       return conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
2166         HConstants.DEFAULT_REGIONSERVER_INFOPORT);
2167     }
2168     return info.getInfoPort();
2169   }
2170 
2171   public String getRegionServerVersion(final ServerName sn) {
2172     RegionServerInfo info = this.regionServerTracker.getRegionServerInfo(sn);
2173     if (info != null && info.hasVersionInfo()) {
2174       return info.getVersionInfo().getVersion();
2175     }
2176     return "Unknown";
2177   }
2178 
2179   /**
2180    * @return array of coprocessor SimpleNames.
2181    */
2182   public String[] getMasterCoprocessors() {
2183     Set<String> masterCoprocessors = getMasterCoprocessorHost().getCoprocessors();
2184     return masterCoprocessors.toArray(new String[masterCoprocessors.size()]);
2185   }
2186 
2187   @Override
2188   public void abort(final String msg, final Throwable t) {
2189     if (isAborted() || isStopped()) {
2190       return;
2191     }
2192     if (cpHost != null) {
2193       // HBASE-4014: dump a list of loaded coprocessors.
2194       LOG.fatal("Master server abort: loaded coprocessors are: " +
2195           getLoadedCoprocessors());
2196     }
2197     if (t != null) LOG.fatal(msg, t);
2198     stopMaster();
2199   }
2200 
2201   @Override
2202   public ZooKeeperWatcher getZooKeeper() {
2203     return zooKeeper;
2204   }
2205 
2206   @Override
2207   public MasterCoprocessorHost getMasterCoprocessorHost() {
2208     return cpHost;
2209   }
2210 
2211   @Override
2212   public MasterQuotaManager getMasterQuotaManager() {
2213     return quotaManager;
2214   }
2215 
2216   @Override
2217   public ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
2218     return procedureExecutor;
2219   }
2220 
2221   @Override
2222   public ServerName getServerName() {
2223     return this.serverName;
2224   }
2225 
2226   @Override
2227   public AssignmentManager getAssignmentManager() {
2228     return this.assignmentManager;
2229   }
2230 
2231   public MemoryBoundedLogMessageBuffer getRegionServerFatalLogBuffer() {
2232     return rsFatals;
2233   }
2234 
2235   public void shutdown() {
2236     if (cpHost != null) {
2237       try {
2238         cpHost.preShutdown();
2239       } catch (IOException ioe) {
2240         LOG.error("Error call master coprocessor preShutdown()", ioe);
2241       }
2242     }
2243 
2244     if (this.serverManager != null) {
2245       this.serverManager.shutdownCluster();
2246     }
2247     if (this.clusterStatusTracker != null){
2248       try {
2249         this.clusterStatusTracker.setClusterDown();
2250       } catch (KeeperException e) {
2251         LOG.error("ZooKeeper exception trying to set cluster as down in ZK", e);
2252       }
2253     }
2254   }
2255 
2256   public void stopMaster() {
2257     if (cpHost != null) {
2258       try {
2259         cpHost.preStopMaster();
2260       } catch (IOException ioe) {
2261         LOG.error("Error call master coprocessor preStopMaster()", ioe);
2262       }
2263     }
2264     stop("Stopped by " + Thread.currentThread().getName());
2265   }
2266 
2267   void checkServiceStarted() throws ServerNotRunningYetException {
2268     if (!serviceStarted) {
2269       throw new ServerNotRunningYetException("Server is not running yet");
2270     }
2271   }
2272 
2273   void checkInitialized() throws PleaseHoldException, ServerNotRunningYetException {
2274     checkServiceStarted();
2275     if (!isInitialized()) throw new PleaseHoldException("Master is initializing");
2276   }
2277 
2278   /**
2279    * Report whether this master is currently the active master or not.
2280    * If not active master, we are parked on ZK waiting to become active.
2281    *
2282    * This method is used for testing.
2283    *
2284    * @return true if active master, false if not.
2285    */
2286   public boolean isActiveMaster() {
2287     return isActiveMaster;
2288   }
2289 
2290   /**
2291    * Report whether this master has completed with its initialization and is
2292    * ready.  If ready, the master is also the active master.  A standby master
2293    * is never ready.
2294    *
2295    * This method is used for testing.
2296    *
2297    * @return true if master is ready to go, false if not.
2298    */
2299   @Override
2300   public boolean isInitialized() {
2301     return initialized.isReady();
2302   }
2303 
2304   @VisibleForTesting
2305   public void setInitialized(boolean isInitialized) {
2306     procedureExecutor.getEnvironment().setEventReady(initialized, isInitialized);
2307   }
2308 
2309   public ProcedureEvent getInitializedEvent() {
2310     return initialized;
2311   }
2312 
2313   /**
2314    * ServerCrashProcessingEnabled is set false before completing assignMeta to prevent processing
2315    * of crashed servers.
2316    * @return true if assignMeta has completed;
2317    */
2318   @Override
2319   public boolean isServerCrashProcessingEnabled() {
2320     return serverCrashProcessingEnabled.isReady();
2321   }
2322 
2323   @VisibleForTesting
2324   public void setServerCrashProcessingEnabled(final boolean b) {
2325     procedureExecutor.getEnvironment().setEventReady(serverCrashProcessingEnabled, b);
2326   }
2327 
2328   public ProcedureEvent getServerCrashProcessingEnabledEvent() {
2329     return serverCrashProcessingEnabled;
2330   }
2331 
2332   /**
2333    * Report whether this master has started initialization and is about to do meta region assignment
2334    * @return true if master is in initialization &amp; about to assign hbase:meta regions
2335    */
2336   public boolean isInitializationStartsMetaRegionAssignment() {
2337     return this.initializationBeforeMetaAssignment;
2338   }
2339 
2340   public void assignRegion(HRegionInfo hri) {
2341     assignmentManager.assign(hri);
2342   }
2343 
2344   /**
2345    * Compute the average load across all region servers.
2346    * Currently, this uses a very naive computation - just uses the number of
2347    * regions being served, ignoring stats about number of requests.
2348    * @return the average load
2349    */
2350   public double getAverageLoad() {
2351     if (this.assignmentManager == null) {
2352       return 0;
2353     }
2354 
2355     RegionStates regionStates = this.assignmentManager.getRegionStates();
2356     if (regionStates == null) {
2357       return 0;
2358     }
2359     return regionStates.getAverageLoad();
2360   }
2361   
2362   /*
2363    * @return the count of region split plans executed
2364    */
2365   public long getSplitPlanCount() {
2366     return splitPlanCount;
2367   }
2368 
2369   /*
2370    * @return the count of region merge plans executed
2371    */
2372   public long getMergePlanCount() {
2373     return mergePlanCount;
2374   }
2375 
2376   @Override
2377   public boolean registerService(Service instance) {
2378     /*
2379      * No stacking of instances is allowed for a single service name
2380      */
2381     Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
2382     if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
2383       LOG.error("Coprocessor service "+serviceDesc.getFullName()+
2384           " already registered, rejecting request from "+instance
2385       );
2386       return false;
2387     }
2388 
2389     coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
2390     if (LOG.isDebugEnabled()) {
2391       LOG.debug("Registered master coprocessor service: service="+serviceDesc.getFullName());
2392     }
2393     return true;
2394   }
2395 
2396   /**
2397    * Utility for constructing an instance of the passed HMaster class.
2398    * @param masterClass
2399    * @return HMaster instance.
2400    */
2401   public static HMaster constructMaster(Class<? extends HMaster> masterClass,
2402       final Configuration conf, final CoordinatedStateManager cp)  {
2403     try {
2404       Constructor<? extends HMaster> c =
2405         masterClass.getConstructor(Configuration.class, CoordinatedStateManager.class);
2406       return c.newInstance(conf, cp);
2407     } catch(Exception e) {
2408       Throwable error = e;
2409       if (e instanceof InvocationTargetException &&
2410           ((InvocationTargetException)e).getTargetException() != null) {
2411         error = ((InvocationTargetException)e).getTargetException();
2412       }
2413       throw new RuntimeException("Failed construction of Master: " + masterClass.toString() + ". "
2414         , error);
2415     }
2416   }
2417 
2418   /**
2419    * @see org.apache.hadoop.hbase.master.HMasterCommandLine
2420    */
2421   public static void main(String [] args) {
2422     VersionInfo.logVersion();
2423     new HMasterCommandLine(HMaster.class).doMain(args);
2424   }
2425 
2426   public HFileCleaner getHFileCleaner() {
2427     return this.hfileCleaner;
2428   }
2429 
2430   /**
2431    * Exposed for TESTING!
2432    * @return the underlying snapshot manager
2433    */
2434   public SnapshotManager getSnapshotManagerForTesting() {
2435     return this.snapshotManager;
2436   }
2437 
2438   @Override
2439   public ClusterSchema getClusterSchema() {
2440     return this.clusterSchemaService;
2441   }
2442 
2443   /**
2444    * Create a new Namespace.
2445    * @param namespaceDescriptor descriptor for new Namespace
2446    * @param nonceGroup Identifier for the source of the request, a client or process.
2447    * @param nonce A unique identifier for this operation from the client or process identified by
2448    * <code>nonceGroup</code> (the source must ensure each operation gets a unique id).
2449    * @return procedure id
2450    */
2451   long createNamespace(final NamespaceDescriptor namespaceDescriptor, final long nonceGroup,
2452       final long nonce)
2453   throws IOException {
2454     checkInitialized();
2455     TableName.isLegalNamespaceName(Bytes.toBytes(namespaceDescriptor.getName()));
2456     if (this.cpHost != null && this.cpHost.preCreateNamespace(namespaceDescriptor)) {
2457       throw new BypassCoprocessorException();
2458     }
2459     LOG.info(getClientIdAuditPrefix() + " creating " + namespaceDescriptor);
2460     // Execute the operation synchronously - wait for the operation to complete before continuing.
2461     long procId = getClusterSchema().createNamespace(namespaceDescriptor, nonceGroup, nonce);
2462     if (this.cpHost != null) this.cpHost.postCreateNamespace(namespaceDescriptor);
2463     return procId;
2464   }
2465 
2466   /**
2467    * Modify an existing Namespace.
2468    * @param nonceGroup Identifier for the source of the request, a client or process.
2469    * @param nonce A unique identifier for this operation from the client or process identified by
2470    * <code>nonceGroup</code> (the source must ensure each operation gets a unique id).
2471    * @return procedure id
2472    */
2473   long modifyNamespace(final NamespaceDescriptor namespaceDescriptor, final long nonceGroup,
2474       final long nonce)
2475   throws IOException {
2476     checkInitialized();
2477     TableName.isLegalNamespaceName(Bytes.toBytes(namespaceDescriptor.getName()));
2478     if (this.cpHost != null && this.cpHost.preModifyNamespace(namespaceDescriptor)) {
2479       throw new BypassCoprocessorException();
2480     }
2481     LOG.info(getClientIdAuditPrefix() + " modify " + namespaceDescriptor);
2482     // Execute the operation synchronously - wait for the operation to complete before continuing.
2483     long procId = getClusterSchema().modifyNamespace(namespaceDescriptor, nonceGroup, nonce);
2484     if (this.cpHost != null) this.cpHost.postModifyNamespace(namespaceDescriptor);
2485     return procId;
2486   }
2487 
2488   /**
2489    * Delete an existing Namespace. Only empty Namespaces (no tables) can be removed.
2490    * @param nonceGroup Identifier for the source of the request, a client or process.
2491    * @param nonce A unique identifier for this operation from the client or process identified by
2492    * <code>nonceGroup</code> (the source must ensure each operation gets a unique id).
2493    * @return procedure id
2494    */
2495   long deleteNamespace(final String name, final long nonceGroup, final long nonce)
2496   throws IOException {
2497     checkInitialized();
2498     if (this.cpHost != null && this.cpHost.preDeleteNamespace(name)) {
2499       throw new BypassCoprocessorException();
2500     }
2501     LOG.info(getClientIdAuditPrefix() + " delete " + name);
2502     // Execute the operation synchronously - wait for the operation to complete before continuing.
2503     long procId = getClusterSchema().deleteNamespace(name, nonceGroup, nonce);
2504     if (this.cpHost != null) this.cpHost.postDeleteNamespace(name);
2505     return procId;
2506   }
2507 
2508   /**
2509    * Get a Namespace
2510    * @param name Name of the Namespace
2511    * @return Namespace descriptor for <code>name</code>
2512    */
2513   NamespaceDescriptor getNamespace(String name) throws IOException {
2514     checkInitialized();
2515     if (this.cpHost != null) this.cpHost.preGetNamespaceDescriptor(name);
2516     NamespaceDescriptor nsd = this.clusterSchemaService.getNamespace(name);
2517     if (this.cpHost != null) this.cpHost.postGetNamespaceDescriptor(nsd);
2518     return nsd;
2519   }
2520 
2521   /**
2522    * Get all Namespaces
2523    * @return All Namespace descriptors
2524    */
2525   List<NamespaceDescriptor> getNamespaces() throws IOException {
2526     checkInitialized();
2527     final List<NamespaceDescriptor> nsds = new ArrayList<NamespaceDescriptor>();
2528     boolean bypass = false;
2529     if (cpHost != null) {
2530       bypass = cpHost.preListNamespaceDescriptors(nsds);
2531     }
2532     if (!bypass) {
2533       nsds.addAll(this.clusterSchemaService.getNamespaces());
2534       if (this.cpHost != null) this.cpHost.postListNamespaceDescriptors(nsds);
2535     }
2536     return nsds;
2537   }
2538 
2539   @Override
2540   public List<TableName> listTableNamesByNamespace(String name) throws IOException {
2541     checkInitialized();
2542     return listTableNames(name, null, true);
2543   }
2544 
2545   @Override
2546   public List<HTableDescriptor> listTableDescriptorsByNamespace(String name) throws IOException {
2547     checkInitialized();
2548     return listTableDescriptors(name, null, null, true);
2549   }
2550 
2551   @Override
2552   public boolean abortProcedure(final long procId, final boolean mayInterruptIfRunning)
2553       throws IOException {
2554     if (cpHost != null) {
2555       cpHost.preAbortProcedure(this.procedureExecutor, procId);
2556     }
2557 
2558     final boolean result = this.procedureExecutor.abort(procId, mayInterruptIfRunning);
2559 
2560     if (cpHost != null) {
2561       cpHost.postAbortProcedure();
2562     }
2563 
2564     return result;
2565   }
2566 
2567   @Override
2568   public List<ProcedureInfo> listProcedures() throws IOException {
2569     if (cpHost != null) {
2570       cpHost.preListProcedures();
2571     }
2572 
2573     final List<ProcedureInfo> procInfoList = this.procedureExecutor.listProcedures();
2574 
2575     if (cpHost != null) {
2576       cpHost.postListProcedures(procInfoList);
2577     }
2578 
2579     return procInfoList;
2580   }
2581 
2582   /**
2583    * Returns the list of table descriptors that match the specified request
2584    * @param namespace the namespace to query, or null if querying for all
2585    * @param regex The regular expression to match against, or null if querying for all
2586    * @param tableNameList the list of table names, or null if querying for all
2587    * @param includeSysTables False to match only against userspace tables
2588    * @return the list of table descriptors
2589    */
2590   public List<HTableDescriptor> listTableDescriptors(final String namespace, final String regex,
2591       final List<TableName> tableNameList, final boolean includeSysTables)
2592   throws IOException {
2593     List<HTableDescriptor> htds = new ArrayList<HTableDescriptor>();
2594     boolean bypass = cpHost != null?
2595         cpHost.preGetTableDescriptors(tableNameList, htds, regex): false;
2596     if (!bypass) {
2597       htds = getTableDescriptors(htds, namespace, regex, tableNameList, includeSysTables);
2598       if (cpHost != null) {
2599         cpHost.postGetTableDescriptors(tableNameList, htds, regex);
2600       }
2601     }
2602     return htds;
2603   }
2604 
2605   /**
2606    * Returns the list of table names that match the specified request
2607    * @param regex The regular expression to match against, or null if querying for all
2608    * @param namespace the namespace to query, or null if querying for all
2609    * @param includeSysTables False to match only against userspace tables
2610    * @return the list of table names
2611    */
2612   public List<TableName> listTableNames(final String namespace, final String regex,
2613       final boolean includeSysTables) throws IOException {
2614     List<HTableDescriptor> htds = new ArrayList<HTableDescriptor>();
2615     boolean bypass = cpHost != null? cpHost.preGetTableNames(htds, regex): false;
2616     if (!bypass) {
2617       htds = getTableDescriptors(htds, namespace, regex, null, includeSysTables);
2618       if (cpHost != null) cpHost.postGetTableNames(htds, regex);
2619     }
2620     List<TableName> result = new ArrayList<TableName>(htds.size());
2621     for (HTableDescriptor htd: htds) result.add(htd.getTableName());
2622     return result;
2623   }
2624 
2625   /**
2626    * @return list of table table descriptors after filtering by regex and whether to include system
2627    *    tables, etc.
2628    * @throws IOException
2629    */
2630   private List<HTableDescriptor> getTableDescriptors(final List<HTableDescriptor> htds,
2631       final String namespace, final String regex, final List<TableName> tableNameList,
2632       final boolean includeSysTables)
2633   throws IOException {
2634     if (tableNameList == null || tableNameList.size() == 0) {
2635       // request for all TableDescriptors
2636       Collection<HTableDescriptor> allHtds;
2637       if (namespace != null && namespace.length() > 0) {
2638         // Do a check on the namespace existence. Will fail if does not exist.
2639         this.clusterSchemaService.getNamespace(namespace);
2640         allHtds = tableDescriptors.getByNamespace(namespace).values();
2641       } else {
2642         allHtds = tableDescriptors.getAll().values();
2643       }
2644       for (HTableDescriptor desc: allHtds) {
2645         if (tableStateManager.isTablePresent(desc.getTableName())
2646             && (includeSysTables || !desc.getTableName().isSystemTable())) {
2647           htds.add(desc);
2648         }
2649       }
2650     } else {
2651       for (TableName s: tableNameList) {
2652         if (tableStateManager.isTablePresent(s)) {
2653           HTableDescriptor desc = tableDescriptors.get(s);
2654           if (desc != null) {
2655             htds.add(desc);
2656           }
2657         }
2658       }
2659     }
2660 
2661     // Retains only those matched by regular expression.
2662     if (regex != null) filterTablesByRegex(htds, Pattern.compile(regex));
2663     return htds;
2664   }
2665 
2666   /**
2667    * Removes the table descriptors that don't match the pattern.
2668    * @param descriptors list of table descriptors to filter
2669    * @param pattern the regex to use
2670    */
2671   private static void filterTablesByRegex(final Collection<HTableDescriptor> descriptors,
2672       final Pattern pattern) {
2673     final String defaultNS = NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR;
2674     Iterator<HTableDescriptor> itr = descriptors.iterator();
2675     while (itr.hasNext()) {
2676       HTableDescriptor htd = itr.next();
2677       String tableName = htd.getTableName().getNameAsString();
2678       boolean matched = pattern.matcher(tableName).matches();
2679       if (!matched && htd.getTableName().getNamespaceAsString().equals(defaultNS)) {
2680         matched = pattern.matcher(defaultNS + TableName.NAMESPACE_DELIM + tableName).matches();
2681       }
2682       if (!matched) {
2683         itr.remove();
2684       }
2685     }
2686   }
2687 
2688   @Override
2689   public long getLastMajorCompactionTimestamp(TableName table) throws IOException {
2690     return getClusterStatus().getLastMajorCompactionTsForTable(table);
2691   }
2692 
2693   @Override
2694   public long getLastMajorCompactionTimestampForRegion(byte[] regionName) throws IOException {
2695     return getClusterStatus().getLastMajorCompactionTsForRegion(regionName);
2696   }
2697 
2698   /**
2699    * Gets the mob file compaction state for a specific table.
2700    * Whether all the mob files are selected is known during the compaction execution, but
2701    * the statistic is done just before compaction starts, it is hard to know the compaction
2702    * type at that time, so the rough statistics are chosen for the mob file compaction. Only two
2703    * compaction states are available, CompactionState.MAJOR_AND_MINOR and CompactionState.NONE.
2704    * @param tableName The current table name.
2705    * @return If a given table is in mob file compaction now.
2706    */
2707   public CompactionState getMobCompactionState(TableName tableName) {
2708     AtomicInteger compactionsCount = mobCompactionStates.get(tableName);
2709     if (compactionsCount != null && compactionsCount.get() != 0) {
2710       return CompactionState.MAJOR_AND_MINOR;
2711     }
2712     return CompactionState.NONE;
2713   }
2714 
2715   public void reportMobCompactionStart(TableName tableName) throws IOException {
2716     IdLock.Entry lockEntry = null;
2717     try {
2718       lockEntry = mobCompactionLock.getLockEntry(tableName.hashCode());
2719       AtomicInteger compactionsCount = mobCompactionStates.get(tableName);
2720       if (compactionsCount == null) {
2721         compactionsCount = new AtomicInteger(0);
2722         mobCompactionStates.put(tableName, compactionsCount);
2723       }
2724       compactionsCount.incrementAndGet();
2725     } finally {
2726       if (lockEntry != null) {
2727         mobCompactionLock.releaseLockEntry(lockEntry);
2728       }
2729     }
2730   }
2731 
2732   public void reportMobCompactionEnd(TableName tableName) throws IOException {
2733     IdLock.Entry lockEntry = null;
2734     try {
2735       lockEntry = mobCompactionLock.getLockEntry(tableName.hashCode());
2736       AtomicInteger compactionsCount = mobCompactionStates.get(tableName);
2737       if (compactionsCount != null) {
2738         int count = compactionsCount.decrementAndGet();
2739         // remove the entry if the count is 0.
2740         if (count == 0) {
2741           mobCompactionStates.remove(tableName);
2742         }
2743       }
2744     } finally {
2745       if (lockEntry != null) {
2746         mobCompactionLock.releaseLockEntry(lockEntry);
2747       }
2748     }
2749   }
2750 
2751   /**
2752    * Requests mob compaction.
2753    * @param tableName The table the compact.
2754    * @param columns The compacted columns.
2755    * @param allFiles Whether add all mob files into the compaction.
2756    */
2757   public void requestMobCompaction(TableName tableName,
2758     List<HColumnDescriptor> columns, boolean allFiles) throws IOException {
2759     mobCompactThread.requestMobCompaction(conf, fs, tableName, columns,
2760       tableLockManager, allFiles);
2761   }
2762 
2763   /**
2764    * Queries the state of the {@link LoadBalancerTracker}. If the balancer is not initialized,
2765    * false is returned.
2766    *
2767    * @return The state of the load balancer, or false if the load balancer isn't defined.
2768    */
2769   public boolean isBalancerOn() {
2770     if (null == loadBalancerTracker) return false;
2771     return loadBalancerTracker.isBalancerOn();
2772   }
2773 
2774   /**
2775    * Queries the state of the {@link RegionNormalizerTracker}. If it's not initialized,
2776    * false is returned.
2777    */
2778   public boolean isNormalizerOn() {
2779     return null == regionNormalizerTracker? false: regionNormalizerTracker.isNormalizerOn();
2780   }
2781 
2782   /**
2783    * Fetch the configured {@link LoadBalancer} class name. If none is set, a default is returned.
2784    *
2785    * @return The name of the {@link LoadBalancer} in use.
2786    */
2787   public String getLoadBalancerClassName() {
2788     return conf.get(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, LoadBalancerFactory
2789         .getDefaultLoadBalancerClass().getName());
2790   }
2791 
2792   /**
2793    * @return RegionNormalizerTracker instance
2794    */
2795   public RegionNormalizerTracker getRegionNormalizerTracker() {
2796     return regionNormalizerTracker;
2797   }
2798 }