1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import java.io.IOException;
22  import java.lang.reflect.Constructor;
23  import java.lang.reflect.InvocationTargetException;
24  import java.net.InetAddress;
25  import java.net.InetSocketAddress;
26  import java.net.UnknownHostException;
27  import java.util.ArrayList;
28  import java.util.Collections;
29  import java.util.Comparator;
30  import java.util.HashSet;
31  import java.util.List;
32  import java.util.Map;
33  import java.util.Set;
34  import java.util.concurrent.Callable;
35  import java.util.concurrent.ExecutionException;
36  import java.util.concurrent.Executors;
37  import java.util.concurrent.Future;
38  import java.util.concurrent.TimeUnit;
39  import java.util.concurrent.atomic.AtomicReference;
40  
41  import javax.management.ObjectName;
42  
43  import org.apache.commons.logging.Log;
44  import org.apache.commons.logging.LogFactory;
45  import org.apache.hadoop.classification.InterfaceAudience;
46  import org.apache.hadoop.conf.Configuration;
47  import org.apache.hadoop.fs.Path;
48  import org.apache.hadoop.hbase.Abortable;
49  import org.apache.hadoop.hbase.Chore;
50  import org.apache.hadoop.hbase.ClusterId;
51  import org.apache.hadoop.hbase.ClusterStatus;
52  import org.apache.hadoop.hbase.HBaseIOException;
53  import org.apache.hadoop.hbase.HColumnDescriptor;
54  import org.apache.hadoop.hbase.HConstants;
55  import org.apache.hadoop.hbase.HRegionInfo;
56  import org.apache.hadoop.hbase.HTableDescriptor;
57  import org.apache.hadoop.hbase.HealthCheckChore;
58  import org.apache.hadoop.hbase.Server;
59  import org.apache.hadoop.hbase.ServerLoad;
60  import org.apache.hadoop.hbase.ServerName;
61  import org.apache.hadoop.hbase.TableDescriptors;
62  import org.apache.hadoop.hbase.catalog.CatalogTracker;
63  import org.apache.hadoop.hbase.catalog.MetaReader;
64  import org.apache.hadoop.hbase.client.HConnectionManager;
65  import org.apache.hadoop.hbase.client.MetaScanner;
66  import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
67  import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
68  import org.apache.hadoop.hbase.client.Result;
69  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
70  import org.apache.hadoop.hbase.exceptions.DeserializationException;
71  import org.apache.hadoop.hbase.exceptions.MasterNotRunningException;
72  import org.apache.hadoop.hbase.exceptions.NotAllMetaRegionsOnlineException;
73  import org.apache.hadoop.hbase.exceptions.PleaseHoldException;
74  import org.apache.hadoop.hbase.exceptions.TableNotDisabledException;
75  import org.apache.hadoop.hbase.exceptions.TableNotFoundException;
76  import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
77  import org.apache.hadoop.hbase.exceptions.UnknownRegionException;
78  import org.apache.hadoop.hbase.executor.ExecutorService;
79  import org.apache.hadoop.hbase.executor.ExecutorType;
80  import org.apache.hadoop.hbase.ipc.RpcServerInterface;
81  import org.apache.hadoop.hbase.ipc.RpcServer;
82  import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
83  import org.apache.hadoop.hbase.ipc.ServerRpcController;
84  import org.apache.hadoop.hbase.master.balancer.BalancerChore;
85  import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
86  import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
87  import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
88  import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
89  import org.apache.hadoop.hbase.master.handler.CreateTableHandler;
90  import org.apache.hadoop.hbase.master.handler.DeleteTableHandler;
91  import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
92  import org.apache.hadoop.hbase.master.handler.DispatchMergingRegionHandler;
93  import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
94  import org.apache.hadoop.hbase.master.handler.ModifyTableHandler;
95  import org.apache.hadoop.hbase.master.handler.TableAddFamilyHandler;
96  import org.apache.hadoop.hbase.master.handler.TableDeleteFamilyHandler;
97  import org.apache.hadoop.hbase.master.handler.TableModifyFamilyHandler;
98  import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
99  import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
100 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
101 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
102 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
103 import org.apache.hadoop.hbase.protobuf.RequestConverter;
104 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
105 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
106 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
107 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
108 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
109 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
110 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos;
111 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AddColumnRequest;
112 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AddColumnResponse;
113 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AssignRegionRequest;
114 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AssignRegionResponse;
115 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.BalanceRequest;
116 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.BalanceResponse;
117 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CatalogScanRequest;
118 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CatalogScanResponse;
119 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CreateTableRequest;
120 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CreateTableResponse;
121 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteColumnRequest;
122 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteColumnResponse;
123 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteSnapshotRequest;
124 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteSnapshotResponse;
125 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteTableRequest;
126 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteTableResponse;
127 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DisableTableRequest;
128 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DisableTableResponse;
129 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DispatchMergingRegionsRequest;
130 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DispatchMergingRegionsResponse;
131 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableCatalogJanitorRequest;
132 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableCatalogJanitorResponse;
133 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableTableRequest;
134 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableTableResponse;
135 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledRequest;
136 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledResponse;
137 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsRestoreSnapshotDoneRequest;
138 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsRestoreSnapshotDoneResponse;
139 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsSnapshotDoneRequest;
140 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsSnapshotDoneResponse;
141 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ListSnapshotRequest;
142 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ListSnapshotResponse;
143 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyColumnRequest;
144 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyColumnResponse;
145 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyTableRequest;
146 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyTableResponse;
147 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.MoveRegionRequest;
148 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.MoveRegionResponse;
149 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.OfflineRegionRequest;
150 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.OfflineRegionResponse;
151 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.RestoreSnapshotRequest;
152 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.RestoreSnapshotResponse;
153 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.SetBalancerRunningRequest;
154 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.SetBalancerRunningResponse;
155 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ShutdownRequest;
156 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ShutdownResponse;
157 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.StopMasterRequest;
158 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.StopMasterResponse;
159 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.TakeSnapshotRequest;
160 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.TakeSnapshotResponse;
161 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.UnassignRegionRequest;
162 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.UnassignRegionResponse;
163 import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos;
164 import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetClusterStatusRequest;
165 import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetClusterStatusResponse;
166 import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetSchemaAlterStatusRequest;
167 import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetSchemaAlterStatusResponse;
168 import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetTableDescriptorsRequest;
169 import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetTableDescriptorsResponse;
170 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest;
171 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningResponse;
172 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos;
173 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
174 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
175 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
176 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportResponse;
177 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
178 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
179 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
180 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorResponse;
181 import org.apache.hadoop.hbase.replication.regionserver.Replication;
182 import org.apache.hadoop.hbase.security.User;
183 import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
184 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
185 import org.apache.hadoop.hbase.trace.SpanReceiverHost;
186 import org.apache.hadoop.hbase.util.Bytes;
187 import org.apache.hadoop.hbase.util.CompressionTest;
188 import org.apache.hadoop.hbase.util.FSTableDescriptors;
189 import org.apache.hadoop.hbase.util.FSUtils;
190 import org.apache.hadoop.hbase.util.HFileArchiveUtil;
191 import org.apache.hadoop.hbase.util.HasThread;
192 import org.apache.hadoop.hbase.util.InfoServer;
193 import org.apache.hadoop.hbase.util.Pair;
194 import org.apache.hadoop.hbase.util.Sleeper;
195 import org.apache.hadoop.hbase.util.Strings;
196 import org.apache.hadoop.hbase.util.Threads;
197 import org.apache.hadoop.hbase.util.VersionInfo;
198 import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
199 import org.apache.hadoop.hbase.zookeeper.DrainingServerTracker;
200 import org.apache.hadoop.hbase.zookeeper.LoadBalancerTracker;
201 import org.apache.hadoop.hbase.zookeeper.RegionServerTracker;
202 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
203 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
204 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
205 import org.apache.hadoop.metrics.util.MBeanUtil;
206 import org.apache.hadoop.net.DNS;
207 import org.apache.zookeeper.KeeperException;
208 import org.apache.zookeeper.Watcher;
209 
210 import com.google.common.collect.Maps;
211 import com.google.protobuf.Descriptors;
212 import com.google.protobuf.Message;
213 import com.google.protobuf.RpcCallback;
214 import com.google.protobuf.RpcController;
215 import com.google.protobuf.Service;
216 import com.google.protobuf.ServiceException;
217 
218 /**
219  * HMaster is the "master server" for HBase. An HBase cluster has one active
220  * master.  If many masters are started, all compete.  Whichever wins goes on to
221  * run the cluster.  All others park themselves in their constructor until
222  * master or cluster shutdown or until the active master loses its lease in
223  * zookeeper.  Thereafter, all running master jostle to take over master role.
224  *
225  * <p>The Master can be asked shutdown the cluster. See {@link #shutdown()}.  In
226  * this case it will tell all regionservers to go down and then wait on them
227  * all reporting in that they are down.  This master will then shut itself down.
228  *
229  * <p>You can also shutdown just this master.  Call {@link #stopMaster()}.
230  *
231  * @see Watcher
232  */
233 @InterfaceAudience.Private
234 @SuppressWarnings("deprecation")
235 public class HMaster extends HasThread
236 implements MasterMonitorProtos.MasterMonitorService.BlockingInterface,
237 MasterAdminProtos.MasterAdminService.BlockingInterface,
238 RegionServerStatusProtos.RegionServerStatusService.BlockingInterface,
239 MasterServices, Server {
240   private static final Log LOG = LogFactory.getLog(HMaster.class.getName());
241 
242   // MASTER is name of the webapp and the attribute name used stuffing this
243   //instance into web context.
244   public static final String MASTER = "master";
245 
246   // The configuration for the Master
247   private final Configuration conf;
248   // server for the web ui
249   private InfoServer infoServer;
250 
251   // Our zk client.
252   private ZooKeeperWatcher zooKeeper;
253   // Manager and zk listener for master election
254   private ActiveMasterManager activeMasterManager;
255   // Region server tracker
256   RegionServerTracker regionServerTracker;
257   // Draining region server tracker
258   private DrainingServerTracker drainingServerTracker;
259   // Tracker for load balancer state
260   private LoadBalancerTracker loadBalancerTracker;
261 
262   // RPC server for the HMaster
263   private final RpcServerInterface rpcServer;
264   // Set after we've called HBaseServer#openServer and ready to receive RPCs.
265   // Set back to false after we stop rpcServer.  Used by tests.
266   private volatile boolean rpcServerOpen = false;
267 
268   /**
269    * This servers address.
270    */
271   private final InetSocketAddress isa;
272 
273   // Metrics for the HMaster
274   private final MetricsMaster metricsMaster;
275   // file system manager for the master FS operations
276   private MasterFileSystem fileSystemManager;
277 
278   // server manager to deal with region server info
279   ServerManager serverManager;
280 
281   // manager of assignment nodes in zookeeper
282   AssignmentManager assignmentManager;
283   // manager of catalog regions
284   private CatalogTracker catalogTracker;
285   // Cluster status zk tracker and local setter
286   private ClusterStatusTracker clusterStatusTracker;
287 
288   // buffer for "fatal error" notices from region servers
289   // in the cluster. This is only used for assisting
290   // operations/debugging.
291   private MemoryBoundedLogMessageBuffer rsFatals;
292 
293   // This flag is for stopping this Master instance.  Its set when we are
294   // stopping or aborting
295   private volatile boolean stopped = false;
296   // Set on abort -- usually failure of our zk session.
297   private volatile boolean abort = false;
298   // flag set after we become the active master (used for testing)
299   private volatile boolean isActiveMaster = false;
300 
301   // flag set after we complete initialization once active,
302   // it is not private since it's used in unit tests
303   volatile boolean initialized = false;
304 
305   // flag set after we complete assignMeta.
306   private volatile boolean serverShutdownHandlerEnabled = false;
307 
308   // Instance of the hbase executor service.
309   ExecutorService executorService;
310 
311   private LoadBalancer balancer;
312   private Thread balancerChore;
313   private Thread clusterStatusChore;
314   private ClusterStatusPublisher clusterStatusPublisherChore = null;
315 
316   private CatalogJanitor catalogJanitorChore;
317   private LogCleaner logCleaner;
318   private HFileCleaner hfileCleaner;
319 
320   private MasterCoprocessorHost cpHost;
321   private final ServerName serverName;
322 
323   private TableDescriptors tableDescriptors;
324 
325   // Table level lock manager for schema changes
326   private TableLockManager tableLockManager;
327 
328   // Time stamps for when a hmaster was started and when it became active
329   private long masterStartTime;
330   private long masterActiveTime;
331 
332   /** time interval for emitting metrics values */
333   private final int msgInterval;
334   /**
335    * MX Bean for MasterInfo
336    */
337   private ObjectName mxBean = null;
338 
339   //should we check the compression codec type at master side, default true, HBASE-6370
340   private final boolean masterCheckCompression;
341 
342   private SpanReceiverHost spanReceiverHost;
343 
344   private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
345 
346   // monitor for snapshot of hbase tables
347   private SnapshotManager snapshotManager;
348 
349   /** The health check chore. */
350   private HealthCheckChore healthCheckChore;
351   
352   /**
353    * is in distributedLogReplay mode. When true, SplitLogWorker directly replays WAL edits to newly
354    * assigned region servers instead of creating recovered.edits files.
355    */
356   private final boolean distributedLogReplay;
357 
358   /** flag used in test cases in order to simulate RS failures during master initialization */
359   private volatile boolean initializationBeforeMetaAssignment = false;
360 
361   /**
362    * Initializes the HMaster. The steps are as follows:
363    * <p>
364    * <ol>
365    * <li>Initialize HMaster RPC and address
366    * <li>Connect to ZooKeeper.
367    * </ol>
368    * <p>
369    * Remaining steps of initialization occur in {@link #run()} so that they
370    * run in their own thread rather than within the context of the constructor.
371    * @throws InterruptedException
372    */
373   public HMaster(final Configuration conf)
374   throws IOException, KeeperException, InterruptedException {
375     this.conf = new Configuration(conf);
376     // Disable the block cache on the master
377     this.conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
378     // Server to handle client requests.
379     String hostname = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
380       conf.get("hbase.master.dns.interface", "default"),
381       conf.get("hbase.master.dns.nameserver", "default")));
382     int port = conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT);
383     // Test that the hostname is reachable
384     InetSocketAddress initialIsa = new InetSocketAddress(hostname, port);
385     if (initialIsa.getAddress() == null) {
386       throw new IllegalArgumentException("Failed resolve of hostname " + initialIsa);
387     }
388     // Verify that the bind address is reachable if set
389     String bindAddress = conf.get("hbase.master.ipc.address");
390     if (bindAddress != null) {
391       initialIsa = new InetSocketAddress(bindAddress, port);
392       if (initialIsa.getAddress() == null) {
393         throw new IllegalArgumentException("Failed resolve of bind address " + initialIsa);
394       }
395     }
396     String name = "master/" + initialIsa.toString();
397     // Set how many times to retry talking to another server over HConnection.
398     HConnectionManager.setServerSideHConnectionRetries(this.conf, name, LOG);
399     int numHandlers = conf.getInt("hbase.master.handler.count",
400       conf.getInt("hbase.regionserver.handler.count", 25));
401     this.rpcServer = new RpcServer(this, name, getServices(),
402       initialIsa, // BindAddress is IP we got for this server.
403       numHandlers,
404       0, // we dont use high priority handlers in master
405       conf,
406       0); // this is a DNC w/o high priority handlers
407     // Set our address.
408     this.isa = this.rpcServer.getListenerAddress();
409     this.serverName = new ServerName(hostname, this.isa.getPort(), System.currentTimeMillis());
410     this.rsFatals = new MemoryBoundedLogMessageBuffer(
411       conf.getLong("hbase.master.buffer.for.rs.fatals", 1*1024*1024));
412 
413     // login the zookeeper client principal (if using security)
414     ZKUtil.loginClient(this.conf, "hbase.zookeeper.client.keytab.file",
415       "hbase.zookeeper.client.kerberos.principal", this.isa.getHostName());
416 
417     // initialize server principal (if using secure Hadoop)
418     User.login(conf, "hbase.master.keytab.file",
419       "hbase.master.kerberos.principal", this.isa.getHostName());
420 
421     LOG.info("hbase.rootdir=" + FSUtils.getRootDir(this.conf) +
422         ", hbase.cluster.distributed=" + this.conf.getBoolean("hbase.cluster.distributed", false));
423 
424     // set the thread name now we have an address
425     setName(MASTER + "-" + this.serverName.toString());
426 
427     Replication.decorateMasterConfiguration(this.conf);
428 
429     // Hack! Maps DFSClient => Master for logs.  HDFS made this
430     // config param for task trackers, but we can piggyback off of it.
431     if (this.conf.get("mapred.task.id") == null) {
432       this.conf.set("mapred.task.id", "hb_m_" + this.serverName.toString());
433     }
434 
435     this.zooKeeper = new ZooKeeperWatcher(conf, MASTER + ":" + isa.getPort(), this, true);
436     this.rpcServer.startThreads();
437 
438     // metrics interval: using the same property as region server.
439     this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
440 
441     //should we check the compression codec type at master side, default true, HBASE-6370
442     this.masterCheckCompression = conf.getBoolean("hbase.master.check.compression", true);
443 
444     this.metricsMaster = new MetricsMaster( new MetricsMasterWrapperImpl(this));
445 
446     // Health checker thread.
447     int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
448       HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
449     if (isHealthCheckerConfigured()) {
450       healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
451     }
452 
453     // Do we publish the status?
454     Class<? extends ClusterStatusPublisher.Publisher> publisherClass =
455         conf.getClass(ClusterStatusPublisher.STATUS_PUBLISHER_CLASS,
456             ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS,
457             ClusterStatusPublisher.Publisher.class);
458 
459     if (publisherClass != null) {
460       clusterStatusPublisherChore = new ClusterStatusPublisher(this, conf, publisherClass);
461       Threads.setDaemonThreadRunning(clusterStatusPublisherChore.getThread());
462     }
463 
464     distributedLogReplay = this.conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, 
465       HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
466   }
467 
468   /**
469    * @return list of blocking services and their security info classes that this server supports
470    */
471   private List<BlockingServiceAndInterface> getServices() {
472     List<BlockingServiceAndInterface> bssi = new ArrayList<BlockingServiceAndInterface>(3);
473     bssi.add(new BlockingServiceAndInterface(
474         MasterMonitorProtos.MasterMonitorService.newReflectiveBlockingService(this),
475         MasterMonitorProtos.MasterMonitorService.BlockingInterface.class));
476     bssi.add(new BlockingServiceAndInterface(
477         MasterAdminProtos.MasterAdminService.newReflectiveBlockingService(this),
478         MasterAdminProtos.MasterAdminService.BlockingInterface.class));
479     bssi.add(new BlockingServiceAndInterface(
480         RegionServerStatusProtos.RegionServerStatusService.newReflectiveBlockingService(this),
481         RegionServerStatusProtos.RegionServerStatusService.BlockingInterface.class));
482     return bssi;
483   }
484 
485   /**
486    * Stall startup if we are designated a backup master; i.e. we want someone
487    * else to become the master before proceeding.
488    * @param c configuration
489    * @param amm
490    * @throws InterruptedException
491    */
492   private static void stallIfBackupMaster(final Configuration c,
493       final ActiveMasterManager amm)
494   throws InterruptedException {
495     // If we're a backup master, stall until a primary to writes his address
496     if (!c.getBoolean(HConstants.MASTER_TYPE_BACKUP,
497       HConstants.DEFAULT_MASTER_TYPE_BACKUP)) {
498       return;
499     }
500     LOG.debug("HMaster started in backup mode.  " +
501       "Stalling until master znode is written.");
502     // This will only be a minute or so while the cluster starts up,
503     // so don't worry about setting watches on the parent znode
504     while (!amm.isActiveMaster()) {
505       LOG.debug("Waiting for master address ZNode to be written " +
506         "(Also watching cluster state node)");
507       Thread.sleep(
508         c.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT));
509     }
510 
511   }
512 
513   MetricsMaster getMetrics() {
514     return metricsMaster;
515   }
516 
517   /**
518    * Main processing loop for the HMaster.
519    * <ol>
520    * <li>Block until becoming active master
521    * <li>Finish initialization via finishInitialization(MonitoredTask)
522    * <li>Enter loop until we are stopped
523    * <li>Stop services and perform cleanup once stopped
524    * </ol>
525    */
526   @Override
527   public void run() {
528     MonitoredTask startupStatus =
529       TaskMonitor.get().createStatus("Master startup");
530     startupStatus.setDescription("Master startup");
531     masterStartTime = System.currentTimeMillis();
532     try {
533       /*
534        * Block on becoming the active master.
535        *
536        * We race with other masters to write our address into ZooKeeper.  If we
537        * succeed, we are the primary/active master and finish initialization.
538        *
539        * If we do not succeed, there is another active master and we should
540        * now wait until it dies to try and become the next active master.  If we
541        * do not succeed on our first attempt, this is no longer a cluster startup.
542        */
543       becomeActiveMaster(startupStatus);
544 
545       // We are either the active master or we were asked to shutdown
546       if (!this.stopped) {
547         finishInitialization(startupStatus, false);
548         loop();
549       }
550     } catch (Throwable t) {
551       // HBASE-5680: Likely hadoop23 vs hadoop 20.x/1.x incompatibility
552       if (t instanceof NoClassDefFoundError &&
553           t.getMessage().contains("org/apache/hadoop/hdfs/protocol/FSConstants$SafeModeAction")) {
554           // improved error message for this special case
555           abort("HBase is having a problem with its Hadoop jars.  You may need to "
556               + "recompile HBase against Hadoop version "
557               +  org.apache.hadoop.util.VersionInfo.getVersion()
558               + " or change your hadoop jars to start properly", t);
559       } else {
560         abort("Unhandled exception. Starting shutdown.", t);
561       }
562     } finally {
563       startupStatus.cleanup();
564 
565       stopChores();
566       // Wait for all the remaining region servers to report in IFF we were
567       // running a cluster shutdown AND we were NOT aborting.
568       if (!this.abort && this.serverManager != null &&
569           this.serverManager.isClusterShutdown()) {
570         this.serverManager.letRegionServersShutdown();
571       }
572       stopServiceThreads();
573       // Stop services started for both backup and active masters
574       if (this.activeMasterManager != null) this.activeMasterManager.stop();
575       if (this.catalogTracker != null) this.catalogTracker.stop();
576       if (this.serverManager != null) this.serverManager.stop();
577       if (this.assignmentManager != null) this.assignmentManager.stop();
578       if (this.fileSystemManager != null) this.fileSystemManager.stop();
579       if (this.snapshotManager != null) this.snapshotManager.stop("server shutting down.");
580       this.zooKeeper.close();
581     }
582     LOG.info("HMaster main thread exiting");
583   }
584 
585   /**
586    * Try becoming active master.
587    * @param startupStatus
588    * @return True if we could successfully become the active master.
589    * @throws InterruptedException
590    */
591   private boolean becomeActiveMaster(MonitoredTask startupStatus)
592   throws InterruptedException {
593     // TODO: This is wrong!!!! Should have new servername if we restart ourselves,
594     // if we come back to life.
595     this.activeMasterManager = new ActiveMasterManager(zooKeeper, this.serverName,
596         this);
597     this.zooKeeper.registerListener(activeMasterManager);
598     stallIfBackupMaster(this.conf, this.activeMasterManager);
599 
600     // The ClusterStatusTracker is setup before the other
601     // ZKBasedSystemTrackers because it's needed by the activeMasterManager
602     // to check if the cluster should be shutdown.
603     this.clusterStatusTracker = new ClusterStatusTracker(getZooKeeper(), this);
604     this.clusterStatusTracker.start();
605     return this.activeMasterManager.blockUntilBecomingActiveMaster(startupStatus);
606   }
607 
608   /**
609    * Initialize all ZK based system trackers.
610    * @throws IOException
611    * @throws InterruptedException
612    */
613   void initializeZKBasedSystemTrackers() throws IOException,
614       InterruptedException, KeeperException {
615     this.catalogTracker = createCatalogTracker(this.zooKeeper, this.conf, this);
616     this.catalogTracker.start();
617 
618     this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
619     this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this);
620     this.loadBalancerTracker.start();
621     this.assignmentManager = new AssignmentManager(this, serverManager,
622       this.catalogTracker, this.balancer, this.executorService, this.metricsMaster,
623       this.tableLockManager);
624     zooKeeper.registerListenerFirst(assignmentManager);
625 
626     this.regionServerTracker = new RegionServerTracker(zooKeeper, this,
627         this.serverManager);
628     this.regionServerTracker.start();
629 
630     this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this,
631       this.serverManager);
632     this.drainingServerTracker.start();
633 
634     // Set the cluster as up.  If new RSs, they'll be waiting on this before
635     // going ahead with their startup.
636     boolean wasUp = this.clusterStatusTracker.isClusterUp();
637     if (!wasUp) this.clusterStatusTracker.setClusterUp();
638 
639     LOG.info("Server active/primary master=" + this.serverName +
640         ", sessionid=0x" +
641         Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) +
642         ", setting cluster-up flag (Was=" + wasUp + ")");
643 
644     // create the snapshot manager
645     this.snapshotManager = new SnapshotManager(this, this.metricsMaster);
646   }
647 
648   /**
649    * Create CatalogTracker.
650    * In its own method so can intercept and mock it over in tests.
651    * @param zk If zk is null, we'll create an instance (and shut it down
652    * when {@link #stop(String)} is called) else we'll use what is passed.
653    * @param conf
654    * @param abortable If fatal exception we'll call abort on this.  May be null.
655    * If it is we'll use the Connection associated with the passed
656    * {@link Configuration} as our {@link Abortable}.
657    * ({@link Object#wait(long)} when passed a <code>0</code> waits for ever).
658    * @throws IOException
659    */
660   CatalogTracker createCatalogTracker(final ZooKeeperWatcher zk,
661       final Configuration conf, Abortable abortable)
662   throws IOException {
663     return new CatalogTracker(zk, conf, abortable);
664   }
665 
666   // Check if we should stop every 100ms
667   private Sleeper stopSleeper = new Sleeper(100, this);
668 
669   private void loop() {
670     long lastMsgTs = 0l;
671     long now = 0l;
672     while (!this.stopped) {
673       now = System.currentTimeMillis();
674       if ((now - lastMsgTs) >= this.msgInterval) {
675         doMetrics();
676         lastMsgTs = System.currentTimeMillis();
677       }
678       stopSleeper.sleep();
679     }
680   }
681 
682   /**
683    * Emit the HMaster metrics, such as region in transition metrics.
684    * Surrounding in a try block just to be sure metrics doesn't abort HMaster.
685    */
686   private void doMetrics() {
687     try {
688       this.assignmentManager.updateRegionsInTransitionMetrics();
689     } catch (Throwable e) {
690       LOG.error("Couldn't update metrics: " + e.getMessage());
691     }
692   }
693 
694   /**
695    * Finish initialization of HMaster after becoming the primary master.
696    *
697    * <ol>
698    * <li>Initialize master components - file system manager, server manager,
699    *     assignment manager, region server tracker, catalog tracker, etc</li>
700    * <li>Start necessary service threads - rpc server, info server,
701    *     executor services, etc</li>
702    * <li>Set cluster as UP in ZooKeeper</li>
703    * <li>Wait for RegionServers to check-in</li>
704    * <li>Split logs and perform data recovery, if necessary</li>
705    * <li>Ensure assignment of meta regions<li>
706    * <li>Handle either fresh cluster start or master failover</li>
707    * </ol>
708    *
709    * @param masterRecovery
710    *
711    * @throws IOException
712    * @throws InterruptedException
713    * @throws KeeperException
714    */
715   private void finishInitialization(MonitoredTask status, boolean masterRecovery)
716   throws IOException, InterruptedException, KeeperException {
717 
718     isActiveMaster = true;
719 
720     /*
721      * We are active master now... go initialize components we need to run.
722      * Note, there may be dross in zk from previous runs; it'll get addressed
723      * below after we determine if cluster startup or failover.
724      */
725 
726     status.setStatus("Initializing Master file system");
727     this.masterActiveTime = System.currentTimeMillis();
728     // TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring.
729     this.fileSystemManager = new MasterFileSystem(this, this, metricsMaster, masterRecovery);
730 
731     this.tableDescriptors =
732       new FSTableDescriptors(this.fileSystemManager.getFileSystem(),
733       this.fileSystemManager.getRootDir());
734 
735     // publish cluster ID
736     status.setStatus("Publishing Cluster ID in ZooKeeper");
737     ZKClusterId.setClusterId(this.zooKeeper, fileSystemManager.getClusterId());
738 
739     if (!masterRecovery) {
740       this.executorService = new ExecutorService(getServerName().toString());
741       this.serverManager = createServerManager(this, this);
742     }
743 
744     //Initialize table lock manager, and ensure that all write locks held previously
745     //are invalidated
746     this.tableLockManager = TableLockManager.createTableLockManager(conf, zooKeeper, serverName);
747     if (!masterRecovery) {
748       this.tableLockManager.reapWriteLocks();
749     }
750 
751     status.setStatus("Initializing ZK system trackers");
752     initializeZKBasedSystemTrackers();
753 
754     if (!masterRecovery) {
755       // initialize master side coprocessors before we start handling requests
756       status.setStatus("Initializing master coprocessors");
757       this.cpHost = new MasterCoprocessorHost(this, this.conf);
758 
759       spanReceiverHost = new SpanReceiverHost(getConfiguration());
760       spanReceiverHost.loadSpanReceivers();
761 
762       // start up all service threads.
763       status.setStatus("Initializing master service threads");
764       startServiceThreads();
765     }
766 
767     // Wait for region servers to report in.
768     this.serverManager.waitForRegionServers(status);
769     // Check zk for region servers that are up but didn't register
770     for (ServerName sn: this.regionServerTracker.getOnlineServers()) {
771       if (!this.serverManager.isServerOnline(sn)
772           && serverManager.checkAlreadySameHostPortAndRecordNewServer(
773               sn, ServerLoad.EMPTY_SERVERLOAD)) {
774         LOG.info("Registered server found up in zk but who has not yet "
775           + "reported in: " + sn);
776       }
777     }
778 
779     if (!masterRecovery) {
780       this.assignmentManager.startTimeOutMonitor();
781     }
782 
783     // get a list for previously failed RS which need log splitting work
784     // we recover .META. region servers inside master initialization and
785     // handle other failed servers in SSH in order to start up master node ASAP
786     Set<ServerName> previouslyFailedServers = this.fileSystemManager
787         .getFailedServersFromLogFolders();
788 
789     // remove stale recovering regions from previous run
790     this.fileSystemManager.removeStaleRecoveringRegionsFromZK(previouslyFailedServers);
791 
792     // log splitting for .META. server
793     ServerName oldMetaServerLocation = this.catalogTracker.getMetaLocation();
794     if (oldMetaServerLocation != null && previouslyFailedServers.contains(oldMetaServerLocation)) {
795       splitMetaLogBeforeAssignment(oldMetaServerLocation);
796       // Note: we can't remove oldMetaServerLocation from previousFailedServers list because it
797       // may also host user regions
798     }
799 
800     this.initializationBeforeMetaAssignment = true;
801     // Make sure meta assigned before proceeding.
802     status.setStatus("Assigning Meta Region");
803     assignMeta(status);
804     // check if master is shutting down because above assignMeta could return even META isn't 
805     // assigned when master is shutting down
806     if(this.stopped) return;
807 
808     if (this.distributedLogReplay && oldMetaServerLocation != null
809         && previouslyFailedServers.contains(oldMetaServerLocation)) {
810       // replay WAL edits mode need new .META. RS is assigned firstly
811       status.setStatus("replaying log for Meta Region");
812       this.fileSystemManager.splitMetaLog(oldMetaServerLocation);
813     }
814 
815     enableServerShutdownHandler();
816 
817     status.setStatus("Submitting log splitting work for previously failed region servers");
818     // Master has recovered META region server and we put
819     // other failed region servers in a queue to be handled later by SSH
820     for (ServerName tmpServer : previouslyFailedServers) {
821       this.serverManager.processDeadServer(tmpServer, true);
822     }
823 
824     // Update meta with new PB serialization if required. i.e migrate all HRI to PB serialization
825     // in meta. This must happen before we assign all user regions or else the assignment will
826     // fail.
827     org.apache.hadoop.hbase.catalog.MetaMigrationConvertingToPB
828       .updateMetaIfNecessary(this);
829 
830     this.balancer.setMasterServices(this);
831     // Fix up assignment manager status
832     status.setStatus("Starting assignment manager");
833     this.assignmentManager.joinCluster();
834 
835     this.balancer.setClusterStatus(getClusterStatus());
836 
837     if (!masterRecovery) {
838       // Start balancer and meta catalog janitor after meta and regions have
839       // been assigned.
840       status.setStatus("Starting balancer and catalog janitor");
841       this.clusterStatusChore = getAndStartClusterStatusChore(this);
842       this.balancerChore = getAndStartBalancerChore(this);
843       this.catalogJanitorChore = new CatalogJanitor(this, this);
844       startCatalogJanitorChore();
845     }
846 
847     status.markComplete("Initialization successful");
848     LOG.info("Master has completed initialization");
849     initialized = true;
850     // clear the dead servers with same host name and port of online server because we are not
851     // removing dead server with same hostname and port of rs which is trying to check in before
852     // master initialization. See HBASE-5916.
853     this.serverManager.clearDeadServersWithSameHostNameAndPortOfOnlineServer();
854 
855     if (!masterRecovery) {
856       if (this.cpHost != null) {
857         // don't let cp initialization errors kill the master
858         try {
859           this.cpHost.postStartMaster();
860         } catch (IOException ioe) {
861           LOG.error("Coprocessor postStartMaster() hook failed", ioe);
862         }
863       }
864     }
865   }
866 
867   /**
868    * Useful for testing purpose also where we have
869    * master restart scenarios.
870    */
871   protected void startCatalogJanitorChore() {
872     Threads.setDaemonThreadRunning(catalogJanitorChore.getThread());
873   }
874 
875   /**
876    * Create a {@link ServerManager} instance.
877    * @param master
878    * @param services
879    * @return An instance of {@link ServerManager}
880    * @throws org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException
881    * @throws IOException
882    */
883   ServerManager createServerManager(final Server master,
884       final MasterServices services)
885   throws IOException {
886     // We put this out here in a method so can do a Mockito.spy and stub it out
887     // w/ a mocked up ServerManager.
888     return new ServerManager(master, services);
889   }
890 
891   /**
892    * If ServerShutdownHandler is disabled, we enable it and expire those dead
893    * but not expired servers.
894    */
895   private void enableServerShutdownHandler() {
896     if (!serverShutdownHandlerEnabled) {
897       serverShutdownHandlerEnabled = true;
898       this.serverManager.processQueuedDeadServers();
899     }
900   }
901 
902   /**
903    * Check <code>.META.</code> is assigned. If not, assign it.
904    * @param status MonitoredTask
905    * @throws InterruptedException
906    * @throws IOException
907    * @throws KeeperException
908    */
909   void assignMeta(MonitoredTask status)
910       throws InterruptedException, IOException, KeeperException {
911     // Work on meta region
912     int assigned = 0;
913     long timeout = this.conf.getLong("hbase.catalog.verification.timeout", 1000);
914     boolean beingExpired = false;
915 
916     status.setStatus("Assigning META region");
917     
918     assignmentManager.getRegionStates().createRegionState(HRegionInfo.FIRST_META_REGIONINFO);
919     boolean rit = this.assignmentManager
920         .processRegionInTransitionAndBlockUntilAssigned(HRegionInfo.FIRST_META_REGIONINFO);
921     boolean metaRegionLocation = this.catalogTracker.verifyMetaRegionLocation(timeout);
922     if (!rit && !metaRegionLocation) {
923       ServerName currentMetaServer = this.catalogTracker.getMetaLocation();
924       if (currentMetaServer != null) {
925         beingExpired = expireIfOnline(currentMetaServer);
926       }
927       if (beingExpired) {
928         splitMetaLogBeforeAssignment(currentMetaServer);
929       }
930       assignmentManager.assignMeta();
931       // Make sure a .META. location is set.
932       enableSSHandWaitForMeta();
933       assigned++;
934       if (beingExpired && this.distributedLogReplay) {
935         // In Replay WAL Mode, we need the new .META. server online
936         this.fileSystemManager.splitMetaLog(currentMetaServer);
937       }
938     } else if (rit && !metaRegionLocation) {
939       // Make sure a .META. location is set.
940       enableSSHandWaitForMeta();
941       assigned++;
942     } else {
943       // Region already assigned. We didn't assign it. Add to in-memory state.
944       this.assignmentManager.regionOnline(HRegionInfo.FIRST_META_REGIONINFO,
945         this.catalogTracker.getMetaLocation());
946     }
947 
948     enableCatalogTables(Bytes.toString(HConstants.META_TABLE_NAME));
949     LOG.info(".META. assigned=" + assigned + ", rit=" + rit + ", location="
950         + catalogTracker.getMetaLocation());
951     status.setStatus("META assigned.");
952   }
953 
954   private void splitMetaLogBeforeAssignment(ServerName currentMetaServer) throws IOException {
955     if (this.distributedLogReplay) {
956       // In log replay mode, we mark META region as recovering in ZK
957       Set<HRegionInfo> regions = new HashSet<HRegionInfo>();
958       regions.add(HRegionInfo.FIRST_META_REGIONINFO);
959       this.fileSystemManager.prepareMetaLogReplay(currentMetaServer, regions);
960     } else {
961       // In recovered.edits mode: create recovered edits file for .META. server
962       this.fileSystemManager.splitMetaLog(currentMetaServer);
963     }
964   }
965 
966   private void enableSSHandWaitForMeta() throws IOException, InterruptedException {
967     enableServerShutdownHandler();
968     this.catalogTracker.waitForMeta();
969     // Above check waits for general meta availability but this does not
970     // guarantee that the transition has completed
971     this.assignmentManager.waitForAssignment(HRegionInfo.FIRST_META_REGIONINFO);
972   }
973 
974   private void enableCatalogTables(String catalogTableName) {
975     if (!this.assignmentManager.getZKTable().isEnabledTable(catalogTableName)) {
976       this.assignmentManager.setEnabledTable(catalogTableName);
977     }
978   }
979 
980   /**
981    * Expire a server if we find it is one of the online servers.
982    * @param sn ServerName to check.
983    * @return true when server <code>sn<code> is being expired by the function.
984    * @throws IOException
985    */
986   private boolean expireIfOnline(final ServerName sn)
987       throws IOException {
988     if (sn == null || !serverManager.isServerOnline(sn)) {
989       return false;
990     }
991     LOG.info("Forcing expire of " + sn);
992     serverManager.expireServer(sn);
993     return true;
994   }
995 
996   @Override
997   public TableDescriptors getTableDescriptors() {
998     return this.tableDescriptors;
999   }
1000 
1001   /** @return InfoServer object. Maybe null.*/
1002   public InfoServer getInfoServer() {
1003     return this.infoServer;
1004   }
1005 
1006   @Override
1007   public Configuration getConfiguration() {
1008     return this.conf;
1009   }
1010 
1011   @Override
1012   public ServerManager getServerManager() {
1013     return this.serverManager;
1014   }
1015 
1016   @Override
1017   public ExecutorService getExecutorService() {
1018     return this.executorService;
1019   }
1020 
1021   @Override
1022   public MasterFileSystem getMasterFileSystem() {
1023     return this.fileSystemManager;
1024   }
1025 
1026   /**
1027    * Get the ZK wrapper object - needed by master_jsp.java
1028    * @return the zookeeper wrapper
1029    */
1030   public ZooKeeperWatcher getZooKeeperWatcher() {
1031     return this.zooKeeper;
1032   }
1033 
1034   /*
1035    * Start up all services. If any of these threads gets an unhandled exception
1036    * then they just die with a logged message.  This should be fine because
1037    * in general, we do not expect the master to get such unhandled exceptions
1038    *  as OOMEs; it should be lightly loaded. See what HRegionServer does if
1039    *  need to install an unexpected exception handler.
1040    */
1041   void startServiceThreads() throws IOException{
1042    // Start the executor service pools
1043    this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION,
1044       conf.getInt("hbase.master.executor.openregion.threads", 5));
1045    this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION,
1046       conf.getInt("hbase.master.executor.closeregion.threads", 5));
1047    this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
1048       conf.getInt("hbase.master.executor.serverops.threads", 3));
1049    this.executorService.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS,
1050       conf.getInt("hbase.master.executor.serverops.threads", 5));
1051 
1052    // We depend on there being only one instance of this executor running
1053    // at a time.  To do concurrency, would need fencing of enable/disable of
1054    // tables.
1055    this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
1056 
1057    // Start log cleaner thread
1058    String n = Thread.currentThread().getName();
1059    int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 60 * 1000);
1060    this.logCleaner =
1061       new LogCleaner(cleanerInterval,
1062          this, conf, getMasterFileSystem().getFileSystem(),
1063          getMasterFileSystem().getOldLogDir());
1064          Threads.setDaemonThreadRunning(logCleaner.getThread(), n + ".oldLogCleaner");
1065 
1066    //start the hfile archive cleaner thread
1067     Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
1068     this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem()
1069         .getFileSystem(), archiveDir);
1070     Threads.setDaemonThreadRunning(hfileCleaner.getThread(), n + ".archivedHFileCleaner");
1071 
1072    // Put up info server.
1073    int port = this.conf.getInt(HConstants.MASTER_INFO_PORT, 60010);
1074    if (port >= 0) {
1075      String a = this.conf.get("hbase.master.info.bindAddress", "0.0.0.0");
1076      this.infoServer = new InfoServer(MASTER, a, port, false, this.conf);
1077      this.infoServer.addServlet("status", "/master-status", MasterStatusServlet.class);
1078      this.infoServer.addServlet("dump", "/dump", MasterDumpServlet.class);
1079      this.infoServer.setAttribute(MASTER, this);
1080      this.infoServer.start();
1081     }
1082 
1083     // Start the health checker
1084     if (this.healthCheckChore != null) {
1085       Threads.setDaemonThreadRunning(this.healthCheckChore.getThread(), n + ".healthChecker");
1086     }
1087 
1088     // Start allowing requests to happen.
1089     this.rpcServer.openServer();
1090     this.rpcServerOpen = true;
1091     if (LOG.isTraceEnabled()) {
1092       LOG.trace("Started service threads");
1093     }
1094   }
1095 
1096   /**
1097    * Use this when trying to figure when its ok to send in rpcs.  Used by tests.
1098    * @return True if we have successfully run {@link RpcServer#openServer()}
1099    */
1100   boolean isRpcServerOpen() {
1101     return this.rpcServerOpen;
1102   }
1103 
1104   private void stopServiceThreads() {
1105     if (LOG.isDebugEnabled()) {
1106       LOG.debug("Stopping service threads");
1107     }
1108     if (this.rpcServer != null) this.rpcServer.stop();
1109     this.rpcServerOpen = false;
1110     // Clean up and close up shop
1111     if (this.logCleaner!= null) this.logCleaner.interrupt();
1112     if (this.hfileCleaner != null) this.hfileCleaner.interrupt();
1113 
1114     if (this.infoServer != null) {
1115       LOG.info("Stopping infoServer");
1116       try {
1117         this.infoServer.stop();
1118       } catch (Exception ex) {
1119         ex.printStackTrace();
1120       }
1121     }
1122     if (this.executorService != null) this.executorService.shutdown();
1123     if (this.healthCheckChore != null) {
1124       this.healthCheckChore.interrupt();
1125     }
1126   }
1127 
1128   private static Thread getAndStartClusterStatusChore(HMaster master) {
1129     if (master == null || master.balancer == null) {
1130       return null;
1131     }
1132     Chore chore = new ClusterStatusChore(master, master.balancer);
1133     return Threads.setDaemonThreadRunning(chore.getThread());
1134   }
1135 
1136   private static Thread getAndStartBalancerChore(final HMaster master) {
1137     // Start up the load balancer chore
1138     Chore chore = new BalancerChore(master);
1139     return Threads.setDaemonThreadRunning(chore.getThread());
1140   }
1141 
1142   private void stopChores() {
1143     if (this.balancerChore != null) {
1144       this.balancerChore.interrupt();
1145     }
1146     if (this.clusterStatusChore != null) {
1147       this.clusterStatusChore.interrupt();
1148     }
1149     if (this.catalogJanitorChore != null) {
1150       this.catalogJanitorChore.interrupt();
1151     }
1152     if (this.clusterStatusPublisherChore != null){
1153       clusterStatusPublisherChore.interrupt();
1154     }
1155   }
1156 
1157   @Override
1158   public RegionServerStartupResponse regionServerStartup(
1159       RpcController controller, RegionServerStartupRequest request) throws ServiceException {
1160     // Register with server manager
1161     try {
1162       InetAddress ia = getRemoteInetAddress(request.getPort(), request.getServerStartCode());
1163       ServerName rs = this.serverManager.regionServerStartup(ia, request.getPort(),
1164         request.getServerStartCode(), request.getServerCurrentTime());
1165 
1166       // Send back some config info
1167       RegionServerStartupResponse.Builder resp = createConfigurationSubset();
1168       NameStringPair.Builder entry = NameStringPair.newBuilder()
1169         .setName(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)
1170         .setValue(rs.getHostname());
1171       resp.addMapEntries(entry.build());
1172 
1173       return resp.build();
1174     } catch (IOException ioe) {
1175       throw new ServiceException(ioe);
1176     }
1177   }
1178 
1179   /**
1180    * @return Get remote side's InetAddress
1181    * @throws UnknownHostException
1182    */
1183   InetAddress getRemoteInetAddress(final int port, final long serverStartCode)
1184   throws UnknownHostException {
1185     // Do it out here in its own little method so can fake an address when
1186     // mocking up in tests.
1187     return RpcServer.getRemoteIp();
1188   }
1189 
1190   /**
1191    * @return Subset of configuration to pass initializing regionservers: e.g.
1192    * the filesystem to use and root directory to use.
1193    */
1194   protected RegionServerStartupResponse.Builder createConfigurationSubset() {
1195     RegionServerStartupResponse.Builder resp = addConfig(
1196       RegionServerStartupResponse.newBuilder(), HConstants.HBASE_DIR);
1197     return addConfig(resp, "fs.default.name");
1198   }
1199 
1200   private RegionServerStartupResponse.Builder addConfig(
1201       final RegionServerStartupResponse.Builder resp, final String key) {
1202     NameStringPair.Builder entry = NameStringPair.newBuilder()
1203       .setName(key)
1204       .setValue(this.conf.get(key));
1205     resp.addMapEntries(entry.build());
1206     return resp;
1207   }
1208 
1209   @Override
1210   public GetLastFlushedSequenceIdResponse getLastFlushedSequenceId(RpcController controller,
1211       GetLastFlushedSequenceIdRequest request) throws ServiceException {
1212     byte[] regionName = request.getRegionName().toByteArray();
1213     long seqId = serverManager.getLastFlushedSequenceId(regionName);
1214     return ResponseConverter.buildGetLastFlushedSequenceIdResponse(seqId);
1215   }
1216 
1217   @Override
1218   public RegionServerReportResponse regionServerReport(
1219       RpcController controller, RegionServerReportRequest request) throws ServiceException {
1220     try {
1221       HBaseProtos.ServerLoad sl = request.getLoad();
1222       this.serverManager.regionServerReport(ProtobufUtil.toServerName(request.getServer()), new ServerLoad(sl));
1223       if (sl != null && this.metricsMaster != null) {
1224         // Up our metrics.
1225         this.metricsMaster.incrementRequests(sl.getTotalNumberOfRequests());
1226       }
1227     } catch (IOException ioe) {
1228       throw new ServiceException(ioe);
1229     }
1230 
1231     return RegionServerReportResponse.newBuilder().build();
1232   }
1233 
1234   @Override
1235   public ReportRSFatalErrorResponse reportRSFatalError(
1236       RpcController controller, ReportRSFatalErrorRequest request) throws ServiceException {
1237     String errorText = request.getErrorMessage();
1238     ServerName sn = ProtobufUtil.toServerName(request.getServer());
1239     String msg = "Region server " + sn +
1240       " reported a fatal error:\n" + errorText;
1241     LOG.error(msg);
1242     rsFatals.add(msg);
1243 
1244     return ReportRSFatalErrorResponse.newBuilder().build();
1245   }
1246 
1247   public boolean isMasterRunning() {
1248     return !isStopped();
1249   }
1250 
1251   public IsMasterRunningResponse isMasterRunning(RpcController c, IsMasterRunningRequest req)
1252   throws ServiceException {
1253     return IsMasterRunningResponse.newBuilder().setIsMasterRunning(isMasterRunning()).build();
1254   }
1255 
1256   @Override
1257   public CatalogScanResponse runCatalogScan(RpcController c,
1258       CatalogScanRequest req) throws ServiceException {
1259     try {
1260       return ResponseConverter.buildCatalogScanResponse(catalogJanitorChore.scan());
1261     } catch (IOException ioe) {
1262       throw new ServiceException(ioe);
1263     }
1264   }
1265 
1266   @Override
1267   public EnableCatalogJanitorResponse enableCatalogJanitor(RpcController c,
1268       EnableCatalogJanitorRequest req) throws ServiceException {
1269     return EnableCatalogJanitorResponse.newBuilder().
1270         setPrevValue(catalogJanitorChore.setEnabled(req.getEnable())).build();
1271   }
1272 
1273   @Override
1274   public IsCatalogJanitorEnabledResponse isCatalogJanitorEnabled(RpcController c,
1275       IsCatalogJanitorEnabledRequest req) throws ServiceException {
1276     boolean isEnabled = catalogJanitorChore != null ? catalogJanitorChore.getEnabled() : false;
1277     return IsCatalogJanitorEnabledResponse.newBuilder().setValue(isEnabled).build();
1278   }
1279 
1280   /**
1281    * @return Maximum time we should run balancer for
1282    */
1283   private int getBalancerCutoffTime() {
1284     int balancerCutoffTime =
1285       getConfiguration().getInt("hbase.balancer.max.balancing", -1);
1286     if (balancerCutoffTime == -1) {
1287       // No time period set so create one
1288       int balancerPeriod =
1289         getConfiguration().getInt("hbase.balancer.period", 300000);
1290       balancerCutoffTime = balancerPeriod;
1291       // If nonsense period, set it to balancerPeriod
1292       if (balancerCutoffTime <= 0) balancerCutoffTime = balancerPeriod;
1293     }
1294     return balancerCutoffTime;
1295   }
1296 
1297   public boolean balance() {
1298     // if master not initialized, don't run balancer.
1299     if (!this.initialized) {
1300       LOG.debug("Master has not been initialized, don't run balancer.");
1301       return false;
1302     }
1303     // If balance not true, don't run balancer.
1304     if (!this.loadBalancerTracker.isBalancerOn()) return false;
1305     // Do this call outside of synchronized block.
1306     int maximumBalanceTime = getBalancerCutoffTime();
1307     boolean balancerRan;
1308     synchronized (this.balancer) {
1309       // Only allow one balance run at at time.
1310       if (this.assignmentManager.getRegionStates().isRegionsInTransition()) {
1311         Map<String, RegionState> regionsInTransition =
1312           this.assignmentManager.getRegionStates().getRegionsInTransition();
1313         LOG.debug("Not running balancer because " + regionsInTransition.size() +
1314           " region(s) in transition: " + org.apache.commons.lang.StringUtils.
1315             abbreviate(regionsInTransition.toString(), 256));
1316         return false;
1317       }
1318       if (this.serverManager.areDeadServersInProgress()) {
1319         LOG.debug("Not running balancer because processing dead regionserver(s): " +
1320           this.serverManager.getDeadServers());
1321         return false;
1322       }
1323 
1324       if (this.cpHost != null) {
1325         try {
1326           if (this.cpHost.preBalance()) {
1327             LOG.debug("Coprocessor bypassing balancer request");
1328             return false;
1329           }
1330         } catch (IOException ioe) {
1331           LOG.error("Error invoking master coprocessor preBalance()", ioe);
1332           return false;
1333         }
1334       }
1335 
1336       Map<String, Map<ServerName, List<HRegionInfo>>> assignmentsByTable =
1337         this.assignmentManager.getRegionStates().getAssignmentsByTable();
1338 
1339       List<RegionPlan> plans = new ArrayList<RegionPlan>();
1340       //Give the balancer the current cluster state.
1341       this.balancer.setClusterStatus(getClusterStatus());
1342       for (Map<ServerName, List<HRegionInfo>> assignments : assignmentsByTable.values()) {
1343         List<RegionPlan> partialPlans = this.balancer.balanceCluster(assignments);
1344         if (partialPlans != null) plans.addAll(partialPlans);
1345       }
1346       long cutoffTime = System.currentTimeMillis() + maximumBalanceTime;
1347       int rpCount = 0;  // number of RegionPlans balanced so far
1348       long totalRegPlanExecTime = 0;
1349       balancerRan = plans != null;
1350       if (plans != null && !plans.isEmpty()) {
1351         for (RegionPlan plan: plans) {
1352           LOG.info("balance " + plan);
1353           long balStartTime = System.currentTimeMillis();
1354           //TODO: bulk assign
1355           this.assignmentManager.balance(plan);
1356           totalRegPlanExecTime += System.currentTimeMillis()-balStartTime;
1357           rpCount++;
1358           if (rpCount < plans.size() &&
1359               // if performing next balance exceeds cutoff time, exit the loop
1360               (System.currentTimeMillis() + (totalRegPlanExecTime / rpCount)) > cutoffTime) {
1361             //TODO: After balance, there should not be a cutoff time (keeping it as a security net for now)
1362             LOG.debug("No more balancing till next balance run; maximumBalanceTime=" +
1363               maximumBalanceTime);
1364             break;
1365           }
1366         }
1367       }
1368       if (this.cpHost != null) {
1369         try {
1370           this.cpHost.postBalance(rpCount < plans.size() ? plans.subList(0, rpCount) : plans);
1371         } catch (IOException ioe) {
1372           // balancing already succeeded so don't change the result
1373           LOG.error("Error invoking master coprocessor postBalance()", ioe);
1374         }
1375       }
1376     }
1377     return balancerRan;
1378   }
1379 
1380   @Override
1381   public BalanceResponse balance(RpcController c, BalanceRequest request) throws ServiceException {
1382     return BalanceResponse.newBuilder().setBalancerRan(balance()).build();
1383   }
1384 
1385   enum BalanceSwitchMode {
1386     SYNC,
1387     ASYNC
1388   }
1389   /**
1390    * Assigns balancer switch according to BalanceSwitchMode
1391    * @param b new balancer switch
1392    * @param mode BalanceSwitchMode
1393    * @return old balancer switch
1394    */
1395   public boolean switchBalancer(final boolean b, BalanceSwitchMode mode) throws IOException {
1396     boolean oldValue = this.loadBalancerTracker.isBalancerOn();
1397     boolean newValue = b;
1398     try {
1399       if (this.cpHost != null) {
1400         newValue = this.cpHost.preBalanceSwitch(newValue);
1401       }
1402       try {
1403         if (mode == BalanceSwitchMode.SYNC) {
1404           synchronized (this.balancer) {
1405             this.loadBalancerTracker.setBalancerOn(newValue);
1406           }
1407         } else {
1408           this.loadBalancerTracker.setBalancerOn(newValue);
1409         }
1410       } catch (KeeperException ke) {
1411         throw new IOException(ke);
1412       }
1413       LOG.info("BalanceSwitch=" + newValue);
1414       if (this.cpHost != null) {
1415         this.cpHost.postBalanceSwitch(oldValue, newValue);
1416       }
1417     } catch (IOException ioe) {
1418       LOG.warn("Error flipping balance switch", ioe);
1419     }
1420     return oldValue;
1421   }
1422 
1423   public boolean synchronousBalanceSwitch(final boolean b) throws IOException {
1424     return switchBalancer(b, BalanceSwitchMode.SYNC);
1425   }
1426 
1427   public boolean balanceSwitch(final boolean b) throws IOException {
1428     return switchBalancer(b, BalanceSwitchMode.ASYNC);
1429   }
1430 
1431   @Override
1432   public SetBalancerRunningResponse setBalancerRunning(
1433       RpcController controller, SetBalancerRunningRequest req) throws ServiceException {
1434     try {
1435       boolean prevValue = (req.getSynchronous())?
1436         synchronousBalanceSwitch(req.getOn()):balanceSwitch(req.getOn());
1437       return SetBalancerRunningResponse.newBuilder().setPrevBalanceValue(prevValue).build();
1438     } catch (IOException ioe) {
1439       throw new ServiceException(ioe);
1440     }
1441   }
1442 
1443   /**
1444    * Switch for the background CatalogJanitor thread.
1445    * Used for testing.  The thread will continue to run.  It will just be a noop
1446    * if disabled.
1447    * @param b If false, the catalog janitor won't do anything.
1448    */
1449   public void setCatalogJanitorEnabled(final boolean b) {
1450     this.catalogJanitorChore.setEnabled(b);
1451   }
1452 
1453   @Override
1454   public DispatchMergingRegionsResponse dispatchMergingRegions(
1455       RpcController controller, DispatchMergingRegionsRequest request)
1456       throws ServiceException {
1457     final byte[] encodedNameOfRegionA = request.getRegionA().getValue()
1458         .toByteArray();
1459     final byte[] encodedNameOfRegionB = request.getRegionB().getValue()
1460         .toByteArray();
1461     final boolean forcible = request.getForcible();
1462     if (request.getRegionA().getType() != RegionSpecifierType.ENCODED_REGION_NAME
1463         || request.getRegionB().getType() != RegionSpecifierType.ENCODED_REGION_NAME) {
1464       LOG.warn("mergeRegions specifier type: expected: "
1465           + RegionSpecifierType.ENCODED_REGION_NAME + " actual: region_a="
1466           + request.getRegionA().getType() + ", region_b="
1467           + request.getRegionB().getType());
1468     }
1469     RegionState regionStateA = assignmentManager.getRegionStates()
1470         .getRegionState(Bytes.toString(encodedNameOfRegionA));
1471     RegionState regionStateB = assignmentManager.getRegionStates()
1472         .getRegionState(Bytes.toString(encodedNameOfRegionB));
1473     if (regionStateA == null || regionStateB == null) {
1474       throw new ServiceException(new UnknownRegionException(
1475           Bytes.toStringBinary(regionStateA == null ? encodedNameOfRegionA
1476               : encodedNameOfRegionB)));
1477     }
1478 
1479     if (!forcible && !HRegionInfo.areAdjacent(regionStateA.getRegion(),
1480             regionStateB.getRegion())) {
1481       throw new ServiceException("Unable to merge not adjacent regions "
1482           + regionStateA.getRegion().getRegionNameAsString() + ", "
1483           + regionStateB.getRegion().getRegionNameAsString()
1484           + " where forcible = " + forcible);
1485     }
1486 
1487     try {
1488       dispatchMergingRegions(regionStateA.getRegion(), regionStateB.getRegion(), forcible);
1489     } catch (IOException ioe) {
1490       throw new ServiceException(ioe);
1491     }
1492 
1493     return DispatchMergingRegionsResponse.newBuilder().build();
1494   }
1495 
1496   @Override
1497   public void dispatchMergingRegions(final HRegionInfo region_a,
1498       final HRegionInfo region_b, final boolean forcible) throws IOException {
1499     checkInitialized();
1500     this.executorService.submit(new DispatchMergingRegionHandler(this,
1501         this.catalogJanitorChore, region_a, region_b, forcible));
1502   }
1503 
1504   @Override
1505   public MoveRegionResponse moveRegion(RpcController controller, MoveRegionRequest req)
1506   throws ServiceException {
1507     final byte [] encodedRegionName = req.getRegion().getValue().toByteArray();
1508     RegionSpecifierType type = req.getRegion().getType();
1509     final byte [] destServerName = (req.hasDestServerName())?
1510       Bytes.toBytes(ProtobufUtil.toServerName(req.getDestServerName()).getServerName()):null;
1511     MoveRegionResponse mrr = MoveRegionResponse.newBuilder().build();
1512 
1513     if (type != RegionSpecifierType.ENCODED_REGION_NAME) {
1514       LOG.warn("moveRegion specifier type: expected: " + RegionSpecifierType.ENCODED_REGION_NAME
1515         + " actual: " + type);
1516     }
1517 
1518     try {
1519       move(encodedRegionName, destServerName);
1520     } catch (HBaseIOException ioe) {
1521       throw new ServiceException(ioe);
1522     }
1523     return mrr;
1524   }
1525 
1526   void move(final byte[] encodedRegionName,
1527       final byte[] destServerName) throws HBaseIOException {
1528     RegionState regionState = assignmentManager.getRegionStates().
1529       getRegionState(Bytes.toString(encodedRegionName));
1530     if (regionState == null) {
1531       throw new UnknownRegionException(Bytes.toStringBinary(encodedRegionName));
1532     }
1533 
1534     HRegionInfo hri = regionState.getRegion();
1535     ServerName dest;
1536     if (destServerName == null || destServerName.length == 0) {
1537       LOG.info("Passed destination servername is null/empty so " +
1538         "choosing a server at random");
1539       final List<ServerName> destServers = this.serverManager.createDestinationServersList(
1540         regionState.getServerName());
1541       dest = balancer.randomAssignment(hri, destServers);
1542     } else {
1543       dest = new ServerName(Bytes.toString(destServerName));
1544       if (dest.equals(regionState.getServerName())) {
1545         LOG.debug("Skipping move of region " + hri.getRegionNameAsString()
1546           + " because region already assigned to the same server " + dest + ".");
1547         return;
1548       }
1549     }
1550 
1551     // Now we can do the move
1552     RegionPlan rp = new RegionPlan(hri, regionState.getServerName(), dest);
1553 
1554     try {
1555       checkInitialized();
1556       if (this.cpHost != null) {
1557         if (this.cpHost.preMove(hri, rp.getSource(), rp.getDestination())) {
1558           return;
1559         }
1560       }
1561       LOG.info("Added move plan " + rp + ", running balancer");
1562       this.assignmentManager.balance(rp);
1563       if (this.cpHost != null) {
1564         this.cpHost.postMove(hri, rp.getSource(), rp.getDestination());
1565       }
1566     } catch (IOException ioe) {
1567       if (ioe instanceof HBaseIOException) {
1568         throw (HBaseIOException)ioe;
1569       }
1570       throw new HBaseIOException(ioe);
1571     }
1572   }
1573 
1574   @Override
1575   public void createTable(HTableDescriptor hTableDescriptor,
1576     byte [][] splitKeys)
1577   throws IOException {
1578     if (!isMasterRunning()) {
1579       throw new MasterNotRunningException();
1580     }
1581 
1582     HRegionInfo [] newRegions = getHRegionInfos(hTableDescriptor, splitKeys);
1583     checkInitialized();
1584     checkCompression(hTableDescriptor);
1585     if (cpHost != null) {
1586       cpHost.preCreateTable(hTableDescriptor, newRegions);
1587     }
1588 
1589     this.executorService.submit(new CreateTableHandler(this,
1590       this.fileSystemManager, hTableDescriptor, conf,
1591       newRegions, this).prepare());
1592     if (cpHost != null) {
1593       cpHost.postCreateTable(hTableDescriptor, newRegions);
1594     }
1595 
1596   }
1597 
1598   private void checkCompression(final HTableDescriptor htd)
1599   throws IOException {
1600     if (!this.masterCheckCompression) return;
1601     for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1602       checkCompression(hcd);
1603     }
1604   }
1605 
1606   private void checkCompression(final HColumnDescriptor hcd)
1607   throws IOException {
1608     if (!this.masterCheckCompression) return;
1609     CompressionTest.testCompression(hcd.getCompression());
1610     CompressionTest.testCompression(hcd.getCompactionCompression());
1611   }
1612 
1613   @Override
1614   public CreateTableResponse createTable(RpcController controller, CreateTableRequest req)
1615   throws ServiceException {
1616     HTableDescriptor hTableDescriptor = HTableDescriptor.convert(req.getTableSchema());
1617     byte [][] splitKeys = ProtobufUtil.getSplitKeysArray(req);
1618     try {
1619       createTable(hTableDescriptor,splitKeys);
1620     } catch (IOException ioe) {
1621       throw new ServiceException(ioe);
1622     }
1623     return CreateTableResponse.newBuilder().build();
1624   }
1625 
1626   private HRegionInfo[] getHRegionInfos(HTableDescriptor hTableDescriptor,
1627     byte[][] splitKeys) {
1628     HRegionInfo[] hRegionInfos = null;
1629     if (splitKeys == null || splitKeys.length == 0) {
1630       hRegionInfos = new HRegionInfo[]{
1631           new HRegionInfo(hTableDescriptor.getName(), null, null)};
1632     } else {
1633       int numRegions = splitKeys.length + 1;
1634       hRegionInfos = new HRegionInfo[numRegions];
1635       byte[] startKey = null;
1636       byte[] endKey = null;
1637       for (int i = 0; i < numRegions; i++) {
1638         endKey = (i == splitKeys.length) ? null : splitKeys[i];
1639         hRegionInfos[i] =
1640             new HRegionInfo(hTableDescriptor.getName(), startKey, endKey);
1641         startKey = endKey;
1642       }
1643     }
1644     return hRegionInfos;
1645   }
1646 
1647   private static boolean isCatalogTable(final byte [] tableName) {
1648     return Bytes.equals(tableName, HConstants.META_TABLE_NAME);
1649   }
1650 
1651   @Override
1652   public void deleteTable(final byte[] tableName) throws IOException {
1653     checkInitialized();
1654     if (cpHost != null) {
1655       cpHost.preDeleteTable(tableName);
1656     }
1657     this.executorService.submit(new DeleteTableHandler(tableName, this, this).prepare());
1658     if (cpHost != null) {
1659       cpHost.postDeleteTable(tableName);
1660     }
1661   }
1662 
1663   @Override
1664   public DeleteTableResponse deleteTable(RpcController controller, DeleteTableRequest request)
1665   throws ServiceException {
1666     try {
1667       deleteTable(request.getTableName().toByteArray());
1668     } catch (IOException ioe) {
1669       throw new ServiceException(ioe);
1670     }
1671     return DeleteTableResponse.newBuilder().build();
1672   }
1673 
1674   /**
1675    * Get the number of regions of the table that have been updated by the alter.
1676    *
1677    * @return Pair indicating the number of regions updated Pair.getFirst is the
1678    *         regions that are yet to be updated Pair.getSecond is the total number
1679    *         of regions of the table
1680    * @throws IOException
1681    */
1682   @Override
1683   public GetSchemaAlterStatusResponse getSchemaAlterStatus(
1684       RpcController controller, GetSchemaAlterStatusRequest req) throws ServiceException {
1685     // TODO: currently, we query using the table name on the client side. this
1686     // may overlap with other table operations or the table operation may
1687     // have completed before querying this API. We need to refactor to a
1688     // transaction system in the future to avoid these ambiguities.
1689     byte [] tableName = req.getTableName().toByteArray();
1690 
1691     try {
1692       Pair<Integer,Integer> pair = this.assignmentManager.getReopenStatus(tableName);
1693       GetSchemaAlterStatusResponse.Builder ret = GetSchemaAlterStatusResponse.newBuilder();
1694       ret.setYetToUpdateRegions(pair.getFirst());
1695       ret.setTotalRegions(pair.getSecond());
1696       return ret.build();
1697     } catch (IOException ioe) {
1698       throw new ServiceException(ioe);
1699     }
1700   }
1701 
1702   @Override
1703   public void addColumn(final byte[] tableName, final HColumnDescriptor column)
1704       throws IOException {
1705     checkInitialized();
1706     if (cpHost != null) {
1707       if (cpHost.preAddColumn(tableName, column)) {
1708         return;
1709       }
1710     }
1711     //TODO: we should process this (and some others) in an executor
1712     new TableAddFamilyHandler(tableName, column, this, this)
1713       .prepare().process();
1714     if (cpHost != null) {
1715       cpHost.postAddColumn(tableName, column);
1716     }
1717   }
1718 
1719   @Override
1720   public AddColumnResponse addColumn(RpcController controller, AddColumnRequest req)
1721   throws ServiceException {
1722     try {
1723       addColumn(req.getTableName().toByteArray(),
1724         HColumnDescriptor.convert(req.getColumnFamilies()));
1725     } catch (IOException ioe) {
1726       throw new ServiceException(ioe);
1727     }
1728     return AddColumnResponse.newBuilder().build();
1729   }
1730 
1731   @Override
1732   public void modifyColumn(byte[] tableName, HColumnDescriptor descriptor)
1733       throws IOException {
1734     checkInitialized();
1735     checkCompression(descriptor);
1736     if (cpHost != null) {
1737       if (cpHost.preModifyColumn(tableName, descriptor)) {
1738         return;
1739       }
1740     }
1741     new TableModifyFamilyHandler(tableName, descriptor, this, this)
1742       .prepare().process();
1743     if (cpHost != null) {
1744       cpHost.postModifyColumn(tableName, descriptor);
1745     }
1746   }
1747 
1748   @Override
1749   public ModifyColumnResponse modifyColumn(RpcController controller, ModifyColumnRequest req)
1750   throws ServiceException {
1751     try {
1752       modifyColumn(req.getTableName().toByteArray(),
1753         HColumnDescriptor.convert(req.getColumnFamilies()));
1754     } catch (IOException ioe) {
1755       throw new ServiceException(ioe);
1756     }
1757     return ModifyColumnResponse.newBuilder().build();
1758   }
1759 
1760   @Override
1761   public void deleteColumn(final byte[] tableName, final byte[] columnName)
1762       throws IOException {
1763     checkInitialized();
1764     if (cpHost != null) {
1765       if (cpHost.preDeleteColumn(tableName, columnName)) {
1766         return;
1767       }
1768     }
1769     new TableDeleteFamilyHandler(tableName, columnName, this, this).prepare().process();
1770     if (cpHost != null) {
1771       cpHost.postDeleteColumn(tableName, columnName);
1772     }
1773   }
1774 
1775   @Override
1776   public DeleteColumnResponse deleteColumn(RpcController controller, DeleteColumnRequest req)
1777   throws ServiceException {
1778     try {
1779       deleteColumn(req.getTableName().toByteArray(), req.getColumnName().toByteArray());
1780     } catch (IOException ioe) {
1781       throw new ServiceException(ioe);
1782     }
1783     return DeleteColumnResponse.newBuilder().build();
1784   }
1785 
1786   @Override
1787   public void enableTable(final byte[] tableName) throws IOException {
1788     checkInitialized();
1789     if (cpHost != null) {
1790       cpHost.preEnableTable(tableName);
1791     }
1792     this.executorService.submit(new EnableTableHandler(this, tableName,
1793       catalogTracker, assignmentManager, tableLockManager, false).prepare());
1794     if (cpHost != null) {
1795       cpHost.postEnableTable(tableName);
1796    }
1797   }
1798 
1799   @Override
1800   public EnableTableResponse enableTable(RpcController controller, EnableTableRequest request)
1801   throws ServiceException {
1802     try {
1803       enableTable(request.getTableName().toByteArray());
1804     } catch (IOException ioe) {
1805       throw new ServiceException(ioe);
1806     }
1807     return EnableTableResponse.newBuilder().build();
1808   }
1809 
1810   @Override
1811   public void disableTable(final byte[] tableName) throws IOException {
1812     checkInitialized();
1813     if (cpHost != null) {
1814       cpHost.preDisableTable(tableName);
1815     }
1816     this.executorService.submit(new DisableTableHandler(this, tableName,
1817       catalogTracker, assignmentManager, tableLockManager, false).prepare());
1818     if (cpHost != null) {
1819       cpHost.postDisableTable(tableName);
1820     }
1821   }
1822 
1823   @Override
1824   public DisableTableResponse disableTable(RpcController controller, DisableTableRequest request)
1825   throws ServiceException {
1826     try {
1827       disableTable(request.getTableName().toByteArray());
1828     } catch (IOException ioe) {
1829       throw new ServiceException(ioe);
1830     }
1831     return DisableTableResponse.newBuilder().build();
1832   }
1833 
1834   /**
1835    * Return the region and current deployment for the region containing
1836    * the given row. If the region cannot be found, returns null. If it
1837    * is found, but not currently deployed, the second element of the pair
1838    * may be null.
1839    */
1840   Pair<HRegionInfo, ServerName> getTableRegionForRow(
1841       final byte [] tableName, final byte [] rowKey)
1842   throws IOException {
1843     final AtomicReference<Pair<HRegionInfo, ServerName>> result =
1844       new AtomicReference<Pair<HRegionInfo, ServerName>>(null);
1845 
1846     MetaScannerVisitor visitor =
1847       new MetaScannerVisitorBase() {
1848         @Override
1849         public boolean processRow(Result data) throws IOException {
1850           if (data == null || data.size() <= 0) {
1851             return true;
1852           }
1853           Pair<HRegionInfo, ServerName> pair = HRegionInfo.getHRegionInfoAndServerName(data);
1854           if (pair == null) {
1855             return false;
1856           }
1857           if (!Bytes.equals(pair.getFirst().getTableName(), tableName)) {
1858             return false;
1859           }
1860           result.set(pair);
1861           return true;
1862         }
1863     };
1864 
1865     MetaScanner.metaScan(conf, visitor, tableName, rowKey, 1);
1866     return result.get();
1867   }
1868 
1869   @Override
1870   public void modifyTable(final byte[] tableName, final HTableDescriptor descriptor)
1871       throws IOException {
1872     checkInitialized();
1873     checkCompression(descriptor);
1874     if (cpHost != null) {
1875       cpHost.preModifyTable(tableName, descriptor);
1876     }
1877     new ModifyTableHandler(tableName, descriptor, this, this).prepare().process();
1878     if (cpHost != null) {
1879       cpHost.postModifyTable(tableName, descriptor);
1880     }
1881   }
1882 
1883   @Override
1884   public ModifyTableResponse modifyTable(RpcController controller, ModifyTableRequest req)
1885   throws ServiceException {
1886     try {
1887       modifyTable(req.getTableName().toByteArray(),
1888         HTableDescriptor.convert(req.getTableSchema()));
1889     } catch (IOException ioe) {
1890       throw new ServiceException(ioe);
1891     }
1892     return ModifyTableResponse.newBuilder().build();
1893   }
1894 
1895   @Override
1896   public void checkTableModifiable(final byte [] tableName)
1897       throws IOException, TableNotFoundException, TableNotDisabledException {
1898     String tableNameStr = Bytes.toString(tableName);
1899     if (isCatalogTable(tableName)) {
1900       throw new IOException("Can't modify catalog tables");
1901     }
1902     if (!MetaReader.tableExists(getCatalogTracker(), tableNameStr)) {
1903       throw new TableNotFoundException(tableNameStr);
1904     }
1905     if (!getAssignmentManager().getZKTable().
1906         isDisabledTable(Bytes.toString(tableName))) {
1907       throw new TableNotDisabledException(tableName);
1908     }
1909   }
1910 
1911   @Override
1912   public GetClusterStatusResponse getClusterStatus(RpcController controller,
1913       GetClusterStatusRequest req)
1914   throws ServiceException {
1915     GetClusterStatusResponse.Builder response = GetClusterStatusResponse.newBuilder();
1916     response.setClusterStatus(getClusterStatus().convert());
1917     return response.build();
1918   }
1919 
1920   /**
1921    * @return cluster status
1922    */
1923   public ClusterStatus getClusterStatus() {
1924     // Build Set of backup masters from ZK nodes
1925     List<String> backupMasterStrings;
1926     try {
1927       backupMasterStrings = ZKUtil.listChildrenNoWatch(this.zooKeeper,
1928         this.zooKeeper.backupMasterAddressesZNode);
1929     } catch (KeeperException e) {
1930       LOG.warn(this.zooKeeper.prefix("Unable to list backup servers"), e);
1931       backupMasterStrings = new ArrayList<String>(0);
1932     }
1933     List<ServerName> backupMasters = new ArrayList<ServerName>(
1934                                           backupMasterStrings.size());
1935     for (String s: backupMasterStrings) {
1936       try {
1937         byte [] bytes =
1938             ZKUtil.getData(this.zooKeeper, ZKUtil.joinZNode(
1939                 this.zooKeeper.backupMasterAddressesZNode, s));
1940         if (bytes != null) {
1941           ServerName sn;
1942           try {
1943             sn = ServerName.parseFrom(bytes);
1944           } catch (DeserializationException e) {
1945             LOG.warn("Failed parse, skipping registering backup server", e);
1946             continue;
1947           }
1948           backupMasters.add(sn);
1949         }
1950       } catch (KeeperException e) {
1951         LOG.warn(this.zooKeeper.prefix("Unable to get information about " +
1952                  "backup servers"), e);
1953       }
1954     }
1955     Collections.sort(backupMasters, new Comparator<ServerName>() {
1956       public int compare(ServerName s1, ServerName s2) {
1957         return s1.getServerName().compareTo(s2.getServerName());
1958       }});
1959 
1960     return new ClusterStatus(VersionInfo.getVersion(),
1961       this.fileSystemManager.getClusterId().toString(),
1962       this.serverManager.getOnlineServers(),
1963       this.serverManager.getDeadServers().copyServerNames(),
1964       this.serverName,
1965       backupMasters,
1966       this.assignmentManager.getRegionStates().getRegionsInTransition(),
1967       this.getCoprocessors(), this.loadBalancerTracker.isBalancerOn());
1968   }
1969 
1970   public String getClusterId() {
1971     if (fileSystemManager == null) {
1972       return "";
1973     }
1974     ClusterId id = fileSystemManager.getClusterId();
1975     if (id == null) {
1976       return "";
1977     }
1978     return id.toString();
1979   }
1980 
1981   /**
1982    * The set of loaded coprocessors is stored in a static set. Since it's
1983    * statically allocated, it does not require that HMaster's cpHost be
1984    * initialized prior to accessing it.
1985    * @return a String representation of the set of names of the loaded
1986    * coprocessors.
1987    */
1988   public static String getLoadedCoprocessors() {
1989     return CoprocessorHost.getLoadedCoprocessors().toString();
1990   }
1991 
1992   /**
1993    * @return timestamp in millis when HMaster was started.
1994    */
1995   public long getMasterStartTime() {
1996     return masterStartTime;
1997   }
1998 
1999   /**
2000    * @return timestamp in millis when HMaster became the active master.
2001    */
2002   public long getMasterActiveTime() {
2003     return masterActiveTime;
2004   }
2005 
2006   /**
2007    * @return array of coprocessor SimpleNames.
2008    */
2009   public String[] getCoprocessors() {
2010     Set<String> masterCoprocessors =
2011         getCoprocessorHost().getCoprocessors();
2012     return masterCoprocessors.toArray(new String[masterCoprocessors.size()]);
2013   }
2014 
2015   @Override
2016   public void abort(final String msg, final Throwable t) {
2017     if (cpHost != null) {
2018       // HBASE-4014: dump a list of loaded coprocessors.
2019       LOG.fatal("Master server abort: loaded coprocessors are: " +
2020           getLoadedCoprocessors());
2021     }
2022 
2023     if (abortNow(msg, t)) {
2024       if (t != null) LOG.fatal(msg, t);
2025       else LOG.fatal(msg);
2026       this.abort = true;
2027       stop("Aborting");
2028     }
2029   }
2030 
2031   /**
2032    * We do the following in a different thread.  If it is not completed
2033    * in time, we will time it out and assume it is not easy to recover.
2034    *
2035    * 1. Create a new ZK session. (since our current one is expired)
2036    * 2. Try to become a primary master again
2037    * 3. Initialize all ZK based system trackers.
2038    * 4. Assign meta. (they are already assigned, but we need to update our
2039    * internal memory state to reflect it)
2040    * 5. Process any RIT if any during the process of our recovery.
2041    *
2042    * @return True if we could successfully recover from ZK session expiry.
2043    * @throws InterruptedException
2044    * @throws IOException
2045    * @throws KeeperException
2046    * @throws ExecutionException
2047    */
2048   private boolean tryRecoveringExpiredZKSession() throws InterruptedException,
2049       IOException, KeeperException, ExecutionException {
2050 
2051     this.zooKeeper.unregisterAllListeners();
2052     this.zooKeeper.reconnectAfterExpiration();
2053 
2054     Callable<Boolean> callable = new Callable<Boolean> () {
2055       public Boolean call() throws InterruptedException,
2056           IOException, KeeperException {
2057         MonitoredTask status =
2058           TaskMonitor.get().createStatus("Recovering expired ZK session");
2059         try {
2060           if (!becomeActiveMaster(status)) {
2061             return Boolean.FALSE;
2062           }
2063           serverShutdownHandlerEnabled = false;
2064           initialized = false;
2065           finishInitialization(status, true);
2066           return !stopped;
2067         } finally {
2068           status.cleanup();
2069         }
2070       }
2071     };
2072 
2073     long timeout =
2074       conf.getLong("hbase.master.zksession.recover.timeout", 300000);
2075     java.util.concurrent.ExecutorService executor =
2076       Executors.newSingleThreadExecutor();
2077     Future<Boolean> result = executor.submit(callable);
2078     executor.shutdown();
2079     if (executor.awaitTermination(timeout, TimeUnit.MILLISECONDS)
2080         && result.isDone()) {
2081       Boolean recovered = result.get();
2082       if (recovered != null) {
2083         return recovered.booleanValue();
2084       }
2085     }
2086     executor.shutdownNow();
2087     return false;
2088   }
2089 
2090   /**
2091    * Check to see if the current trigger for abort is due to ZooKeeper session
2092    * expiry, and If yes, whether we can recover from ZK session expiry.
2093    *
2094    * @param msg Original abort message
2095    * @param t   The cause for current abort request
2096    * @return true if we should proceed with abort operation, false other wise.
2097    */
2098   private boolean abortNow(final String msg, final Throwable t) {
2099     if (!this.isActiveMaster) {
2100       return true;
2101     }
2102     if (t != null && t instanceof KeeperException.SessionExpiredException) {
2103       try {
2104         LOG.info("Primary Master trying to recover from ZooKeeper session " +
2105             "expiry.");
2106         return !tryRecoveringExpiredZKSession();
2107       } catch (Throwable newT) {
2108         LOG.error("Primary master encountered unexpected exception while " +
2109             "trying to recover from ZooKeeper session" +
2110             " expiry. Proceeding with server abort.", newT);
2111       }
2112     }
2113     return true;
2114   }
2115 
2116   @Override
2117   public ZooKeeperWatcher getZooKeeper() {
2118     return zooKeeper;
2119   }
2120 
2121   @Override
2122   public MasterCoprocessorHost getCoprocessorHost() {
2123     return cpHost;
2124   }
2125 
2126   @Override
2127   public ServerName getServerName() {
2128     return this.serverName;
2129   }
2130 
2131   @Override
2132   public CatalogTracker getCatalogTracker() {
2133     return catalogTracker;
2134   }
2135 
2136   @Override
2137   public AssignmentManager getAssignmentManager() {
2138     return this.assignmentManager;
2139   }
2140 
2141   @Override
2142   public TableLockManager getTableLockManager() {
2143     return this.tableLockManager;
2144   }
2145 
2146   public MemoryBoundedLogMessageBuffer getRegionServerFatalLogBuffer() {
2147     return rsFatals;
2148   }
2149 
2150   public void shutdown() {
2151     if (spanReceiverHost != null) {
2152       spanReceiverHost.closeReceivers();
2153     }
2154     if (cpHost != null) {
2155       try {
2156         cpHost.preShutdown();
2157       } catch (IOException ioe) {
2158         LOG.error("Error call master coprocessor preShutdown()", ioe);
2159       }
2160     }
2161     if (mxBean != null) {
2162       MBeanUtil.unregisterMBean(mxBean);
2163       mxBean = null;
2164     }
2165     if (this.assignmentManager != null) this.assignmentManager.shutdown();
2166     if (this.serverManager != null) this.serverManager.shutdownCluster();
2167     try {
2168       if (this.clusterStatusTracker != null){
2169         this.clusterStatusTracker.setClusterDown();
2170       }
2171     } catch (KeeperException e) {
2172       LOG.error("ZooKeeper exception trying to set cluster as down in ZK", e);
2173     }
2174   }
2175 
2176   @Override
2177   public ShutdownResponse shutdown(RpcController controller, ShutdownRequest request)
2178   throws ServiceException {
2179     shutdown();
2180     return ShutdownResponse.newBuilder().build();
2181   }
2182 
2183   public void stopMaster() {
2184     if (cpHost != null) {
2185       try {
2186         cpHost.preStopMaster();
2187       } catch (IOException ioe) {
2188         LOG.error("Error call master coprocessor preStopMaster()", ioe);
2189       }
2190     }
2191     stop("Stopped by " + Thread.currentThread().getName());
2192   }
2193 
2194   @Override
2195   public StopMasterResponse stopMaster(RpcController controller, StopMasterRequest request)
2196   throws ServiceException {
2197     stopMaster();
2198     return StopMasterResponse.newBuilder().build();
2199   }
2200 
2201   @Override
2202   public void stop(final String why) {
2203     LOG.info(why);
2204     this.stopped = true;
2205     // We wake up the stopSleeper to stop immediately
2206     stopSleeper.skipSleepCycle();
2207     // If we are a backup master, we need to interrupt wait
2208     if (this.activeMasterManager != null) {
2209       synchronized (this.activeMasterManager.clusterHasActiveMaster) {
2210         this.activeMasterManager.clusterHasActiveMaster.notifyAll();
2211       }
2212     }
2213     // If no region server is online then master may stuck waiting on .META. to come on line.
2214     // See HBASE-8422.
2215     if (this.catalogTracker != null && this.serverManager.getOnlineServers().isEmpty()) {
2216       this.catalogTracker.stop();
2217     }
2218   }
2219 
2220   @Override
2221   public boolean isStopped() {
2222     return this.stopped;
2223   }
2224 
2225   public boolean isAborted() {
2226     return this.abort;
2227   }
2228 
2229   void checkInitialized() throws PleaseHoldException {
2230     if (!this.initialized) {
2231       throw new PleaseHoldException("Master is initializing");
2232     }
2233   }
2234 
2235   /**
2236    * Report whether this master is currently the active master or not.
2237    * If not active master, we are parked on ZK waiting to become active.
2238    *
2239    * This method is used for testing.
2240    *
2241    * @return true if active master, false if not.
2242    */
2243   public boolean isActiveMaster() {
2244     return isActiveMaster;
2245   }
2246 
2247   /**
2248    * Report whether this master has completed with its initialization and is
2249    * ready.  If ready, the master is also the active master.  A standby master
2250    * is never ready.
2251    *
2252    * This method is used for testing.
2253    *
2254    * @return true if master is ready to go, false if not.
2255    */
2256   public boolean isInitialized() {
2257     return initialized;
2258   }
2259 
2260   /**
2261    * ServerShutdownHandlerEnabled is set false before completing
2262    * assignMeta to prevent processing of ServerShutdownHandler.
2263    * @return true if assignMeta has completed;
2264    */
2265   public boolean isServerShutdownHandlerEnabled() {
2266     return this.serverShutdownHandlerEnabled;
2267   }
2268 
2269   /**
2270    * Report whether this master has started initialization and is about to do meta region assignment
2271    * @return true if master is in initialization & about to assign META regions
2272    */
2273   public boolean isInitializationStartsMetaRegionAssignment() {
2274     return this.initializationBeforeMetaAssignment;
2275   }
2276 
2277   @Override
2278   public AssignRegionResponse assignRegion(RpcController controller, AssignRegionRequest req)
2279   throws ServiceException {
2280     try {
2281       final byte [] regionName = req.getRegion().getValue().toByteArray();
2282       RegionSpecifierType type = req.getRegion().getType();
2283       AssignRegionResponse arr = AssignRegionResponse.newBuilder().build();
2284 
2285       checkInitialized();
2286       if (type != RegionSpecifierType.REGION_NAME) {
2287         LOG.warn("assignRegion specifier type: expected: " + RegionSpecifierType.REGION_NAME
2288           + " actual: " + type);
2289       }
2290       HRegionInfo regionInfo = assignmentManager.getRegionStates().getRegionInfo(regionName);
2291       if (regionInfo == null) throw new UnknownRegionException(Bytes.toString(regionName));
2292       if (cpHost != null) {
2293         if (cpHost.preAssign(regionInfo)) {
2294           return arr;
2295         }
2296       }
2297       assignmentManager.assign(regionInfo, true, true);
2298       if (cpHost != null) {
2299         cpHost.postAssign(regionInfo);
2300       }
2301 
2302       return arr;
2303     } catch (IOException ioe) {
2304       throw new ServiceException(ioe);
2305     }
2306   }
2307 
2308   public void assignRegion(HRegionInfo hri) {
2309     assignmentManager.assign(hri, true);
2310   }
2311 
2312   @Override
2313   public UnassignRegionResponse unassignRegion(RpcController controller, UnassignRegionRequest req)
2314   throws ServiceException {
2315     try {
2316       final byte [] regionName = req.getRegion().getValue().toByteArray();
2317       RegionSpecifierType type = req.getRegion().getType();
2318       final boolean force = req.getForce();
2319       UnassignRegionResponse urr = UnassignRegionResponse.newBuilder().build();
2320 
2321       checkInitialized();
2322       if (type != RegionSpecifierType.REGION_NAME) {
2323         LOG.warn("unassignRegion specifier type: expected: " + RegionSpecifierType.REGION_NAME
2324           + " actual: " + type);
2325       }
2326       Pair<HRegionInfo, ServerName> pair =
2327         MetaReader.getRegion(this.catalogTracker, regionName);
2328       if (pair == null) throw new UnknownRegionException(Bytes.toString(regionName));
2329       HRegionInfo hri = pair.getFirst();
2330       if (cpHost != null) {
2331         if (cpHost.preUnassign(hri, force)) {
2332           return urr;
2333         }
2334       }
2335       LOG.debug("Close region " + hri.getRegionNameAsString()
2336           + " on current location if it is online and reassign.force=" + force);
2337       this.assignmentManager.unassign(hri, force);
2338       if (!this.assignmentManager.getRegionStates().isRegionInTransition(hri)
2339           && !this.assignmentManager.getRegionStates().isRegionAssigned(hri)) {
2340         LOG.debug("Region " + hri.getRegionNameAsString()
2341             + " is not online on any region server, reassigning it.");
2342         assignRegion(hri);
2343       }
2344       if (cpHost != null) {
2345         cpHost.postUnassign(hri, force);
2346       }
2347 
2348       return urr;
2349     } catch (IOException ioe) {
2350       throw new ServiceException(ioe);
2351     }
2352   }
2353 
2354   /**
2355    * Get list of TableDescriptors for requested tables.
2356    * @param controller Unused (set to null).
2357    * @param req GetTableDescriptorsRequest that contains:
2358    * - tableNames: requested tables, or if empty, all are requested
2359    * @return GetTableDescriptorsResponse
2360    * @throws ServiceException
2361    */
2362   public GetTableDescriptorsResponse getTableDescriptors(
2363 	      RpcController controller, GetTableDescriptorsRequest req) throws ServiceException {
2364     GetTableDescriptorsResponse.Builder builder = GetTableDescriptorsResponse.newBuilder();
2365     if (req.getTableNamesCount() == 0) {
2366       // request for all TableDescriptors
2367       Map<String, HTableDescriptor> descriptors = null;
2368       try {
2369         descriptors = this.tableDescriptors.getAll();
2370       } catch (IOException e) {
2371           LOG.warn("Failed getting all descriptors", e);
2372       }
2373       if (descriptors != null) {
2374         for (HTableDescriptor htd : descriptors.values()) {
2375           builder.addTableSchema(htd.convert());
2376         }
2377       }
2378     }
2379     else {
2380       for (String s: req.getTableNamesList()) {
2381         HTableDescriptor htd = null;
2382         try {
2383           htd = this.tableDescriptors.get(s);
2384         } catch (IOException e) {
2385           LOG.warn("Failed getting descriptor for " + s, e);
2386         }
2387         if (htd == null) continue;
2388         builder.addTableSchema(htd.convert());
2389       }
2390     }
2391     return builder.build();
2392   }
2393 
2394   /**
2395    * Compute the average load across all region servers.
2396    * Currently, this uses a very naive computation - just uses the number of
2397    * regions being served, ignoring stats about number of requests.
2398    * @return the average load
2399    */
2400   public double getAverageLoad() {
2401     if (this.assignmentManager == null) {
2402       return 0;
2403     }
2404 
2405     RegionStates regionStates = this.assignmentManager.getRegionStates();
2406     if (regionStates == null) {
2407       return 0;
2408     }
2409     return regionStates.getAverageLoad();
2410   }
2411 
2412   /**
2413    * Offline specified region from master's in-memory state. It will not attempt to
2414    * reassign the region as in unassign.
2415    *
2416    * This is a special method that should be used by experts or hbck.
2417    *
2418    */
2419   @Override
2420   public OfflineRegionResponse offlineRegion(RpcController controller, OfflineRegionRequest request)
2421   throws ServiceException {
2422     final byte [] regionName = request.getRegion().getValue().toByteArray();
2423     RegionSpecifierType type = request.getRegion().getType();
2424     if (type != RegionSpecifierType.REGION_NAME) {
2425       LOG.warn("moveRegion specifier type: expected: " + RegionSpecifierType.REGION_NAME
2426         + " actual: " + type);
2427     }
2428 
2429     try {
2430       Pair<HRegionInfo, ServerName> pair =
2431         MetaReader.getRegion(this.catalogTracker, regionName);
2432       if (pair == null) throw new UnknownRegionException(Bytes.toStringBinary(regionName));
2433       HRegionInfo hri = pair.getFirst();
2434       if (cpHost != null) {
2435         cpHost.preRegionOffline(hri);
2436       }
2437       this.assignmentManager.regionOffline(hri);
2438       if (cpHost != null) {
2439         cpHost.postRegionOffline(hri);
2440       }
2441     } catch (IOException ioe) {
2442       throw new ServiceException(ioe);
2443     }
2444     return OfflineRegionResponse.newBuilder().build();
2445   }
2446 
2447   @Override
2448   public boolean registerService(Service instance) {
2449     /*
2450      * No stacking of instances is allowed for a single service name
2451      */
2452     Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
2453     if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
2454       LOG.error("Coprocessor service "+serviceDesc.getFullName()+
2455           " already registered, rejecting request from "+instance
2456       );
2457       return false;
2458     }
2459 
2460     coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
2461     if (LOG.isDebugEnabled()) {
2462       LOG.debug("Registered master coprocessor service: service="+serviceDesc.getFullName());
2463     }
2464     return true;
2465   }
2466 
2467   @Override
2468   public ClientProtos.CoprocessorServiceResponse execMasterService(final RpcController controller,
2469       final ClientProtos.CoprocessorServiceRequest request) throws ServiceException {
2470     try {
2471       ServerRpcController execController = new ServerRpcController();
2472 
2473       ClientProtos.CoprocessorServiceCall call = request.getCall();
2474       String serviceName = call.getServiceName();
2475       String methodName = call.getMethodName();
2476       if (!coprocessorServiceHandlers.containsKey(serviceName)) {
2477         throw new UnknownProtocolException(null,
2478             "No registered master coprocessor service found for name "+serviceName);
2479       }
2480 
2481       Service service = coprocessorServiceHandlers.get(serviceName);
2482       Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();
2483       Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
2484       if (methodDesc == null) {
2485         throw new UnknownProtocolException(service.getClass(),
2486             "Unknown method "+methodName+" called on master service "+serviceName);
2487       }
2488 
2489       //invoke the method
2490       Message execRequest = service.getRequestPrototype(methodDesc).newBuilderForType()
2491           .mergeFrom(call.getRequest()).build();
2492       final Message.Builder responseBuilder =
2493           service.getResponsePrototype(methodDesc).newBuilderForType();
2494       service.callMethod(methodDesc, execController, execRequest, new RpcCallback<Message>() {
2495         @Override
2496         public void run(Message message) {
2497           if (message != null) {
2498             responseBuilder.mergeFrom(message);
2499           }
2500         }
2501       });
2502       Message execResult = responseBuilder.build();
2503 
2504       if (execController.getFailedOn() != null) {
2505         throw execController.getFailedOn();
2506       }
2507       ClientProtos.CoprocessorServiceResponse.Builder builder =
2508           ClientProtos.CoprocessorServiceResponse.newBuilder();
2509       builder.setRegion(RequestConverter.buildRegionSpecifier(
2510           RegionSpecifierType.REGION_NAME, HConstants.EMPTY_BYTE_ARRAY));
2511       builder.setValue(
2512           builder.getValueBuilder().setName(execResult.getClass().getName())
2513               .setValue(execResult.toByteString()));
2514       return builder.build();
2515     } catch (IOException ie) {
2516       throw new ServiceException(ie);
2517     }
2518   }
2519 
2520   /**
2521    * Utility for constructing an instance of the passed HMaster class.
2522    * @param masterClass
2523    * @param conf
2524    * @return HMaster instance.
2525    */
2526   public static HMaster constructMaster(Class<? extends HMaster> masterClass,
2527       final Configuration conf)  {
2528     try {
2529       Constructor<? extends HMaster> c =
2530         masterClass.getConstructor(Configuration.class);
2531       return c.newInstance(conf);
2532     } catch (InvocationTargetException ite) {
2533       Throwable target = ite.getTargetException() != null?
2534         ite.getTargetException(): ite;
2535       if (target.getCause() != null) target = target.getCause();
2536       throw new RuntimeException("Failed construction of Master: " +
2537         masterClass.toString(), target);
2538     } catch (Exception e) {
2539       throw new RuntimeException("Failed construction of Master: " +
2540         masterClass.toString() + ((e.getCause() != null)?
2541           e.getCause().getMessage(): ""), e);
2542     }
2543   }
2544 
2545   /**
2546    * @see org.apache.hadoop.hbase.master.HMasterCommandLine
2547    */
2548   public static void main(String [] args) {
2549     VersionInfo.logVersion();
2550     new HMasterCommandLine(HMaster.class).doMain(args);
2551   }
2552 
2553   public HFileCleaner getHFileCleaner() {
2554     return this.hfileCleaner;
2555   }
2556 
2557   /**
2558    * Exposed for TESTING!
2559    * @return the underlying snapshot manager
2560    */
2561   public SnapshotManager getSnapshotManagerForTesting() {
2562     return this.snapshotManager;
2563   }
2564 
2565   /**
2566    * Triggers an asynchronous attempt to take a snapshot.
2567    * {@inheritDoc}
2568    */
2569   @Override
2570   public TakeSnapshotResponse snapshot(RpcController controller, TakeSnapshotRequest request)
2571       throws ServiceException {
2572     try {
2573       this.snapshotManager.checkSnapshotSupport();
2574     } catch (UnsupportedOperationException e) {
2575       throw new ServiceException(e);
2576     }
2577 
2578     LOG.debug("Submitting snapshot request for:" +
2579         ClientSnapshotDescriptionUtils.toString(request.getSnapshot()));
2580     // get the snapshot information
2581     SnapshotDescription snapshot = SnapshotDescriptionUtils.validate(request.getSnapshot(),
2582       this.conf);
2583     try {
2584       snapshotManager.takeSnapshot(snapshot);
2585     } catch (IOException e) {
2586       throw new ServiceException(e);
2587     }
2588 
2589     // send back the max amount of time the client should wait for the snapshot to complete
2590     long waitTime = SnapshotDescriptionUtils.getMaxMasterTimeout(conf, snapshot.getType(),
2591       SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME);
2592     return TakeSnapshotResponse.newBuilder().setExpectedTimeout(waitTime).build();
2593   }
2594 
2595   /**
2596    * List the currently available/stored snapshots. Any in-progress snapshots are ignored
2597    */
2598   @Override
2599   public ListSnapshotResponse getCompletedSnapshots(RpcController controller,
2600       ListSnapshotRequest request) throws ServiceException {
2601     try {
2602       ListSnapshotResponse.Builder builder = ListSnapshotResponse.newBuilder();
2603       List<SnapshotDescription> snapshots = snapshotManager.getCompletedSnapshots();
2604 
2605       // convert to protobuf
2606       for (SnapshotDescription snapshot : snapshots) {
2607         builder.addSnapshots(snapshot);
2608       }
2609       return builder.build();
2610     } catch (IOException e) {
2611       throw new ServiceException(e);
2612     }
2613   }
2614 
2615   /**
2616    * Execute Delete Snapshot operation.
2617    * @return DeleteSnapshotResponse (a protobuf wrapped void) if the snapshot existed and was
2618    *    deleted properly.
2619    * @throws ServiceException wrapping SnapshotDoesNotExistException if specified snapshot did not
2620    *    exist.
2621    */
2622   @Override
2623   public DeleteSnapshotResponse deleteSnapshot(RpcController controller,
2624       DeleteSnapshotRequest request) throws ServiceException {
2625     try {
2626       this.snapshotManager.checkSnapshotSupport();
2627     } catch (UnsupportedOperationException e) {
2628       throw new ServiceException(e);
2629     }
2630 
2631     try {
2632       snapshotManager.deleteSnapshot(request.getSnapshot());
2633       return DeleteSnapshotResponse.newBuilder().build();
2634     } catch (IOException e) {
2635       throw new ServiceException(e);
2636     }
2637   }
2638 
2639   /**
2640    * Checks if the specified snapshot is done.
2641    * @return true if the snapshot is in file system ready to use,
2642    *   false if the snapshot is in the process of completing
2643    * @throws ServiceException wrapping UnknownSnapshotException if invalid snapshot, or
2644    *  a wrapped HBaseSnapshotException with progress failure reason.
2645    */
2646   @Override
2647   public IsSnapshotDoneResponse isSnapshotDone(RpcController controller,
2648       IsSnapshotDoneRequest request) throws ServiceException {
2649     LOG.debug("Checking to see if snapshot from request:" +
2650         ClientSnapshotDescriptionUtils.toString(request.getSnapshot()) + " is done");
2651     try {
2652       IsSnapshotDoneResponse.Builder builder = IsSnapshotDoneResponse.newBuilder();
2653       boolean done = snapshotManager.isSnapshotDone(request.getSnapshot());
2654       builder.setDone(done);
2655       return builder.build();
2656     } catch (IOException e) {
2657       throw new ServiceException(e);
2658     }
2659   }
2660 
2661   /**
2662    * Execute Restore/Clone snapshot operation.
2663    *
2664    * <p>If the specified table exists a "Restore" is executed, replacing the table
2665    * schema and directory data with the content of the snapshot.
2666    * The table must be disabled, or a UnsupportedOperationException will be thrown.
2667    *
2668    * <p>If the table doesn't exist a "Clone" is executed, a new table is created
2669    * using the schema at the time of the snapshot, and the content of the snapshot.
2670    *
2671    * <p>The restore/clone operation does not require copying HFiles. Since HFiles
2672    * are immutable the table can point to and use the same files as the original one.
2673    */
2674   @Override
2675   public RestoreSnapshotResponse restoreSnapshot(RpcController controller,
2676       RestoreSnapshotRequest request) throws ServiceException {
2677     try {
2678       this.snapshotManager.checkSnapshotSupport();
2679     } catch (UnsupportedOperationException e) {
2680       throw new ServiceException(e);
2681     }
2682 
2683     try {
2684       SnapshotDescription reqSnapshot = request.getSnapshot();
2685       snapshotManager.restoreSnapshot(reqSnapshot);
2686       return RestoreSnapshotResponse.newBuilder().build();
2687     } catch (IOException e) {
2688       throw new ServiceException(e);
2689     }
2690   }
2691 
2692   /**
2693    * Returns the status of the requested snapshot restore/clone operation.
2694    * This method is not exposed to the user, it is just used internally by HBaseAdmin
2695    * to verify if the restore is completed.
2696    *
2697    * No exceptions are thrown if the restore is not running, the result will be "done".
2698    *
2699    * @return done <tt>true</tt> if the restore/clone operation is completed.
2700    * @throws ServiceException if the operation failed.
2701    */
2702   @Override
2703   public IsRestoreSnapshotDoneResponse isRestoreSnapshotDone(RpcController controller,
2704       IsRestoreSnapshotDoneRequest request) throws ServiceException {
2705     try {
2706       SnapshotDescription snapshot = request.getSnapshot();
2707       IsRestoreSnapshotDoneResponse.Builder builder = IsRestoreSnapshotDoneResponse.newBuilder();
2708       boolean done = snapshotManager.isRestoreDone(snapshot);
2709       builder.setDone(done);
2710       return builder.build();
2711     } catch (IOException e) {
2712       throw new ServiceException(e);
2713     }
2714   }
2715 
2716   private boolean isHealthCheckerConfigured() {
2717     String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
2718     return org.apache.commons.lang.StringUtils.isNotBlank(healthScriptLocation);
2719   }
2720 
2721 }