View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.lang.reflect.Constructor;
24  import java.lang.reflect.InvocationTargetException;
25  import java.net.InetAddress;
26  import java.net.InetSocketAddress;
27  import java.net.UnknownHostException;
28  import java.util.ArrayList;
29  import java.util.Collection;
30  import java.util.Collections;
31  import java.util.Comparator;
32  import java.util.HashSet;
33  import java.util.List;
34  import java.util.Map;
35  import java.util.Set;
36  import java.util.concurrent.Callable;
37  import java.util.concurrent.ExecutionException;
38  import java.util.concurrent.Executors;
39  import java.util.concurrent.Future;
40  import java.util.concurrent.TimeUnit;
41  import java.util.concurrent.atomic.AtomicReference;
42  
43  import javax.management.ObjectName;
44  
45  import org.apache.commons.logging.Log;
46  import org.apache.commons.logging.LogFactory;
47  import org.apache.hadoop.classification.InterfaceAudience;
48  import org.apache.hadoop.conf.Configuration;
49  import org.apache.hadoop.fs.Path;
50  import org.apache.hadoop.hbase.Abortable;
51  import org.apache.hadoop.hbase.Chore;
52  import org.apache.hadoop.hbase.ClusterId;
53  import org.apache.hadoop.hbase.ClusterStatus;
54  import org.apache.hadoop.hbase.DoNotRetryIOException;
55  import org.apache.hadoop.hbase.HBaseIOException;
56  import org.apache.hadoop.hbase.HColumnDescriptor;
57  import org.apache.hadoop.hbase.HConstants;
58  import org.apache.hadoop.hbase.HRegionInfo;
59  import org.apache.hadoop.hbase.HTableDescriptor;
60  import org.apache.hadoop.hbase.HealthCheckChore;
61  import org.apache.hadoop.hbase.MasterNotRunningException;
62  import org.apache.hadoop.hbase.NamespaceDescriptor;
63  import org.apache.hadoop.hbase.NamespaceNotFoundException;
64  import org.apache.hadoop.hbase.PleaseHoldException;
65  import org.apache.hadoop.hbase.Server;
66  import org.apache.hadoop.hbase.ServerLoad;
67  import org.apache.hadoop.hbase.ServerName;
68  import org.apache.hadoop.hbase.TableDescriptors;
69  import org.apache.hadoop.hbase.TableName;
70  import org.apache.hadoop.hbase.TableNotDisabledException;
71  import org.apache.hadoop.hbase.TableNotFoundException;
72  import org.apache.hadoop.hbase.UnknownRegionException;
73  import org.apache.hadoop.hbase.catalog.CatalogTracker;
74  import org.apache.hadoop.hbase.catalog.MetaReader;
75  import org.apache.hadoop.hbase.client.ConnectionUtils;
76  import org.apache.hadoop.hbase.client.MetaScanner;
77  import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
78  import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
79  import org.apache.hadoop.hbase.client.Result;
80  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
81  import org.apache.hadoop.hbase.exceptions.DeserializationException;
82  import org.apache.hadoop.hbase.exceptions.MergeRegionException;
83  import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
84  import org.apache.hadoop.hbase.executor.ExecutorService;
85  import org.apache.hadoop.hbase.executor.ExecutorType;
86  import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
87  import org.apache.hadoop.hbase.ipc.RequestContext;
88  import org.apache.hadoop.hbase.ipc.RpcServer;
89  import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
90  import org.apache.hadoop.hbase.ipc.RpcServerInterface;
91  import org.apache.hadoop.hbase.ipc.ServerRpcController;
92  import org.apache.hadoop.hbase.master.RegionState.State;
93  import org.apache.hadoop.hbase.master.balancer.BalancerChore;
94  import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
95  import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
96  import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
97  import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
98  import org.apache.hadoop.hbase.master.handler.CreateTableHandler;
99  import org.apache.hadoop.hbase.master.handler.DeleteTableHandler;
100 import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
101 import org.apache.hadoop.hbase.master.handler.DispatchMergingRegionHandler;
102 import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
103 import org.apache.hadoop.hbase.master.handler.ModifyTableHandler;
104 import org.apache.hadoop.hbase.master.handler.TableAddFamilyHandler;
105 import org.apache.hadoop.hbase.master.handler.TableDeleteFamilyHandler;
106 import org.apache.hadoop.hbase.master.handler.TableModifyFamilyHandler;
107 import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
108 import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
109 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
110 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
111 import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
112 import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
113 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
114 import org.apache.hadoop.hbase.protobuf.RequestConverter;
115 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
116 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
117 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
118 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
119 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
120 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
121 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
122 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
123 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
124 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos;
125 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnRequest;
126 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnResponse;
127 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionRequest;
128 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionResponse;
129 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceRequest;
130 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceResponse;
131 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceRequest;
132 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceResponse;
133 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableRequest;
134 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableResponse;
135 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnRequest;
136 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnResponse;
137 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
138 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
139 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
140 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
141 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableRequest;
142 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableResponse;
143 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableRequest;
144 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableResponse;
145 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsRequest;
146 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsResponse;
147 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
148 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
149 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableRequest;
150 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableResponse;
151 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureRequest;
152 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureResponse;
153 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusRequest;
154 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusResponse;
155 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
156 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
157 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
158 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
159 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest;
160 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusResponse;
161 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
162 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
163 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesRequest;
164 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesResponse;
165 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest;
166 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse;
167 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest;
168 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningResponse;
169 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
170 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
171 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneRequest;
172 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneResponse;
173 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
174 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
175 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
176 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
177 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
178 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
179 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
180 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
181 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyColumnRequest;
182 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyColumnResponse;
183 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
184 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
185 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableRequest;
186 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableResponse;
187 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionRequest;
188 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionResponse;
189 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionRequest;
190 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionResponse;
191 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
192 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
193 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanRequest;
194 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanResponse;
195 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
196 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
197 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownRequest;
198 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownResponse;
199 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotRequest;
200 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotResponse;
201 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterRequest;
202 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterResponse;
203 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionRequest;
204 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionResponse;
205 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos;
206 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
207 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
208 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
209 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportResponse;
210 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
211 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
212 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
213 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorResponse;
214 import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy;
215 import org.apache.hadoop.hbase.replication.regionserver.Replication;
216 import org.apache.hadoop.hbase.security.UserProvider;
217 import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
218 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
219 import org.apache.hadoop.hbase.trace.SpanReceiverHost;
220 import org.apache.hadoop.hbase.util.Bytes;
221 import org.apache.hadoop.hbase.util.CompressionTest;
222 import org.apache.hadoop.hbase.util.FSTableDescriptors;
223 import org.apache.hadoop.hbase.util.FSUtils;
224 import org.apache.hadoop.hbase.util.HFileArchiveUtil;
225 import org.apache.hadoop.hbase.util.HasThread;
226 import org.apache.hadoop.hbase.util.InfoServer;
227 import org.apache.hadoop.hbase.util.JvmPauseMonitor;
228 import org.apache.hadoop.hbase.util.Pair;
229 import org.apache.hadoop.hbase.util.Sleeper;
230 import org.apache.hadoop.hbase.util.Strings;
231 import org.apache.hadoop.hbase.util.Threads;
232 import org.apache.hadoop.hbase.util.VersionInfo;
233 import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
234 import org.apache.hadoop.hbase.zookeeper.DrainingServerTracker;
235 import org.apache.hadoop.hbase.zookeeper.LoadBalancerTracker;
236 import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
237 import org.apache.hadoop.hbase.zookeeper.RegionServerTracker;
238 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
239 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
240 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
241 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
242 import org.apache.hadoop.metrics.util.MBeanUtil;
243 import org.apache.hadoop.net.DNS;
244 import org.apache.zookeeper.KeeperException;
245 import org.apache.zookeeper.Watcher;
246 
247 import com.google.common.collect.Lists;
248 import com.google.common.collect.Maps;
249 import com.google.protobuf.Descriptors;
250 import com.google.protobuf.Message;
251 import com.google.protobuf.RpcCallback;
252 import com.google.protobuf.RpcController;
253 import com.google.protobuf.Service;
254 import com.google.protobuf.ServiceException;
255 
256 /**
257  * HMaster is the "master server" for HBase. An HBase cluster has one active
258  * master.  If many masters are started, all compete.  Whichever wins goes on to
259  * run the cluster.  All others park themselves in their constructor until
260  * master or cluster shutdown or until the active master loses its lease in
261  * zookeeper.  Thereafter, all running master jostle to take over master role.
262  *
263  * <p>The Master can be asked shutdown the cluster. See {@link #shutdown()}.  In
264  * this case it will tell all regionservers to go down and then wait on them
265  * all reporting in that they are down.  This master will then shut itself down.
266  *
267  * <p>You can also shutdown just this master.  Call {@link #stopMaster()}.
268  *
269  * @see Watcher
270  */
271 @InterfaceAudience.Private
272 @SuppressWarnings("deprecation")
273 public class HMaster extends HasThread implements MasterProtos.MasterService.BlockingInterface,
274 RegionServerStatusProtos.RegionServerStatusService.BlockingInterface,
275 MasterServices, Server {
276   private static final Log LOG = LogFactory.getLog(HMaster.class.getName());
277 
278   // MASTER is name of the webapp and the attribute name used stuffing this
279   //instance into web context.
280   public static final String MASTER = "master";
281 
282   // The configuration for the Master
283   private final Configuration conf;
284   // server for the web ui
285   private InfoServer infoServer;
286 
287   // Our zk client.
288   private ZooKeeperWatcher zooKeeper;
289   // Manager and zk listener for master election
290   private ActiveMasterManager activeMasterManager;
291   // Region server tracker
292   RegionServerTracker regionServerTracker;
293   // Draining region server tracker
294   private DrainingServerTracker drainingServerTracker;
295   // Tracker for load balancer state
296   private LoadBalancerTracker loadBalancerTracker;
297   // master address tracker
298   private MasterAddressTracker masterAddressTracker;
299 
300   // RPC server for the HMaster
301   private final RpcServerInterface rpcServer;
302   private JvmPauseMonitor pauseMonitor;
303   // Set after we've called HBaseServer#openServer and ready to receive RPCs.
304   // Set back to false after we stop rpcServer.  Used by tests.
305   private volatile boolean rpcServerOpen = false;
306 
307   /** Namespace stuff */
308   private TableNamespaceManager tableNamespaceManager;
309   private NamespaceJanitor namespaceJanitorChore;
310 
311   /**
312    * This servers address.
313    */
314   private final InetSocketAddress isa;
315 
316   // Metrics for the HMaster
317   private final MetricsMaster metricsMaster;
318   // file system manager for the master FS operations
319   private MasterFileSystem fileSystemManager;
320 
321   // server manager to deal with region server info
322   ServerManager serverManager;
323 
324   // manager of assignment nodes in zookeeper
325   AssignmentManager assignmentManager;
326   // manager of catalog regions
327   private CatalogTracker catalogTracker;
328   // Cluster status zk tracker and local setter
329   private ClusterStatusTracker clusterStatusTracker;
330 
331   // buffer for "fatal error" notices from region servers
332   // in the cluster. This is only used for assisting
333   // operations/debugging.
334   private MemoryBoundedLogMessageBuffer rsFatals;
335 
336   // This flag is for stopping this Master instance.  Its set when we are
337   // stopping or aborting
338   private volatile boolean stopped = false;
339   // Set on abort -- usually failure of our zk session.
340   private volatile boolean abort = false;
341   // flag set after we become the active master (used for testing)
342   private volatile boolean isActiveMaster = false;
343 
344   // flag set after we complete initialization once active,
345   // it is not private since it's used in unit tests
346   volatile boolean initialized = false;
347 
348   // flag set after we complete assignMeta.
349   private volatile boolean serverShutdownHandlerEnabled = false;
350 
351   // Instance of the hbase executor service.
352   ExecutorService executorService;
353 
354   private LoadBalancer balancer;
355   private Thread balancerChore;
356   private Thread clusterStatusChore;
357   private ClusterStatusPublisher clusterStatusPublisherChore = null;
358 
359   private CatalogJanitor catalogJanitorChore;
360   private LogCleaner logCleaner;
361   private HFileCleaner hfileCleaner;
362 
363   private MasterCoprocessorHost cpHost;
364   private final ServerName serverName;
365 
366   private TableDescriptors tableDescriptors;
367 
368   // Table level lock manager for schema changes
369   private TableLockManager tableLockManager;
370 
371   // Time stamps for when a hmaster was started and when it became active
372   private long masterStartTime;
373   private long masterActiveTime;
374 
375   /** time interval for emitting metrics values */
376   private final int msgInterval;
377   /**
378    * MX Bean for MasterInfo
379    */
380   private ObjectName mxBean = null;
381 
382   //should we check the compression codec type at master side, default true, HBASE-6370
383   private final boolean masterCheckCompression;
384 
385   private SpanReceiverHost spanReceiverHost;
386 
387   private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
388 
389   // monitor for snapshot of hbase tables
390   private SnapshotManager snapshotManager;
391   // monitor for distributed procedures
392   private MasterProcedureManagerHost mpmHost;
393 
394   /** The health check chore. */
395   private HealthCheckChore healthCheckChore;
396 
397   /**
398    * is in distributedLogReplay mode. When true, SplitLogWorker directly replays WAL edits to newly
399    * assigned region servers instead of creating recovered.edits files.
400    */
401   private final boolean distributedLogReplay;
402 
403   /** flag used in test cases in order to simulate RS failures during master initialization */
404   private volatile boolean initializationBeforeMetaAssignment = false;
405 
406   /** The following is used in master recovery scenario to re-register listeners */
407   private List<ZooKeeperListener> registeredZKListenersBeforeRecovery;
408 
409   /**
410    * Initializes the HMaster. The steps are as follows:
411    * <p>
412    * <ol>
413    * <li>Initialize HMaster RPC and address
414    * <li>Connect to ZooKeeper.
415    * </ol>
416    * <p>
417    * Remaining steps of initialization occur in {@link #run()} so that they
418    * run in their own thread rather than within the context of the constructor.
419    * @throws InterruptedException
420    */
421   public HMaster(final Configuration conf)
422   throws IOException, KeeperException, InterruptedException {
423     this.conf = new Configuration(conf);
424     // Disable the block cache on the master
425     this.conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
426     FSUtils.setupShortCircuitRead(conf);
427     // Server to handle client requests.
428     String hostname = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
429       conf.get("hbase.master.dns.interface", "default"),
430       conf.get("hbase.master.dns.nameserver", "default")));
431     int port = conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT);
432     // Test that the hostname is reachable
433     InetSocketAddress initialIsa = new InetSocketAddress(hostname, port);
434     if (initialIsa.getAddress() == null) {
435       throw new IllegalArgumentException("Failed resolve of hostname " + initialIsa);
436     }
437     // Verify that the bind address is reachable if set
438     String bindAddress = conf.get("hbase.master.ipc.address");
439     if (bindAddress != null) {
440       initialIsa = new InetSocketAddress(bindAddress, port);
441       if (initialIsa.getAddress() == null) {
442         throw new IllegalArgumentException("Failed resolve of bind address " + initialIsa);
443       }
444     }
445     String name = "master/" + initialIsa.toString();
446     // Set how many times to retry talking to another server over HConnection.
447     ConnectionUtils.setServerSideHConnectionRetriesConfig(this.conf, name, LOG);
448     int numHandlers = conf.getInt(HConstants.MASTER_HANDLER_COUNT,
449       conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, HConstants.DEFAULT_MASTER_HANLDER_COUNT));
450     this.rpcServer = new RpcServer(this, name, getServices(),
451       initialIsa, // BindAddress is IP we got for this server.
452       conf,
453       new FifoRpcScheduler(conf, numHandlers));
454     // Set our address.
455     this.isa = this.rpcServer.getListenerAddress();
456     // We don't want to pass isa's hostname here since it could be 0.0.0.0
457     this.serverName = ServerName.valueOf(hostname, this.isa.getPort(), System.currentTimeMillis());
458     this.rsFatals = new MemoryBoundedLogMessageBuffer(
459       conf.getLong("hbase.master.buffer.for.rs.fatals", 1*1024*1024));
460 
461     // login the zookeeper client principal (if using security)
462     ZKUtil.loginClient(this.conf, "hbase.zookeeper.client.keytab.file",
463       "hbase.zookeeper.client.kerberos.principal", this.isa.getHostName());
464 
465     // initialize server principal (if using secure Hadoop)
466     UserProvider provider = UserProvider.instantiate(conf);
467     provider.login("hbase.master.keytab.file",
468       "hbase.master.kerberos.principal", this.isa.getHostName());
469 
470     LOG.info("hbase.rootdir=" + FSUtils.getRootDir(this.conf) +
471         ", hbase.cluster.distributed=" + this.conf.getBoolean("hbase.cluster.distributed", false));
472 
473     // set the thread name now we have an address
474     setName(MASTER + ":" + this.serverName.toShortString());
475 
476     Replication.decorateMasterConfiguration(this.conf);
477 
478     // Hack! Maps DFSClient => Master for logs.  HDFS made this
479     // config param for task trackers, but we can piggyback off of it.
480     if (this.conf.get("mapred.task.id") == null) {
481       this.conf.set("mapred.task.id", "hb_m_" + this.serverName.toString());
482     }
483 
484     this.zooKeeper = new ZooKeeperWatcher(conf, MASTER + ":" + isa.getPort(), this, true);
485     this.rpcServer.startThreads();
486     this.pauseMonitor = new JvmPauseMonitor(conf);
487     this.pauseMonitor.start();
488 
489     // metrics interval: using the same property as region server.
490     this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
491 
492     //should we check the compression codec type at master side, default true, HBASE-6370
493     this.masterCheckCompression = conf.getBoolean("hbase.master.check.compression", true);
494 
495     this.metricsMaster = new MetricsMaster( new MetricsMasterWrapperImpl(this));
496 
497     // Health checker thread.
498     int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
499       HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
500     if (isHealthCheckerConfigured()) {
501       healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
502     }
503 
504     // Do we publish the status?
505     boolean shouldPublish = conf.getBoolean(HConstants.STATUS_PUBLISHED,
506         HConstants.STATUS_PUBLISHED_DEFAULT);
507     Class<? extends ClusterStatusPublisher.Publisher> publisherClass =
508         conf.getClass(ClusterStatusPublisher.STATUS_PUBLISHER_CLASS,
509             ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS,
510             ClusterStatusPublisher.Publisher.class);
511 
512     if (shouldPublish) {
513       if (publisherClass == null) {
514         LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
515             ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS +
516             " is not set - not publishing status");
517       } else {
518         clusterStatusPublisherChore = new ClusterStatusPublisher(this, conf, publisherClass);
519         Threads.setDaemonThreadRunning(clusterStatusPublisherChore.getThread());
520       }
521     }
522 
523     distributedLogReplay = this.conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
524       HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
525   }
526 
527   /**
528    * @return list of blocking services and their security info classes that this server supports
529    */
530   private List<BlockingServiceAndInterface> getServices() {
531     List<BlockingServiceAndInterface> bssi = new ArrayList<BlockingServiceAndInterface>(3);
532     bssi.add(new BlockingServiceAndInterface(
533         MasterProtos.MasterService.newReflectiveBlockingService(this),
534         MasterProtos.MasterService.BlockingInterface.class));
535     bssi.add(new BlockingServiceAndInterface(
536         RegionServerStatusProtos.RegionServerStatusService.newReflectiveBlockingService(this),
537         RegionServerStatusProtos.RegionServerStatusService.BlockingInterface.class));
538     return bssi;
539   }
540 
541   /**
542    * Stall startup if we are designated a backup master; i.e. we want someone
543    * else to become the master before proceeding.
544    * @param c configuration
545    * @param amm
546    * @throws InterruptedException
547    */
548   private static void stallIfBackupMaster(final Configuration c,
549       final ActiveMasterManager amm)
550   throws InterruptedException {
551     // If we're a backup master, stall until a primary to writes his address
552     if (!c.getBoolean(HConstants.MASTER_TYPE_BACKUP,
553       HConstants.DEFAULT_MASTER_TYPE_BACKUP)) {
554       return;
555     }
556     LOG.debug("HMaster started in backup mode.  " +
557       "Stalling until master znode is written.");
558     // This will only be a minute or so while the cluster starts up,
559     // so don't worry about setting watches on the parent znode
560     while (!amm.isActiveMaster()) {
561       LOG.debug("Waiting for master address ZNode to be written " +
562         "(Also watching cluster state node)");
563       Thread.sleep(
564         c.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT));
565     }
566 
567   }
568 
569   MetricsMaster getMetrics() {
570     return metricsMaster;
571   }
572 
573   /**
574    * Main processing loop for the HMaster.
575    * <ol>
576    * <li>Block until becoming active master
577    * <li>Finish initialization via finishInitialization(MonitoredTask)
578    * <li>Enter loop until we are stopped
579    * <li>Stop services and perform cleanup once stopped
580    * </ol>
581    */
582   @Override
583   public void run() {
584     MonitoredTask startupStatus =
585       TaskMonitor.get().createStatus("Master startup");
586     startupStatus.setDescription("Master startup");
587     masterStartTime = System.currentTimeMillis();
588     try {
589       this.masterAddressTracker = new MasterAddressTracker(getZooKeeperWatcher(), this);
590       this.masterAddressTracker.start();
591 
592       // Put up info server.
593       int port = this.conf.getInt("hbase.master.info.port", HConstants.DEFAULT_MASTER_INFOPORT);
594       if (port >= 0) {
595         String a = this.conf.get("hbase.master.info.bindAddress", "0.0.0.0");
596         this.infoServer = new InfoServer(MASTER, a, port, false, this.conf);
597         this.infoServer.addServlet("status", "/master-status", MasterStatusServlet.class);
598         this.infoServer.addServlet("dump", "/dump", MasterDumpServlet.class);
599         this.infoServer.setAttribute(MASTER, this);
600         this.infoServer.start();
601       }
602 
603       this.registeredZKListenersBeforeRecovery = this.zooKeeper.getListeners();
604       /*
605        * Block on becoming the active master.
606        *
607        * We race with other masters to write our address into ZooKeeper.  If we
608        * succeed, we are the primary/active master and finish initialization.
609        *
610        * If we do not succeed, there is another active master and we should
611        * now wait until it dies to try and become the next active master.  If we
612        * do not succeed on our first attempt, this is no longer a cluster startup.
613        */
614       becomeActiveMaster(startupStatus);
615 
616       // We are either the active master or we were asked to shutdown
617       if (!this.stopped) {
618         finishInitialization(startupStatus, false);
619         loop();
620       }
621     } catch (Throwable t) {
622       // HBASE-5680: Likely hadoop23 vs hadoop 20.x/1.x incompatibility
623       if (t instanceof NoClassDefFoundError &&
624           t.getMessage().contains("org/apache/hadoop/hdfs/protocol/FSConstants$SafeModeAction")) {
625           // improved error message for this special case
626           abort("HBase is having a problem with its Hadoop jars.  You may need to "
627               + "recompile HBase against Hadoop version "
628               +  org.apache.hadoop.util.VersionInfo.getVersion()
629               + " or change your hadoop jars to start properly", t);
630       } else {
631         abort("Unhandled exception. Starting shutdown.", t);
632       }
633     } finally {
634       startupStatus.cleanup();
635 
636       stopChores();
637       // Wait for all the remaining region servers to report in IFF we were
638       // running a cluster shutdown AND we were NOT aborting.
639       if (!this.abort && this.serverManager != null &&
640           this.serverManager.isClusterShutdown()) {
641         this.serverManager.letRegionServersShutdown();
642       }
643       stopServiceThreads();
644       // Stop services started for both backup and active masters
645       if (this.activeMasterManager != null) this.activeMasterManager.stop();
646       if (this.catalogTracker != null) this.catalogTracker.stop();
647       if (this.serverManager != null) this.serverManager.stop();
648       if (this.assignmentManager != null) this.assignmentManager.stop();
649       if (this.fileSystemManager != null) this.fileSystemManager.stop();
650       if (this.mpmHost != null) this.mpmHost.stop("server shutting down.");
651       this.zooKeeper.close();
652     }
653     LOG.info("HMaster main thread exiting");
654   }
655 
656   /**
657    * Try becoming active master.
658    * @param startupStatus
659    * @return True if we could successfully become the active master.
660    * @throws InterruptedException
661    */
662   private boolean becomeActiveMaster(MonitoredTask startupStatus)
663   throws InterruptedException {
664     // TODO: This is wrong!!!! Should have new servername if we restart ourselves,
665     // if we come back to life.
666     this.activeMasterManager = new ActiveMasterManager(zooKeeper, this.serverName,
667         this);
668     this.zooKeeper.registerListener(activeMasterManager);
669     stallIfBackupMaster(this.conf, this.activeMasterManager);
670 
671     // The ClusterStatusTracker is setup before the other
672     // ZKBasedSystemTrackers because it's needed by the activeMasterManager
673     // to check if the cluster should be shutdown.
674     this.clusterStatusTracker = new ClusterStatusTracker(getZooKeeper(), this);
675     this.clusterStatusTracker.start();
676     return this.activeMasterManager.blockUntilBecomingActiveMaster(startupStatus);
677   }
678 
679   /**
680    * Initialize all ZK based system trackers.
681    * @throws IOException
682    * @throws InterruptedException
683    */
684   void initializeZKBasedSystemTrackers() throws IOException,
685       InterruptedException, KeeperException {
686     this.catalogTracker = createCatalogTracker(this.zooKeeper, this.conf, this);
687     this.catalogTracker.start();
688 
689     this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
690     this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this);
691     this.loadBalancerTracker.start();
692     this.assignmentManager = new AssignmentManager(this, serverManager,
693       this.catalogTracker, this.balancer, this.executorService, this.metricsMaster,
694       this.tableLockManager);
695     zooKeeper.registerListenerFirst(assignmentManager);
696 
697     this.regionServerTracker = new RegionServerTracker(zooKeeper, this,
698         this.serverManager);
699     this.regionServerTracker.start();
700 
701     this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this,
702       this.serverManager);
703     this.drainingServerTracker.start();
704 
705     // Set the cluster as up.  If new RSs, they'll be waiting on this before
706     // going ahead with their startup.
707     boolean wasUp = this.clusterStatusTracker.isClusterUp();
708     if (!wasUp) this.clusterStatusTracker.setClusterUp();
709 
710     LOG.info("Server active/primary master=" + this.serverName +
711         ", sessionid=0x" +
712         Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) +
713         ", setting cluster-up flag (Was=" + wasUp + ")");
714 
715     // create/initialize the snapshot manager and other procedure managers
716     this.snapshotManager = new SnapshotManager();
717     this.mpmHost = new MasterProcedureManagerHost();
718     this.mpmHost.register(this.snapshotManager);
719     this.mpmHost.loadProcedures(conf);
720     this.mpmHost.initialize(this, this.metricsMaster);
721   }
722 
723   /**
724    * Create CatalogTracker.
725    * In its own method so can intercept and mock it over in tests.
726    * @param zk If zk is null, we'll create an instance (and shut it down
727    * when {@link #stop(String)} is called) else we'll use what is passed.
728    * @param conf
729    * @param abortable If fatal exception we'll call abort on this.  May be null.
730    * If it is we'll use the Connection associated with the passed
731    * {@link Configuration} as our {@link Abortable}.
732    * ({@link Object#wait(long)} when passed a <code>0</code> waits for ever).
733    * @throws IOException
734    */
735   CatalogTracker createCatalogTracker(final ZooKeeperWatcher zk,
736       final Configuration conf, Abortable abortable)
737   throws IOException {
738     return new CatalogTracker(zk, conf, abortable);
739   }
740 
741   // Check if we should stop every 100ms
742   private Sleeper stopSleeper = new Sleeper(100, this);
743 
744   private void loop() {
745     long lastMsgTs = 0l;
746     long now = 0l;
747     while (!this.stopped) {
748       now = System.currentTimeMillis();
749       if ((now - lastMsgTs) >= this.msgInterval) {
750         doMetrics();
751         lastMsgTs = System.currentTimeMillis();
752       }
753       stopSleeper.sleep();
754     }
755   }
756 
757   /**
758    * Emit the HMaster metrics, such as region in transition metrics.
759    * Surrounding in a try block just to be sure metrics doesn't abort HMaster.
760    */
761   private void doMetrics() {
762     try {
763       this.assignmentManager.updateRegionsInTransitionMetrics();
764     } catch (Throwable e) {
765       LOG.error("Couldn't update metrics: " + e.getMessage());
766     }
767   }
768 
769   /**
770    * Finish initialization of HMaster after becoming the primary master.
771    *
772    * <ol>
773    * <li>Initialize master components - file system manager, server manager,
774    *     assignment manager, region server tracker, catalog tracker, etc</li>
775    * <li>Start necessary service threads - rpc server, info server,
776    *     executor services, etc</li>
777    * <li>Set cluster as UP in ZooKeeper</li>
778    * <li>Wait for RegionServers to check-in</li>
779    * <li>Split logs and perform data recovery, if necessary</li>
780    * <li>Ensure assignment of meta regions<li>
781    * <li>Handle either fresh cluster start or master failover</li>
782    * </ol>
783    *
784    * @param masterRecovery
785    *
786    * @throws IOException
787    * @throws InterruptedException
788    * @throws KeeperException
789    */
790   private void finishInitialization(MonitoredTask status, boolean masterRecovery)
791   throws IOException, InterruptedException, KeeperException {
792 
793     isActiveMaster = true;
794 
795     /*
796      * We are active master now... go initialize components we need to run.
797      * Note, there may be dross in zk from previous runs; it'll get addressed
798      * below after we determine if cluster startup or failover.
799      */
800 
801     status.setStatus("Initializing Master file system");
802 
803     this.masterActiveTime = System.currentTimeMillis();
804     // TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring.
805     this.fileSystemManager = new MasterFileSystem(this, this, masterRecovery);
806 
807     this.tableDescriptors =
808       new FSTableDescriptors(this.fileSystemManager.getFileSystem(),
809       this.fileSystemManager.getRootDir());
810 
811     // publish cluster ID
812     status.setStatus("Publishing Cluster ID in ZooKeeper");
813     ZKClusterId.setClusterId(this.zooKeeper, fileSystemManager.getClusterId());
814 
815     if (!masterRecovery) {
816       this.executorService = new ExecutorService(getServerName().toShortString());
817       this.serverManager = createServerManager(this, this);
818     }
819 
820     //Initialize table lock manager, and ensure that all write locks held previously
821     //are invalidated
822     this.tableLockManager = TableLockManager.createTableLockManager(conf, zooKeeper, serverName);
823     if (!masterRecovery) {
824       this.tableLockManager.reapWriteLocks();
825     }
826 
827     status.setStatus("Initializing ZK system trackers");
828     initializeZKBasedSystemTrackers();
829 
830     if (!masterRecovery) {
831       // initialize master side coprocessors before we start handling requests
832       status.setStatus("Initializing master coprocessors");
833       this.cpHost = new MasterCoprocessorHost(this, this.conf);
834 
835       spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration());
836 
837       // start up all service threads.
838       status.setStatus("Initializing master service threads");
839       startServiceThreads();
840     }
841 
842     // Wait for region servers to report in.
843     this.serverManager.waitForRegionServers(status);
844     // Check zk for region servers that are up but didn't register
845     for (ServerName sn: this.regionServerTracker.getOnlineServers()) {
846       // The isServerOnline check is opportunistic, correctness is handled inside
847       if (!this.serverManager.isServerOnline(sn)
848           && serverManager.checkAndRecordNewServer(sn, ServerLoad.EMPTY_SERVERLOAD)) {
849         LOG.info("Registered server found up in zk but who has not yet reported in: " + sn);
850       }
851     }
852 
853     if (!masterRecovery) {
854       this.assignmentManager.startTimeOutMonitor();
855     }
856 
857     // get a list for previously failed RS which need log splitting work
858     // we recover hbase:meta region servers inside master initialization and
859     // handle other failed servers in SSH in order to start up master node ASAP
860     Set<ServerName> previouslyFailedServers = this.fileSystemManager
861         .getFailedServersFromLogFolders();
862 
863     // remove stale recovering regions from previous run
864     this.fileSystemManager.removeStaleRecoveringRegionsFromZK(previouslyFailedServers);
865 
866     // log splitting for hbase:meta server
867     ServerName oldMetaServerLocation = this.catalogTracker.getMetaLocation();
868     if (oldMetaServerLocation != null && previouslyFailedServers.contains(oldMetaServerLocation)) {
869       splitMetaLogBeforeAssignment(oldMetaServerLocation);
870       // Note: we can't remove oldMetaServerLocation from previousFailedServers list because it
871       // may also host user regions
872     }
873     Set<ServerName> previouslyFailedMetaRSs = getPreviouselyFailedMetaServersFromZK();
874     // need to use union of previouslyFailedMetaRSs recorded in ZK and previouslyFailedServers
875     // instead of previouslyFailedMetaRSs alone to address the following two situations:
876     // 1) the chained failure situation(recovery failed multiple times in a row).
877     // 2) master get killed right before it could delete the recovering hbase:meta from ZK while the
878     // same server still has non-meta wals to be replayed so that
879     // removeStaleRecoveringRegionsFromZK can't delete the stale hbase:meta region
880     // Passing more servers into splitMetaLog is all right. If a server doesn't have hbase:meta wal,
881     // there is no op for the server.
882     previouslyFailedMetaRSs.addAll(previouslyFailedServers);
883 
884     this.initializationBeforeMetaAssignment = true;
885 
886     //initialize load balancer
887     this.balancer.setClusterStatus(getClusterStatus());
888     this.balancer.setMasterServices(this);
889     this.balancer.initialize();
890 
891     // Make sure meta assigned before proceeding.
892     status.setStatus("Assigning Meta Region");
893     assignMeta(status, previouslyFailedMetaRSs);
894     // check if master is shutting down because above assignMeta could return even hbase:meta isn't
895     // assigned when master is shutting down
896     if(this.stopped) return;
897 
898     status.setStatus("Submitting log splitting work for previously failed region servers");
899     // Master has recovered hbase:meta region server and we put
900     // other failed region servers in a queue to be handled later by SSH
901     for (ServerName tmpServer : previouslyFailedServers) {
902       this.serverManager.processDeadServer(tmpServer, true);
903     }
904 
905     // Update meta with new PB serialization if required. i.e migrate all HRI to PB serialization
906     // in meta. This must happen before we assign all user regions or else the assignment will
907     // fail.
908     org.apache.hadoop.hbase.catalog.MetaMigrationConvertingToPB
909       .updateMetaIfNecessary(this);
910 
911     // Fix up assignment manager status
912     status.setStatus("Starting assignment manager");
913     this.assignmentManager.joinCluster();
914 
915     //set cluster status again after user regions are assigned
916     this.balancer.setClusterStatus(getClusterStatus());
917 
918     if (!masterRecovery) {
919       // Start balancer and meta catalog janitor after meta and regions have
920       // been assigned.
921       status.setStatus("Starting balancer and catalog janitor");
922       this.clusterStatusChore = getAndStartClusterStatusChore(this);
923       this.balancerChore = getAndStartBalancerChore(this);
924       this.catalogJanitorChore = new CatalogJanitor(this, this);
925       startCatalogJanitorChore();
926     }
927 
928     status.setStatus("Starting namespace manager");
929     initNamespace();
930 
931     if (this.cpHost != null) {
932       try {
933         this.cpHost.preMasterInitialization();
934       } catch (IOException e) {
935         LOG.error("Coprocessor preMasterInitialization() hook failed", e);
936       }
937     }
938 
939     status.markComplete("Initialization successful");
940     LOG.info("Master has completed initialization");
941     initialized = true;
942     // clear the dead servers with same host name and port of online server because we are not
943     // removing dead server with same hostname and port of rs which is trying to check in before
944     // master initialization. See HBASE-5916.
945     this.serverManager.clearDeadServersWithSameHostNameAndPortOfOnlineServer();
946 
947     if (!masterRecovery) {
948       if (this.cpHost != null) {
949         // don't let cp initialization errors kill the master
950         try {
951           this.cpHost.postStartMaster();
952         } catch (IOException ioe) {
953           LOG.error("Coprocessor postStartMaster() hook failed", ioe);
954         }
955       }
956     }
957   }
958 
959   /**
960    * Useful for testing purpose also where we have
961    * master restart scenarios.
962    */
963   protected void startCatalogJanitorChore() {
964     Threads.setDaemonThreadRunning(catalogJanitorChore.getThread());
965   }
966 
967   /**
968    * Useful for testing purpose also where we have
969    * master restart scenarios.
970    */
971   protected void startNamespaceJanitorChore() {
972     Threads.setDaemonThreadRunning(namespaceJanitorChore.getThread());
973   }
974 
975   /**
976    * Create a {@link ServerManager} instance.
977    * @param master
978    * @param services
979    * @return An instance of {@link ServerManager}
980    * @throws org.apache.hadoop.hbase.ZooKeeperConnectionException
981    * @throws IOException
982    */
983   ServerManager createServerManager(final Server master,
984       final MasterServices services)
985   throws IOException {
986     // We put this out here in a method so can do a Mockito.spy and stub it out
987     // w/ a mocked up ServerManager.
988     return new ServerManager(master, services);
989   }
990 
991   /**
992    * Check <code>hbase:meta</code> is assigned. If not, assign it.
993    * @param status MonitoredTask
994    * @param previouslyFailedMetaRSs
995    * @throws InterruptedException
996    * @throws IOException
997    * @throws KeeperException
998    */
999   void assignMeta(MonitoredTask status, Set<ServerName> previouslyFailedMetaRSs)
1000       throws InterruptedException, IOException, KeeperException {
1001     // Work on meta region
1002     int assigned = 0;
1003     long timeout = this.conf.getLong("hbase.catalog.verification.timeout", 1000);
1004     status.setStatus("Assigning hbase:meta region");
1005 
1006     RegionStates regionStates = assignmentManager.getRegionStates();
1007     regionStates.createRegionState(HRegionInfo.FIRST_META_REGIONINFO);
1008     boolean rit = this.assignmentManager
1009       .processRegionInTransitionAndBlockUntilAssigned(HRegionInfo.FIRST_META_REGIONINFO);
1010     boolean metaRegionLocation = this.catalogTracker.verifyMetaRegionLocation(timeout);
1011     ServerName currentMetaServer = this.catalogTracker.getMetaLocation();
1012     if (!metaRegionLocation) {
1013       // Meta location is not verified. It should be in transition, or offline.
1014       // We will wait for it to be assigned in enableSSHandWaitForMeta below.
1015       assigned++;
1016       if (!rit) {
1017         // Assign meta since not already in transition
1018         if (currentMetaServer != null) {
1019           // If the meta server is not known to be dead or online,
1020           // just split the meta log, and don't expire it since this
1021           // could be a full cluster restart. Otherwise, we will think
1022           // this is a failover and lose previous region locations.
1023           // If it is really a failover case, AM will find out in rebuilding
1024           // user regions. Otherwise, we are good since all logs are split
1025           // or known to be replayed before user regions are assigned.
1026           if (serverManager.isServerOnline(currentMetaServer)) {
1027             LOG.info("Forcing expire of " + currentMetaServer);
1028             serverManager.expireServer(currentMetaServer);
1029           }
1030           splitMetaLogBeforeAssignment(currentMetaServer);
1031           previouslyFailedMetaRSs.add(currentMetaServer);
1032         }
1033         assignmentManager.assignMeta();
1034       }
1035     } else {
1036       // Region already assigned. We didn't assign it. Add to in-memory state.
1037       regionStates.updateRegionState(
1038         HRegionInfo.FIRST_META_REGIONINFO, State.OPEN, currentMetaServer);
1039       this.assignmentManager.regionOnline(
1040         HRegionInfo.FIRST_META_REGIONINFO, currentMetaServer);
1041     }
1042 
1043     enableMeta(TableName.META_TABLE_NAME);
1044 
1045     if (this.distributedLogReplay && (!previouslyFailedMetaRSs.isEmpty())) {
1046       // replay WAL edits mode need new hbase:meta RS is assigned firstly
1047       status.setStatus("replaying log for Meta Region");
1048       this.fileSystemManager.splitMetaLog(previouslyFailedMetaRSs);
1049     }
1050 
1051     // Make sure a hbase:meta location is set. We need to enable SSH here since
1052     // if the meta region server is died at this time, we need it to be re-assigned
1053     // by SSH so that system tables can be assigned.
1054     // No need to wait for meta is assigned = 0 when meta is just verified.
1055     enableServerShutdownHandler(assigned != 0);
1056 
1057     LOG.info("hbase:meta assigned=" + assigned + ", rit=" + rit +
1058       ", location=" + catalogTracker.getMetaLocation());
1059     status.setStatus("META assigned.");
1060   }
1061 
1062   void initNamespace() throws IOException {
1063     //create namespace manager
1064     tableNamespaceManager = new TableNamespaceManager(this);
1065     tableNamespaceManager.start();
1066   }
1067 
1068   private void splitMetaLogBeforeAssignment(ServerName currentMetaServer) throws IOException {
1069     if (this.distributedLogReplay) {
1070       // In log replay mode, we mark hbase:meta region as recovering in ZK
1071       Set<HRegionInfo> regions = new HashSet<HRegionInfo>();
1072       regions.add(HRegionInfo.FIRST_META_REGIONINFO);
1073       this.fileSystemManager.prepareLogReplay(currentMetaServer, regions);
1074     } else {
1075       // In recovered.edits mode: create recovered edits file for hbase:meta server
1076       this.fileSystemManager.splitMetaLog(currentMetaServer);
1077     }
1078   }
1079 
1080   private void enableServerShutdownHandler(
1081       final boolean waitForMeta) throws IOException, InterruptedException {
1082     // If ServerShutdownHandler is disabled, we enable it and expire those dead
1083     // but not expired servers. This is required so that if meta is assigning to
1084     // a server which dies after assignMeta starts assignment,
1085     // SSH can re-assign it. Otherwise, we will be
1086     // stuck here waiting forever if waitForMeta is specified.
1087     if (!serverShutdownHandlerEnabled) {
1088       serverShutdownHandlerEnabled = true;
1089       this.serverManager.processQueuedDeadServers();
1090     }
1091 
1092     if (waitForMeta) {
1093       this.catalogTracker.waitForMeta();
1094       // Above check waits for general meta availability but this does not
1095       // guarantee that the transition has completed
1096       this.assignmentManager.waitForAssignment(HRegionInfo.FIRST_META_REGIONINFO);
1097     }
1098   }
1099 
1100   private void enableMeta(TableName metaTableName) {
1101     if (!this.assignmentManager.getZKTable().isEnabledTable(metaTableName)) {
1102       this.assignmentManager.setEnabledTable(metaTableName);
1103     }
1104   }
1105 
1106   /**
1107    * This function returns a set of region server names under hbase:meta recovering region ZK node
1108    * @return Set of meta server names which were recorded in ZK
1109    * @throws KeeperException
1110    */
1111   private Set<ServerName> getPreviouselyFailedMetaServersFromZK() throws KeeperException {
1112     Set<ServerName> result = new HashSet<ServerName>();
1113     String metaRecoveringZNode = ZKUtil.joinZNode(zooKeeper.recoveringRegionsZNode,
1114       HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
1115     List<String> regionFailedServers = ZKUtil.listChildrenNoWatch(zooKeeper, metaRecoveringZNode);
1116     if (regionFailedServers == null) return result;
1117 
1118     for(String failedServer : regionFailedServers) {
1119       ServerName server = ServerName.parseServerName(failedServer);
1120       result.add(server);
1121     }
1122     return result;
1123   }
1124 
1125   @Override
1126   public TableDescriptors getTableDescriptors() {
1127     return this.tableDescriptors;
1128   }
1129 
1130   /** @return InfoServer object. Maybe null.*/
1131   public InfoServer getInfoServer() {
1132     return this.infoServer;
1133   }
1134 
1135   @Override
1136   public Configuration getConfiguration() {
1137     return this.conf;
1138   }
1139 
1140   @Override
1141   public ServerManager getServerManager() {
1142     return this.serverManager;
1143   }
1144 
1145   @Override
1146   public ExecutorService getExecutorService() {
1147     return this.executorService;
1148   }
1149 
1150   @Override
1151   public MasterFileSystem getMasterFileSystem() {
1152     return this.fileSystemManager;
1153   }
1154 
1155   /**
1156    * Get the ZK wrapper object - needed by master_jsp.java
1157    * @return the zookeeper wrapper
1158    */
1159   public ZooKeeperWatcher getZooKeeperWatcher() {
1160     return this.zooKeeper;
1161   }
1162 
1163   public ActiveMasterManager getActiveMasterManager() {
1164     return this.activeMasterManager;
1165   }
1166 
1167   public MasterAddressTracker getMasterAddressTracker() {
1168     return this.masterAddressTracker;
1169   }
1170 
1171   /*
1172    * Start up all services. If any of these threads gets an unhandled exception
1173    * then they just die with a logged message.  This should be fine because
1174    * in general, we do not expect the master to get such unhandled exceptions
1175    *  as OOMEs; it should be lightly loaded. See what HRegionServer does if
1176    *  need to install an unexpected exception handler.
1177    */
1178   void startServiceThreads() throws IOException{
1179    // Start the executor service pools
1180    this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION,
1181       conf.getInt("hbase.master.executor.openregion.threads", 5));
1182    this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION,
1183       conf.getInt("hbase.master.executor.closeregion.threads", 5));
1184    this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
1185       conf.getInt("hbase.master.executor.serverops.threads", 5));
1186    this.executorService.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS,
1187       conf.getInt("hbase.master.executor.serverops.threads", 5));
1188    this.executorService.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS,
1189       conf.getInt("hbase.master.executor.logreplayops.threads", 10));
1190 
1191    // We depend on there being only one instance of this executor running
1192    // at a time.  To do concurrency, would need fencing of enable/disable of
1193    // tables.
1194    this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
1195 
1196    // Start log cleaner thread
1197    String n = Thread.currentThread().getName();
1198    int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 60 * 1000);
1199    this.logCleaner =
1200       new LogCleaner(cleanerInterval,
1201          this, conf, getMasterFileSystem().getFileSystem(),
1202          getMasterFileSystem().getOldLogDir());
1203          Threads.setDaemonThreadRunning(logCleaner.getThread(), n + ".oldLogCleaner");
1204 
1205    //start the hfile archive cleaner thread
1206     Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
1207     this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem()
1208         .getFileSystem(), archiveDir);
1209     Threads.setDaemonThreadRunning(hfileCleaner.getThread(), n + ".archivedHFileCleaner");
1210 
1211     // Start the health checker
1212     if (this.healthCheckChore != null) {
1213       Threads.setDaemonThreadRunning(this.healthCheckChore.getThread(), n + ".healthChecker");
1214     }
1215 
1216     // Start allowing requests to happen.
1217     this.rpcServer.openServer();
1218     this.rpcServerOpen = true;
1219     if (LOG.isTraceEnabled()) {
1220       LOG.trace("Started service threads");
1221     }
1222   }
1223 
1224   /**
1225    * Use this when trying to figure when its ok to send in rpcs.  Used by tests.
1226    * @return True if we have successfully run {@link RpcServer#openServer()}
1227    */
1228   boolean isRpcServerOpen() {
1229     return this.rpcServerOpen;
1230   }
1231 
1232   private void stopServiceThreads() {
1233     if (LOG.isDebugEnabled()) {
1234       LOG.debug("Stopping service threads");
1235     }
1236     if (this.rpcServer != null) this.rpcServer.stop();
1237     this.rpcServerOpen = false;
1238     // Clean up and close up shop
1239     if (this.logCleaner!= null) this.logCleaner.interrupt();
1240     if (this.hfileCleaner != null) this.hfileCleaner.interrupt();
1241 
1242     if (this.infoServer != null) {
1243       LOG.info("Stopping infoServer");
1244       try {
1245         this.infoServer.stop();
1246       } catch (Exception ex) {
1247         ex.printStackTrace();
1248       }
1249     }
1250     if (this.executorService != null) this.executorService.shutdown();
1251     if (this.healthCheckChore != null) {
1252       this.healthCheckChore.interrupt();
1253     }
1254     if (this.pauseMonitor != null) {
1255       this.pauseMonitor.stop();
1256     }
1257   }
1258 
1259   private static Thread getAndStartClusterStatusChore(HMaster master) {
1260     if (master == null || master.balancer == null) {
1261       return null;
1262     }
1263     Chore chore = new ClusterStatusChore(master, master.balancer);
1264     return Threads.setDaemonThreadRunning(chore.getThread());
1265   }
1266 
1267   private static Thread getAndStartBalancerChore(final HMaster master) {
1268     // Start up the load balancer chore
1269     Chore chore = new BalancerChore(master);
1270     return Threads.setDaemonThreadRunning(chore.getThread());
1271   }
1272 
1273   private void stopChores() {
1274     if (this.balancerChore != null) {
1275       this.balancerChore.interrupt();
1276     }
1277     if (this.clusterStatusChore != null) {
1278       this.clusterStatusChore.interrupt();
1279     }
1280     if (this.catalogJanitorChore != null) {
1281       this.catalogJanitorChore.interrupt();
1282     }
1283     if (this.clusterStatusPublisherChore != null){
1284       clusterStatusPublisherChore.interrupt();
1285     }
1286     if (this.namespaceJanitorChore != null){
1287       namespaceJanitorChore.interrupt();
1288     }
1289   }
1290 
1291   @Override
1292   public RegionServerStartupResponse regionServerStartup(
1293       RpcController controller, RegionServerStartupRequest request) throws ServiceException {
1294     // Register with server manager
1295     try {
1296       InetAddress ia = getRemoteInetAddress(request.getPort(), request.getServerStartCode());
1297       ServerName rs = this.serverManager.regionServerStartup(ia, request.getPort(),
1298         request.getServerStartCode(), request.getServerCurrentTime());
1299 
1300       // Send back some config info
1301       RegionServerStartupResponse.Builder resp = createConfigurationSubset();
1302       NameStringPair.Builder entry = NameStringPair.newBuilder()
1303         .setName(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)
1304         .setValue(rs.getHostname());
1305       resp.addMapEntries(entry.build());
1306 
1307       return resp.build();
1308     } catch (IOException ioe) {
1309       throw new ServiceException(ioe);
1310     }
1311   }
1312 
1313   /**
1314    * @return Get remote side's InetAddress
1315    * @throws UnknownHostException
1316    */
1317   InetAddress getRemoteInetAddress(final int port, final long serverStartCode)
1318   throws UnknownHostException {
1319     // Do it out here in its own little method so can fake an address when
1320     // mocking up in tests.
1321     return RpcServer.getRemoteIp();
1322   }
1323 
1324   /**
1325    * @return Subset of configuration to pass initializing regionservers: e.g.
1326    * the filesystem to use and root directory to use.
1327    */
1328   protected RegionServerStartupResponse.Builder createConfigurationSubset() {
1329     RegionServerStartupResponse.Builder resp = addConfig(
1330       RegionServerStartupResponse.newBuilder(), HConstants.HBASE_DIR);
1331     return addConfig(resp, "fs.default.name");
1332   }
1333 
1334   private RegionServerStartupResponse.Builder addConfig(
1335       final RegionServerStartupResponse.Builder resp, final String key) {
1336     NameStringPair.Builder entry = NameStringPair.newBuilder()
1337       .setName(key)
1338       .setValue(this.conf.get(key));
1339     resp.addMapEntries(entry.build());
1340     return resp;
1341   }
1342 
1343   @Override
1344   public GetLastFlushedSequenceIdResponse getLastFlushedSequenceId(RpcController controller,
1345       GetLastFlushedSequenceIdRequest request) throws ServiceException {
1346     byte[] regionName = request.getRegionName().toByteArray();
1347     long seqId = serverManager.getLastFlushedSequenceId(regionName);
1348     return ResponseConverter.buildGetLastFlushedSequenceIdResponse(seqId);
1349   }
1350 
1351   @Override
1352   public RegionServerReportResponse regionServerReport(
1353       RpcController controller, RegionServerReportRequest request) throws ServiceException {
1354     try {
1355       ClusterStatusProtos.ServerLoad sl = request.getLoad();
1356       ServerName serverName = ProtobufUtil.toServerName(request.getServer());
1357       ServerLoad oldLoad = serverManager.getLoad(serverName);
1358       this.serverManager.regionServerReport(serverName, new ServerLoad(sl));
1359       if (sl != null && this.metricsMaster != null) {
1360         // Up our metrics.
1361         this.metricsMaster.incrementRequests(sl.getTotalNumberOfRequests()
1362           - (oldLoad != null ? oldLoad.getTotalNumberOfRequests() : 0));
1363       }
1364     } catch (IOException ioe) {
1365       throw new ServiceException(ioe);
1366     }
1367 
1368     return RegionServerReportResponse.newBuilder().build();
1369   }
1370 
1371   @Override
1372   public ReportRSFatalErrorResponse reportRSFatalError(
1373       RpcController controller, ReportRSFatalErrorRequest request) throws ServiceException {
1374     String errorText = request.getErrorMessage();
1375     ServerName sn = ProtobufUtil.toServerName(request.getServer());
1376     String msg = "Region server " + sn +
1377       " reported a fatal error:\n" + errorText;
1378     LOG.error(msg);
1379     rsFatals.add(msg);
1380 
1381     return ReportRSFatalErrorResponse.newBuilder().build();
1382   }
1383 
1384   public boolean isMasterRunning() {
1385     return !isStopped();
1386   }
1387 
1388   @Override
1389   public IsMasterRunningResponse isMasterRunning(RpcController c, IsMasterRunningRequest req)
1390   throws ServiceException {
1391     return IsMasterRunningResponse.newBuilder().setIsMasterRunning(isMasterRunning()).build();
1392   }
1393 
1394   @Override
1395   public RunCatalogScanResponse runCatalogScan(RpcController c,
1396       RunCatalogScanRequest req) throws ServiceException {
1397     try {
1398       return ResponseConverter.buildRunCatalogScanResponse(catalogJanitorChore.scan());
1399     } catch (IOException ioe) {
1400       throw new ServiceException(ioe);
1401     }
1402   }
1403 
1404   @Override
1405   public EnableCatalogJanitorResponse enableCatalogJanitor(RpcController c,
1406       EnableCatalogJanitorRequest req) throws ServiceException {
1407     return EnableCatalogJanitorResponse.newBuilder().
1408         setPrevValue(catalogJanitorChore.setEnabled(req.getEnable())).build();
1409   }
1410 
1411   @Override
1412   public IsCatalogJanitorEnabledResponse isCatalogJanitorEnabled(RpcController c,
1413       IsCatalogJanitorEnabledRequest req) throws ServiceException {
1414     boolean isEnabled = catalogJanitorChore != null ? catalogJanitorChore.getEnabled() : false;
1415     return IsCatalogJanitorEnabledResponse.newBuilder().setValue(isEnabled).build();
1416   }
1417 
1418   /**
1419    * @return Maximum time we should run balancer for
1420    */
1421   private int getBalancerCutoffTime() {
1422     int balancerCutoffTime =
1423       getConfiguration().getInt("hbase.balancer.max.balancing", -1);
1424     if (balancerCutoffTime == -1) {
1425       // No time period set so create one
1426       int balancerPeriod =
1427         getConfiguration().getInt("hbase.balancer.period", 300000);
1428       balancerCutoffTime = balancerPeriod;
1429       // If nonsense period, set it to balancerPeriod
1430       if (balancerCutoffTime <= 0) balancerCutoffTime = balancerPeriod;
1431     }
1432     return balancerCutoffTime;
1433   }
1434 
1435   public boolean balance() throws IOException {
1436     // if master not initialized, don't run balancer.
1437     if (!this.initialized) {
1438       LOG.debug("Master has not been initialized, don't run balancer.");
1439       return false;
1440     }
1441     // Do this call outside of synchronized block.
1442     int maximumBalanceTime = getBalancerCutoffTime();
1443     boolean balancerRan;
1444     synchronized (this.balancer) {
1445       // If balance not true, don't run balancer.
1446       if (!this.loadBalancerTracker.isBalancerOn()) return false;
1447       // Only allow one balance run at at time.
1448       if (this.assignmentManager.getRegionStates().isRegionsInTransition()) {
1449         Map<String, RegionState> regionsInTransition =
1450           this.assignmentManager.getRegionStates().getRegionsInTransition();
1451         LOG.debug("Not running balancer because " + regionsInTransition.size() +
1452           " region(s) in transition: " + org.apache.commons.lang.StringUtils.
1453             abbreviate(regionsInTransition.toString(), 256));
1454         return false;
1455       }
1456       if (this.serverManager.areDeadServersInProgress()) {
1457         LOG.debug("Not running balancer because processing dead regionserver(s): " +
1458           this.serverManager.getDeadServers());
1459         return false;
1460       }
1461 
1462       if (this.cpHost != null) {
1463         try {
1464           if (this.cpHost.preBalance()) {
1465             LOG.debug("Coprocessor bypassing balancer request");
1466             return false;
1467           }
1468         } catch (IOException ioe) {
1469           LOG.error("Error invoking master coprocessor preBalance()", ioe);
1470           return false;
1471         }
1472       }
1473 
1474       Map<TableName, Map<ServerName, List<HRegionInfo>>> assignmentsByTable =
1475         this.assignmentManager.getRegionStates().getAssignmentsByTable();
1476 
1477       List<RegionPlan> plans = new ArrayList<RegionPlan>();
1478       //Give the balancer the current cluster state.
1479       this.balancer.setClusterStatus(getClusterStatus());
1480       for (Map<ServerName, List<HRegionInfo>> assignments : assignmentsByTable.values()) {
1481         List<RegionPlan> partialPlans = this.balancer.balanceCluster(assignments);
1482         if (partialPlans != null) plans.addAll(partialPlans);
1483       }
1484       long cutoffTime = System.currentTimeMillis() + maximumBalanceTime;
1485       int rpCount = 0;  // number of RegionPlans balanced so far
1486       long totalRegPlanExecTime = 0;
1487       balancerRan = plans != null;
1488       if (plans != null && !plans.isEmpty()) {
1489         for (RegionPlan plan: plans) {
1490           LOG.info("balance " + plan);
1491           long balStartTime = System.currentTimeMillis();
1492           //TODO: bulk assign
1493           this.assignmentManager.balance(plan);
1494           totalRegPlanExecTime += System.currentTimeMillis()-balStartTime;
1495           rpCount++;
1496           if (rpCount < plans.size() &&
1497               // if performing next balance exceeds cutoff time, exit the loop
1498               (System.currentTimeMillis() + (totalRegPlanExecTime / rpCount)) > cutoffTime) {
1499             //TODO: After balance, there should not be a cutoff time (keeping it as a security net for now)
1500             LOG.debug("No more balancing till next balance run; maximumBalanceTime=" +
1501               maximumBalanceTime);
1502             break;
1503           }
1504         }
1505       }
1506       if (this.cpHost != null) {
1507         try {
1508           this.cpHost.postBalance(rpCount < plans.size() ? plans.subList(0, rpCount) : plans);
1509         } catch (IOException ioe) {
1510           // balancing already succeeded so don't change the result
1511           LOG.error("Error invoking master coprocessor postBalance()", ioe);
1512         }
1513       }
1514     }
1515     return balancerRan;
1516   }
1517 
1518   @Override
1519   public BalanceResponse balance(RpcController c, BalanceRequest request) throws ServiceException {
1520     try {
1521       return BalanceResponse.newBuilder().setBalancerRan(balance()).build();
1522     } catch (IOException ex) {
1523       throw new ServiceException(ex);
1524     }
1525   }
1526 
1527   enum BalanceSwitchMode {
1528     SYNC,
1529     ASYNC
1530   }
1531 
1532   /**
1533    * Assigns balancer switch according to BalanceSwitchMode
1534    * @param b new balancer switch
1535    * @param mode BalanceSwitchMode
1536    * @return old balancer switch
1537    */
1538   public boolean switchBalancer(final boolean b, BalanceSwitchMode mode) throws IOException {
1539     boolean oldValue = this.loadBalancerTracker.isBalancerOn();
1540     boolean newValue = b;
1541     try {
1542       if (this.cpHost != null) {
1543         newValue = this.cpHost.preBalanceSwitch(newValue);
1544       }
1545       try {
1546         if (mode == BalanceSwitchMode.SYNC) {
1547           synchronized (this.balancer) {
1548             this.loadBalancerTracker.setBalancerOn(newValue);
1549           }
1550         } else {
1551           this.loadBalancerTracker.setBalancerOn(newValue);
1552         }
1553       } catch (KeeperException ke) {
1554         throw new IOException(ke);
1555       }
1556       LOG.info(getClientIdAuditPrefix() + " set balanceSwitch=" + newValue);
1557       if (this.cpHost != null) {
1558         this.cpHost.postBalanceSwitch(oldValue, newValue);
1559       }
1560     } catch (IOException ioe) {
1561       LOG.warn("Error flipping balance switch", ioe);
1562     }
1563     return oldValue;
1564   }
1565 
1566   /**
1567    * @return Client info for use as prefix on an audit log string; who did an action
1568    */
1569   String getClientIdAuditPrefix() {
1570     return "Client=" + RequestContext.getRequestUserName() + "/" +
1571       RequestContext.get().getRemoteAddress();
1572   }
1573 
1574   public boolean synchronousBalanceSwitch(final boolean b) throws IOException {
1575     return switchBalancer(b, BalanceSwitchMode.SYNC);
1576   }
1577 
1578   public boolean balanceSwitch(final boolean b) throws IOException {
1579     return switchBalancer(b, BalanceSwitchMode.ASYNC);
1580   }
1581 
1582   @Override
1583   public SetBalancerRunningResponse setBalancerRunning(
1584       RpcController controller, SetBalancerRunningRequest req) throws ServiceException {
1585     try {
1586       boolean prevValue = (req.getSynchronous())?
1587         synchronousBalanceSwitch(req.getOn()):balanceSwitch(req.getOn());
1588       return SetBalancerRunningResponse.newBuilder().setPrevBalanceValue(prevValue).build();
1589     } catch (IOException ioe) {
1590       throw new ServiceException(ioe);
1591     }
1592   }
1593 
1594   /**
1595    * Switch for the background CatalogJanitor thread.
1596    * Used for testing.  The thread will continue to run.  It will just be a noop
1597    * if disabled.
1598    * @param b If false, the catalog janitor won't do anything.
1599    */
1600   public void setCatalogJanitorEnabled(final boolean b) {
1601     this.catalogJanitorChore.setEnabled(b);
1602   }
1603 
1604   @Override
1605   public DispatchMergingRegionsResponse dispatchMergingRegions(
1606       RpcController controller, DispatchMergingRegionsRequest request)
1607       throws ServiceException {
1608     final byte[] encodedNameOfRegionA = request.getRegionA().getValue()
1609         .toByteArray();
1610     final byte[] encodedNameOfRegionB = request.getRegionB().getValue()
1611         .toByteArray();
1612     final boolean forcible = request.getForcible();
1613     if (request.getRegionA().getType() != RegionSpecifierType.ENCODED_REGION_NAME
1614         || request.getRegionB().getType() != RegionSpecifierType.ENCODED_REGION_NAME) {
1615       LOG.warn("mergeRegions specifier type: expected: "
1616           + RegionSpecifierType.ENCODED_REGION_NAME + " actual: region_a="
1617           + request.getRegionA().getType() + ", region_b="
1618           + request.getRegionB().getType());
1619     }
1620     RegionState regionStateA = assignmentManager.getRegionStates()
1621         .getRegionState(Bytes.toString(encodedNameOfRegionA));
1622     RegionState regionStateB = assignmentManager.getRegionStates()
1623         .getRegionState(Bytes.toString(encodedNameOfRegionB));
1624     if (regionStateA == null || regionStateB == null) {
1625       throw new ServiceException(new UnknownRegionException(
1626           Bytes.toStringBinary(regionStateA == null ? encodedNameOfRegionA
1627               : encodedNameOfRegionB)));
1628     }
1629 
1630     if (!regionStateA.isOpened() || !regionStateB.isOpened()) {
1631       throw new ServiceException(new MergeRegionException(
1632         "Unable to merge regions not online " + regionStateA + ", " + regionStateB));
1633     }
1634 
1635     HRegionInfo regionInfoA = regionStateA.getRegion();
1636     HRegionInfo regionInfoB = regionStateB.getRegion();
1637     if (regionInfoA.compareTo(regionInfoB) == 0) {
1638       throw new ServiceException(new MergeRegionException(
1639         "Unable to merge a region to itself " + regionInfoA + ", " + regionInfoB));
1640     }
1641 
1642     if (!forcible && !HRegionInfo.areAdjacent(regionInfoA, regionInfoB)) {
1643       throw new ServiceException(new MergeRegionException(
1644         "Unable to merge not adjacent regions "
1645           + regionInfoA.getRegionNameAsString() + ", "
1646           + regionInfoB.getRegionNameAsString()
1647           + " where forcible = " + forcible));
1648     }
1649 
1650     try {
1651       dispatchMergingRegions(regionInfoA, regionInfoB, forcible);
1652     } catch (IOException ioe) {
1653       throw new ServiceException(ioe);
1654     }
1655 
1656     return DispatchMergingRegionsResponse.newBuilder().build();
1657   }
1658 
1659   @Override
1660   public void dispatchMergingRegions(final HRegionInfo region_a,
1661       final HRegionInfo region_b, final boolean forcible) throws IOException {
1662     checkInitialized();
1663     this.executorService.submit(new DispatchMergingRegionHandler(this,
1664         this.catalogJanitorChore, region_a, region_b, forcible));
1665   }
1666 
1667   @Override
1668   public MoveRegionResponse moveRegion(RpcController controller, MoveRegionRequest req)
1669   throws ServiceException {
1670     final byte [] encodedRegionName = req.getRegion().getValue().toByteArray();
1671     RegionSpecifierType type = req.getRegion().getType();
1672     final byte [] destServerName = (req.hasDestServerName())?
1673       Bytes.toBytes(ProtobufUtil.toServerName(req.getDestServerName()).getServerName()):null;
1674     MoveRegionResponse mrr = MoveRegionResponse.newBuilder().build();
1675 
1676     if (type != RegionSpecifierType.ENCODED_REGION_NAME) {
1677       LOG.warn("moveRegion specifier type: expected: " + RegionSpecifierType.ENCODED_REGION_NAME
1678         + " actual: " + type);
1679     }
1680 
1681     try {
1682       move(encodedRegionName, destServerName);
1683     } catch (HBaseIOException ioe) {
1684       throw new ServiceException(ioe);
1685     }
1686     return mrr;
1687   }
1688 
1689   void move(final byte[] encodedRegionName,
1690       final byte[] destServerName) throws HBaseIOException {
1691     RegionState regionState = assignmentManager.getRegionStates().
1692       getRegionState(Bytes.toString(encodedRegionName));
1693     if (regionState == null) {
1694       throw new UnknownRegionException(Bytes.toStringBinary(encodedRegionName));
1695     }
1696 
1697     HRegionInfo hri = regionState.getRegion();
1698     ServerName dest;
1699     if (destServerName == null || destServerName.length == 0) {
1700       LOG.info("Passed destination servername is null/empty so " +
1701         "choosing a server at random");
1702       final List<ServerName> destServers = this.serverManager.createDestinationServersList(
1703         regionState.getServerName());
1704       dest = balancer.randomAssignment(hri, destServers);
1705     } else {
1706       dest = ServerName.valueOf(Bytes.toString(destServerName));
1707       if (dest.equals(regionState.getServerName())) {
1708         LOG.debug("Skipping move of region " + hri.getRegionNameAsString()
1709           + " because region already assigned to the same server " + dest + ".");
1710         return;
1711       }
1712     }
1713 
1714     // Now we can do the move
1715     RegionPlan rp = new RegionPlan(hri, regionState.getServerName(), dest);
1716 
1717     try {
1718       checkInitialized();
1719       if (this.cpHost != null) {
1720         if (this.cpHost.preMove(hri, rp.getSource(), rp.getDestination())) {
1721           return;
1722         }
1723       }
1724       LOG.info(getClientIdAuditPrefix() + " move " + rp + ", running balancer");
1725       this.assignmentManager.balance(rp);
1726       if (this.cpHost != null) {
1727         this.cpHost.postMove(hri, rp.getSource(), rp.getDestination());
1728       }
1729     } catch (IOException ioe) {
1730       if (ioe instanceof HBaseIOException) {
1731         throw (HBaseIOException)ioe;
1732       }
1733       throw new HBaseIOException(ioe);
1734     }
1735   }
1736 
1737   @Override
1738   public void createTable(HTableDescriptor hTableDescriptor,
1739     byte [][] splitKeys)
1740   throws IOException {
1741     if (!isMasterRunning()) {
1742       throw new MasterNotRunningException();
1743     }
1744 
1745     String namespace = hTableDescriptor.getTableName().getNamespaceAsString();
1746     getNamespaceDescriptor(namespace); // ensure namespace exists
1747 
1748     HRegionInfo[] newRegions = getHRegionInfos(hTableDescriptor, splitKeys);
1749     checkInitialized();
1750     sanityCheckTableDescriptor(hTableDescriptor);
1751     if (cpHost != null) {
1752       cpHost.preCreateTable(hTableDescriptor, newRegions);
1753     }
1754     LOG.info(getClientIdAuditPrefix() + " create " + hTableDescriptor);
1755     this.executorService.submit(new CreateTableHandler(this,
1756       this.fileSystemManager, hTableDescriptor, conf,
1757       newRegions, this).prepare());
1758     if (cpHost != null) {
1759       cpHost.postCreateTable(hTableDescriptor, newRegions);
1760     }
1761 
1762   }
1763 
1764   /**
1765    * Checks whether the table conforms to some sane limits, and configured
1766    * values (compression, etc) work. Throws an exception if something is wrong.
1767    * @throws IOException
1768    */
1769   private void sanityCheckTableDescriptor(final HTableDescriptor htd) throws IOException {
1770     final String CONF_KEY = "hbase.table.sanity.checks";
1771     if (!conf.getBoolean(CONF_KEY, true)) {
1772       return;
1773     }
1774     String tableVal = htd.getConfigurationValue(CONF_KEY);
1775     if (tableVal != null && !Boolean.valueOf(tableVal)) {
1776       return;
1777     }
1778 
1779     // check max file size
1780     long maxFileSizeLowerLimit = 2 * 1024 * 1024L; // 2M is the default lower limit
1781     long maxFileSize = htd.getMaxFileSize();
1782     if (maxFileSize < 0) {
1783       maxFileSize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, maxFileSizeLowerLimit);
1784     }
1785     if (maxFileSize < conf.getLong("hbase.hregion.max.filesize.limit", maxFileSizeLowerLimit)) {
1786       throw new DoNotRetryIOException("MAX_FILESIZE for table descriptor or "
1787         + "\"hbase.hregion.max.filesize\" (" + maxFileSize
1788         + ") is too small, which might cause over splitting into unmanageable "
1789         + "number of regions. Set " + CONF_KEY + " to false at conf or table descriptor "
1790           + "if you want to bypass sanity checks");
1791     }
1792 
1793     // check flush size
1794     long flushSizeLowerLimit = 1024 * 1024L; // 1M is the default lower limit
1795     long flushSize = htd.getMemStoreFlushSize();
1796     if (flushSize < 0) {
1797       flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, flushSizeLowerLimit);
1798     }
1799     if (flushSize < conf.getLong("hbase.hregion.memstore.flush.size.limit", flushSizeLowerLimit)) {
1800       throw new DoNotRetryIOException("MEMSTORE_FLUSHSIZE for table descriptor or "
1801           + "\"hbase.hregion.memstore.flush.size\" ("+flushSize+") is too small, which might cause"
1802           + " very frequent flushing. Set " + CONF_KEY + " to false at conf or table descriptor "
1803           + "if you want to bypass sanity checks");
1804     }
1805 
1806     // check split policy class can be loaded
1807     try {
1808       RegionSplitPolicy.getSplitPolicyClass(htd, conf);
1809     } catch (Exception ex) {
1810       throw new DoNotRetryIOException(ex);
1811     }
1812 
1813     // check compression can be loaded
1814     checkCompression(htd);
1815 
1816     // check that we have at least 1 CF
1817     if (htd.getColumnFamilies().length == 0) {
1818       throw new DoNotRetryIOException("Table should have at least one column family "
1819           + "Set "+CONF_KEY+" at conf or table descriptor if you want to bypass sanity checks");
1820     }
1821 
1822     for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1823       if (hcd.getTimeToLive() <= 0) {
1824         throw new DoNotRetryIOException("TTL for column family " + hcd.getNameAsString()
1825           + "  must be positive. Set " + CONF_KEY + " to false at conf or table descriptor "
1826           + "if you want to bypass sanity checks");
1827       }
1828 
1829       // check blockSize
1830       if (hcd.getBlocksize() < 1024 || hcd.getBlocksize() > 16 * 1024 * 1024) {
1831         throw new DoNotRetryIOException("Block size for column family " + hcd.getNameAsString()
1832           + "  must be between 1K and 16MB Set "+CONF_KEY+" to false at conf or table descriptor "
1833           + "if you want to bypass sanity checks");
1834       }
1835 
1836       // check versions
1837       if (hcd.getMinVersions() < 0) {
1838         throw new DoNotRetryIOException("Min versions for column family " + hcd.getNameAsString()
1839           + "  must be positive. Set " + CONF_KEY + " to false at conf or table descriptor "
1840           + "if you want to bypass sanity checks");
1841       }
1842       // max versions already being checked
1843 
1844       // check replication scope
1845       if (hcd.getScope() < 0) {
1846         throw new DoNotRetryIOException("Replication scope for column family "
1847           + hcd.getNameAsString() + "  must be positive. Set " + CONF_KEY + " to false at conf "
1848           + "or table descriptor if you want to bypass sanity checks");
1849       }
1850 
1851       // TODO: should we check coprocessors and encryption ?
1852     }
1853   }
1854 
1855   private void checkCompression(final HTableDescriptor htd)
1856   throws IOException {
1857     if (!this.masterCheckCompression) return;
1858     for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1859       checkCompression(hcd);
1860     }
1861   }
1862 
1863   private void checkCompression(final HColumnDescriptor hcd)
1864   throws IOException {
1865     if (!this.masterCheckCompression) return;
1866     CompressionTest.testCompression(hcd.getCompression());
1867     CompressionTest.testCompression(hcd.getCompactionCompression());
1868   }
1869 
1870   @Override
1871   public CreateTableResponse createTable(RpcController controller, CreateTableRequest req)
1872   throws ServiceException {
1873     HTableDescriptor hTableDescriptor = HTableDescriptor.convert(req.getTableSchema());
1874     byte [][] splitKeys = ProtobufUtil.getSplitKeysArray(req);
1875     try {
1876       createTable(hTableDescriptor,splitKeys);
1877     } catch (IOException ioe) {
1878       throw new ServiceException(ioe);
1879     }
1880     return CreateTableResponse.newBuilder().build();
1881   }
1882 
1883   private HRegionInfo[] getHRegionInfos(HTableDescriptor hTableDescriptor,
1884     byte[][] splitKeys) {
1885     HRegionInfo[] hRegionInfos = null;
1886     if (splitKeys == null || splitKeys.length == 0) {
1887       hRegionInfos = new HRegionInfo[]{
1888           new HRegionInfo(hTableDescriptor.getTableName(), null, null)};
1889     } else {
1890       int numRegions = splitKeys.length + 1;
1891       hRegionInfos = new HRegionInfo[numRegions];
1892       byte[] startKey = null;
1893       byte[] endKey = null;
1894       for (int i = 0; i < numRegions; i++) {
1895         endKey = (i == splitKeys.length) ? null : splitKeys[i];
1896         hRegionInfos[i] =
1897             new HRegionInfo(hTableDescriptor.getTableName(), startKey, endKey);
1898         startKey = endKey;
1899       }
1900     }
1901     return hRegionInfos;
1902   }
1903 
1904   private static boolean isCatalogTable(final TableName tableName) {
1905     return tableName.equals(TableName.META_TABLE_NAME);
1906   }
1907 
1908   @Override
1909   public void deleteTable(final TableName tableName) throws IOException {
1910     checkInitialized();
1911     if (cpHost != null) {
1912       cpHost.preDeleteTable(tableName);
1913     }
1914     LOG.info(getClientIdAuditPrefix() + " delete " + tableName);
1915     this.executorService.submit(new DeleteTableHandler(tableName, this, this).prepare());
1916     if (cpHost != null) {
1917       cpHost.postDeleteTable(tableName);
1918     }
1919   }
1920 
1921   @Override
1922   public DeleteTableResponse deleteTable(RpcController controller, DeleteTableRequest request)
1923   throws ServiceException {
1924     try {
1925       deleteTable(ProtobufUtil.toTableName(request.getTableName()));
1926     } catch (IOException ioe) {
1927       throw new ServiceException(ioe);
1928     }
1929     return DeleteTableResponse.newBuilder().build();
1930   }
1931 
1932   /**
1933    * Get the number of regions of the table that have been updated by the alter.
1934    *
1935    * @return Pair indicating the number of regions updated Pair.getFirst is the
1936    *         regions that are yet to be updated Pair.getSecond is the total number
1937    *         of regions of the table
1938    * @throws IOException
1939    */
1940   @Override
1941   public GetSchemaAlterStatusResponse getSchemaAlterStatus(
1942       RpcController controller, GetSchemaAlterStatusRequest req) throws ServiceException {
1943     // TODO: currently, we query using the table name on the client side. this
1944     // may overlap with other table operations or the table operation may
1945     // have completed before querying this API. We need to refactor to a
1946     // transaction system in the future to avoid these ambiguities.
1947     TableName tableName = ProtobufUtil.toTableName(req.getTableName());
1948 
1949     try {
1950       Pair<Integer,Integer> pair = this.assignmentManager.getReopenStatus(tableName);
1951       GetSchemaAlterStatusResponse.Builder ret = GetSchemaAlterStatusResponse.newBuilder();
1952       ret.setYetToUpdateRegions(pair.getFirst());
1953       ret.setTotalRegions(pair.getSecond());
1954       return ret.build();
1955     } catch (IOException ioe) {
1956       throw new ServiceException(ioe);
1957     }
1958   }
1959 
1960   @Override
1961   public void addColumn(final TableName tableName, final HColumnDescriptor column)
1962       throws IOException {
1963     checkInitialized();
1964     if (cpHost != null) {
1965       if (cpHost.preAddColumn(tableName, column)) {
1966         return;
1967       }
1968     }
1969     //TODO: we should process this (and some others) in an executor
1970     new TableAddFamilyHandler(tableName, column, this, this).prepare().process();
1971     if (cpHost != null) {
1972       cpHost.postAddColumn(tableName, column);
1973     }
1974   }
1975 
1976   @Override
1977   public AddColumnResponse addColumn(RpcController controller, AddColumnRequest req)
1978   throws ServiceException {
1979     try {
1980       addColumn(ProtobufUtil.toTableName(req.getTableName()),
1981         HColumnDescriptor.convert(req.getColumnFamilies()));
1982     } catch (IOException ioe) {
1983       throw new ServiceException(ioe);
1984     }
1985     return AddColumnResponse.newBuilder().build();
1986   }
1987 
1988   @Override
1989   public void modifyColumn(TableName tableName, HColumnDescriptor descriptor)
1990       throws IOException {
1991     checkInitialized();
1992     checkCompression(descriptor);
1993     if (cpHost != null) {
1994       if (cpHost.preModifyColumn(tableName, descriptor)) {
1995         return;
1996       }
1997     }
1998     LOG.info(getClientIdAuditPrefix() + " modify " + descriptor);
1999     new TableModifyFamilyHandler(tableName, descriptor, this, this)
2000       .prepare().process();
2001     if (cpHost != null) {
2002       cpHost.postModifyColumn(tableName, descriptor);
2003     }
2004   }
2005 
2006   @Override
2007   public ModifyColumnResponse modifyColumn(RpcController controller, ModifyColumnRequest req)
2008   throws ServiceException {
2009     try {
2010       modifyColumn(ProtobufUtil.toTableName(req.getTableName()),
2011         HColumnDescriptor.convert(req.getColumnFamilies()));
2012     } catch (IOException ioe) {
2013       throw new ServiceException(ioe);
2014     }
2015     return ModifyColumnResponse.newBuilder().build();
2016   }
2017 
2018   @Override
2019   public void deleteColumn(final TableName tableName, final byte[] columnName)
2020       throws IOException {
2021     checkInitialized();
2022     if (cpHost != null) {
2023       if (cpHost.preDeleteColumn(tableName, columnName)) {
2024         return;
2025       }
2026     }
2027     LOG.info(getClientIdAuditPrefix() + " delete " + Bytes.toString(columnName));
2028     new TableDeleteFamilyHandler(tableName, columnName, this, this).prepare().process();
2029     if (cpHost != null) {
2030       cpHost.postDeleteColumn(tableName, columnName);
2031     }
2032   }
2033 
2034   @Override
2035   public DeleteColumnResponse deleteColumn(RpcController controller, DeleteColumnRequest req)
2036   throws ServiceException {
2037     try {
2038       deleteColumn(ProtobufUtil.toTableName(req.getTableName()),
2039           req.getColumnName().toByteArray());
2040     } catch (IOException ioe) {
2041       throw new ServiceException(ioe);
2042     }
2043     return DeleteColumnResponse.newBuilder().build();
2044   }
2045 
2046   @Override
2047   public void enableTable(final TableName tableName) throws IOException {
2048     checkInitialized();
2049     if (cpHost != null) {
2050       cpHost.preEnableTable(tableName);
2051     }
2052     LOG.info(getClientIdAuditPrefix() + " enable " + tableName);
2053     this.executorService.submit(new EnableTableHandler(this, tableName,
2054       catalogTracker, assignmentManager, tableLockManager, false).prepare());
2055     if (cpHost != null) {
2056       cpHost.postEnableTable(tableName);
2057    }
2058   }
2059 
2060   @Override
2061   public EnableTableResponse enableTable(RpcController controller, EnableTableRequest request)
2062   throws ServiceException {
2063     try {
2064       enableTable(ProtobufUtil.toTableName(request.getTableName()));
2065     } catch (IOException ioe) {
2066       throw new ServiceException(ioe);
2067     }
2068     return EnableTableResponse.newBuilder().build();
2069   }
2070 
2071   @Override
2072   public void disableTable(final TableName tableName) throws IOException {
2073     checkInitialized();
2074     if (cpHost != null) {
2075       cpHost.preDisableTable(tableName);
2076     }
2077     LOG.info(getClientIdAuditPrefix() + " disable " + tableName);
2078     this.executorService.submit(new DisableTableHandler(this, tableName,
2079       catalogTracker, assignmentManager, tableLockManager, false).prepare());
2080     if (cpHost != null) {
2081       cpHost.postDisableTable(tableName);
2082     }
2083   }
2084 
2085   @Override
2086   public DisableTableResponse disableTable(RpcController controller, DisableTableRequest request)
2087   throws ServiceException {
2088     try {
2089       disableTable(ProtobufUtil.toTableName(request.getTableName()));
2090     } catch (IOException ioe) {
2091       throw new ServiceException(ioe);
2092     }
2093     return DisableTableResponse.newBuilder().build();
2094   }
2095 
2096   /**
2097    * Return the region and current deployment for the region containing
2098    * the given row. If the region cannot be found, returns null. If it
2099    * is found, but not currently deployed, the second element of the pair
2100    * may be null.
2101    */
2102   Pair<HRegionInfo, ServerName> getTableRegionForRow(
2103       final TableName tableName, final byte [] rowKey)
2104   throws IOException {
2105     final AtomicReference<Pair<HRegionInfo, ServerName>> result =
2106       new AtomicReference<Pair<HRegionInfo, ServerName>>(null);
2107 
2108     MetaScannerVisitor visitor =
2109       new MetaScannerVisitorBase() {
2110         @Override
2111         public boolean processRow(Result data) throws IOException {
2112           if (data == null || data.size() <= 0) {
2113             return true;
2114           }
2115           Pair<HRegionInfo, ServerName> pair = HRegionInfo.getHRegionInfoAndServerName(data);
2116           if (pair == null) {
2117             return false;
2118           }
2119           if (!pair.getFirst().getTable().equals(tableName)) {
2120             return false;
2121           }
2122           result.set(pair);
2123           return true;
2124         }
2125     };
2126 
2127     MetaScanner.metaScan(conf, visitor, tableName, rowKey, 1);
2128     return result.get();
2129   }
2130 
2131   @Override
2132   public void modifyTable(final TableName tableName, final HTableDescriptor descriptor)
2133       throws IOException {
2134     checkInitialized();
2135     sanityCheckTableDescriptor(descriptor);
2136     if (cpHost != null) {
2137       cpHost.preModifyTable(tableName, descriptor);
2138     }
2139     LOG.info(getClientIdAuditPrefix() + " modify " + tableName);
2140     new ModifyTableHandler(tableName, descriptor, this, this).prepare().process();
2141     if (cpHost != null) {
2142       cpHost.postModifyTable(tableName, descriptor);
2143     }
2144   }
2145 
2146   @Override
2147   public ModifyTableResponse modifyTable(RpcController controller, ModifyTableRequest req)
2148   throws ServiceException {
2149     try {
2150       modifyTable(ProtobufUtil.toTableName(req.getTableName()),
2151         HTableDescriptor.convert(req.getTableSchema()));
2152     } catch (IOException ioe) {
2153       throw new ServiceException(ioe);
2154     }
2155     return ModifyTableResponse.newBuilder().build();
2156   }
2157 
2158   @Override
2159   public void checkTableModifiable(final TableName tableName)
2160       throws IOException, TableNotFoundException, TableNotDisabledException {
2161     if (isCatalogTable(tableName)) {
2162       throw new IOException("Can't modify catalog tables");
2163     }
2164     if (!MetaReader.tableExists(getCatalogTracker(), tableName)) {
2165       throw new TableNotFoundException(tableName);
2166     }
2167     if (!getAssignmentManager().getZKTable().
2168         isDisabledTable(tableName)) {
2169       throw new TableNotDisabledException(tableName);
2170     }
2171   }
2172 
2173   @Override
2174   public GetClusterStatusResponse getClusterStatus(RpcController controller,
2175       GetClusterStatusRequest req)
2176   throws ServiceException {
2177     GetClusterStatusResponse.Builder response = GetClusterStatusResponse.newBuilder();
2178     try {
2179       response.setClusterStatus(getClusterStatus().convert());
2180     } catch (InterruptedIOException e) {
2181       throw new ServiceException(e);
2182     }
2183     return response.build();
2184   }
2185 
2186   /**
2187    * @return cluster status
2188    */
2189   public ClusterStatus getClusterStatus() throws InterruptedIOException {
2190     // Build Set of backup masters from ZK nodes
2191     List<String> backupMasterStrings;
2192     try {
2193       backupMasterStrings = ZKUtil.listChildrenNoWatch(this.zooKeeper,
2194         this.zooKeeper.backupMasterAddressesZNode);
2195     } catch (KeeperException e) {
2196       LOG.warn(this.zooKeeper.prefix("Unable to list backup servers"), e);
2197       backupMasterStrings = new ArrayList<String>(0);
2198     }
2199     List<ServerName> backupMasters = new ArrayList<ServerName>(
2200                                           backupMasterStrings.size());
2201     for (String s: backupMasterStrings) {
2202       try {
2203         byte [] bytes;
2204         try {
2205           bytes = ZKUtil.getData(this.zooKeeper, ZKUtil.joinZNode(
2206               this.zooKeeper.backupMasterAddressesZNode, s));
2207         } catch (InterruptedException e) {
2208           throw new InterruptedIOException();
2209         }
2210         if (bytes != null) {
2211           ServerName sn;
2212           try {
2213             sn = ServerName.parseFrom(bytes);
2214           } catch (DeserializationException e) {
2215             LOG.warn("Failed parse, skipping registering backup server", e);
2216             continue;
2217           }
2218           backupMasters.add(sn);
2219         }
2220       } catch (KeeperException e) {
2221         LOG.warn(this.zooKeeper.prefix("Unable to get information about " +
2222                  "backup servers"), e);
2223       }
2224     }
2225     Collections.sort(backupMasters, new Comparator<ServerName>() {
2226       @Override
2227       public int compare(ServerName s1, ServerName s2) {
2228         return s1.getServerName().compareTo(s2.getServerName());
2229       }});
2230 
2231     return new ClusterStatus(VersionInfo.getVersion(),
2232       this.fileSystemManager.getClusterId().toString(),
2233       this.serverManager.getOnlineServers(),
2234       this.serverManager.getDeadServers().copyServerNames(),
2235       this.serverName,
2236       backupMasters,
2237       this.assignmentManager.getRegionStates().getRegionsInTransition(),
2238       this.getCoprocessors(), this.loadBalancerTracker.isBalancerOn());
2239   }
2240 
2241   public String getClusterId() {
2242     if (fileSystemManager == null) {
2243       return "";
2244     }
2245     ClusterId id = fileSystemManager.getClusterId();
2246     if (id == null) {
2247       return "";
2248     }
2249     return id.toString();
2250   }
2251 
2252   /**
2253    * The set of loaded coprocessors is stored in a static set. Since it's
2254    * statically allocated, it does not require that HMaster's cpHost be
2255    * initialized prior to accessing it.
2256    * @return a String representation of the set of names of the loaded
2257    * coprocessors.
2258    */
2259   public static String getLoadedCoprocessors() {
2260     return CoprocessorHost.getLoadedCoprocessors().toString();
2261   }
2262 
2263   /**
2264    * @return timestamp in millis when HMaster was started.
2265    */
2266   public long getMasterStartTime() {
2267     return masterStartTime;
2268   }
2269 
2270   /**
2271    * @return timestamp in millis when HMaster became the active master.
2272    */
2273   public long getMasterActiveTime() {
2274     return masterActiveTime;
2275   }
2276 
2277   public int getRegionServerInfoPort(final ServerName sn) {
2278     RegionServerInfo info = this.regionServerTracker.getRegionServerInfo(sn);
2279     if (info == null || info.getInfoPort() == 0) {
2280       return conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
2281         HConstants.DEFAULT_REGIONSERVER_INFOPORT);
2282     }
2283     return info.getInfoPort();
2284   }
2285 
2286   /**
2287    * @return array of coprocessor SimpleNames.
2288    */
2289   public String[] getCoprocessors() {
2290     Set<String> masterCoprocessors =
2291         getCoprocessorHost().getCoprocessors();
2292     return masterCoprocessors.toArray(new String[masterCoprocessors.size()]);
2293   }
2294 
2295   @Override
2296   public void abort(final String msg, final Throwable t) {
2297     if (cpHost != null) {
2298       // HBASE-4014: dump a list of loaded coprocessors.
2299       LOG.fatal("Master server abort: loaded coprocessors are: " +
2300           getLoadedCoprocessors());
2301     }
2302 
2303     if (abortNow(msg, t)) {
2304       if (t != null) LOG.fatal(msg, t);
2305       else LOG.fatal(msg);
2306       this.abort = true;
2307       stop("Aborting");
2308     }
2309   }
2310 
2311   /**
2312    * We do the following in a different thread.  If it is not completed
2313    * in time, we will time it out and assume it is not easy to recover.
2314    *
2315    * 1. Create a new ZK session. (since our current one is expired)
2316    * 2. Try to become a primary master again
2317    * 3. Initialize all ZK based system trackers.
2318    * 4. Assign meta. (they are already assigned, but we need to update our
2319    * internal memory state to reflect it)
2320    * 5. Process any RIT if any during the process of our recovery.
2321    *
2322    * @return True if we could successfully recover from ZK session expiry.
2323    * @throws InterruptedException
2324    * @throws IOException
2325    * @throws KeeperException
2326    * @throws ExecutionException
2327    */
2328   private boolean tryRecoveringExpiredZKSession() throws InterruptedException,
2329       IOException, KeeperException, ExecutionException {
2330 
2331     this.zooKeeper.unregisterAllListeners();
2332     // add back listeners which were registered before master initialization
2333     // because they won't be added back in below Master re-initialization code
2334     if (this.registeredZKListenersBeforeRecovery != null) {
2335       for (ZooKeeperListener curListener : this.registeredZKListenersBeforeRecovery) {
2336         this.zooKeeper.registerListener(curListener);
2337       }
2338     }
2339 
2340     this.zooKeeper.reconnectAfterExpiration();
2341 
2342     Callable<Boolean> callable = new Callable<Boolean> () {
2343       @Override
2344       public Boolean call() throws InterruptedException,
2345           IOException, KeeperException {
2346         MonitoredTask status =
2347           TaskMonitor.get().createStatus("Recovering expired ZK session");
2348         try {
2349           if (!becomeActiveMaster(status)) {
2350             return Boolean.FALSE;
2351           }
2352           serverShutdownHandlerEnabled = false;
2353           initialized = false;
2354           finishInitialization(status, true);
2355           return !stopped;
2356         } finally {
2357           status.cleanup();
2358         }
2359       }
2360     };
2361 
2362     long timeout =
2363       conf.getLong("hbase.master.zksession.recover.timeout", 300000);
2364     java.util.concurrent.ExecutorService executor =
2365       Executors.newSingleThreadExecutor();
2366     Future<Boolean> result = executor.submit(callable);
2367     executor.shutdown();
2368     if (executor.awaitTermination(timeout, TimeUnit.MILLISECONDS)
2369         && result.isDone()) {
2370       Boolean recovered = result.get();
2371       if (recovered != null) {
2372         return recovered.booleanValue();
2373       }
2374     }
2375     executor.shutdownNow();
2376     return false;
2377   }
2378 
2379   /**
2380    * Check to see if the current trigger for abort is due to ZooKeeper session
2381    * expiry, and If yes, whether we can recover from ZK session expiry.
2382    *
2383    * @param msg Original abort message
2384    * @param t   The cause for current abort request
2385    * @return true if we should proceed with abort operation, false other wise.
2386    */
2387   private boolean abortNow(final String msg, final Throwable t) {
2388     if (!this.isActiveMaster || this.stopped) {
2389       return true;
2390     }
2391 
2392     boolean failFast = conf.getBoolean("fail.fast.expired.active.master", false);
2393     if (t != null && t instanceof KeeperException.SessionExpiredException
2394         && !failFast) {
2395       try {
2396         LOG.info("Primary Master trying to recover from ZooKeeper session " +
2397             "expiry.");
2398         return !tryRecoveringExpiredZKSession();
2399       } catch (Throwable newT) {
2400         LOG.error("Primary master encountered unexpected exception while " +
2401             "trying to recover from ZooKeeper session" +
2402             " expiry. Proceeding with server abort.", newT);
2403       }
2404     }
2405     return true;
2406   }
2407 
2408   @Override
2409   public ZooKeeperWatcher getZooKeeper() {
2410     return zooKeeper;
2411   }
2412 
2413   @Override
2414   public MasterCoprocessorHost getCoprocessorHost() {
2415     return cpHost;
2416   }
2417 
2418   @Override
2419   public ServerName getServerName() {
2420     return this.serverName;
2421   }
2422 
2423   @Override
2424   public CatalogTracker getCatalogTracker() {
2425     return catalogTracker;
2426   }
2427 
2428   @Override
2429   public AssignmentManager getAssignmentManager() {
2430     return this.assignmentManager;
2431   }
2432 
2433   @Override
2434   public TableLockManager getTableLockManager() {
2435     return this.tableLockManager;
2436   }
2437 
2438   public MemoryBoundedLogMessageBuffer getRegionServerFatalLogBuffer() {
2439     return rsFatals;
2440   }
2441 
2442   public void shutdown() {
2443     if (spanReceiverHost != null) {
2444       spanReceiverHost.closeReceivers();
2445     }
2446     if (cpHost != null) {
2447       try {
2448         cpHost.preShutdown();
2449       } catch (IOException ioe) {
2450         LOG.error("Error call master coprocessor preShutdown()", ioe);
2451       }
2452     }
2453     if (mxBean != null) {
2454       MBeanUtil.unregisterMBean(mxBean);
2455       mxBean = null;
2456     }
2457     if (this.assignmentManager != null) this.assignmentManager.shutdown();
2458     if (this.serverManager != null) this.serverManager.shutdownCluster();
2459     try {
2460       if (this.clusterStatusTracker != null){
2461         this.clusterStatusTracker.setClusterDown();
2462       }
2463     } catch (KeeperException e) {
2464       LOG.error("ZooKeeper exception trying to set cluster as down in ZK", e);
2465     }
2466   }
2467 
2468   @Override
2469   public ShutdownResponse shutdown(RpcController controller, ShutdownRequest request)
2470   throws ServiceException {
2471     LOG.info(getClientIdAuditPrefix() + " shutdown");
2472     shutdown();
2473     return ShutdownResponse.newBuilder().build();
2474   }
2475 
2476   public void stopMaster() {
2477     if (cpHost != null) {
2478       try {
2479         cpHost.preStopMaster();
2480       } catch (IOException ioe) {
2481         LOG.error("Error call master coprocessor preStopMaster()", ioe);
2482       }
2483     }
2484     stop("Stopped by " + Thread.currentThread().getName());
2485   }
2486 
2487   @Override
2488   public StopMasterResponse stopMaster(RpcController controller, StopMasterRequest request)
2489   throws ServiceException {
2490     LOG.info(getClientIdAuditPrefix() + " stop");
2491     stopMaster();
2492     return StopMasterResponse.newBuilder().build();
2493   }
2494 
2495   @Override
2496   public void stop(final String why) {
2497     LOG.info(why);
2498     this.stopped = true;
2499     // We wake up the stopSleeper to stop immediately
2500     stopSleeper.skipSleepCycle();
2501     // If we are a backup master, we need to interrupt wait
2502     if (this.activeMasterManager != null) {
2503       synchronized (this.activeMasterManager.clusterHasActiveMaster) {
2504         this.activeMasterManager.clusterHasActiveMaster.notifyAll();
2505       }
2506     }
2507     // If no region server is online then master may stuck waiting on hbase:meta to come on line.
2508     // See HBASE-8422.
2509     if (this.catalogTracker != null && this.serverManager.getOnlineServers().isEmpty()) {
2510       this.catalogTracker.stop();
2511     }
2512   }
2513 
2514   @Override
2515   public boolean isStopped() {
2516     return this.stopped;
2517   }
2518 
2519   @Override
2520   public boolean isAborted() {
2521     return this.abort;
2522   }
2523 
2524   void checkInitialized() throws PleaseHoldException {
2525     if (!this.initialized) {
2526       throw new PleaseHoldException("Master is initializing");
2527     }
2528   }
2529 
2530   /**
2531    * Report whether this master is currently the active master or not.
2532    * If not active master, we are parked on ZK waiting to become active.
2533    *
2534    * This method is used for testing.
2535    *
2536    * @return true if active master, false if not.
2537    */
2538   public boolean isActiveMaster() {
2539     return isActiveMaster;
2540   }
2541 
2542   /**
2543    * Report whether this master has completed with its initialization and is
2544    * ready.  If ready, the master is also the active master.  A standby master
2545    * is never ready.
2546    *
2547    * This method is used for testing.
2548    *
2549    * @return true if master is ready to go, false if not.
2550    */
2551   @Override
2552   public boolean isInitialized() {
2553     return initialized;
2554   }
2555 
2556   /**
2557    * ServerShutdownHandlerEnabled is set false before completing
2558    * assignMeta to prevent processing of ServerShutdownHandler.
2559    * @return true if assignMeta has completed;
2560    */
2561   @Override
2562   public boolean isServerShutdownHandlerEnabled() {
2563     return this.serverShutdownHandlerEnabled;
2564   }
2565 
2566   /**
2567    * Report whether this master has started initialization and is about to do meta region assignment
2568    * @return true if master is in initialization & about to assign hbase:meta regions
2569    */
2570   public boolean isInitializationStartsMetaRegionAssignment() {
2571     return this.initializationBeforeMetaAssignment;
2572   }
2573 
2574   @Override
2575   public AssignRegionResponse assignRegion(RpcController controller, AssignRegionRequest req)
2576   throws ServiceException {
2577     try {
2578       final byte [] regionName = req.getRegion().getValue().toByteArray();
2579       RegionSpecifierType type = req.getRegion().getType();
2580       AssignRegionResponse arr = AssignRegionResponse.newBuilder().build();
2581 
2582       checkInitialized();
2583       if (type != RegionSpecifierType.REGION_NAME) {
2584         LOG.warn("assignRegion specifier type: expected: " + RegionSpecifierType.REGION_NAME
2585           + " actual: " + type);
2586       }
2587       HRegionInfo regionInfo = assignmentManager.getRegionStates().getRegionInfo(regionName);
2588       if (regionInfo == null) throw new UnknownRegionException(Bytes.toString(regionName));
2589       if (cpHost != null) {
2590         if (cpHost.preAssign(regionInfo)) {
2591           return arr;
2592         }
2593       }
2594       LOG.info(getClientIdAuditPrefix() + " assign " + regionInfo.getRegionNameAsString());
2595       assignmentManager.assign(regionInfo, true, true);
2596       if (cpHost != null) {
2597         cpHost.postAssign(regionInfo);
2598       }
2599 
2600       return arr;
2601     } catch (IOException ioe) {
2602       throw new ServiceException(ioe);
2603     }
2604   }
2605 
2606   public void assignRegion(HRegionInfo hri) {
2607     assignmentManager.assign(hri, true);
2608   }
2609 
2610   @Override
2611   public UnassignRegionResponse unassignRegion(RpcController controller, UnassignRegionRequest req)
2612   throws ServiceException {
2613     try {
2614       final byte [] regionName = req.getRegion().getValue().toByteArray();
2615       RegionSpecifierType type = req.getRegion().getType();
2616       final boolean force = req.getForce();
2617       UnassignRegionResponse urr = UnassignRegionResponse.newBuilder().build();
2618 
2619       checkInitialized();
2620       if (type != RegionSpecifierType.REGION_NAME) {
2621         LOG.warn("unassignRegion specifier type: expected: " + RegionSpecifierType.REGION_NAME
2622           + " actual: " + type);
2623       }
2624       Pair<HRegionInfo, ServerName> pair =
2625         MetaReader.getRegion(this.catalogTracker, regionName);
2626       if (pair == null) throw new UnknownRegionException(Bytes.toString(regionName));
2627       HRegionInfo hri = pair.getFirst();
2628       if (cpHost != null) {
2629         if (cpHost.preUnassign(hri, force)) {
2630           return urr;
2631         }
2632       }
2633       LOG.debug(getClientIdAuditPrefix() + " unassign " + hri.getRegionNameAsString()
2634           + " in current location if it is online and reassign.force=" + force);
2635       this.assignmentManager.unassign(hri, force);
2636       if (this.assignmentManager.getRegionStates().isRegionOffline(hri)) {
2637         LOG.debug("Region " + hri.getRegionNameAsString()
2638             + " is not online on any region server, reassigning it.");
2639         assignRegion(hri);
2640       }
2641       if (cpHost != null) {
2642         cpHost.postUnassign(hri, force);
2643       }
2644 
2645       return urr;
2646     } catch (IOException ioe) {
2647       throw new ServiceException(ioe);
2648     }
2649   }
2650 
2651   /**
2652    * Get list of TableDescriptors for requested tables.
2653    * @param controller Unused (set to null).
2654    * @param req GetTableDescriptorsRequest that contains:
2655    * - tableNames: requested tables, or if empty, all are requested
2656    * @return GetTableDescriptorsResponse
2657    * @throws ServiceException
2658    */
2659   @Override
2660   public GetTableDescriptorsResponse getTableDescriptors(
2661 	      RpcController controller, GetTableDescriptorsRequest req) throws ServiceException {
2662     List<HTableDescriptor> descriptors = new ArrayList<HTableDescriptor>();
2663     List<TableName> tableNameList = new ArrayList<TableName>();
2664     for(HBaseProtos.TableName tableNamePB: req.getTableNamesList()) {
2665       tableNameList.add(ProtobufUtil.toTableName(tableNamePB));
2666     }
2667     boolean bypass = false;
2668     if (this.cpHost != null) {
2669       try {
2670         bypass = this.cpHost.preGetTableDescriptors(tableNameList, descriptors);
2671       } catch (IOException ioe) {
2672         throw new ServiceException(ioe);
2673       }
2674     }
2675 
2676     if (!bypass) {
2677       if (req.getTableNamesCount() == 0) {
2678         // request for all TableDescriptors
2679         Map<String, HTableDescriptor> descriptorMap = null;
2680         try {
2681           descriptorMap = this.tableDescriptors.getAll();
2682         } catch (IOException e) {
2683           LOG.warn("Failed getting all descriptors", e);
2684         }
2685         if (descriptorMap != null) {
2686           for(HTableDescriptor desc: descriptorMap.values()) {
2687             if(!desc.getTableName().isSystemTable()) {
2688               descriptors.add(desc);
2689             }
2690           }
2691         }
2692       } else {
2693         for (TableName s: tableNameList) {
2694           try {
2695             HTableDescriptor desc = this.tableDescriptors.get(s);
2696             if (desc != null) {
2697               descriptors.add(desc);
2698             }
2699           } catch (IOException e) {
2700             LOG.warn("Failed getting descriptor for " + s, e);
2701           }
2702         }
2703       }
2704 
2705       if (this.cpHost != null) {
2706         try {
2707           this.cpHost.postGetTableDescriptors(descriptors);
2708         } catch (IOException ioe) {
2709           throw new ServiceException(ioe);
2710         }
2711       }
2712     }
2713 
2714     GetTableDescriptorsResponse.Builder builder = GetTableDescriptorsResponse.newBuilder();
2715     for (HTableDescriptor htd: descriptors) {
2716       builder.addTableSchema(htd.convert());
2717     }
2718     return builder.build();
2719   }
2720 
2721   /**
2722    * Get list of userspace table names
2723    * @param controller Unused (set to null).
2724    * @param req GetTableNamesRequest
2725    * @return GetTableNamesResponse
2726    * @throws ServiceException
2727    */
2728   @Override
2729   public GetTableNamesResponse getTableNames(
2730         RpcController controller, GetTableNamesRequest req) throws ServiceException {
2731     try {
2732       Collection<HTableDescriptor> descriptors = this.tableDescriptors.getAll().values();
2733       GetTableNamesResponse.Builder builder = GetTableNamesResponse.newBuilder();
2734       for (HTableDescriptor descriptor: descriptors) {
2735         if (descriptor.getTableName().isSystemTable()) {
2736           continue;
2737         }
2738         builder.addTableNames(ProtobufUtil.toProtoTableName(descriptor.getTableName()));
2739       }
2740       return builder.build();
2741     } catch (IOException e) {
2742       throw new ServiceException(e);
2743     }
2744   }
2745 
2746   /**
2747    * Compute the average load across all region servers.
2748    * Currently, this uses a very naive computation - just uses the number of
2749    * regions being served, ignoring stats about number of requests.
2750    * @return the average load
2751    */
2752   public double getAverageLoad() {
2753     if (this.assignmentManager == null) {
2754       return 0;
2755     }
2756 
2757     RegionStates regionStates = this.assignmentManager.getRegionStates();
2758     if (regionStates == null) {
2759       return 0;
2760     }
2761     return regionStates.getAverageLoad();
2762   }
2763 
2764   /**
2765    * Offline specified region from master's in-memory state. It will not attempt to
2766    * reassign the region as in unassign.
2767    *
2768    * This is a special method that should be used by experts or hbck.
2769    *
2770    */
2771   @Override
2772   public OfflineRegionResponse offlineRegion(RpcController controller, OfflineRegionRequest request)
2773   throws ServiceException {
2774     final byte [] regionName = request.getRegion().getValue().toByteArray();
2775     RegionSpecifierType type = request.getRegion().getType();
2776     if (type != RegionSpecifierType.REGION_NAME) {
2777       LOG.warn("moveRegion specifier type: expected: " + RegionSpecifierType.REGION_NAME
2778         + " actual: " + type);
2779     }
2780 
2781     try {
2782       Pair<HRegionInfo, ServerName> pair =
2783         MetaReader.getRegion(this.catalogTracker, regionName);
2784       if (pair == null) throw new UnknownRegionException(Bytes.toStringBinary(regionName));
2785       HRegionInfo hri = pair.getFirst();
2786       if (cpHost != null) {
2787         cpHost.preRegionOffline(hri);
2788       }
2789       LOG.info(getClientIdAuditPrefix() + " offline " + hri.getRegionNameAsString());
2790       this.assignmentManager.regionOffline(hri);
2791       if (cpHost != null) {
2792         cpHost.postRegionOffline(hri);
2793       }
2794     } catch (IOException ioe) {
2795       throw new ServiceException(ioe);
2796     }
2797     return OfflineRegionResponse.newBuilder().build();
2798   }
2799 
2800   @Override
2801   public boolean registerService(Service instance) {
2802     /*
2803      * No stacking of instances is allowed for a single service name
2804      */
2805     Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
2806     if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
2807       LOG.error("Coprocessor service "+serviceDesc.getFullName()+
2808           " already registered, rejecting request from "+instance
2809       );
2810       return false;
2811     }
2812 
2813     coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
2814     if (LOG.isDebugEnabled()) {
2815       LOG.debug("Registered master coprocessor service: service="+serviceDesc.getFullName());
2816     }
2817     return true;
2818   }
2819 
2820   @Override
2821   public ClientProtos.CoprocessorServiceResponse execMasterService(final RpcController controller,
2822       final ClientProtos.CoprocessorServiceRequest request) throws ServiceException {
2823     try {
2824       ServerRpcController execController = new ServerRpcController();
2825 
2826       ClientProtos.CoprocessorServiceCall call = request.getCall();
2827       String serviceName = call.getServiceName();
2828       String methodName = call.getMethodName();
2829       if (!coprocessorServiceHandlers.containsKey(serviceName)) {
2830         throw new UnknownProtocolException(null,
2831             "No registered master coprocessor service found for name "+serviceName);
2832       }
2833 
2834       Service service = coprocessorServiceHandlers.get(serviceName);
2835       Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();
2836       Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
2837       if (methodDesc == null) {
2838         throw new UnknownProtocolException(service.getClass(),
2839             "Unknown method "+methodName+" called on master service "+serviceName);
2840       }
2841 
2842       //invoke the method
2843       Message execRequest = service.getRequestPrototype(methodDesc).newBuilderForType()
2844           .mergeFrom(call.getRequest()).build();
2845       final Message.Builder responseBuilder =
2846           service.getResponsePrototype(methodDesc).newBuilderForType();
2847       service.callMethod(methodDesc, execController, execRequest, new RpcCallback<Message>() {
2848         @Override
2849         public void run(Message message) {
2850           if (message != null) {
2851             responseBuilder.mergeFrom(message);
2852           }
2853         }
2854       });
2855       Message execResult = responseBuilder.build();
2856 
2857       if (execController.getFailedOn() != null) {
2858         throw execController.getFailedOn();
2859       }
2860       ClientProtos.CoprocessorServiceResponse.Builder builder =
2861           ClientProtos.CoprocessorServiceResponse.newBuilder();
2862       builder.setRegion(RequestConverter.buildRegionSpecifier(
2863           RegionSpecifierType.REGION_NAME, HConstants.EMPTY_BYTE_ARRAY));
2864       builder.setValue(
2865           builder.getValueBuilder().setName(execResult.getClass().getName())
2866               .setValue(execResult.toByteString()));
2867       return builder.build();
2868     } catch (IOException ie) {
2869       throw new ServiceException(ie);
2870     }
2871   }
2872 
2873   /**
2874    * Utility for constructing an instance of the passed HMaster class.
2875    * @param masterClass
2876    * @param conf
2877    * @return HMaster instance.
2878    */
2879   public static HMaster constructMaster(Class<? extends HMaster> masterClass,
2880       final Configuration conf)  {
2881     try {
2882       Constructor<? extends HMaster> c =
2883         masterClass.getConstructor(Configuration.class);
2884       return c.newInstance(conf);
2885     } catch (InvocationTargetException ite) {
2886       Throwable target = ite.getTargetException() != null?
2887         ite.getTargetException(): ite;
2888       if (target.getCause() != null) target = target.getCause();
2889       throw new RuntimeException("Failed construction of Master: " +
2890         masterClass.toString(), target);
2891     } catch (Exception e) {
2892       throw new RuntimeException("Failed construction of Master: " +
2893         masterClass.toString() + ((e.getCause() != null)?
2894           e.getCause().getMessage(): ""), e);
2895     }
2896   }
2897 
2898   /**
2899    * @see org.apache.hadoop.hbase.master.HMasterCommandLine
2900    */
2901   public static void main(String [] args) {
2902     VersionInfo.logVersion();
2903     new HMasterCommandLine(HMaster.class).doMain(args);
2904   }
2905 
2906   public HFileCleaner getHFileCleaner() {
2907     return this.hfileCleaner;
2908   }
2909 
2910   /**
2911    * Exposed for TESTING!
2912    * @return the underlying snapshot manager
2913    */
2914   public SnapshotManager getSnapshotManagerForTesting() {
2915     return this.snapshotManager;
2916   }
2917 
2918   /**
2919    * Triggers an asynchronous attempt to take a snapshot.
2920    * {@inheritDoc}
2921    */
2922   @Override
2923   public SnapshotResponse snapshot(RpcController controller, SnapshotRequest request)
2924       throws ServiceException {
2925     try {
2926       this.snapshotManager.checkSnapshotSupport();
2927     } catch (UnsupportedOperationException e) {
2928       throw new ServiceException(e);
2929     }
2930 
2931     LOG.info(getClientIdAuditPrefix() + " snapshot request for:" +
2932         ClientSnapshotDescriptionUtils.toString(request.getSnapshot()));
2933     // get the snapshot information
2934     SnapshotDescription snapshot = SnapshotDescriptionUtils.validate(request.getSnapshot(),
2935       this.conf);
2936     try {
2937       snapshotManager.takeSnapshot(snapshot);
2938     } catch (IOException e) {
2939       throw new ServiceException(e);
2940     }
2941 
2942     // send back the max amount of time the client should wait for the snapshot to complete
2943     long waitTime = SnapshotDescriptionUtils.getMaxMasterTimeout(conf, snapshot.getType(),
2944       SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME);
2945     return SnapshotResponse.newBuilder().setExpectedTimeout(waitTime).build();
2946   }
2947 
2948   /**
2949    * List the currently available/stored snapshots. Any in-progress snapshots are ignored
2950    */
2951   @Override
2952   public GetCompletedSnapshotsResponse getCompletedSnapshots(RpcController controller,
2953       GetCompletedSnapshotsRequest request) throws ServiceException {
2954     try {
2955       GetCompletedSnapshotsResponse.Builder builder = GetCompletedSnapshotsResponse.newBuilder();
2956       List<SnapshotDescription> snapshots = snapshotManager.getCompletedSnapshots();
2957 
2958       // convert to protobuf
2959       for (SnapshotDescription snapshot : snapshots) {
2960         builder.addSnapshots(snapshot);
2961       }
2962       return builder.build();
2963     } catch (IOException e) {
2964       throw new ServiceException(e);
2965     }
2966   }
2967 
2968   /**
2969    * Execute Delete Snapshot operation.
2970    * @return DeleteSnapshotResponse (a protobuf wrapped void) if the snapshot existed and was
2971    *    deleted properly.
2972    * @throws ServiceException wrapping SnapshotDoesNotExistException if specified snapshot did not
2973    *    exist.
2974    */
2975   @Override
2976   public DeleteSnapshotResponse deleteSnapshot(RpcController controller,
2977       DeleteSnapshotRequest request) throws ServiceException {
2978     try {
2979       this.snapshotManager.checkSnapshotSupport();
2980     } catch (UnsupportedOperationException e) {
2981       throw new ServiceException(e);
2982     }
2983 
2984     try {
2985       LOG.info(getClientIdAuditPrefix() + " delete " + request.getSnapshot());
2986       snapshotManager.deleteSnapshot(request.getSnapshot());
2987       return DeleteSnapshotResponse.newBuilder().build();
2988     } catch (IOException e) {
2989       throw new ServiceException(e);
2990     }
2991   }
2992 
2993   /**
2994    * Checks if the specified snapshot is done.
2995    * @return true if the snapshot is in file system ready to use,
2996    *   false if the snapshot is in the process of completing
2997    * @throws ServiceException wrapping UnknownSnapshotException if invalid snapshot, or
2998    *  a wrapped HBaseSnapshotException with progress failure reason.
2999    */
3000   @Override
3001   public IsSnapshotDoneResponse isSnapshotDone(RpcController controller,
3002       IsSnapshotDoneRequest request) throws ServiceException {
3003     LOG.debug("Checking to see if snapshot from request:" +
3004         ClientSnapshotDescriptionUtils.toString(request.getSnapshot()) + " is done");
3005     try {
3006       IsSnapshotDoneResponse.Builder builder = IsSnapshotDoneResponse.newBuilder();
3007       boolean done = snapshotManager.isSnapshotDone(request.getSnapshot());
3008       builder.setDone(done);
3009       return builder.build();
3010     } catch (IOException e) {
3011       throw new ServiceException(e);
3012     }
3013   }
3014 
3015   /**
3016    * Execute Restore/Clone snapshot operation.
3017    *
3018    * <p>If the specified table exists a "Restore" is executed, replacing the table
3019    * schema and directory data with the content of the snapshot.
3020    * The table must be disabled, or a UnsupportedOperationException will be thrown.
3021    *
3022    * <p>If the table doesn't exist a "Clone" is executed, a new table is created
3023    * using the schema at the time of the snapshot, and the content of the snapshot.
3024    *
3025    * <p>The restore/clone operation does not require copying HFiles. Since HFiles
3026    * are immutable the table can point to and use the same files as the original one.
3027    */
3028   @Override
3029   public RestoreSnapshotResponse restoreSnapshot(RpcController controller,
3030       RestoreSnapshotRequest request) throws ServiceException {
3031     try {
3032       this.snapshotManager.checkSnapshotSupport();
3033     } catch (UnsupportedOperationException e) {
3034       throw new ServiceException(e);
3035     }
3036 
3037     // ensure namespace exists
3038     try {
3039       TableName dstTable = TableName.valueOf(request.getSnapshot().getTable());
3040       getNamespaceDescriptor(dstTable.getNamespaceAsString());
3041     } catch (IOException ioe) {
3042       throw new ServiceException(ioe);
3043     }
3044 
3045     try {
3046       SnapshotDescription reqSnapshot = request.getSnapshot();
3047       snapshotManager.restoreSnapshot(reqSnapshot);
3048       return RestoreSnapshotResponse.newBuilder().build();
3049     } catch (IOException e) {
3050       throw new ServiceException(e);
3051     }
3052   }
3053 
3054   /**
3055    * Returns the status of the requested snapshot restore/clone operation.
3056    * This method is not exposed to the user, it is just used internally by HBaseAdmin
3057    * to verify if the restore is completed.
3058    *
3059    * No exceptions are thrown if the restore is not running, the result will be "done".
3060    *
3061    * @return done <tt>true</tt> if the restore/clone operation is completed.
3062    * @throws ServiceException if the operation failed.
3063    */
3064   @Override
3065   public IsRestoreSnapshotDoneResponse isRestoreSnapshotDone(RpcController controller,
3066       IsRestoreSnapshotDoneRequest request) throws ServiceException {
3067     try {
3068       SnapshotDescription snapshot = request.getSnapshot();
3069       IsRestoreSnapshotDoneResponse.Builder builder = IsRestoreSnapshotDoneResponse.newBuilder();
3070       boolean done = snapshotManager.isRestoreDone(snapshot);
3071       builder.setDone(done);
3072       return builder.build();
3073     } catch (IOException e) {
3074       throw new ServiceException(e);
3075     }
3076   }
3077 
3078   /**
3079    * Triggers an asynchronous attempt to run a distributed procedure.
3080    * {@inheritDoc}
3081    */
3082   @Override
3083   public ExecProcedureResponse execProcedure(RpcController controller,
3084       ExecProcedureRequest request) throws ServiceException {
3085     ProcedureDescription desc = request.getProcedure();
3086     MasterProcedureManager mpm = this.mpmHost.getProcedureManager(desc
3087         .getSignature());
3088     if (mpm == null) {
3089       throw new ServiceException("The procedure is not registered: "
3090           + desc.getSignature());
3091     }
3092 
3093     LOG.info(getClientIdAuditPrefix() + " procedure request for: "
3094         + desc.getSignature());
3095 
3096     try {
3097       mpm.execProcedure(desc);
3098     } catch (IOException e) {
3099       throw new ServiceException(e);
3100     }
3101 
3102     // send back the max amount of time the client should wait for the procedure
3103     // to complete
3104     long waitTime = SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME;
3105     return ExecProcedureResponse.newBuilder().setExpectedTimeout(waitTime)
3106         .build();
3107   }
3108 
3109   /**
3110    * Checks if the specified procedure is done.
3111    * @return true if the procedure is done,
3112    *   false if the procedure is in the process of completing
3113    * @throws ServiceException if invalid procedure, or
3114    *  a failed procedure with progress failure reason.
3115    */
3116   @Override
3117   public IsProcedureDoneResponse isProcedureDone(RpcController controller,
3118       IsProcedureDoneRequest request) throws ServiceException {
3119     ProcedureDescription desc = request.getProcedure();
3120     MasterProcedureManager mpm = this.mpmHost.getProcedureManager(desc
3121         .getSignature());
3122     if (mpm == null) {
3123       throw new ServiceException("The procedure is not registered: "
3124           + desc.getSignature());
3125     }
3126     LOG.debug("Checking to see if procedure from request:"
3127         + desc.getSignature() + " is done");
3128 
3129     try {
3130       IsProcedureDoneResponse.Builder builder = IsProcedureDoneResponse
3131           .newBuilder();
3132       boolean done = mpm.isProcedureDone(desc);
3133       builder.setDone(done);
3134       return builder.build();
3135     } catch (IOException e) {
3136       throw new ServiceException(e);
3137     }
3138   }
3139 
3140   @Override
3141   public ModifyNamespaceResponse modifyNamespace(RpcController controller,
3142       ModifyNamespaceRequest request) throws ServiceException {
3143     try {
3144       modifyNamespace(ProtobufUtil.toNamespaceDescriptor(request.getNamespaceDescriptor()));
3145       return ModifyNamespaceResponse.getDefaultInstance();
3146     } catch (IOException e) {
3147       throw new ServiceException(e);
3148     }
3149   }
3150 
3151   @Override
3152   public CreateNamespaceResponse createNamespace(RpcController controller,
3153      CreateNamespaceRequest request) throws ServiceException {
3154     try {
3155       createNamespace(ProtobufUtil.toNamespaceDescriptor(request.getNamespaceDescriptor()));
3156       return CreateNamespaceResponse.getDefaultInstance();
3157     } catch (IOException e) {
3158       throw new ServiceException(e);
3159     }
3160   }
3161 
3162   @Override
3163   public DeleteNamespaceResponse deleteNamespace(RpcController controller,
3164       DeleteNamespaceRequest request) throws ServiceException {
3165     try {
3166       deleteNamespace(request.getNamespaceName());
3167       return DeleteNamespaceResponse.getDefaultInstance();
3168     } catch (IOException e) {
3169       throw new ServiceException(e);
3170     }
3171   }
3172 
3173   @Override
3174   public GetNamespaceDescriptorResponse getNamespaceDescriptor(
3175       RpcController controller, GetNamespaceDescriptorRequest request)
3176       throws ServiceException {
3177     try {
3178       return GetNamespaceDescriptorResponse.newBuilder()
3179           .setNamespaceDescriptor(
3180               ProtobufUtil.toProtoNamespaceDescriptor(getNamespaceDescriptor(request.getNamespaceName())))
3181           .build();
3182     } catch (IOException e) {
3183       throw new ServiceException(e);
3184     }
3185   }
3186 
3187   @Override
3188   public ListNamespaceDescriptorsResponse listNamespaceDescriptors(
3189       RpcController controller, ListNamespaceDescriptorsRequest request)
3190       throws ServiceException {
3191     try {
3192       ListNamespaceDescriptorsResponse.Builder response =
3193           ListNamespaceDescriptorsResponse.newBuilder();
3194       for(NamespaceDescriptor ns: listNamespaceDescriptors()) {
3195         response.addNamespaceDescriptor(ProtobufUtil.toProtoNamespaceDescriptor(ns));
3196       }
3197       return response.build();
3198     } catch (IOException e) {
3199       throw new ServiceException(e);
3200     }
3201   }
3202 
3203   @Override
3204   public ListTableDescriptorsByNamespaceResponse listTableDescriptorsByNamespace(
3205       RpcController controller, ListTableDescriptorsByNamespaceRequest request)
3206       throws ServiceException {
3207     try {
3208       ListTableDescriptorsByNamespaceResponse.Builder b =
3209           ListTableDescriptorsByNamespaceResponse.newBuilder();
3210       for(HTableDescriptor htd: listTableDescriptorsByNamespace(request.getNamespaceName())) {
3211         b.addTableSchema(htd.convert());
3212       }
3213       return b.build();
3214     } catch (IOException e) {
3215       throw new ServiceException(e);
3216     }
3217   }
3218 
3219   @Override
3220   public ListTableNamesByNamespaceResponse listTableNamesByNamespace(
3221       RpcController controller, ListTableNamesByNamespaceRequest request)
3222       throws ServiceException {
3223     try {
3224       ListTableNamesByNamespaceResponse.Builder b =
3225           ListTableNamesByNamespaceResponse.newBuilder();
3226       for (TableName tableName: listTableNamesByNamespace(request.getNamespaceName())) {
3227         b.addTableName(ProtobufUtil.toProtoTableName(tableName));
3228       }
3229       return b.build();
3230     } catch (IOException e) {
3231       throw new ServiceException(e);
3232     }
3233   }
3234 
3235   private boolean isHealthCheckerConfigured() {
3236     String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
3237     return org.apache.commons.lang.StringUtils.isNotBlank(healthScriptLocation);
3238   }
3239 
3240   @Override
3241   public void createNamespace(NamespaceDescriptor descriptor) throws IOException {
3242     TableName.isLegalNamespaceName(Bytes.toBytes(descriptor.getName()));
3243     if (cpHost != null) {
3244       if (cpHost.preCreateNamespace(descriptor)) {
3245         return;
3246       }
3247     }
3248     LOG.info(getClientIdAuditPrefix() + " creating " + descriptor);
3249     tableNamespaceManager.create(descriptor);
3250     if (cpHost != null) {
3251       cpHost.postCreateNamespace(descriptor);
3252     }
3253   }
3254 
3255   @Override
3256   public void modifyNamespace(NamespaceDescriptor descriptor) throws IOException {
3257     TableName.isLegalNamespaceName(Bytes.toBytes(descriptor.getName()));
3258     if (cpHost != null) {
3259       if (cpHost.preModifyNamespace(descriptor)) {
3260         return;
3261       }
3262     }
3263     LOG.info(getClientIdAuditPrefix() + " modify " + descriptor);
3264     tableNamespaceManager.update(descriptor);
3265     if (cpHost != null) {
3266       cpHost.postModifyNamespace(descriptor);
3267     }
3268   }
3269 
3270   @Override
3271   public void deleteNamespace(String name) throws IOException {
3272     if (cpHost != null) {
3273       if (cpHost.preDeleteNamespace(name)) {
3274         return;
3275       }
3276     }
3277     LOG.info(getClientIdAuditPrefix() + " delete " + name);
3278     tableNamespaceManager.remove(name);
3279     if (cpHost != null) {
3280       cpHost.postDeleteNamespace(name);
3281     }
3282   }
3283 
3284   @Override
3285   public NamespaceDescriptor getNamespaceDescriptor(String name) throws IOException {
3286     boolean ready = tableNamespaceManager != null &&
3287         tableNamespaceManager.isTableAvailableAndInitialized();
3288     if (!ready) {
3289       throw new IOException("Table Namespace Manager not ready yet, try again later");
3290     }
3291     NamespaceDescriptor nsd = tableNamespaceManager.get(name);
3292     if (nsd == null) {
3293       throw new NamespaceNotFoundException(name);
3294     }
3295     return nsd;
3296   }
3297 
3298   @Override
3299   public List<NamespaceDescriptor> listNamespaceDescriptors() throws IOException {
3300     return Lists.newArrayList(tableNamespaceManager.list());
3301   }
3302 
3303   @Override
3304   public List<HTableDescriptor> listTableDescriptorsByNamespace(String name) throws IOException {
3305     getNamespaceDescriptor(name); // check that namespace exists
3306     return Lists.newArrayList(tableDescriptors.getByNamespace(name).values());
3307   }
3308 
3309   @Override
3310   public List<TableName> listTableNamesByNamespace(String name) throws IOException {
3311     List<TableName> tableNames = Lists.newArrayList();
3312     getNamespaceDescriptor(name); // check that namespace exists
3313     for (HTableDescriptor descriptor: tableDescriptors.getByNamespace(name).values()) {
3314       tableNames.add(descriptor.getTableName());
3315     }
3316     return tableNames;
3317   }
3318 
3319 }