View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import com.google.common.annotations.VisibleForTesting;
22  import com.google.common.collect.Lists;
23  import com.google.common.collect.Maps;
24  import com.google.protobuf.Descriptors;
25  import com.google.protobuf.Service;
26  
27  
28  import java.io.IOException;
29  import java.io.InterruptedIOException;
30  import java.lang.reflect.Constructor;
31  import java.lang.reflect.InvocationTargetException;
32  import java.net.InetAddress;
33  import java.net.InetSocketAddress;
34  import java.net.UnknownHostException;
35  import java.util.ArrayList;
36  import java.util.Arrays;
37  import java.util.Collection;
38  import java.util.Collections;
39  import java.util.Comparator;
40  import java.util.HashSet;
41  import java.util.Iterator;
42  import java.util.List;
43  import java.util.Map;
44  import java.util.Map.Entry;
45  import java.util.Set;
46  import java.util.concurrent.TimeUnit;
47  import java.util.concurrent.atomic.AtomicInteger;
48  import java.util.concurrent.atomic.AtomicReference;
49  import java.util.regex.Pattern;
50  
51  import javax.servlet.ServletException;
52  import javax.servlet.http.HttpServlet;
53  import javax.servlet.http.HttpServletRequest;
54  import javax.servlet.http.HttpServletResponse;
55  
56  import org.apache.commons.logging.Log;
57  import org.apache.commons.logging.LogFactory;
58  import org.apache.hadoop.conf.Configuration;
59  import org.apache.hadoop.fs.Path;
60  import org.apache.hadoop.hbase.ClusterStatus;
61  import org.apache.hadoop.hbase.CoordinatedStateException;
62  import org.apache.hadoop.hbase.CoordinatedStateManager;
63  import org.apache.hadoop.hbase.DoNotRetryIOException;
64  import org.apache.hadoop.hbase.HBaseIOException;
65  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
66  import org.apache.hadoop.hbase.HColumnDescriptor;
67  import org.apache.hadoop.hbase.HConstants;
68  import org.apache.hadoop.hbase.HRegionInfo;
69  import org.apache.hadoop.hbase.HTableDescriptor;
70  import org.apache.hadoop.hbase.MasterNotRunningException;
71  import org.apache.hadoop.hbase.MetaTableAccessor;
72  import org.apache.hadoop.hbase.NamespaceDescriptor;
73  import org.apache.hadoop.hbase.PleaseHoldException;
74  import org.apache.hadoop.hbase.ProcedureInfo;
75  import org.apache.hadoop.hbase.RegionStateListener;
76  import org.apache.hadoop.hbase.Server;
77  import org.apache.hadoop.hbase.ServerLoad;
78  import org.apache.hadoop.hbase.ServerName;
79  import org.apache.hadoop.hbase.TableDescriptor;
80  import org.apache.hadoop.hbase.TableDescriptors;
81  import org.apache.hadoop.hbase.TableName;
82  import org.apache.hadoop.hbase.TableNotDisabledException;
83  import org.apache.hadoop.hbase.TableNotFoundException;
84  import org.apache.hadoop.hbase.UnknownRegionException;
85  import org.apache.hadoop.hbase.classification.InterfaceAudience;
86  import org.apache.hadoop.hbase.client.MasterSwitchType;
87  import org.apache.hadoop.hbase.client.RegionReplicaUtil;
88  import org.apache.hadoop.hbase.client.Result;
89  import org.apache.hadoop.hbase.client.TableState;
90  import org.apache.hadoop.hbase.coprocessor.BypassCoprocessorException;
91  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
92  import org.apache.hadoop.hbase.exceptions.DeserializationException;
93  import org.apache.hadoop.hbase.executor.ExecutorType;
94  import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
95  import org.apache.hadoop.hbase.ipc.RpcServer;
96  import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
97  import org.apache.hadoop.hbase.master.MasterRpcServices.BalanceSwitchMode;
98  import org.apache.hadoop.hbase.master.balancer.BalancerChore;
99  import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
100 import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
101 import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
102 import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
103 import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
104 import org.apache.hadoop.hbase.master.handler.DispatchMergingRegionHandler;
105 import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan;
106 import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
107 import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer;
108 import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerChore;
109 import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerFactory;
110 import org.apache.hadoop.hbase.master.procedure.AddColumnFamilyProcedure;
111 import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
112 import org.apache.hadoop.hbase.master.procedure.DeleteColumnFamilyProcedure;
113 import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure;
114 import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure;
115 import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure;
116 import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
117 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
118 import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent;
119 import org.apache.hadoop.hbase.master.procedure.ModifyColumnFamilyProcedure;
120 import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure;
121 import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
122 import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
123 import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure;
124 import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
125 import org.apache.hadoop.hbase.mob.MobConstants;
126 import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
127 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
128 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
129 import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
130 import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager;
131 import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
132 import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
133 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
134 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
135 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
136 import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
137 import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
138 import org.apache.hadoop.hbase.regionserver.HRegionServer;
139 import org.apache.hadoop.hbase.regionserver.HStore;
140 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
141 import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
142 import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy;
143 import org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolicy;
144 import org.apache.hadoop.hbase.regionserver.compactions.FIFOCompactionPolicy;
145 import org.apache.hadoop.hbase.replication.master.TableCFsUpdater;
146 import org.apache.hadoop.hbase.replication.regionserver.Replication;
147 import org.apache.hadoop.hbase.security.User;
148 import org.apache.hadoop.hbase.security.UserProvider;
149 import org.apache.hadoop.hbase.util.Addressing;
150 import org.apache.hadoop.hbase.util.Bytes;
151 import org.apache.hadoop.hbase.util.CompressionTest;
152 import org.apache.hadoop.hbase.util.EncryptionTest;
153 import org.apache.hadoop.hbase.util.FSUtils;
154 import org.apache.hadoop.hbase.util.HFileArchiveUtil;
155 import org.apache.hadoop.hbase.util.HasThread;
156 import org.apache.hadoop.hbase.util.IdLock;
157 import org.apache.hadoop.hbase.util.ModifyRegionUtils;
158 import org.apache.hadoop.hbase.util.Pair;
159 import org.apache.hadoop.hbase.util.Threads;
160 import org.apache.hadoop.hbase.util.VersionInfo;
161 import org.apache.hadoop.hbase.util.ZKDataMigrator;
162 import org.apache.hadoop.hbase.zookeeper.DrainingServerTracker;
163 import org.apache.hadoop.hbase.zookeeper.LoadBalancerTracker;
164 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
165 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
166 import org.apache.hadoop.hbase.zookeeper.RegionNormalizerTracker;
167 import org.apache.hadoop.hbase.zookeeper.RegionServerTracker;
168 import org.apache.hadoop.hbase.zookeeper.SplitOrMergeTracker;
169 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
170 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
171 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
172 import org.apache.zookeeper.KeeperException;
173 import org.mortbay.jetty.Connector;
174 import org.mortbay.jetty.nio.SelectChannelConnector;
175 import org.mortbay.jetty.servlet.Context;
176
177 /**
178  * HMaster is the "master server" for HBase. An HBase cluster has one active
179  * master.  If many masters are started, all compete.  Whichever wins goes on to
180  * run the cluster.  All others park themselves in their constructor until
181  * master or cluster shutdown or until the active master loses its lease in
182  * zookeeper.  Thereafter, all running master jostle to take over master role.
183  *
184  * <p>The Master can be asked shutdown the cluster. See {@link #shutdown()}.  In
185  * this case it will tell all regionservers to go down and then wait on them
186  * all reporting in that they are down.  This master will then shut itself down.
187  *
188  * <p>You can also shutdown just this master.  Call {@link #stopMaster()}.
189  *
190  * @see org.apache.zookeeper.Watcher
191  */
192 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
193 @SuppressWarnings("deprecation")
194 public class HMaster extends HRegionServer implements MasterServices {
195   private static final Log LOG = LogFactory.getLog(HMaster.class.getName());
196
197   /**
198    * Protection against zombie master. Started once Master accepts active responsibility and
199    * starts taking over responsibilities. Allows a finite time window before giving up ownership.
200    */
201   private static class InitializationMonitor extends HasThread {
202     /** The amount of time in milliseconds to sleep before checking initialization status. */
203     public static final String TIMEOUT_KEY = "hbase.master.initializationmonitor.timeout";
204     public static final long TIMEOUT_DEFAULT = TimeUnit.MILLISECONDS.convert(15, TimeUnit.MINUTES);
205
206     /**
207      * When timeout expired and initialization has not complete, call {@link System#exit(int)} when
208      * true, do nothing otherwise.
209      */
210     public static final String HALT_KEY = "hbase.master.initializationmonitor.haltontimeout";
211     public static final boolean HALT_DEFAULT = false;
212
213     private final HMaster master;
214     private final long timeout;
215     private final boolean haltOnTimeout;
216
217     /** Creates a Thread that monitors the {@link #isInitialized()} state. */
218     InitializationMonitor(HMaster master) {
219       super("MasterInitializationMonitor");
220       this.master = master;
221       this.timeout = master.getConfiguration().getLong(TIMEOUT_KEY, TIMEOUT_DEFAULT);
222       this.haltOnTimeout = master.getConfiguration().getBoolean(HALT_KEY, HALT_DEFAULT);
223       this.setDaemon(true);
224     }
225
226     @Override
227     public void run() {
228       try {
229         while (!master.isStopped() && master.isActiveMaster()) {
230           Thread.sleep(timeout);
231           if (master.isInitialized()) {
232             LOG.debug("Initialization completed within allotted tolerance. Monitor exiting.");
233           } else {
234             LOG.error("Master failed to complete initialization after " + timeout + "ms. Please"
235                 + " consider submitting a bug report including a thread dump of this process.");
236             if (haltOnTimeout) {
237               LOG.error("Zombie Master exiting. Thread dump to stdout");
238               Threads.printThreadInfo(System.out, "Zombie HMaster");
239               System.exit(-1);
240             }
241           }
242         }
243       } catch (InterruptedException ie) {
244         LOG.trace("InitMonitor thread interrupted. Existing.");
245       }
246     }
247   }
248
249   // MASTER is name of the webapp and the attribute name used stuffing this
250   //instance into web context.
251   public static final String MASTER = "master";
252
253   // Manager and zk listener for master election
254   private final ActiveMasterManager activeMasterManager;
255   // Region server tracker
256   RegionServerTracker regionServerTracker;
257   // Draining region server tracker
258   private DrainingServerTracker drainingServerTracker;
259   // Tracker for load balancer state
260   LoadBalancerTracker loadBalancerTracker;
261
262   // Tracker for split and merge state
263   private SplitOrMergeTracker splitOrMergeTracker;
264
265   // Tracker for region normalizer state
266   private RegionNormalizerTracker regionNormalizerTracker;
267
268   private ClusterSchemaService clusterSchemaService;
269
270   // Metrics for the HMaster
271   final MetricsMaster metricsMaster;
272   // file system manager for the master FS operations
273   private MasterFileSystem fileSystemManager;
274   private MasterWalManager walManager;
275
276   // server manager to deal with region server info
277   private volatile ServerManager serverManager;
278
279   // manager of assignment nodes in zookeeper
280   private AssignmentManager assignmentManager;
281
282   // buffer for "fatal error" notices from region servers
283   // in the cluster. This is only used for assisting
284   // operations/debugging.
285   MemoryBoundedLogMessageBuffer rsFatals;
286
287   // flag set after we become the active master (used for testing)
288   private volatile boolean isActiveMaster = false;
289
290   // flag set after we complete initialization once active,
291   // it is not private since it's used in unit tests
292   private final ProcedureEvent initialized = new ProcedureEvent("master initialized");
293
294   // flag set after master services are started,
295   // initialization may have not completed yet.
296   volatile boolean serviceStarted = false;
297
298   // flag set after we complete assignMeta.
299   private final ProcedureEvent serverCrashProcessingEnabled =
300     new ProcedureEvent("server crash processing");
301
302   private LoadBalancer balancer;
303   private RegionNormalizer normalizer;
304   private BalancerChore balancerChore;
305   private RegionNormalizerChore normalizerChore;
306   private ClusterStatusChore clusterStatusChore;
307   private ClusterStatusPublisher clusterStatusPublisherChore = null;
308
309   CatalogJanitor catalogJanitorChore;
310   private LogCleaner logCleaner;
311   private HFileCleaner hfileCleaner;
312   private ExpiredMobFileCleanerChore expiredMobFileCleanerChore;
313   private MobCompactionChore mobCompactChore;
314   private MasterMobCompactionThread mobCompactThread;
315   // used to synchronize the mobCompactionStates
316   private final IdLock mobCompactionLock = new IdLock();
317   // save the information of mob compactions in tables.
318   // the key is table name, the value is the number of compactions in that table.
319   private Map<TableName, AtomicInteger> mobCompactionStates = Maps.newConcurrentMap();
320 
321   MasterCoprocessorHost cpHost;
322 
323   private final boolean preLoadTableDescriptors;
324
325   // Time stamps for when a hmaster became active
326   private long masterActiveTime;
327
328   //should we check the compression codec type at master side, default true, HBASE-6370
329   private final boolean masterCheckCompression;
330
331   //should we check encryption settings at master side, default true
332   private final boolean masterCheckEncryption;
333 
334   Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
335
336   // monitor for snapshot of hbase tables
337   SnapshotManager snapshotManager;
338   // monitor for distributed procedures
339   private MasterProcedureManagerHost mpmHost;
340
341   // it is assigned after 'initialized' guard set to true, so should be volatile
342   private volatile MasterQuotaManager quotaManager;
343
344   private ProcedureExecutor<MasterProcedureEnv> procedureExecutor;
345   private WALProcedureStore procedureStore;
346
347   // handle table states
348   private TableStateManager tableStateManager;
349
350   private long splitPlanCount;
351   private long mergePlanCount;
352
353   /** flag used in test cases in order to simulate RS failures during master initialization */
354   private volatile boolean initializationBeforeMetaAssignment = false;
355
356   /** jetty server for master to redirect requests to regionserver infoServer */
357   private org.mortbay.jetty.Server masterJettyServer;
358
359   public static class RedirectServlet extends HttpServlet {
360     private static final long serialVersionUID = 2894774810058302472L;
361     private static int regionServerInfoPort;
362
363     @Override
364     public void doGet(HttpServletRequest request,
365         HttpServletResponse response) throws ServletException, IOException {
366       String redirectUrl = request.getScheme() + "://"
367         + request.getServerName() + ":" + regionServerInfoPort
368         + request.getRequestURI();
369       response.sendRedirect(redirectUrl);
370     }
371   }
372
373   /**
374    * Initializes the HMaster. The steps are as follows:
375    * <p>
376    * <ol>
377    * <li>Initialize the local HRegionServer
378    * <li>Start the ActiveMasterManager.
379    * </ol>
380    * <p>
381    * Remaining steps of initialization occur in
382    * #finishActiveMasterInitialization(MonitoredTask) after
383    * the master becomes the active one.
384    */
385   public HMaster(final Configuration conf, CoordinatedStateManager csm)
386       throws IOException, KeeperException {
387     super(conf, csm);
388     this.rsFatals = new MemoryBoundedLogMessageBuffer(
389       conf.getLong("hbase.master.buffer.for.rs.fatals", 1*1024*1024));
390
391     LOG.info("hbase.rootdir=" + FSUtils.getRootDir(this.conf) +
392       ", hbase.cluster.distributed=" + this.conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, false));
393
394     // Disable usage of meta replicas in the master
395     this.conf.setBoolean(HConstants.USE_META_REPLICAS, false);
396 
397     Replication.decorateMasterConfiguration(this.conf);
398
399     // Hack! Maps DFSClient => Master for logs.  HDFS made this
400     // config param for task trackers, but we can piggyback off of it.
401     if (this.conf.get("mapreduce.task.attempt.id") == null) {
402       this.conf.set("mapreduce.task.attempt.id", "hb_m_" + this.serverName.toString());
403     }
404
405     // should we check the compression codec type at master side, default true, HBASE-6370
406     this.masterCheckCompression = conf.getBoolean("hbase.master.check.compression", true);
407
408     // should we check encryption settings at master side, default true
409     this.masterCheckEncryption = conf.getBoolean("hbase.master.check.encryption", true);
410 
411     this.metricsMaster = new MetricsMaster(new MetricsMasterWrapperImpl(this));
412
413     // preload table descriptor at startup
414     this.preLoadTableDescriptors = conf.getBoolean("hbase.master.preload.tabledescriptors", true);
415 
416     // Do we publish the status?
417
418     boolean shouldPublish = conf.getBoolean(HConstants.STATUS_PUBLISHED,
419         HConstants.STATUS_PUBLISHED_DEFAULT);
420     Class<? extends ClusterStatusPublisher.Publisher> publisherClass =
421         conf.getClass(ClusterStatusPublisher.STATUS_PUBLISHER_CLASS,
422             ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS,
423             ClusterStatusPublisher.Publisher.class);
424
425     if (shouldPublish) {
426       if (publisherClass == null) {
427         LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
428             ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS +
429             " is not set - not publishing status");
430       } else {
431         clusterStatusPublisherChore = new ClusterStatusPublisher(this, conf, publisherClass);
432         getChoreService().scheduleChore(clusterStatusPublisherChore);
433       }
434     }
435
436     // Some unit tests don't need a cluster, so no zookeeper at all
437     if (!conf.getBoolean("hbase.testing.nocluster", false)) {
438       activeMasterManager = new ActiveMasterManager(zooKeeper, this.serverName, this);
439       int infoPort = putUpJettyServer();
440       startActiveMasterManager(infoPort);
441     } else {
442       activeMasterManager = null;
443     }
444   }
445
446   // return the actual infoPort, -1 means disable info server.
447   private int putUpJettyServer() throws IOException {
448     if (!conf.getBoolean("hbase.master.infoserver.redirect", true)) {
449       return -1;
450     }
451     int infoPort = conf.getInt("hbase.master.info.port.orig",
452       HConstants.DEFAULT_MASTER_INFOPORT);
453     // -1 is for disabling info server, so no redirecting
454     if (infoPort < 0 || infoServer == null) {
455       return -1;
456     }
457     String addr = conf.get("hbase.master.info.bindAddress", "0.0.0.0");
458     if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) {
459       String msg =
460           "Failed to start redirecting jetty server. Address " + addr
461               + " does not belong to this host. Correct configuration parameter: "
462               + "hbase.master.info.bindAddress";
463       LOG.error(msg);
464       throw new IOException(msg);
465     }
466
467     RedirectServlet.regionServerInfoPort = infoServer.getPort();
468     if(RedirectServlet.regionServerInfoPort == infoPort) {
469       return infoPort;
470     }
471     masterJettyServer = new org.mortbay.jetty.Server();
472     Connector connector = new SelectChannelConnector();
473     connector.setHost(addr);
474     connector.setPort(infoPort);
475     masterJettyServer.addConnector(connector);
476     masterJettyServer.setStopAtShutdown(true);
477     Context context = new Context(masterJettyServer, "/", Context.NO_SESSIONS);
478     context.addServlet(RedirectServlet.class, "/*");
479     try {
480       masterJettyServer.start();
481     } catch (Exception e) {
482       throw new IOException("Failed to start redirecting jetty server", e);
483     }
484     return connector.getLocalPort();
485   }
486
487   @Override
488   protected TableDescriptors getFsTableDescriptors() throws IOException {
489     return super.getFsTableDescriptors();
490   }
491
492   /**
493    * For compatibility, if failed with regionserver credentials, try the master one
494    */
495   @Override
496   protected void login(UserProvider user, String host) throws IOException {
497     try {
498       super.login(user, host);
499     } catch (IOException ie) {
500       user.login("hbase.master.keytab.file",
501         "hbase.master.kerberos.principal", host);
502     }
503   }
504
505   /**
506    * If configured to put regions on active master,
507    * wait till a backup master becomes active.
508    * Otherwise, loop till the server is stopped or aborted.
509    */
510   @Override
511   protected void waitForMasterActive(){
512     boolean tablesOnMaster = BaseLoadBalancer.tablesOnMaster(conf);
513     while (!(tablesOnMaster && isActiveMaster)
514         && !isStopped() && !isAborted()) {
515       sleeper.sleep();
516     }
517   }
518
519   @VisibleForTesting
520   public MasterRpcServices getMasterRpcServices() {
521     return (MasterRpcServices)rpcServices;
522   }
523
524   public boolean balanceSwitch(final boolean b) throws IOException {
525     return getMasterRpcServices().switchBalancer(b, BalanceSwitchMode.ASYNC);
526   }
527
528   @Override
529   protected String getProcessName() {
530     return MASTER;
531   }
532
533   @Override
534   protected boolean canCreateBaseZNode() {
535     return true;
536   }
537
538   @Override
539   protected boolean canUpdateTableDescriptor() {
540     return true;
541   }
542
543   @Override
544   protected RSRpcServices createRpcServices() throws IOException {
545     return new MasterRpcServices(this);
546   }
547
548   @Override
549   protected void configureInfoServer() {
550     infoServer.addServlet("master-status", "/master-status", MasterStatusServlet.class);
551     infoServer.setAttribute(MASTER, this);
552     if (BaseLoadBalancer.tablesOnMaster(conf)) {
553       super.configureInfoServer();
554     }
555   }
556
557   @Override
558   protected Class<? extends HttpServlet> getDumpServlet() {
559     return MasterDumpServlet.class;
560   }
561
562   /**
563    * Emit the HMaster metrics, such as region in transition metrics.
564    * Surrounding in a try block just to be sure metrics doesn't abort HMaster.
565    */
566   @Override
567   protected void doMetrics() {
568     try {
569       if (assignmentManager != null) {
570         assignmentManager.updateRegionsInTransitionMetrics();
571       }
572     } catch (Throwable e) {
573       LOG.error("Couldn't update metrics: " + e.getMessage());
574     }
575   }
576
577   MetricsMaster getMasterMetrics() {
578     return metricsMaster;
579   }
580
581   /**
582    * Initialize all ZK based system trackers.
583    */
584   void initializeZKBasedSystemTrackers() throws IOException,
585       InterruptedException, KeeperException, CoordinatedStateException {
586     this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
587     this.normalizer = RegionNormalizerFactory.getRegionNormalizer(conf);
588     this.normalizer.setMasterServices(this);
589     this.normalizer.setMasterRpcServices((MasterRpcServices)rpcServices);
590     this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this);
591     this.loadBalancerTracker.start();
592
593     this.regionNormalizerTracker = new RegionNormalizerTracker(zooKeeper, this);
594     this.regionNormalizerTracker.start();
595
596     this.splitOrMergeTracker = new SplitOrMergeTracker(zooKeeper, conf, this);
597     this.splitOrMergeTracker.start();
598
599     this.assignmentManager = new AssignmentManager(this, serverManager,
600       this.balancer, this.service, this.metricsMaster,
601       this.tableLockManager, tableStateManager);
602
603     this.regionServerTracker = new RegionServerTracker(zooKeeper, this, this.serverManager);
604     this.regionServerTracker.start();
605
606     this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, this.serverManager);
607     this.drainingServerTracker.start();
608
609     // Set the cluster as up.  If new RSs, they'll be waiting on this before
610     // going ahead with their startup.
611     boolean wasUp = this.clusterStatusTracker.isClusterUp();
612     if (!wasUp) this.clusterStatusTracker.setClusterUp();
613
614     LOG.info("Server active/primary master=" + this.serverName +
615         ", sessionid=0x" +
616         Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) +
617         ", setting cluster-up flag (Was=" + wasUp + ")");
618
619     // create/initialize the snapshot manager and other procedure managers
620     this.snapshotManager = new SnapshotManager();
621     this.mpmHost = new MasterProcedureManagerHost();
622     this.mpmHost.register(this.snapshotManager);
623     this.mpmHost.register(new MasterFlushTableProcedureManager());
624     this.mpmHost.loadProcedures(conf);
625     this.mpmHost.initialize(this, this.metricsMaster);
626 
627   }
628
629   /**
630    * Finish initialization of HMaster after becoming the primary master.
631    *
632    * <ol>
633    * <li>Initialize master components - file system manager, server manager,
634    *     assignment manager, region server tracker, etc</li>
635    * <li>Start necessary service threads - balancer, catalog janior,
636    *     executor services, etc</li>
637    * <li>Set cluster as UP in ZooKeeper</li>
638    * <li>Wait for RegionServers to check-in</li>
639    * <li>Split logs and perform data recovery, if necessary</li>
640    * <li>Ensure assignment of meta/namespace regions<li>
641    * <li>Handle either fresh cluster start or master failover</li>
642    * </ol>
643    */
644   private void finishActiveMasterInitialization(MonitoredTask status)
645       throws IOException, InterruptedException, KeeperException, CoordinatedStateException {
646
647     isActiveMaster = true;
648     Thread zombieDetector = new Thread(new InitializationMonitor(this));
649     zombieDetector.start();
650
651     /*
652      * We are active master now... go initialize components we need to run.
653      * Note, there may be dross in zk from previous runs; it'll get addressed
654      * below after we determine if cluster startup or failover.
655      */
656 
657     status.setStatus("Initializing Master file system");
658
659     this.masterActiveTime = System.currentTimeMillis();
660     // TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring.
661     this.fileSystemManager = new MasterFileSystem(this);
662     this.walManager = new MasterWalManager(this);
663
664     // enable table descriptors cache
665     this.tableDescriptors.setCacheOn();
666     // set the META's descriptor to the correct replication
667     this.tableDescriptors.get(TableName.META_TABLE_NAME).setRegionReplication(
668         conf.getInt(HConstants.META_REPLICAS_NUM, HConstants.DEFAULT_META_REPLICA_NUM));
669     // warm-up HTDs cache on master initialization
670     if (preLoadTableDescriptors) {
671       status.setStatus("Pre-loading table descriptors");
672       this.tableDescriptors.getAll();
673     }
674
675     // publish cluster ID
676     status.setStatus("Publishing Cluster ID in ZooKeeper");
677     ZKClusterId.setClusterId(this.zooKeeper, fileSystemManager.getClusterId());
678
679     this.serverManager = createServerManager(this, this);
680
681     // Invalidate all write locks held previously
682     this.tableLockManager.reapWriteLocks();
683     this.tableStateManager = new TableStateManager(this);
684 
685     status.setStatus("Initializing ZK system trackers");
686     initializeZKBasedSystemTrackers();
687
688     // This is for backwards compatibility
689     // See HBASE-11393
690     status.setStatus("Update TableCFs node in ZNode");
691     TableCFsUpdater tableCFsUpdater = new TableCFsUpdater(zooKeeper,
692             conf, this.clusterConnection);
693     tableCFsUpdater.update();
694
695     // initialize master side coprocessors before we start handling requests
696     status.setStatus("Initializing master coprocessors");
697     this.cpHost = new MasterCoprocessorHost(this, this.conf);
698
699     // start up all service threads.
700     status.setStatus("Initializing master service threads");
701     startServiceThreads();
702 
703     // Wake up this server to check in
704     sleeper.skipSleepCycle();
705
706     // Wait for region servers to report in
707     this.serverManager.waitForRegionServers(status);
708     // Check zk for region servers that are up but didn't register
709     for (ServerName sn: this.regionServerTracker.getOnlineServers()) {
710       // The isServerOnline check is opportunistic, correctness is handled inside
711       if (!this.serverManager.isServerOnline(sn)
712           && serverManager.checkAndRecordNewServer(sn, ServerLoad.EMPTY_SERVERLOAD)) {
713         LOG.info("Registered server found up in zk but who has not yet reported in: " + sn);
714       }
715     }
716
717     // get a list for previously failed RS which need log splitting work
718     // we recover hbase:meta region servers inside master initialization and
719     // handle other failed servers in SSH in order to start up master node ASAP
720     Set<ServerName> previouslyFailedServers =
721       this.walManager.getFailedServersFromLogFolders();
722
723     // log splitting for hbase:meta server
724     ServerName oldMetaServerLocation = metaTableLocator.getMetaRegionLocation(this.getZooKeeper());
725     if (oldMetaServerLocation != null && previouslyFailedServers.contains(oldMetaServerLocation)) {
726       splitMetaLogBeforeAssignment(oldMetaServerLocation);
727       // Note: we can't remove oldMetaServerLocation from previousFailedServers list because it
728       // may also host user regions
729     }
730     Set<ServerName> previouslyFailedMetaRSs = getPreviouselyFailedMetaServersFromZK();
731     // need to use union of previouslyFailedMetaRSs recorded in ZK and previouslyFailedServers
732     // instead of previouslyFailedMetaRSs alone to address the following two situations:
733     // 1) the chained failure situation(recovery failed multiple times in a row).
734     // 2) master get killed right before it could delete the recovering hbase:meta from ZK while the
735     // same server still has non-meta wals to be replayed so that
736     // removeStaleRecoveringRegionsFromZK can't delete the stale hbase:meta region
737     // Passing more servers into splitMetaLog is all right. If a server doesn't have hbase:meta wal,
738     // there is no op for the server.
739     previouslyFailedMetaRSs.addAll(previouslyFailedServers);
740
741     this.initializationBeforeMetaAssignment = true;
742
743     // Wait for regionserver to finish initialization.
744     if (BaseLoadBalancer.tablesOnMaster(conf)) {
745       waitForServerOnline();
746     }
747
748     //initialize load balancer
749     this.balancer.setClusterStatus(getClusterStatus());
750     this.balancer.setMasterServices(this);
751     this.balancer.initialize();
752
753     // Check if master is shutting down because of some issue
754     // in initializing the regionserver or the balancer.
755     if (isStopped()) return;
756
757     // Make sure meta assigned before proceeding.
758     status.setStatus("Assigning Meta Region");
759     assignMeta(status, previouslyFailedMetaRSs, HRegionInfo.DEFAULT_REPLICA_ID);
760     // check if master is shutting down because above assignMeta could return even hbase:meta isn't
761     // assigned when master is shutting down
762     if (isStopped()) return;
763
764     // migrating existent table state from zk, so splitters
765     // and recovery process treat states properly.
766     for (Map.Entry<TableName, TableState.State> entry : ZKDataMigrator
767         .queryForTableStates(getZooKeeper()).entrySet()) {
768       LOG.info("Converting state from zk to new states:" + entry);
769       tableStateManager.setTableState(entry.getKey(), entry.getValue());
770     }
771     ZKUtil.deleteChildrenRecursively(getZooKeeper(), getZooKeeper().tableZNode);
772
773     status.setStatus("Submitting log splitting work for previously failed region servers");
774     // Master has recovered hbase:meta region server and we put
775     // other failed region servers in a queue to be handled later by SSH
776     for (ServerName tmpServer : previouslyFailedServers) {
777       this.serverManager.processDeadServer(tmpServer, true);
778     }
779
780     // Fix up assignment manager status
781     status.setStatus("Starting assignment manager");
782     this.assignmentManager.joinCluster();
783 
784     // set cluster status again after user regions are assigned
785     this.balancer.setClusterStatus(getClusterStatus());
786
787     // Start balancer and meta catalog janitor after meta and regions have been assigned.
788     status.setStatus("Starting balancer and catalog janitor");
789     this.clusterStatusChore = new ClusterStatusChore(this, balancer);
790     getChoreService().scheduleChore(clusterStatusChore);
791     this.balancerChore = new BalancerChore(this);
792     getChoreService().scheduleChore(balancerChore);
793     this.normalizerChore = new RegionNormalizerChore(this);
794     getChoreService().scheduleChore(normalizerChore);
795     this.catalogJanitorChore = new CatalogJanitor(this, this);
796     getChoreService().scheduleChore(catalogJanitorChore);
797 
798     status.setStatus("Starting cluster schema service");
799     initClusterSchemaService();
800
801     if (this.cpHost != null) {
802       try {
803         this.cpHost.preMasterInitialization();
804       } catch (IOException e) {
805         LOG.error("Coprocessor preMasterInitialization() hook failed", e);
806       }
807     }
808
809     status.markComplete("Initialization successful");
810     LOG.info("Master has completed initialization");
811     configurationManager.registerObserver(this.balancer);
812 
813     // Set master as 'initialized'.
814     setInitialized(true);
815
816     // assign the meta replicas
817     Set<ServerName> EMPTY_SET = new HashSet<ServerName>();
818     int numReplicas = conf.getInt(HConstants.META_REPLICAS_NUM,
819            HConstants.DEFAULT_META_REPLICA_NUM);
820     for (int i = 1; i < numReplicas; i++) {
821       assignMeta(status, EMPTY_SET, i);
822     }
823     unassignExcessMetaReplica(zooKeeper, numReplicas);
824 
825     status.setStatus("Starting quota manager");
826     initQuotaManager();
827
828     // clear the dead servers with same host name and port of online server because we are not
829     // removing dead server with same hostname and port of rs which is trying to check in before
830     // master initialization. See HBASE-5916.
831     this.serverManager.clearDeadServersWithSameHostNameAndPortOfOnlineServer();
832
833     // Check and set the znode ACLs if needed in case we are overtaking a non-secure configuration
834     status.setStatus("Checking ZNode ACLs");
835     zooKeeper.checkAndSetZNodeAcls();
836
837     status.setStatus("Calling postStartMaster coprocessors");
838 
839     this.expiredMobFileCleanerChore = new ExpiredMobFileCleanerChore(this);
840     getChoreService().scheduleChore(expiredMobFileCleanerChore);
841
842     int mobCompactionPeriod = conf.getInt(MobConstants.MOB_COMPACTION_CHORE_PERIOD,
843       MobConstants.DEFAULT_MOB_COMPACTION_CHORE_PERIOD);
844     if (mobCompactionPeriod > 0) {
845       this.mobCompactChore = new MobCompactionChore(this, mobCompactionPeriod);
846       getChoreService().scheduleChore(mobCompactChore);
847     } else {
848       LOG
849         .info("The period is " + mobCompactionPeriod + " seconds, MobCompactionChore is disabled");
850     }
851     this.mobCompactThread = new MasterMobCompactionThread(this);
852
853     if (this.cpHost != null) {
854       // don't let cp initialization errors kill the master
855       try {
856         this.cpHost.postStartMaster();
857       } catch (IOException ioe) {
858         LOG.error("Coprocessor postStartMaster() hook failed", ioe);
859       }
860     }
861
862     zombieDetector.interrupt();
863   }
864   /**
865    * Create a {@link ServerManager} instance.
866    */
867   ServerManager createServerManager(final Server master,
868       final MasterServices services)
869   throws IOException {
870     // We put this out here in a method so can do a Mockito.spy and stub it out
871     // w/ a mocked up ServerManager.
872     setupClusterConnection();
873     return new ServerManager(master, services);
874   }
875
876   private void unassignExcessMetaReplica(ZooKeeperWatcher zkw, int numMetaReplicasConfigured) {
877     // unassign the unneeded replicas (for e.g., if the previous master was configured
878     // with a replication of 3 and now it is 2, we need to unassign the 1 unneeded replica)
879     try {
880       List<String> metaReplicaZnodes = zooKeeper.getMetaReplicaNodes();
881       for (String metaReplicaZnode : metaReplicaZnodes) {
882         int replicaId = zooKeeper.getMetaReplicaIdFromZnode(metaReplicaZnode);
883         if (replicaId >= numMetaReplicasConfigured) {
884           RegionState r = MetaTableLocator.getMetaRegionState(zkw, replicaId);
885           LOG.info("Closing excess replica of meta region " + r.getRegion());
886           // send a close and wait for a max of 30 seconds
887           ServerManager.closeRegionSilentlyAndWait(getClusterConnection(), r.getServerName(),
888               r.getRegion(), 30000);
889           ZKUtil.deleteNode(zkw, zkw.getZNodeForReplica(replicaId));
890         }
891       }
892     } catch (Exception ex) {
893       // ignore the exception since we don't want the master to be wedged due to potential
894       // issues in the cleanup of the extra regions. We can do that cleanup via hbck or manually
895       LOG.warn("Ignoring exception " + ex);
896     }
897   }
898
899   /**
900    * Check <code>hbase:meta</code> is assigned. If not, assign it.
901    */
902   void assignMeta(MonitoredTask status, Set<ServerName> previouslyFailedMetaRSs, int replicaId)
903       throws InterruptedException, IOException, KeeperException {
904     // Work on meta region
905     int assigned = 0;
906     long timeout = this.conf.getLong("hbase.catalog.verification.timeout", 1000);
907     if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) {
908       status.setStatus("Assigning hbase:meta region");
909     } else {
910       status.setStatus("Assigning hbase:meta region, replicaId " + replicaId);
911     }
912
913     // Get current meta state from zk.
914     RegionState metaState = MetaTableLocator.getMetaRegionState(getZooKeeper(), replicaId);
915     HRegionInfo hri = RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO,
916         replicaId);
917     RegionStates regionStates = assignmentManager.getRegionStates();
918     regionStates.createRegionState(hri, metaState.getState(),
919         metaState.getServerName(), null);
920
921     if (!metaState.isOpened() || !metaTableLocator.verifyMetaRegionLocation(
922         this.getClusterConnection(), this.getZooKeeper(), timeout, replicaId)) {
923       ServerName currentMetaServer = metaState.getServerName();
924       if (serverManager.isServerOnline(currentMetaServer)) {
925         if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) {
926           LOG.info("Meta was in transition on " + currentMetaServer);
927         } else {
928           LOG.info("Meta with replicaId " + replicaId + " was in transition on " +
929                     currentMetaServer);
930         }
931         assignmentManager.processRegionsInTransition(Arrays.asList(metaState));
932       } else {
933         if (currentMetaServer != null) {
934           if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) {
935             splitMetaLogBeforeAssignment(currentMetaServer);
936             regionStates.logSplit(HRegionInfo.FIRST_META_REGIONINFO);
937             previouslyFailedMetaRSs.add(currentMetaServer);
938           }
939         }
940         LOG.info("Re-assigning hbase:meta with replicaId, " + replicaId +
941             " it was on " + currentMetaServer);
942         assignmentManager.assignMeta(hri);
943       }
944       assigned++;
945     }
946
947     if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) {
948       // TODO: should we prevent from using state manager before meta was initialized?
949       // tableStateManager.start();
950       getTableStateManager().setTableState(TableName.META_TABLE_NAME, TableState.State.ENABLED);
951     }
952
953     if ((RecoveryMode.LOG_REPLAY == this.getMasterWalManager().getLogRecoveryMode())
954         && (!previouslyFailedMetaRSs.isEmpty())) {
955       // replay WAL edits mode need new hbase:meta RS is assigned firstly
956       status.setStatus("replaying log for Meta Region");
957       this.walManager.splitMetaLog(previouslyFailedMetaRSs);
958     }
959
960     this.assignmentManager.setEnabledTable(TableName.META_TABLE_NAME);
961     tableStateManager.start();
962
963     // Make sure a hbase:meta location is set. We need to enable SSH here since
964     // if the meta region server is died at this time, we need it to be re-assigned
965     // by SSH so that system tables can be assigned.
966     // No need to wait for meta is assigned = 0 when meta is just verified.
967     if (replicaId == HRegionInfo.DEFAULT_REPLICA_ID) enableCrashedServerProcessing(assigned != 0);
968     LOG.info("hbase:meta with replicaId " + replicaId + " assigned=" + assigned + ", location="
969       + metaTableLocator.getMetaRegionLocation(this.getZooKeeper(), replicaId));
970     status.setStatus("META assigned.");
971   }
972
973   void initClusterSchemaService() throws IOException, InterruptedException {
974     this.clusterSchemaService = new ClusterSchemaServiceImpl(this);
975     this.clusterSchemaService.startAndWait();
976     if (!this.clusterSchemaService.isRunning()) throw new HBaseIOException("Failed start");
977   }
978
979   void initQuotaManager() throws IOException {
980     MasterQuotaManager quotaManager = new MasterQuotaManager(this);
981     this.assignmentManager.setRegionStateListener((RegionStateListener)quotaManager);
982     quotaManager.start();
983     this.quotaManager = quotaManager;
984   }
985
986   boolean isCatalogJanitorEnabled() {
987     return catalogJanitorChore != null ?
988       catalogJanitorChore.getEnabled() : false;
989   }
990
991   private void splitMetaLogBeforeAssignment(ServerName currentMetaServer) throws IOException {
992     if (RecoveryMode.LOG_REPLAY == this.getMasterWalManager().getLogRecoveryMode()) {
993       // In log replay mode, we mark hbase:meta region as recovering in ZK
994       Set<HRegionInfo> regions = new HashSet<HRegionInfo>();
995       regions.add(HRegionInfo.FIRST_META_REGIONINFO);
996       this.walManager.prepareLogReplay(currentMetaServer, regions);
997     } else {
998       // In recovered.edits mode: create recovered edits file for hbase:meta server
999       this.walManager.splitMetaLog(currentMetaServer);
1000     }
1001   }
1002
1003   private void enableCrashedServerProcessing(final boolean waitForMeta)
1004   throws IOException, InterruptedException {
1005     // If crashed server processing is disabled, we enable it and expire those dead but not expired
1006     // servers. This is required so that if meta is assigning to a server which dies after
1007     // assignMeta starts assignment, ServerCrashProcedure can re-assign it. Otherwise, we will be
1008     // stuck here waiting forever if waitForMeta is specified.
1009     if (!isServerCrashProcessingEnabled()) {
1010       setServerCrashProcessingEnabled(true);
1011       this.serverManager.processQueuedDeadServers();
1012     }
1013
1014     if (waitForMeta) {
1015       metaTableLocator.waitMetaRegionLocation(this.getZooKeeper());
1016     }
1017   }
1018
1019   /**
1020    * This function returns a set of region server names under hbase:meta recovering region ZK node
1021    * @return Set of meta server names which were recorded in ZK
1022    */
1023   private Set<ServerName> getPreviouselyFailedMetaServersFromZK() throws KeeperException {
1024     Set<ServerName> result = new HashSet<ServerName>();
1025     String metaRecoveringZNode = ZKUtil.joinZNode(zooKeeper.recoveringRegionsZNode,
1026       HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
1027     List<String> regionFailedServers = ZKUtil.listChildrenNoWatch(zooKeeper, metaRecoveringZNode);
1028     if (regionFailedServers == null) return result;
1029
1030     for(String failedServer : regionFailedServers) {
1031       ServerName server = ServerName.parseServerName(failedServer);
1032       result.add(server);
1033     }
1034     return result;
1035   }
1036
1037   @Override
1038   public TableDescriptors getTableDescriptors() {
1039     return this.tableDescriptors;
1040   }
1041
1042   @Override
1043   public ServerManager getServerManager() {
1044     return this.serverManager;
1045   }
1046
1047   @Override
1048   public MasterFileSystem getMasterFileSystem() {
1049     return this.fileSystemManager;
1050   }
1051
1052   @Override
1053   public MasterWalManager getMasterWalManager() {
1054     return this.walManager;
1055   }
1056
1057   @Override
1058   public TableStateManager getTableStateManager() {
1059     return tableStateManager;
1060   }
1061
1062   /*
1063    * Start up all services. If any of these threads gets an unhandled exception
1064    * then they just die with a logged message.  This should be fine because
1065    * in general, we do not expect the master to get such unhandled exceptions
1066    *  as OOMEs; it should be lightly loaded. See what HRegionServer does if
1067    *  need to install an unexpected exception handler.
1068    */
1069   private void startServiceThreads() throws IOException{
1070    // Start the executor service pools
1071    this.service.startExecutorService(ExecutorType.MASTER_OPEN_REGION,
1072       conf.getInt("hbase.master.executor.openregion.threads", 5));
1073    this.service.startExecutorService(ExecutorType.MASTER_CLOSE_REGION,
1074       conf.getInt("hbase.master.executor.closeregion.threads", 5));
1075    this.service.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
1076       conf.getInt("hbase.master.executor.serverops.threads", 5));
1077    this.service.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS,
1078       conf.getInt("hbase.master.executor.meta.serverops.threads", 5));
1079    this.service.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS,
1080       conf.getInt("hbase.master.executor.logreplayops.threads", 10));
1081
1082    // We depend on there being only one instance of this executor running
1083    // at a time.  To do concurrency, would need fencing of enable/disable of
1084    // tables.
1085    // Any time changing this maxThreads to > 1, pls see the comment at
1086    // AccessController#postCompletedCreateTableAction
1087    this.service.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
1088    startProcedureExecutor();
1089
1090    // Start log cleaner thread
1091    int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 60 * 1000);
1092    this.logCleaner =
1093       new LogCleaner(cleanerInterval,
1094          this, conf, getMasterWalManager().getFileSystem(),
1095          getMasterWalManager().getOldLogDir());
1096     getChoreService().scheduleChore(logCleaner);
1097
1098    //start the hfile archive cleaner thread
1099     Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
1100     this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem()
1101         .getFileSystem(), archiveDir);
1102     getChoreService().scheduleChore(hfileCleaner);
1103     serviceStarted = true;
1104     if (LOG.isTraceEnabled()) {
1105       LOG.trace("Started service threads");
1106     }
1107   }
1108
1109   @Override
1110   protected void sendShutdownInterrupt() {
1111     super.sendShutdownInterrupt();
1112     stopProcedureExecutor();
1113   }
1114
1115   @Override
1116   protected void stopServiceThreads() {
1117     if (masterJettyServer != null) {
1118       LOG.info("Stopping master jetty server");
1119       try {
1120         masterJettyServer.stop();
1121       } catch (Exception e) {
1122         LOG.error("Failed to stop master jetty server", e);
1123       }
1124     }
1125     super.stopServiceThreads();
1126     stopChores();
1127
1128     // Wait for all the remaining region servers to report in IFF we were
1129     // running a cluster shutdown AND we were NOT aborting.
1130     if (!isAborted() && this.serverManager != null &&
1131         this.serverManager.isClusterShutdown()) {
1132       this.serverManager.letRegionServersShutdown();
1133     }
1134     if (LOG.isDebugEnabled()) {
1135       LOG.debug("Stopping service threads");
1136     }
1137     // Clean up and close up shop
1138     if (this.logCleaner != null) this.logCleaner.cancel(true);
1139     if (this.hfileCleaner != null) this.hfileCleaner.cancel(true);
1140     if (this.quotaManager != null) this.quotaManager.stop();
1141     if (this.activeMasterManager != null) this.activeMasterManager.stop();
1142     if (this.serverManager != null) this.serverManager.stop();
1143     if (this.assignmentManager != null) this.assignmentManager.stop();
1144     if (this.walManager != null) this.walManager.stop();
1145     if (this.fileSystemManager != null) this.fileSystemManager.stop();
1146     if (this.mpmHost != null) this.mpmHost.stop("server shutting down.");
1147   }
1148
1149   private void startProcedureExecutor() throws IOException {
1150     final MasterProcedureEnv procEnv = new MasterProcedureEnv(this);
1151     final Path logDir = new Path(fileSystemManager.getRootDir(),
1152         MasterProcedureConstants.MASTER_PROCEDURE_LOGDIR);
1153
1154     procedureStore = new WALProcedureStore(conf, fileSystemManager.getFileSystem(), logDir,
1155         new MasterProcedureEnv.WALStoreLeaseRecovery(this));
1156     procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this));
1157     procedureExecutor = new ProcedureExecutor(conf, procEnv, procedureStore,
1158         procEnv.getProcedureQueue());
1159
1160     final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS,
1161         Math.max(Runtime.getRuntime().availableProcessors(),
1162           MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS));
1163     final boolean abortOnCorruption = conf.getBoolean(
1164         MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION,
1165         MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION);
1166     procedureStore.start(numThreads);
1167     procedureExecutor.start(numThreads, abortOnCorruption);
1168   }
1169
1170   private void stopProcedureExecutor() {
1171     if (procedureExecutor != null) {
1172       procedureExecutor.stop();
1173     }
1174
1175     if (procedureStore != null) {
1176       procedureStore.stop(isAborted());
1177     }
1178   }
1179
1180   private void stopChores() {
1181     if (this.expiredMobFileCleanerChore != null) {
1182       this.expiredMobFileCleanerChore.cancel(true);
1183     }
1184     if (this.mobCompactChore != null) {
1185       this.mobCompactChore.cancel(true);
1186     }
1187     if (this.balancerChore != null) {
1188       this.balancerChore.cancel(true);
1189     }
1190     if (this.normalizerChore != null) {
1191       this.normalizerChore.cancel(true);
1192     }
1193     if (this.clusterStatusChore != null) {
1194       this.clusterStatusChore.cancel(true);
1195     }
1196     if (this.catalogJanitorChore != null) {
1197       this.catalogJanitorChore.cancel(true);
1198     }
1199     if (this.clusterStatusPublisherChore != null){
1200       clusterStatusPublisherChore.cancel(true);
1201     }
1202     if (this.mobCompactThread != null) {
1203       this.mobCompactThread.close();
1204     }
1205   }
1206
1207   /**
1208    * @return Get remote side's InetAddress
1209    */
1210   InetAddress getRemoteInetAddress(final int port,
1211       final long serverStartCode) throws UnknownHostException {
1212     // Do it out here in its own little method so can fake an address when
1213     // mocking up in tests.
1214     InetAddress ia = RpcServer.getRemoteIp();
1215
1216     // The call could be from the local regionserver,
1217     // in which case, there is no remote address.
1218     if (ia == null && serverStartCode == startcode) {
1219       InetSocketAddress isa = rpcServices.getSocketAddress();
1220       if (isa != null && isa.getPort() == port) {
1221         ia = isa.getAddress();
1222       }
1223     }
1224     return ia;
1225   }
1226
1227   /**
1228    * @return Maximum time we should run balancer for
1229    */
1230   private int getBalancerCutoffTime() {
1231     int balancerCutoffTime = getConfiguration().getInt("hbase.balancer.max.balancing", -1);
1232     if (balancerCutoffTime == -1) {
1233       // if cutoff time isn't set, defaulting it to period time
1234       int balancerPeriod = getConfiguration().getInt("hbase.balancer.period", 300000);
1235       balancerCutoffTime = balancerPeriod;
1236     }
1237     return balancerCutoffTime;
1238   }
1239
1240   public boolean balance() throws IOException {
1241     return balance(false);
1242   }
1243
1244   public boolean balance(boolean force) throws IOException {
1245     // if master not initialized, don't run balancer.
1246     if (!isInitialized()) {
1247       LOG.debug("Master has not been initialized, don't run balancer.");
1248       return false;
1249     }
1250     // Do this call outside of synchronized block.
1251     int maximumBalanceTime = getBalancerCutoffTime();
1252     synchronized (this.balancer) {
1253       // If balance not true, don't run balancer.
1254       if (!this.loadBalancerTracker.isBalancerOn()) return false;
1255       // Only allow one balance run at at time.
1256       if (this.assignmentManager.getRegionStates().isRegionsInTransition()) {
1257         Set<RegionState> regionsInTransition =
1258           this.assignmentManager.getRegionStates().getRegionsInTransition();
1259         // if hbase:meta region is in transition, result of assignment cannot be recorded
1260         // ignore the force flag in that case
1261         boolean metaInTransition = assignmentManager.getRegionStates().isMetaRegionInTransition();
1262         String prefix = force && !metaInTransition ? "R" : "Not r";
1263         LOG.debug(prefix + "unning balancer because " + regionsInTransition.size() +
1264           " region(s) in transition: " + org.apache.commons.lang.StringUtils.
1265             abbreviate(regionsInTransition.toString(), 256));
1266         if (!force || metaInTransition) return false;
1267       }
1268       if (this.serverManager.areDeadServersInProgress()) {
1269         LOG.debug("Not running balancer because processing dead regionserver(s): " +
1270           this.serverManager.getDeadServers());
1271         return false;
1272       }
1273
1274       if (this.cpHost != null) {
1275         try {
1276           if (this.cpHost.preBalance()) {
1277             LOG.debug("Coprocessor bypassing balancer request");
1278             return false;
1279           }
1280         } catch (IOException ioe) {
1281           LOG.error("Error invoking master coprocessor preBalance()", ioe);
1282           return false;
1283         }
1284       }
1285
1286       Map<TableName, Map<ServerName, List<HRegionInfo>>> assignmentsByTable =
1287         this.assignmentManager.getRegionStates().getAssignmentsByTable();
1288
1289       List<RegionPlan> plans = new ArrayList<RegionPlan>();
1290
1291       //Give the balancer the current cluster state.
1292       this.balancer.setClusterStatus(getClusterStatus());
1293       for (Entry<TableName, Map<ServerName, List<HRegionInfo>>> e : assignmentsByTable.entrySet()) {
1294         List<RegionPlan> partialPlans = this.balancer.balanceCluster(e.getKey(), e.getValue());
1295         if (partialPlans != null) plans.addAll(partialPlans);
1296       }
1297
1298       long cutoffTime = System.currentTimeMillis() + maximumBalanceTime;
1299       int rpCount = 0;  // number of RegionPlans balanced so far
1300       long totalRegPlanExecTime = 0;
1301       if (plans != null && !plans.isEmpty()) {
1302         for (RegionPlan plan: plans) {
1303           LOG.info("balance " + plan);
1304           long balStartTime = System.currentTimeMillis();
1305           //TODO: bulk assign
1306           this.assignmentManager.balance(plan);
1307           totalRegPlanExecTime += System.currentTimeMillis()-balStartTime;
1308           rpCount++;
1309           if (rpCount < plans.size() &&
1310               // if performing next balance exceeds cutoff time, exit the loop
1311               (System.currentTimeMillis() + (totalRegPlanExecTime / rpCount)) > cutoffTime) {
1312             //TODO: After balance, there should not be a cutoff time (keeping it as
1313             // a security net for now)
1314             LOG.debug("No more balancing till next balance run; maximumBalanceTime=" +
1315               maximumBalanceTime);
1316             break;
1317           }
1318         }
1319       }
1320       if (this.cpHost != null) {
1321         try {
1322           this.cpHost.postBalance(rpCount < plans.size() ? plans.subList(0, rpCount) : plans);
1323         } catch (IOException ioe) {
1324           // balancing already succeeded so don't change the result
1325           LOG.error("Error invoking master coprocessor postBalance()", ioe);
1326         }
1327       }
1328     }
1329     // If LoadBalancer did not generate any plans, it means the cluster is already balanced.
1330     // Return true indicating a success.
1331     return true;
1332   }
1333
1334   @VisibleForTesting
1335   public RegionNormalizer getRegionNormalizer() {
1336     return this.normalizer;
1337   }
1338
1339   /**
1340    * Perform normalization of cluster (invoked by {@link RegionNormalizerChore}).
1341    *
1342    * @return true if normalization step was performed successfully, false otherwise
1343    *    (specifically, if HMaster hasn't been initialized properly or normalization
1344    *    is globally disabled)
1345    */
1346   public boolean normalizeRegions() throws IOException {
1347     if (!isInitialized()) {
1348       LOG.debug("Master has not been initialized, don't run region normalizer.");
1349       return false;
1350     }
1351
1352     if (!this.regionNormalizerTracker.isNormalizerOn()) {
1353       LOG.debug("Region normalization is disabled, don't run region normalizer.");
1354       return false;
1355     }
1356
1357     synchronized (this.normalizer) {
1358       // Don't run the normalizer concurrently
1359       List<TableName> allEnabledTables = new ArrayList<>(
1360         this.tableStateManager.getTablesInStates(TableState.State.ENABLED));
1361
1362       Collections.shuffle(allEnabledTables);
1363
1364       for (TableName table : allEnabledTables) {
1365         TableDescriptor tblDesc = getTableDescriptors().getDescriptor(table);
1366         if (table.isSystemTable() || (tblDesc != null &&
1367             tblDesc.getHTableDescriptor() != null &&
1368             !tblDesc.getHTableDescriptor().isNormalizationEnabled())) {
1369           LOG.debug("Skipping normalization for table: " + table + ", as it's either system"
1370               + " table or doesn't have auto normalization turned on");
1371           continue;
1372         }
1373         List<NormalizationPlan> plans = this.normalizer.computePlanForTable(table);
1374         if (plans != null) {
1375           for (NormalizationPlan plan : plans) {
1376             plan.execute(clusterConnection.getAdmin());
1377             if (plan.getType() == PlanType.SPLIT) {
1378               splitPlanCount++;
1379             } else if (plan.getType() == PlanType.MERGE) {
1380               mergePlanCount++;
1381             }
1382           }
1383         }
1384       }
1385     }
1386     // If Region did not generate any plans, it means the cluster is already balanced.
1387     // Return true indicating a success.
1388     return true;
1389   }
1390
1391   /**
1392    * @return Client info for use as prefix on an audit log string; who did an action
1393    */
1394   String getClientIdAuditPrefix() {
1395     return "Client=" + RpcServer.getRequestUserName() + "/" + RpcServer.getRemoteAddress();
1396   }
1397
1398   /**
1399    * Switch for the background CatalogJanitor thread.
1400    * Used for testing.  The thread will continue to run.  It will just be a noop
1401    * if disabled.
1402    * @param b If false, the catalog janitor won't do anything.
1403    */
1404   public void setCatalogJanitorEnabled(final boolean b) {
1405     this.catalogJanitorChore.setEnabled(b);
1406   }
1407
1408   @Override
1409   public void dispatchMergingRegions(final HRegionInfo region_a,
1410       final HRegionInfo region_b, final boolean forcible, final User user) throws IOException {
1411     checkInitialized();
1412     this.service.submit(new DispatchMergingRegionHandler(this,
1413       this.catalogJanitorChore, region_a, region_b, forcible, user));
1414   }
1415
1416   void move(final byte[] encodedRegionName,
1417       final byte[] destServerName) throws HBaseIOException {
1418     RegionState regionState = assignmentManager.getRegionStates().
1419       getRegionState(Bytes.toString(encodedRegionName));
1420
1421     HRegionInfo hri;
1422     if (regionState != null) {
1423       hri = regionState.getRegion();
1424     } else {
1425       throw new UnknownRegionException(Bytes.toStringBinary(encodedRegionName));
1426     }
1427
1428     ServerName dest;
1429     if (destServerName == null || destServerName.length == 0) {
1430       LOG.info("Passed destination servername is null/empty so " +
1431         "choosing a server at random");
1432       final List<ServerName> destServers = this.serverManager.createDestinationServersList(
1433         regionState.getServerName());
1434       dest = balancer.randomAssignment(hri, destServers);
1435       if (dest == null) {
1436         LOG.debug("Unable to determine a plan to assign " + hri);
1437         return;
1438       }
1439     } else {
1440       ServerName candidate = ServerName.valueOf(Bytes.toString(destServerName));
1441       dest = balancer.randomAssignment(hri, Lists.newArrayList(candidate));
1442       if (dest == null) {
1443         LOG.debug("Unable to determine a plan to assign " + hri);
1444         return;
1445       }
1446       if (dest.equals(serverName) && balancer instanceof BaseLoadBalancer
1447           && !((BaseLoadBalancer)balancer).shouldBeOnMaster(hri)) {
1448         // To avoid unnecessary region moving later by balancer. Don't put user
1449         // regions on master. Regions on master could be put on other region
1450         // server intentionally by test however.
1451         LOG.debug("Skipping move of region " + hri.getRegionNameAsString()
1452           + " to avoid unnecessary region moving later by load balancer,"
1453           + " because it should not be on master");
1454         return;
1455       }
1456     }
1457
1458     if (dest.equals(regionState.getServerName())) {
1459       LOG.debug("Skipping move of region " + hri.getRegionNameAsString()
1460         + " because region already assigned to the same server " + dest + ".");
1461       return;
1462     }
1463
1464     // Now we can do the move
1465     RegionPlan rp = new RegionPlan(hri, regionState.getServerName(), dest);
1466
1467     try {
1468       checkInitialized();
1469       if (this.cpHost != null) {
1470         if (this.cpHost.preMove(hri, rp.getSource(), rp.getDestination())) {
1471           return;
1472         }
1473       }
1474       // warmup the region on the destination before initiating the move. this call
1475       // is synchronous and takes some time. doing it before the source region gets
1476       // closed
1477       serverManager.sendRegionWarmup(rp.getDestination(), hri);
1478
1479       LOG.info(getClientIdAuditPrefix() + " move " + rp + ", running balancer");
1480       this.assignmentManager.balance(rp);
1481       if (this.cpHost != null) {
1482         this.cpHost.postMove(hri, rp.getSource(), rp.getDestination());
1483       }
1484     } catch (IOException ioe) {
1485       if (ioe instanceof HBaseIOException) {
1486         throw (HBaseIOException)ioe;
1487       }
1488       throw new HBaseIOException(ioe);
1489     }
1490   }
1491
1492   @Override
1493   public long createTable(
1494       final HTableDescriptor hTableDescriptor,
1495       final byte [][] splitKeys,
1496       final long nonceGroup,
1497       final long nonce) throws IOException {
1498     if (isStopped()) {
1499       throw new MasterNotRunningException();
1500     }
1501     checkInitialized();
1502     String namespace = hTableDescriptor.getTableName().getNamespaceAsString();
1503     this.clusterSchemaService.getNamespace(namespace);
1504
1505     HRegionInfo[] newRegions = ModifyRegionUtils.createHRegionInfos(hTableDescriptor, splitKeys);
1506     checkInitialized();
1507     sanityCheckTableDescriptor(hTableDescriptor);
1508     if (cpHost != null) {
1509       cpHost.preCreateTable(hTableDescriptor, newRegions);
1510     }
1511     LOG.info(getClientIdAuditPrefix() + " create " + hTableDescriptor);
1512
1513     // TODO: We can handle/merge duplicate requests, and differentiate the case of
1514     //       TableExistsException by saying if the schema is the same or not.
1515     ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch();
1516     long procId = this.procedureExecutor.submitProcedure(
1517       new CreateTableProcedure(
1518         procedureExecutor.getEnvironment(), hTableDescriptor, newRegions, latch),
1519       nonceGroup,
1520       nonce);
1521     latch.await();
1522
1523     if (cpHost != null) {
1524       cpHost.postCreateTable(hTableDescriptor, newRegions);
1525     }
1526
1527     return procId;
1528   }
1529
1530   /**
1531    * Checks whether the table conforms to some sane limits, and configured
1532    * values (compression, etc) work. Throws an exception if something is wrong.
1533    * @throws IOException
1534    */
1535   private void sanityCheckTableDescriptor(final HTableDescriptor htd) throws IOException {
1536     final String CONF_KEY = "hbase.table.sanity.checks";
1537     boolean logWarn = false;
1538     if (!conf.getBoolean(CONF_KEY, true)) {
1539       logWarn = true;
1540     }
1541     String tableVal = htd.getConfigurationValue(CONF_KEY);
1542     if (tableVal != null && !Boolean.valueOf(tableVal)) {
1543       logWarn = true;
1544     }
1545
1546     // check max file size
1547     long maxFileSizeLowerLimit = 2 * 1024 * 1024L; // 2M is the default lower limit
1548     long maxFileSize = htd.getMaxFileSize();
1549     if (maxFileSize < 0) {
1550       maxFileSize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, maxFileSizeLowerLimit);
1551     }
1552     if (maxFileSize < conf.getLong("hbase.hregion.max.filesize.limit", maxFileSizeLowerLimit)) {
1553       String message = "MAX_FILESIZE for table descriptor or "
1554           + "\"hbase.hregion.max.filesize\" (" + maxFileSize
1555           + ") is too small, which might cause over splitting into unmanageable "
1556           + "number of regions.";
1557       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1558     }
1559
1560     // check flush size
1561     long flushSizeLowerLimit = 1024 * 1024L; // 1M is the default lower limit
1562     long flushSize = htd.getMemStoreFlushSize();
1563     if (flushSize < 0) {
1564       flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, flushSizeLowerLimit);
1565     }
1566     if (flushSize < conf.getLong("hbase.hregion.memstore.flush.size.limit", flushSizeLowerLimit)) {
1567       String message = "MEMSTORE_FLUSHSIZE for table descriptor or "
1568           + "\"hbase.hregion.memstore.flush.size\" ("+flushSize+") is too small, which might cause"
1569           + " very frequent flushing.";
1570       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1571     }
1572
1573     // check that coprocessors and other specified plugin classes can be loaded
1574     try {
1575       checkClassLoading(conf, htd);
1576     } catch (Exception ex) {
1577       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, ex.getMessage(), null);
1578     }
1579
1580     // check compression can be loaded
1581     try {
1582       checkCompression(htd);
1583     } catch (IOException e) {
1584       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, e.getMessage(), e);
1585     }
1586
1587     // check encryption can be loaded
1588     try {
1589       checkEncryption(conf, htd);
1590     } catch (IOException e) {
1591       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, e.getMessage(), e);
1592     }
1593     // Verify compaction policy
1594     try{
1595       checkCompactionPolicy(conf, htd);
1596     } catch(IOException e){
1597       warnOrThrowExceptionForFailure(false, CONF_KEY, e.getMessage(), e);
1598     }
1599     // check that we have at least 1 CF
1600     if (htd.getColumnFamilies().length == 0) {
1601       String message = "Table should have at least one column family.";
1602       warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1603     }
1604
1605     for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1606       if (hcd.getTimeToLive() <= 0) {
1607         String message = "TTL for column family " + hcd.getNameAsString() + " must be positive.";
1608         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1609       }
1610
1611       // check blockSize
1612       if (hcd.getBlocksize() < 1024 || hcd.getBlocksize() > 16 * 1024 * 1024) {
1613         String message = "Block size for column family " + hcd.getNameAsString()
1614             + "  must be between 1K and 16MB.";
1615         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1616       }
1617
1618       // check versions
1619       if (hcd.getMinVersions() < 0) {
1620         String message = "Min versions for column family " + hcd.getNameAsString()
1621           + "  must be positive.";
1622         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1623       }
1624       // max versions already being checked
1625
1626       // HBASE-13776 Setting illegal versions for HColumnDescriptor
1627       //  does not throw IllegalArgumentException
1628       // check minVersions <= maxVerions
1629       if (hcd.getMinVersions() > hcd.getMaxVersions()) {
1630         String message = "Min versions for column family " + hcd.getNameAsString()
1631             + " must be less than the Max versions.";
1632         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1633       }
1634
1635       // check replication scope
1636       if (hcd.getScope() < 0) {
1637         String message = "Replication scope for column family "
1638           + hcd.getNameAsString() + "  must be positive.";
1639         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1640       }
1641
1642       // check data replication factor, it can be 0(default value) when user has not explicitly
1643       // set the value, in this case we use default replication factor set in the file system.
1644       if (hcd.getDFSReplication() < 0) {
1645         String message = "HFile Replication for column family " + hcd.getNameAsString()
1646             + "  must be greater than zero.";
1647         warnOrThrowExceptionForFailure(logWarn, CONF_KEY, message, null);
1648       }
1649
1650       // TODO: should we check coprocessors and encryption ?
1651     }
1652   }
1653
1654   private void checkCompactionPolicy(Configuration conf, HTableDescriptor htd)
1655       throws IOException {
1656     // FIFO compaction has some requirements
1657     // Actually FCP ignores periodic major compactions
1658     String className =
1659         htd.getConfigurationValue(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY);
1660     if (className == null) {
1661       className =
1662           conf.get(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY,
1663             ExploringCompactionPolicy.class.getName());
1664     }
1665
1666     int blockingFileCount = HStore.DEFAULT_BLOCKING_STOREFILE_COUNT;
1667     String sv = htd.getConfigurationValue(HStore.BLOCKING_STOREFILES_KEY);
1668     if (sv != null) {
1669       blockingFileCount = Integer.parseInt(sv);
1670     } else {
1671       blockingFileCount = conf.getInt(HStore.BLOCKING_STOREFILES_KEY, blockingFileCount);
1672     }
1673
1674     for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1675       String compactionPolicy =
1676           hcd.getConfigurationValue(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY);
1677       if (compactionPolicy == null) {
1678         compactionPolicy = className;
1679       }
1680       if (!compactionPolicy.equals(FIFOCompactionPolicy.class.getName())) {
1681         continue;
1682       }
1683       // FIFOCompaction
1684       String message = null;
1685
1686       // 1. Check TTL
1687       if (hcd.getTimeToLive() == HColumnDescriptor.DEFAULT_TTL) {
1688         message = "Default TTL is not supported for FIFO compaction";
1689         throw new IOException(message);
1690       }
1691
1692       // 2. Check min versions
1693       if (hcd.getMinVersions() > 0) {
1694         message = "MIN_VERSION > 0 is not supported for FIFO compaction";
1695         throw new IOException(message);
1696       }
1697
1698       // 3. blocking file count
1699       String sbfc = htd.getConfigurationValue(HStore.BLOCKING_STOREFILES_KEY);
1700       if (sbfc != null) {
1701         blockingFileCount = Integer.parseInt(sbfc);
1702       }
1703       if (blockingFileCount < 1000) {
1704         message =
1705             "blocking file count '" + HStore.BLOCKING_STOREFILES_KEY + "' " + blockingFileCount
1706                 + " is below recommended minimum of 1000";
1707         throw new IOException(message);
1708       }
1709     }
1710   }
1711
1712   // HBASE-13350 - Helper method to log warning on sanity check failures if checks disabled.
1713   private static void warnOrThrowExceptionForFailure(boolean logWarn, String confKey,
1714       String message, Exception cause) throws IOException {
1715     if (!logWarn) {
1716       throw new DoNotRetryIOException(message + " Set " + confKey +
1717           " to false at conf or table descriptor if you want to bypass sanity checks", cause);
1718     }
1719     LOG.warn(message);
1720   }
1721
1722   private void startActiveMasterManager(int infoPort) throws KeeperException {
1723     String backupZNode = ZKUtil.joinZNode(
1724       zooKeeper.backupMasterAddressesZNode, serverName.toString());
1725     /*
1726     * Add a ZNode for ourselves in the backup master directory since we
1727     * may not become the active master. If so, we want the actual active
1728     * master to know we are backup masters, so that it won't assign
1729     * regions to us if so configured.
1730     *
1731     * If we become the active master later, ActiveMasterManager will delete
1732     * this node explicitly.  If we crash before then, ZooKeeper will delete
1733     * this node for us since it is ephemeral.
1734     */
1735     LOG.info("Adding backup master ZNode " + backupZNode);
1736     if (!MasterAddressTracker.setMasterAddress(zooKeeper, backupZNode,
1737         serverName, infoPort)) {
1738       LOG.warn("Failed create of " + backupZNode + " by " + serverName);
1739     }
1740
1741     activeMasterManager.setInfoPort(infoPort);
1742     // Start a thread to try to become the active master, so we won't block here
1743     Threads.setDaemonThreadRunning(new Thread(new Runnable() {
1744       @Override
1745       public void run() {
1746         int timeout = conf.getInt(HConstants.ZK_SESSION_TIMEOUT,
1747           HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
1748         // If we're a backup master, stall until a primary to writes his address
1749         if (conf.getBoolean(HConstants.MASTER_TYPE_BACKUP,
1750           HConstants.DEFAULT_MASTER_TYPE_BACKUP)) {
1751           LOG.debug("HMaster started in backup mode. "
1752             + "Stalling until master znode is written.");
1753           // This will only be a minute or so while the cluster starts up,
1754           // so don't worry about setting watches on the parent znode
1755           while (!activeMasterManager.hasActiveMaster()) {
1756             LOG.debug("Waiting for master address ZNode to be written "
1757               + "(Also watching cluster state node)");
1758             Threads.sleep(timeout);
1759           }
1760         }
1761         MonitoredTask status = TaskMonitor.get().createStatus("Master startup");
1762         status.setDescription("Master startup");
1763         try {
1764           if (activeMasterManager.blockUntilBecomingActiveMaster(timeout, status)) {
1765             finishActiveMasterInitialization(status);
1766           }
1767         } catch (Throwable t) {
1768           status.setStatus("Failed to become active: " + t.getMessage());
1769           LOG.fatal("Failed to become active master", t);
1770           // HBASE-5680: Likely hadoop23 vs hadoop 20.x/1.x incompatibility
1771           if (t instanceof NoClassDefFoundError &&
1772             t.getMessage()
1773               .contains("org/apache/hadoop/hdfs/protocol/HdfsConstants$SafeModeAction")) {
1774             // improved error message for this special case
1775             abort("HBase is having a problem with its Hadoop jars.  You may need to "
1776               + "recompile HBase against Hadoop version "
1777               + org.apache.hadoop.util.VersionInfo.getVersion()
1778               + " or change your hadoop jars to start properly", t);
1779           } else {
1780             abort("Unhandled exception. Starting shutdown.", t);
1781           }
1782         } finally {
1783           status.cleanup();
1784         }
1785       }
1786     }, getServerName().toShortString() + ".activeMasterManager"));
1787   }
1788
1789   private void checkCompression(final HTableDescriptor htd)
1790   throws IOException {
1791     if (!this.masterCheckCompression) return;
1792     for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1793       checkCompression(hcd);
1794     }
1795   }
1796
1797   private void checkCompression(final HColumnDescriptor hcd)
1798   throws IOException {
1799     if (!this.masterCheckCompression) return;
1800     CompressionTest.testCompression(hcd.getCompressionType());
1801     CompressionTest.testCompression(hcd.getCompactionCompressionType());
1802   }
1803
1804   private void checkEncryption(final Configuration conf, final HTableDescriptor htd)
1805   throws IOException {
1806     if (!this.masterCheckEncryption) return;
1807     for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1808       checkEncryption(conf, hcd);
1809     }
1810   }
1811
1812   private void checkEncryption(final Configuration conf, final HColumnDescriptor hcd)
1813   throws IOException {
1814     if (!this.masterCheckEncryption) return;
1815     EncryptionTest.testEncryption(conf, hcd.getEncryptionType(), hcd.getEncryptionKey());
1816   }
1817
1818   private void checkClassLoading(final Configuration conf, final HTableDescriptor htd)
1819   throws IOException {
1820     RegionSplitPolicy.getSplitPolicyClass(htd, conf);
1821     RegionCoprocessorHost.testTableCoprocessorAttrs(conf, htd);
1822   }
1823
1824   private static boolean isCatalogTable(final TableName tableName) {
1825     return tableName.equals(TableName.META_TABLE_NAME);
1826   }
1827
1828   @Override
1829   public long deleteTable(
1830       final TableName tableName,
1831       final long nonceGroup,
1832       final long nonce) throws IOException {
1833     checkInitialized();
1834     if (cpHost != null) {
1835       cpHost.preDeleteTable(tableName);
1836     }
1837     LOG.info(getClientIdAuditPrefix() + " delete " + tableName);
1838
1839     // TODO: We can handle/merge duplicate request
1840     ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch();
1841     long procId = this.procedureExecutor.submitProcedure(
1842       new DeleteTableProcedure(procedureExecutor.getEnvironment(), tableName, latch),
1843       nonceGroup,
1844       nonce);
1845     latch.await();
1846
1847     if (cpHost != null) {
1848       cpHost.postDeleteTable(tableName);
1849     }
1850
1851     return procId;
1852   }
1853
1854   @Override
1855   public long truncateTable(
1856       final TableName tableName,
1857       final boolean preserveSplits,
1858       final long nonceGroup,
1859       final long nonce) throws IOException {
1860     checkInitialized();
1861     if (cpHost != null) {
1862       cpHost.preTruncateTable(tableName);
1863     }
1864     LOG.info(getClientIdAuditPrefix() + " truncate " + tableName);
1865
1866     long procId = this.procedureExecutor.submitProcedure(
1867       new TruncateTableProcedure(procedureExecutor.getEnvironment(), tableName, preserveSplits),
1868       nonceGroup,
1869       nonce);
1870     ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
1871
1872     if (cpHost != null) {
1873       cpHost.postTruncateTable(tableName);
1874     }
1875     return procId;
1876   }
1877
1878   @Override
1879   public long addColumn(
1880       final TableName tableName,
1881       final HColumnDescriptor columnDescriptor,
1882       final long nonceGroup,
1883       final long nonce)
1884       throws IOException {
1885     checkInitialized();
1886     checkCompression(columnDescriptor);
1887     checkEncryption(conf, columnDescriptor);
1888     if (cpHost != null) {
1889       if (cpHost.preAddColumn(tableName, columnDescriptor)) {
1890         return -1;
1891       }
1892     }
1893     // Execute the operation synchronously - wait for the operation to complete before continuing.
1894     long procId = this.procedureExecutor.submitProcedure(
1895       new AddColumnFamilyProcedure(procedureExecutor.getEnvironment(), tableName, columnDescriptor),
1896       nonceGroup,
1897       nonce);
1898     ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
1899     if (cpHost != null) {
1900       cpHost.postAddColumn(tableName, columnDescriptor);
1901     }
1902     return procId;
1903   }
1904
1905   @Override
1906   public long modifyColumn(
1907       final TableName tableName,
1908       final HColumnDescriptor descriptor,
1909       final long nonceGroup,
1910       final long nonce)
1911       throws IOException {
1912     checkInitialized();
1913     checkCompression(descriptor);
1914     checkEncryption(conf, descriptor);
1915     if (cpHost != null) {
1916       if (cpHost.preModifyColumn(tableName, descriptor)) {
1917         return -1;
1918       }
1919     }
1920     LOG.info(getClientIdAuditPrefix() + " modify " + descriptor);
1921
1922     // Execute the operation synchronously - wait for the operation to complete before continuing.
1923     long procId = this.procedureExecutor.submitProcedure(
1924       new ModifyColumnFamilyProcedure(procedureExecutor.getEnvironment(), tableName, descriptor),
1925       nonceGroup,
1926       nonce);
1927     ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
1928
1929     if (cpHost != null) {
1930       cpHost.postModifyColumn(tableName, descriptor);
1931     }
1932     return procId;
1933   }
1934
1935   @Override
1936   public long deleteColumn(
1937       final TableName tableName,
1938       final byte[] columnName,
1939       final long nonceGroup,
1940       final long nonce)
1941       throws IOException {
1942     checkInitialized();
1943     if (cpHost != null) {
1944       if (cpHost.preDeleteColumn(tableName, columnName)) {
1945         return -1;
1946       }
1947     }
1948     LOG.info(getClientIdAuditPrefix() + " delete " + Bytes.toString(columnName));
1949
1950     // Execute the operation synchronously - wait for the operation to complete before continuing.
1951     long procId = this.procedureExecutor.submitProcedure(
1952       new DeleteColumnFamilyProcedure(procedureExecutor.getEnvironment(), tableName, columnName),
1953       nonceGroup,
1954       nonce);
1955     ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
1956
1957     if (cpHost != null) {
1958       cpHost.postDeleteColumn(tableName, columnName);
1959     }
1960     return procId;
1961   }
1962
1963   @Override
1964   public long enableTable(
1965       final TableName tableName,
1966       final long nonceGroup,
1967       final long nonce) throws IOException {
1968     checkInitialized();
1969     if (cpHost != null) {
1970       cpHost.preEnableTable(tableName);
1971     }
1972     LOG.info(getClientIdAuditPrefix() + " enable " + tableName);
1973
1974     // Execute the operation asynchronously - client will check the progress of the operation
1975     final ProcedurePrepareLatch prepareLatch = ProcedurePrepareLatch.createLatch();
1976     long procId = this.procedureExecutor.submitProcedure(
1977       new EnableTableProcedure(procedureExecutor.getEnvironment(), tableName, false, prepareLatch),
1978       nonceGroup,
1979       nonce);
1980     // Before returning to client, we want to make sure that the table is prepared to be
1981     // enabled (the table is locked and the table state is set).
1982     //
1983     // Note: if the procedure throws exception, we will catch it and rethrow.
1984     prepareLatch.await();
1985
1986     if (cpHost != null) {
1987       cpHost.postEnableTable(tableName);
1988     }
1989
1990     return procId;
1991   }
1992
1993   @Override
1994   public long disableTable(
1995       final TableName tableName,
1996       final long nonceGroup,
1997       final long nonce) throws IOException {
1998     checkInitialized();
1999     if (cpHost != null) {
2000       cpHost.preDisableTable(tableName);
2001     }
2002     LOG.info(getClientIdAuditPrefix() + " disable " + tableName);
2003
2004     // Execute the operation asynchronously - client will check the progress of the operation
2005     final ProcedurePrepareLatch prepareLatch = ProcedurePrepareLatch.createLatch();
2006     // Execute the operation asynchronously - client will check the progress of the operation
2007     long procId = this.procedureExecutor.submitProcedure(
2008       new DisableTableProcedure(procedureExecutor.getEnvironment(), tableName, false, prepareLatch),
2009       nonceGroup,
2010       nonce);
2011     // Before returning to client, we want to make sure that the table is prepared to be
2012     // enabled (the table is locked and the table state is set).
2013     //
2014     // Note: if the procedure throws exception, we will catch it and rethrow.
2015     prepareLatch.await();
2016
2017     if (cpHost != null) {
2018       cpHost.postDisableTable(tableName);
2019     }
2020
2021     return procId;
2022   }
2023
2024   /**
2025    * Return the region and current deployment for the region containing
2026    * the given row. If the region cannot be found, returns null. If it
2027    * is found, but not currently deployed, the second element of the pair
2028    * may be null.
2029    */
2030   @VisibleForTesting // Used by TestMaster.
2031   Pair<HRegionInfo, ServerName> getTableRegionForRow(
2032       final TableName tableName, final byte [] rowKey)
2033   throws IOException {
2034     final AtomicReference<Pair<HRegionInfo, ServerName>> result =
2035       new AtomicReference<Pair<HRegionInfo, ServerName>>(null);
2036
2037     MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() {
2038         @Override
2039         public boolean visit(Result data) throws IOException {
2040           if (data == null || data.size() <= 0) {
2041             return true;
2042           }
2043           Pair<HRegionInfo, ServerName> pair =
2044               new Pair(MetaTableAccessor.getHRegionInfo(data),
2045                   MetaTableAccessor.getServerName(data,0));
2046           if (pair == null) {
2047             return false;
2048           }
2049           if (!pair.getFirst().getTable().equals(tableName)) {
2050             return false;
2051           }
2052           result.set(pair);
2053           return true;
2054         }
2055     };
2056
2057     MetaTableAccessor.scanMeta(clusterConnection, visitor, tableName, rowKey, 1);
2058     return result.get();
2059   }
2060
2061   @Override
2062   public long modifyTable(
2063       final TableName tableName,
2064       final HTableDescriptor descriptor,
2065       final long nonceGroup,
2066       final long nonce)
2067       throws IOException {
2068     checkInitialized();
2069     sanityCheckTableDescriptor(descriptor);
2070     if (cpHost != null) {
2071       cpHost.preModifyTable(tableName, descriptor);
2072     }
2073
2074     LOG.info(getClientIdAuditPrefix() + " modify " + tableName);
2075
2076     // Execute the operation synchronously - wait for the operation completes before continuing.
2077     long procId = this.procedureExecutor.submitProcedure(
2078       new ModifyTableProcedure(procedureExecutor.getEnvironment(), descriptor),
2079       nonceGroup,
2080       nonce);
2081
2082     ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
2083
2084     if (cpHost != null) {
2085       cpHost.postModifyTable(tableName, descriptor);
2086     }
2087
2088     return procId;
2089   }
2090
2091   @Override
2092   public void checkTableModifiable(final TableName tableName)
2093       throws IOException, TableNotFoundException, TableNotDisabledException {
2094     if (isCatalogTable(tableName)) {
2095       throw new IOException("Can't modify catalog tables");
2096     }
2097     if (!MetaTableAccessor.tableExists(getConnection(), tableName)) {
2098       throw new TableNotFoundException(tableName);
2099     }
2100     if (!getTableStateManager().isTableState(tableName, TableState.State.DISABLED)) {
2101       throw new TableNotDisabledException(tableName);
2102     }
2103   }
2104
2105   /**
2106    * @return cluster status
2107    */
2108   public ClusterStatus getClusterStatus() throws InterruptedIOException {
2109     // Build Set of backup masters from ZK nodes
2110     List<String> backupMasterStrings;
2111     try {
2112       backupMasterStrings = ZKUtil.listChildrenNoWatch(this.zooKeeper,
2113         this.zooKeeper.backupMasterAddressesZNode);
2114     } catch (KeeperException e) {
2115       LOG.warn(this.zooKeeper.prefix("Unable to list backup servers"), e);
2116       backupMasterStrings = null;
2117     }
2118
2119     List<ServerName> backupMasters = null;
2120     if (backupMasterStrings != null && !backupMasterStrings.isEmpty()) {
2121       backupMasters = new ArrayList<ServerName>(backupMasterStrings.size());
2122       for (String s: backupMasterStrings) {
2123         try {
2124           byte [] bytes;
2125           try {
2126             bytes = ZKUtil.getData(this.zooKeeper, ZKUtil.joinZNode(
2127                 this.zooKeeper.backupMasterAddressesZNode, s));
2128           } catch (InterruptedException e) {
2129             throw new InterruptedIOException();
2130           }
2131           if (bytes != null) {
2132             ServerName sn;
2133             try {
2134               sn = ServerName.parseFrom(bytes);
2135             } catch (DeserializationException e) {
2136               LOG.warn("Failed parse, skipping registering backup server", e);
2137               continue;
2138             }
2139             backupMasters.add(sn);
2140           }
2141         } catch (KeeperException e) {
2142           LOG.warn(this.zooKeeper.prefix("Unable to get information about " +
2143                    "backup servers"), e);
2144         }
2145       }
2146       Collections.sort(backupMasters, new Comparator<ServerName>() {
2147         @Override
2148         public int compare(ServerName s1, ServerName s2) {
2149           return s1.getServerName().compareTo(s2.getServerName());
2150         }});
2151     }
2152
2153     String clusterId = fileSystemManager != null ?
2154       fileSystemManager.getClusterId().toString() : null;
2155     Set<RegionState> regionsInTransition = assignmentManager != null ?
2156       assignmentManager.getRegionStates().getRegionsInTransition() : null;
2157     String[] coprocessors = cpHost != null ? getMasterCoprocessors() : null;
2158     boolean balancerOn = loadBalancerTracker != null ?
2159       loadBalancerTracker.isBalancerOn() : false;
2160     Map<ServerName, ServerLoad> onlineServers = null;
2161     Set<ServerName> deadServers = null;
2162     if (serverManager != null) {
2163       deadServers = serverManager.getDeadServers().copyServerNames();
2164       onlineServers = serverManager.getOnlineServers();
2165     }
2166     return new ClusterStatus(VersionInfo.getVersion(), clusterId,
2167       onlineServers, deadServers, serverName, backupMasters,
2168       regionsInTransition, coprocessors, balancerOn);
2169   }
2170
2171   /**
2172    * The set of loaded coprocessors is stored in a static set. Since it's
2173    * statically allocated, it does not require that HMaster's cpHost be
2174    * initialized prior to accessing it.
2175    * @return a String representation of the set of names of the loaded coprocessors.
2176    */
2177   public static String getLoadedCoprocessors() {
2178     return CoprocessorHost.getLoadedCoprocessors().toString();
2179   }
2180
2181   /**
2182    * @return timestamp in millis when HMaster was started.
2183    */
2184   public long getMasterStartTime() {
2185     return startcode;
2186   }
2187
2188   /**
2189    * @return timestamp in millis when HMaster became the active master.
2190    */
2191   public long getMasterActiveTime() {
2192     return masterActiveTime;
2193   }
2194
2195   public int getNumWALFiles() {
2196     return procedureStore != null ? procedureStore.getActiveLogs().size() : 0;
2197   }
2198
2199   public WALProcedureStore getWalProcedureStore() {
2200     return procedureStore;
2201   }
2202 
2203   public int getRegionServerInfoPort(final ServerName sn) {
2204     RegionServerInfo info = this.regionServerTracker.getRegionServerInfo(sn);
2205     if (info == null || info.getInfoPort() == 0) {
2206       return conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
2207         HConstants.DEFAULT_REGIONSERVER_INFOPORT);
2208     }
2209     return info.getInfoPort();
2210   }
2211
2212   public String getRegionServerVersion(final ServerName sn) {
2213     RegionServerInfo info = this.regionServerTracker.getRegionServerInfo(sn);
2214     if (info != null && info.hasVersionInfo()) {
2215       return info.getVersionInfo().getVersion();
2216     }
2217     return "Unknown";
2218   }
2219
2220   /**
2221    * @return array of coprocessor SimpleNames.
2222    */
2223   public String[] getMasterCoprocessors() {
2224     Set<String> masterCoprocessors = getMasterCoprocessorHost().getCoprocessors();
2225     return masterCoprocessors.toArray(new String[masterCoprocessors.size()]);
2226   }
2227
2228   @Override
2229   public void abort(final String msg, final Throwable t) {
2230     if (isAborted() || isStopped()) {
2231       return;
2232     }
2233     if (cpHost != null) {
2234       // HBASE-4014: dump a list of loaded coprocessors.
2235       LOG.fatal("Master server abort: loaded coprocessors are: " +
2236           getLoadedCoprocessors());
2237     }
2238     if (t != null) LOG.fatal(msg, t);
2239     stopMaster();
2240   }
2241
2242   @Override
2243   public ZooKeeperWatcher getZooKeeper() {
2244     return zooKeeper;
2245   }
2246
2247   @Override
2248   public MasterCoprocessorHost getMasterCoprocessorHost() {
2249     return cpHost;
2250   }
2251
2252   @Override
2253   public MasterQuotaManager getMasterQuotaManager() {
2254     return quotaManager;
2255   }
2256
2257   @Override
2258   public ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
2259     return procedureExecutor;
2260   }
2261
2262   @Override
2263   public ServerName getServerName() {
2264     return this.serverName;
2265   }
2266 
2267   @Override
2268   public AssignmentManager getAssignmentManager() {
2269     return this.assignmentManager;
2270   }
2271
2272   public MemoryBoundedLogMessageBuffer getRegionServerFatalLogBuffer() {
2273     return rsFatals;
2274   }
2275 
2276   public void shutdown() {
2277     if (cpHost != null) {
2278       try {
2279         cpHost.preShutdown();
2280       } catch (IOException ioe) {
2281         LOG.error("Error call master coprocessor preShutdown()", ioe);
2282       }
2283     }
2284
2285     if (this.serverManager != null) {
2286       this.serverManager.shutdownCluster();
2287     }
2288     if (this.clusterStatusTracker != null){
2289       try {
2290         this.clusterStatusTracker.setClusterDown();
2291       } catch (KeeperException e) {
2292         LOG.error("ZooKeeper exception trying to set cluster as down in ZK", e);
2293       }
2294     }
2295   }
2296
2297   public void stopMaster() {
2298     if (cpHost != null) {
2299       try {
2300         cpHost.preStopMaster();
2301       } catch (IOException ioe) {
2302         LOG.error("Error call master coprocessor preStopMaster()", ioe);
2303       }
2304     }
2305     stop("Stopped by " + Thread.currentThread().getName());
2306   }
2307
2308   void checkServiceStarted() throws ServerNotRunningYetException {
2309     if (!serviceStarted) {
2310       throw new ServerNotRunningYetException("Server is not running yet");
2311     }
2312   }
2313
2314   void checkInitialized() throws PleaseHoldException, ServerNotRunningYetException {
2315     checkServiceStarted();
2316     if (!isInitialized()) throw new PleaseHoldException("Master is initializing");
2317   }
2318
2319   /**
2320    * Report whether this master is currently the active master or not.
2321    * If not active master, we are parked on ZK waiting to become active.
2322    *
2323    * This method is used for testing.
2324    *
2325    * @return true if active master, false if not.
2326    */
2327   public boolean isActiveMaster() {
2328     return isActiveMaster;
2329   }
2330
2331   /**
2332    * Report whether this master has completed with its initialization and is
2333    * ready.  If ready, the master is also the active master.  A standby master
2334    * is never ready.
2335    *
2336    * This method is used for testing.
2337    *
2338    * @return true if master is ready to go, false if not.
2339    */
2340   @Override
2341   public boolean isInitialized() {
2342     return initialized.isReady();
2343   }
2344 
2345   @VisibleForTesting
2346   public void setInitialized(boolean isInitialized) {
2347     procedureExecutor.getEnvironment().setEventReady(initialized, isInitialized);
2348   }
2349
2350   public ProcedureEvent getInitializedEvent() {
2351     return initialized;
2352   }
2353
2354   /**
2355    * ServerCrashProcessingEnabled is set false before completing assignMeta to prevent processing
2356    * of crashed servers.
2357    * @return true if assignMeta has completed;
2358    */
2359   @Override
2360   public boolean isServerCrashProcessingEnabled() {
2361     return serverCrashProcessingEnabled.isReady();
2362   }
2363 
2364   @VisibleForTesting
2365   public void setServerCrashProcessingEnabled(final boolean b) {
2366     procedureExecutor.getEnvironment().setEventReady(serverCrashProcessingEnabled, b);
2367   }
2368
2369   public ProcedureEvent getServerCrashProcessingEnabledEvent() {
2370     return serverCrashProcessingEnabled;
2371   }
2372
2373   /**
2374    * Report whether this master has started initialization and is about to do meta region assignment
2375    * @return true if master is in initialization &amp; about to assign hbase:meta regions
2376    */
2377   public boolean isInitializationStartsMetaRegionAssignment() {
2378     return this.initializationBeforeMetaAssignment;
2379   }
2380
2381   public void assignRegion(HRegionInfo hri) {
2382     assignmentManager.assign(hri);
2383   }
2384
2385   /**
2386    * Compute the average load across all region servers.
2387    * Currently, this uses a very naive computation - just uses the number of
2388    * regions being served, ignoring stats about number of requests.
2389    * @return the average load
2390    */
2391   public double getAverageLoad() {
2392     if (this.assignmentManager == null) {
2393       return 0;
2394     }
2395
2396     RegionStates regionStates = this.assignmentManager.getRegionStates();
2397     if (regionStates == null) {
2398       return 0;
2399     }
2400     return regionStates.getAverageLoad();
2401   }
2402
2403   /*
2404    * @return the count of region split plans executed
2405    */
2406   public long getSplitPlanCount() {
2407     return splitPlanCount;
2408   }
2409
2410   /*
2411    * @return the count of region merge plans executed
2412    */
2413   public long getMergePlanCount() {
2414     return mergePlanCount;
2415   }
2416
2417   @Override
2418   public boolean registerService(Service instance) {
2419     /*
2420      * No stacking of instances is allowed for a single service name
2421      */
2422     Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
2423     String serviceName = CoprocessorRpcUtils.getServiceName(serviceDesc);
2424     if (coprocessorServiceHandlers.containsKey(serviceName)) {
2425       LOG.error("Coprocessor service "+serviceName+
2426           " already registered, rejecting request from "+instance
2427       );
2428       return false;
2429     }
2430
2431     coprocessorServiceHandlers.put(serviceName, instance);
2432     if (LOG.isDebugEnabled()) {
2433       LOG.debug("Registered master coprocessor service: service="+serviceName);
2434     }
2435     return true;
2436   }
2437
2438   /**
2439    * Utility for constructing an instance of the passed HMaster class.
2440    * @param masterClass
2441    * @return HMaster instance.
2442    */
2443   public static HMaster constructMaster(Class<? extends HMaster> masterClass,
2444       final Configuration conf, final CoordinatedStateManager cp)  {
2445     try {
2446       Constructor<? extends HMaster> c =
2447         masterClass.getConstructor(Configuration.class, CoordinatedStateManager.class);
2448       return c.newInstance(conf, cp);
2449     } catch(Exception e) {
2450       Throwable error = e;
2451       if (e instanceof InvocationTargetException &&
2452           ((InvocationTargetException)e).getTargetException() != null) {
2453         error = ((InvocationTargetException)e).getTargetException();
2454       }
2455       throw new RuntimeException("Failed construction of Master: " + masterClass.toString() + ". "
2456         , error);
2457     }
2458   }
2459
2460   /**
2461    * @see org.apache.hadoop.hbase.master.HMasterCommandLine
2462    */
2463   public static void main(String [] args) {
2464     VersionInfo.logVersion();
2465     new HMasterCommandLine(HMaster.class).doMain(args);
2466   }
2467
2468   public HFileCleaner getHFileCleaner() {
2469     return this.hfileCleaner;
2470   }
2471
2472   /**
2473    * @return the underlying snapshot manager
2474    */
2475   public SnapshotManager getSnapshotManager() {
2476     return this.snapshotManager;
2477   }
2478
2479   /**
2480    * @return the underlying MasterProcedureManagerHost
2481    */
2482   public MasterProcedureManagerHost getMasterProcedureManagerHost() {
2483     return mpmHost;
2484   }
2485
2486   @Override
2487   public ClusterSchema getClusterSchema() {
2488     return this.clusterSchemaService;
2489   }
2490
2491   /**
2492    * Create a new Namespace.
2493    * @param namespaceDescriptor descriptor for new Namespace
2494    * @param nonceGroup Identifier for the source of the request, a client or process.
2495    * @param nonce A unique identifier for this operation from the client or process identified by
2496    * <code>nonceGroup</code> (the source must ensure each operation gets a unique id).
2497    * @return procedure id
2498    */
2499   long createNamespace(final NamespaceDescriptor namespaceDescriptor, final long nonceGroup,
2500       final long nonce)
2501   throws IOException {
2502     checkInitialized();
2503     TableName.isLegalNamespaceName(Bytes.toBytes(namespaceDescriptor.getName()));
2504     if (this.cpHost != null && this.cpHost.preCreateNamespace(namespaceDescriptor)) {
2505       throw new BypassCoprocessorException();
2506     }
2507     LOG.info(getClientIdAuditPrefix() + " creating " + namespaceDescriptor);
2508     // Execute the operation synchronously - wait for the operation to complete before continuing.
2509     long procId = getClusterSchema().createNamespace(namespaceDescriptor, nonceGroup, nonce);
2510     if (this.cpHost != null) this.cpHost.postCreateNamespace(namespaceDescriptor);
2511     return procId;
2512   }
2513
2514   /**
2515    * Modify an existing Namespace.
2516    * @param nonceGroup Identifier for the source of the request, a client or process.
2517    * @param nonce A unique identifier for this operation from the client or process identified by
2518    * <code>nonceGroup</code> (the source must ensure each operation gets a unique id).
2519    * @return procedure id
2520    */
2521   long modifyNamespace(final NamespaceDescriptor namespaceDescriptor, final long nonceGroup,
2522       final long nonce)
2523   throws IOException {
2524     checkInitialized();
2525     TableName.isLegalNamespaceName(Bytes.toBytes(namespaceDescriptor.getName()));
2526     if (this.cpHost != null && this.cpHost.preModifyNamespace(namespaceDescriptor)) {
2527       throw new BypassCoprocessorException();
2528     }
2529     LOG.info(getClientIdAuditPrefix() + " modify " + namespaceDescriptor);
2530     // Execute the operation synchronously - wait for the operation to complete before continuing.
2531     long procId = getClusterSchema().modifyNamespace(namespaceDescriptor, nonceGroup, nonce);
2532     if (this.cpHost != null) this.cpHost.postModifyNamespace(namespaceDescriptor);
2533     return procId;
2534   }
2535
2536   /**
2537    * Delete an existing Namespace. Only empty Namespaces (no tables) can be removed.
2538    * @param nonceGroup Identifier for the source of the request, a client or process.
2539    * @param nonce A unique identifier for this operation from the client or process identified by
2540    * <code>nonceGroup</code> (the source must ensure each operation gets a unique id).
2541    * @return procedure id
2542    */
2543   long deleteNamespace(final String name, final long nonceGroup, final long nonce)
2544   throws IOException {
2545     checkInitialized();
2546     if (this.cpHost != null && this.cpHost.preDeleteNamespace(name)) {
2547       throw new BypassCoprocessorException();
2548     }
2549     LOG.info(getClientIdAuditPrefix() + " delete " + name);
2550     // Execute the operation synchronously - wait for the operation to complete before continuing.
2551     long procId = getClusterSchema().deleteNamespace(name, nonceGroup, nonce);
2552     if (this.cpHost != null) this.cpHost.postDeleteNamespace(name);
2553     return procId;
2554   }
2555
2556   /**
2557    * Get a Namespace
2558    * @param name Name of the Namespace
2559    * @return Namespace descriptor for <code>name</code>
2560    */
2561   NamespaceDescriptor getNamespace(String name) throws IOException {
2562     checkInitialized();
2563     if (this.cpHost != null) this.cpHost.preGetNamespaceDescriptor(name);
2564     NamespaceDescriptor nsd = this.clusterSchemaService.getNamespace(name);
2565     if (this.cpHost != null) this.cpHost.postGetNamespaceDescriptor(nsd);
2566     return nsd;
2567   }
2568
2569   /**
2570    * Get all Namespaces
2571    * @return All Namespace descriptors
2572    */
2573   List<NamespaceDescriptor> getNamespaces() throws IOException {
2574     checkInitialized();
2575     final List<NamespaceDescriptor> nsds = new ArrayList<NamespaceDescriptor>();
2576     boolean bypass = false;
2577     if (cpHost != null) {
2578       bypass = cpHost.preListNamespaceDescriptors(nsds);
2579     }
2580     if (!bypass) {
2581       nsds.addAll(this.clusterSchemaService.getNamespaces());
2582       if (this.cpHost != null) this.cpHost.postListNamespaceDescriptors(nsds);
2583     }
2584     return nsds;
2585   }
2586
2587   @Override
2588   public List<TableName> listTableNamesByNamespace(String name) throws IOException {
2589     checkInitialized();
2590     return listTableNames(name, null, true);
2591   }
2592
2593   @Override
2594   public List<HTableDescriptor> listTableDescriptorsByNamespace(String name) throws IOException {
2595     checkInitialized();
2596     return listTableDescriptors(name, null, null, true);
2597   }
2598
2599   @Override
2600   public boolean abortProcedure(final long procId, final boolean mayInterruptIfRunning)
2601       throws IOException {
2602     if (cpHost != null) {
2603       cpHost.preAbortProcedure(this.procedureExecutor, procId);
2604     }
2605
2606     final boolean result = this.procedureExecutor.abort(procId, mayInterruptIfRunning);
2607
2608     if (cpHost != null) {
2609       cpHost.postAbortProcedure();
2610     }
2611
2612     return result;
2613   }
2614
2615   @Override
2616   public List<ProcedureInfo> listProcedures() throws IOException {
2617     if (cpHost != null) {
2618       cpHost.preListProcedures();
2619     }
2620
2621     final List<ProcedureInfo> procInfoList = this.procedureExecutor.listProcedures();
2622
2623     if (cpHost != null) {
2624       cpHost.postListProcedures(procInfoList);
2625     }
2626
2627     return procInfoList;
2628   }
2629
2630   /**
2631    * Returns the list of table descriptors that match the specified request
2632    * @param namespace the namespace to query, or null if querying for all
2633    * @param regex The regular expression to match against, or null if querying for all
2634    * @param tableNameList the list of table names, or null if querying for all
2635    * @param includeSysTables False to match only against userspace tables
2636    * @return the list of table descriptors
2637    */
2638   public List<HTableDescriptor> listTableDescriptors(final String namespace, final String regex,
2639       final List<TableName> tableNameList, final boolean includeSysTables)
2640   throws IOException {
2641     List<HTableDescriptor> htds = new ArrayList<HTableDescriptor>();
2642     boolean bypass = cpHost != null?
2643         cpHost.preGetTableDescriptors(tableNameList, htds, regex): false;
2644     if (!bypass) {
2645       htds = getTableDescriptors(htds, namespace, regex, tableNameList, includeSysTables);
2646       if (cpHost != null) {
2647         cpHost.postGetTableDescriptors(tableNameList, htds, regex);
2648       }
2649     }
2650     return htds;
2651   }
2652
2653   /**
2654    * Returns the list of table names that match the specified request
2655    * @param regex The regular expression to match against, or null if querying for all
2656    * @param namespace the namespace to query, or null if querying for all
2657    * @param includeSysTables False to match only against userspace tables
2658    * @return the list of table names
2659    */
2660   public List<TableName> listTableNames(final String namespace, final String regex,
2661       final boolean includeSysTables) throws IOException {
2662     List<HTableDescriptor> htds = new ArrayList<HTableDescriptor>();
2663     boolean bypass = cpHost != null? cpHost.preGetTableNames(htds, regex): false;
2664     if (!bypass) {
2665       htds = getTableDescriptors(htds, namespace, regex, null, includeSysTables);
2666       if (cpHost != null) cpHost.postGetTableNames(htds, regex);
2667     }
2668     List<TableName> result = new ArrayList<TableName>(htds.size());
2669     for (HTableDescriptor htd: htds) result.add(htd.getTableName());
2670     return result;
2671   }
2672
2673   /**
2674    * @return list of table table descriptors after filtering by regex and whether to include system
2675    *    tables, etc.
2676    * @throws IOException
2677    */
2678   private List<HTableDescriptor> getTableDescriptors(final List<HTableDescriptor> htds,
2679       final String namespace, final String regex, final List<TableName> tableNameList,
2680       final boolean includeSysTables)
2681   throws IOException {
2682     if (tableNameList == null || tableNameList.size() == 0) {
2683       // request for all TableDescriptors
2684       Collection<HTableDescriptor> allHtds;
2685       if (namespace != null && namespace.length() > 0) {
2686         // Do a check on the namespace existence. Will fail if does not exist.
2687         this.clusterSchemaService.getNamespace(namespace);
2688         allHtds = tableDescriptors.getByNamespace(namespace).values();
2689       } else {
2690         allHtds = tableDescriptors.getAll().values();
2691       }
2692       for (HTableDescriptor desc: allHtds) {
2693         if (tableStateManager.isTablePresent(desc.getTableName())
2694             && (includeSysTables || !desc.getTableName().isSystemTable())) {
2695           htds.add(desc);
2696         }
2697       }
2698     } else {
2699       for (TableName s: tableNameList) {
2700         if (tableStateManager.isTablePresent(s)) {
2701           HTableDescriptor desc = tableDescriptors.get(s);
2702           if (desc != null) {
2703             htds.add(desc);
2704           }
2705         }
2706       }
2707     }
2708
2709     // Retains only those matched by regular expression.
2710     if (regex != null) filterTablesByRegex(htds, Pattern.compile(regex));
2711     return htds;
2712   }
2713
2714   /**
2715    * Removes the table descriptors that don't match the pattern.
2716    * @param descriptors list of table descriptors to filter
2717    * @param pattern the regex to use
2718    */
2719   private static void filterTablesByRegex(final Collection<HTableDescriptor> descriptors,
2720       final Pattern pattern) {
2721     final String defaultNS = NamespaceDescriptor.DEFAULT_NAMESPACE_NAME_STR;
2722     Iterator<HTableDescriptor> itr = descriptors.iterator();
2723     while (itr.hasNext()) {
2724       HTableDescriptor htd = itr.next();
2725       String tableName = htd.getTableName().getNameAsString();
2726       boolean matched = pattern.matcher(tableName).matches();
2727       if (!matched && htd.getTableName().getNamespaceAsString().equals(defaultNS)) {
2728         matched = pattern.matcher(defaultNS + TableName.NAMESPACE_DELIM + tableName).matches();
2729       }
2730       if (!matched) {
2731         itr.remove();
2732       }
2733     }
2734   }
2735 
2736   @Override
2737   public long getLastMajorCompactionTimestamp(TableName table) throws IOException {
2738     return getClusterStatus().getLastMajorCompactionTsForTable(table);
2739   }
2740
2741   @Override
2742   public long getLastMajorCompactionTimestampForRegion(byte[] regionName) throws IOException {
2743     return getClusterStatus().getLastMajorCompactionTsForRegion(regionName);
2744   }
2745
2746   /**
2747    * Gets the mob file compaction state for a specific table.
2748    * Whether all the mob files are selected is known during the compaction execution, but
2749    * the statistic is done just before compaction starts, it is hard to know the compaction
2750    * type at that time, so the rough statistics are chosen for the mob file compaction. Only two
2751    * compaction states are available, CompactionState.MAJOR_AND_MINOR and CompactionState.NONE.
2752    * @param tableName The current table name.
2753    * @return If a given table is in mob file compaction now.
2754    */
2755   public CompactionState getMobCompactionState(TableName tableName) {
2756     AtomicInteger compactionsCount = mobCompactionStates.get(tableName);
2757     if (compactionsCount != null && compactionsCount.get() != 0) {
2758       return CompactionState.MAJOR_AND_MINOR;
2759     }
2760     return CompactionState.NONE;
2761   }
2762
2763   public void reportMobCompactionStart(TableName tableName) throws IOException {
2764     IdLock.Entry lockEntry = null;
2765     try {
2766       lockEntry = mobCompactionLock.getLockEntry(tableName.hashCode());
2767       AtomicInteger compactionsCount = mobCompactionStates.get(tableName);
2768       if (compactionsCount == null) {
2769         compactionsCount = new AtomicInteger(0);
2770         mobCompactionStates.put(tableName, compactionsCount);
2771       }
2772       compactionsCount.incrementAndGet();
2773     } finally {
2774       if (lockEntry != null) {
2775         mobCompactionLock.releaseLockEntry(lockEntry);
2776       }
2777     }
2778   }
2779
2780   public void reportMobCompactionEnd(TableName tableName) throws IOException {
2781     IdLock.Entry lockEntry = null;
2782     try {
2783       lockEntry = mobCompactionLock.getLockEntry(tableName.hashCode());
2784       AtomicInteger compactionsCount = mobCompactionStates.get(tableName);
2785       if (compactionsCount != null) {
2786         int count = compactionsCount.decrementAndGet();
2787         // remove the entry if the count is 0.
2788         if (count == 0) {
2789           mobCompactionStates.remove(tableName);
2790         }
2791       }
2792     } finally {
2793       if (lockEntry != null) {
2794         mobCompactionLock.releaseLockEntry(lockEntry);
2795       }
2796     }
2797   }
2798
2799   /**
2800    * Requests mob compaction.
2801    * @param tableName The table the compact.
2802    * @param columns The compacted columns.
2803    * @param allFiles Whether add all mob files into the compaction.
2804    */
2805   public void requestMobCompaction(TableName tableName,
2806     List<HColumnDescriptor> columns, boolean allFiles) throws IOException {
2807     mobCompactThread.requestMobCompaction(conf, fs, tableName, columns,
2808       tableLockManager, allFiles);
2809   }
2810
2811   /**
2812    * Queries the state of the {@link LoadBalancerTracker}. If the balancer is not initialized,
2813    * false is returned.
2814    *
2815    * @return The state of the load balancer, or false if the load balancer isn't defined.
2816    */
2817   public boolean isBalancerOn() {
2818     if (null == loadBalancerTracker) return false;
2819     return loadBalancerTracker.isBalancerOn();
2820   }
2821
2822   /**
2823    * Queries the state of the {@link RegionNormalizerTracker}. If it's not initialized,
2824    * false is returned.
2825    */
2826   public boolean isNormalizerOn() {
2827     return null == regionNormalizerTracker? false: regionNormalizerTracker.isNormalizerOn();
2828   }
2829
2830
2831   /**
2832    * Queries the state of the {@link SplitOrMergeTracker}. If it is not initialized,
2833    * false is returned. If switchType is illegal, false will return.
2834    * @param switchType see {@link org.apache.hadoop.hbase.client.MasterSwitchType}
2835    * @return The state of the switch
2836    */
2837   public boolean isSplitOrMergeEnabled(MasterSwitchType switchType) {
2838     if (null == splitOrMergeTracker) {
2839       return false;
2840     }
2841     return splitOrMergeTracker.isSplitOrMergeEnabled(switchType);
2842   }
2843 
2844   /**
2845    * Fetch the configured {@link LoadBalancer} class name. If none is set, a default is returned.
2846    *
2847    * @return The name of the {@link LoadBalancer} in use.
2848    */
2849   public String getLoadBalancerClassName() {
2850     return conf.get(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, LoadBalancerFactory
2851         .getDefaultLoadBalancerClass().getName());
2852   }
2853
2854   /**
2855    * @return RegionNormalizerTracker instance
2856    */
2857   public RegionNormalizerTracker getRegionNormalizerTracker() {
2858     return regionNormalizerTracker;
2859   }
2860
2861   public SplitOrMergeTracker getSplitOrMergeTracker() {
2862     return splitOrMergeTracker;
2863   }
2864
2865   @Override
2866   public LoadBalancer getLoadBalancer() {
2867     return balancer;
2868   }
2869 }