1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.master;
20
21 import java.io.IOException;
22 import java.lang.reflect.Constructor;
23 import java.lang.reflect.InvocationTargetException;
24 import java.net.InetAddress;
25 import java.net.InetSocketAddress;
26 import java.net.UnknownHostException;
27 import java.util.ArrayList;
28 import java.util.Collections;
29 import java.util.Comparator;
30 import java.util.HashSet;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.Set;
34 import java.util.concurrent.Callable;
35 import java.util.concurrent.ExecutionException;
36 import java.util.concurrent.Executors;
37 import java.util.concurrent.Future;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicReference;
40
41 import javax.management.ObjectName;
42
43 import org.apache.commons.logging.Log;
44 import org.apache.commons.logging.LogFactory;
45 import org.apache.hadoop.classification.InterfaceAudience;
46 import org.apache.hadoop.conf.Configuration;
47 import org.apache.hadoop.fs.Path;
48 import org.apache.hadoop.hbase.Abortable;
49 import org.apache.hadoop.hbase.Chore;
50 import org.apache.hadoop.hbase.ClusterId;
51 import org.apache.hadoop.hbase.ClusterStatus;
52 import org.apache.hadoop.hbase.HBaseIOException;
53 import org.apache.hadoop.hbase.HColumnDescriptor;
54 import org.apache.hadoop.hbase.HConstants;
55 import org.apache.hadoop.hbase.HRegionInfo;
56 import org.apache.hadoop.hbase.HTableDescriptor;
57 import org.apache.hadoop.hbase.HealthCheckChore;
58 import org.apache.hadoop.hbase.Server;
59 import org.apache.hadoop.hbase.ServerLoad;
60 import org.apache.hadoop.hbase.ServerName;
61 import org.apache.hadoop.hbase.TableDescriptors;
62 import org.apache.hadoop.hbase.catalog.CatalogTracker;
63 import org.apache.hadoop.hbase.catalog.MetaReader;
64 import org.apache.hadoop.hbase.client.HConnectionManager;
65 import org.apache.hadoop.hbase.client.MetaScanner;
66 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
67 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
68 import org.apache.hadoop.hbase.client.Result;
69 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
70 import org.apache.hadoop.hbase.exceptions.DeserializationException;
71 import org.apache.hadoop.hbase.exceptions.MasterNotRunningException;
72 import org.apache.hadoop.hbase.exceptions.NotAllMetaRegionsOnlineException;
73 import org.apache.hadoop.hbase.exceptions.PleaseHoldException;
74 import org.apache.hadoop.hbase.exceptions.TableNotDisabledException;
75 import org.apache.hadoop.hbase.exceptions.TableNotFoundException;
76 import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
77 import org.apache.hadoop.hbase.exceptions.UnknownRegionException;
78 import org.apache.hadoop.hbase.executor.ExecutorService;
79 import org.apache.hadoop.hbase.executor.ExecutorType;
80 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
81 import org.apache.hadoop.hbase.ipc.RpcServer;
82 import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
83 import org.apache.hadoop.hbase.ipc.ServerRpcController;
84 import org.apache.hadoop.hbase.master.balancer.BalancerChore;
85 import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
86 import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
87 import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
88 import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
89 import org.apache.hadoop.hbase.master.handler.CreateTableHandler;
90 import org.apache.hadoop.hbase.master.handler.DeleteTableHandler;
91 import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
92 import org.apache.hadoop.hbase.master.handler.DispatchMergingRegionHandler;
93 import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
94 import org.apache.hadoop.hbase.master.handler.ModifyTableHandler;
95 import org.apache.hadoop.hbase.master.handler.TableAddFamilyHandler;
96 import org.apache.hadoop.hbase.master.handler.TableDeleteFamilyHandler;
97 import org.apache.hadoop.hbase.master.handler.TableModifyFamilyHandler;
98 import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
99 import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
100 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
101 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
102 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
103 import org.apache.hadoop.hbase.protobuf.RequestConverter;
104 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
105 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
106 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
107 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
108 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
109 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
110 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos;
111 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AddColumnRequest;
112 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AddColumnResponse;
113 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AssignRegionRequest;
114 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AssignRegionResponse;
115 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.BalanceRequest;
116 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.BalanceResponse;
117 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CatalogScanRequest;
118 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CatalogScanResponse;
119 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CreateTableRequest;
120 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.CreateTableResponse;
121 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteColumnRequest;
122 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteColumnResponse;
123 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteSnapshotRequest;
124 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteSnapshotResponse;
125 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteTableRequest;
126 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DeleteTableResponse;
127 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DisableTableRequest;
128 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DisableTableResponse;
129 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DispatchMergingRegionsRequest;
130 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.DispatchMergingRegionsResponse;
131 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableCatalogJanitorRequest;
132 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableCatalogJanitorResponse;
133 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableTableRequest;
134 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.EnableTableResponse;
135 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledRequest;
136 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsCatalogJanitorEnabledResponse;
137 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsRestoreSnapshotDoneRequest;
138 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsRestoreSnapshotDoneResponse;
139 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsSnapshotDoneRequest;
140 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.IsSnapshotDoneResponse;
141 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ListSnapshotRequest;
142 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ListSnapshotResponse;
143 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyColumnRequest;
144 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyColumnResponse;
145 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyTableRequest;
146 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ModifyTableResponse;
147 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.MoveRegionRequest;
148 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.MoveRegionResponse;
149 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.OfflineRegionRequest;
150 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.OfflineRegionResponse;
151 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.RestoreSnapshotRequest;
152 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.RestoreSnapshotResponse;
153 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.SetBalancerRunningRequest;
154 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.SetBalancerRunningResponse;
155 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ShutdownRequest;
156 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.ShutdownResponse;
157 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.StopMasterRequest;
158 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.StopMasterResponse;
159 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.TakeSnapshotRequest;
160 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.TakeSnapshotResponse;
161 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.UnassignRegionRequest;
162 import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.UnassignRegionResponse;
163 import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos;
164 import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetClusterStatusRequest;
165 import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetClusterStatusResponse;
166 import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetSchemaAlterStatusRequest;
167 import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetSchemaAlterStatusResponse;
168 import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetTableDescriptorsRequest;
169 import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetTableDescriptorsResponse;
170 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest;
171 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningResponse;
172 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos;
173 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
174 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
175 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
176 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportResponse;
177 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
178 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
179 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
180 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorResponse;
181 import org.apache.hadoop.hbase.replication.regionserver.Replication;
182 import org.apache.hadoop.hbase.security.User;
183 import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
184 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
185 import org.apache.hadoop.hbase.trace.SpanReceiverHost;
186 import org.apache.hadoop.hbase.util.Bytes;
187 import org.apache.hadoop.hbase.util.CompressionTest;
188 import org.apache.hadoop.hbase.util.FSTableDescriptors;
189 import org.apache.hadoop.hbase.util.FSUtils;
190 import org.apache.hadoop.hbase.util.HFileArchiveUtil;
191 import org.apache.hadoop.hbase.util.HasThread;
192 import org.apache.hadoop.hbase.util.InfoServer;
193 import org.apache.hadoop.hbase.util.Pair;
194 import org.apache.hadoop.hbase.util.Sleeper;
195 import org.apache.hadoop.hbase.util.Strings;
196 import org.apache.hadoop.hbase.util.Threads;
197 import org.apache.hadoop.hbase.util.VersionInfo;
198 import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
199 import org.apache.hadoop.hbase.zookeeper.DrainingServerTracker;
200 import org.apache.hadoop.hbase.zookeeper.LoadBalancerTracker;
201 import org.apache.hadoop.hbase.zookeeper.RegionServerTracker;
202 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
203 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
204 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
205 import org.apache.hadoop.metrics.util.MBeanUtil;
206 import org.apache.hadoop.net.DNS;
207 import org.apache.zookeeper.KeeperException;
208 import org.apache.zookeeper.Watcher;
209
210 import com.google.common.collect.Maps;
211 import com.google.protobuf.Descriptors;
212 import com.google.protobuf.Message;
213 import com.google.protobuf.RpcCallback;
214 import com.google.protobuf.RpcController;
215 import com.google.protobuf.Service;
216 import com.google.protobuf.ServiceException;
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233 @InterfaceAudience.Private
234 @SuppressWarnings("deprecation")
235 public class HMaster extends HasThread
236 implements MasterMonitorProtos.MasterMonitorService.BlockingInterface,
237 MasterAdminProtos.MasterAdminService.BlockingInterface,
238 RegionServerStatusProtos.RegionServerStatusService.BlockingInterface,
239 MasterServices, Server {
240 private static final Log LOG = LogFactory.getLog(HMaster.class.getName());
241
242
243
244 public static final String MASTER = "master";
245
246
247 private final Configuration conf;
248
249 private InfoServer infoServer;
250
251
252 private ZooKeeperWatcher zooKeeper;
253
254 private ActiveMasterManager activeMasterManager;
255
256 RegionServerTracker regionServerTracker;
257
258 private DrainingServerTracker drainingServerTracker;
259
260 private LoadBalancerTracker loadBalancerTracker;
261
262
263 private final RpcServerInterface rpcServer;
264
265
266 private volatile boolean rpcServerOpen = false;
267
268
269
270
271 private final InetSocketAddress isa;
272
273
274 private final MetricsMaster metricsMaster;
275
276 private MasterFileSystem fileSystemManager;
277
278
279 ServerManager serverManager;
280
281
282 AssignmentManager assignmentManager;
283
284 private CatalogTracker catalogTracker;
285
286 private ClusterStatusTracker clusterStatusTracker;
287
288
289
290
291 private MemoryBoundedLogMessageBuffer rsFatals;
292
293
294
295 private volatile boolean stopped = false;
296
297 private volatile boolean abort = false;
298
299 private volatile boolean isActiveMaster = false;
300
301
302
303 volatile boolean initialized = false;
304
305
306 private volatile boolean serverShutdownHandlerEnabled = false;
307
308
309 ExecutorService executorService;
310
311 private LoadBalancer balancer;
312 private Thread balancerChore;
313 private Thread clusterStatusChore;
314 private ClusterStatusPublisher clusterStatusPublisherChore = null;
315
316 private CatalogJanitor catalogJanitorChore;
317 private LogCleaner logCleaner;
318 private HFileCleaner hfileCleaner;
319
320 private MasterCoprocessorHost cpHost;
321 private final ServerName serverName;
322
323 private TableDescriptors tableDescriptors;
324
325
326 private TableLockManager tableLockManager;
327
328
329 private long masterStartTime;
330 private long masterActiveTime;
331
332
333 private final int msgInterval;
334
335
336
337 private ObjectName mxBean = null;
338
339
340 private final boolean masterCheckCompression;
341
342 private SpanReceiverHost spanReceiverHost;
343
344 private Map<String, Service> coprocessorServiceHandlers = Maps.newHashMap();
345
346
347 private SnapshotManager snapshotManager;
348
349
350 private HealthCheckChore healthCheckChore;
351
352
353
354
355
356 private final boolean distributedLogReplay;
357
358
359 private volatile boolean initializationBeforeMetaAssignment = false;
360
361
362
363
364
365
366
367
368
369
370
371
372
373 public HMaster(final Configuration conf)
374 throws IOException, KeeperException, InterruptedException {
375 this.conf = new Configuration(conf);
376
377 this.conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
378
379 String hostname = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
380 conf.get("hbase.master.dns.interface", "default"),
381 conf.get("hbase.master.dns.nameserver", "default")));
382 int port = conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT);
383
384 InetSocketAddress initialIsa = new InetSocketAddress(hostname, port);
385 if (initialIsa.getAddress() == null) {
386 throw new IllegalArgumentException("Failed resolve of hostname " + initialIsa);
387 }
388
389 String bindAddress = conf.get("hbase.master.ipc.address");
390 if (bindAddress != null) {
391 initialIsa = new InetSocketAddress(bindAddress, port);
392 if (initialIsa.getAddress() == null) {
393 throw new IllegalArgumentException("Failed resolve of bind address " + initialIsa);
394 }
395 }
396 String name = "master/" + initialIsa.toString();
397
398 HConnectionManager.setServerSideHConnectionRetries(this.conf, name, LOG);
399 int numHandlers = conf.getInt("hbase.master.handler.count",
400 conf.getInt("hbase.regionserver.handler.count", 25));
401 this.rpcServer = new RpcServer(this, name, getServices(),
402 initialIsa,
403 numHandlers,
404 0,
405 conf,
406 0);
407
408 this.isa = this.rpcServer.getListenerAddress();
409 this.serverName = new ServerName(hostname, this.isa.getPort(), System.currentTimeMillis());
410 this.rsFatals = new MemoryBoundedLogMessageBuffer(
411 conf.getLong("hbase.master.buffer.for.rs.fatals", 1*1024*1024));
412
413
414 ZKUtil.loginClient(this.conf, "hbase.zookeeper.client.keytab.file",
415 "hbase.zookeeper.client.kerberos.principal", this.isa.getHostName());
416
417
418 User.login(conf, "hbase.master.keytab.file",
419 "hbase.master.kerberos.principal", this.isa.getHostName());
420
421 LOG.info("hbase.rootdir=" + FSUtils.getRootDir(this.conf) +
422 ", hbase.cluster.distributed=" + this.conf.getBoolean("hbase.cluster.distributed", false));
423
424
425 setName(MASTER + "-" + this.serverName.toString());
426
427 Replication.decorateMasterConfiguration(this.conf);
428
429
430
431 if (this.conf.get("mapred.task.id") == null) {
432 this.conf.set("mapred.task.id", "hb_m_" + this.serverName.toString());
433 }
434
435 this.zooKeeper = new ZooKeeperWatcher(conf, MASTER + ":" + isa.getPort(), this, true);
436 this.rpcServer.startThreads();
437
438
439 this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
440
441
442 this.masterCheckCompression = conf.getBoolean("hbase.master.check.compression", true);
443
444 this.metricsMaster = new MetricsMaster( new MetricsMasterWrapperImpl(this));
445
446
447 int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
448 HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
449 if (isHealthCheckerConfigured()) {
450 healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
451 }
452
453
454 Class<? extends ClusterStatusPublisher.Publisher> publisherClass =
455 conf.getClass(ClusterStatusPublisher.STATUS_PUBLISHER_CLASS,
456 ClusterStatusPublisher.DEFAULT_STATUS_PUBLISHER_CLASS,
457 ClusterStatusPublisher.Publisher.class);
458
459 if (publisherClass != null) {
460 clusterStatusPublisherChore = new ClusterStatusPublisher(this, conf, publisherClass);
461 Threads.setDaemonThreadRunning(clusterStatusPublisherChore.getThread());
462 }
463
464 distributedLogReplay = this.conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
465 HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
466 }
467
468
469
470
471 private List<BlockingServiceAndInterface> getServices() {
472 List<BlockingServiceAndInterface> bssi = new ArrayList<BlockingServiceAndInterface>(3);
473 bssi.add(new BlockingServiceAndInterface(
474 MasterMonitorProtos.MasterMonitorService.newReflectiveBlockingService(this),
475 MasterMonitorProtos.MasterMonitorService.BlockingInterface.class));
476 bssi.add(new BlockingServiceAndInterface(
477 MasterAdminProtos.MasterAdminService.newReflectiveBlockingService(this),
478 MasterAdminProtos.MasterAdminService.BlockingInterface.class));
479 bssi.add(new BlockingServiceAndInterface(
480 RegionServerStatusProtos.RegionServerStatusService.newReflectiveBlockingService(this),
481 RegionServerStatusProtos.RegionServerStatusService.BlockingInterface.class));
482 return bssi;
483 }
484
485
486
487
488
489
490
491
492 private static void stallIfBackupMaster(final Configuration c,
493 final ActiveMasterManager amm)
494 throws InterruptedException {
495
496 if (!c.getBoolean(HConstants.MASTER_TYPE_BACKUP,
497 HConstants.DEFAULT_MASTER_TYPE_BACKUP)) {
498 return;
499 }
500 LOG.debug("HMaster started in backup mode. " +
501 "Stalling until master znode is written.");
502
503
504 while (!amm.isActiveMaster()) {
505 LOG.debug("Waiting for master address ZNode to be written " +
506 "(Also watching cluster state node)");
507 Thread.sleep(
508 c.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT));
509 }
510
511 }
512
513 MetricsMaster getMetrics() {
514 return metricsMaster;
515 }
516
517
518
519
520
521
522
523
524
525
526 @Override
527 public void run() {
528 MonitoredTask startupStatus =
529 TaskMonitor.get().createStatus("Master startup");
530 startupStatus.setDescription("Master startup");
531 masterStartTime = System.currentTimeMillis();
532 try {
533
534
535
536
537
538
539
540
541
542
543 becomeActiveMaster(startupStatus);
544
545
546 if (!this.stopped) {
547 finishInitialization(startupStatus, false);
548 loop();
549 }
550 } catch (Throwable t) {
551
552 if (t instanceof NoClassDefFoundError &&
553 t.getMessage().contains("org/apache/hadoop/hdfs/protocol/FSConstants$SafeModeAction")) {
554
555 abort("HBase is having a problem with its Hadoop jars. You may need to "
556 + "recompile HBase against Hadoop version "
557 + org.apache.hadoop.util.VersionInfo.getVersion()
558 + " or change your hadoop jars to start properly", t);
559 } else {
560 abort("Unhandled exception. Starting shutdown.", t);
561 }
562 } finally {
563 startupStatus.cleanup();
564
565 stopChores();
566
567
568 if (!this.abort && this.serverManager != null &&
569 this.serverManager.isClusterShutdown()) {
570 this.serverManager.letRegionServersShutdown();
571 }
572 stopServiceThreads();
573
574 if (this.activeMasterManager != null) this.activeMasterManager.stop();
575 if (this.catalogTracker != null) this.catalogTracker.stop();
576 if (this.serverManager != null) this.serverManager.stop();
577 if (this.assignmentManager != null) this.assignmentManager.stop();
578 if (this.fileSystemManager != null) this.fileSystemManager.stop();
579 if (this.snapshotManager != null) this.snapshotManager.stop("server shutting down.");
580 this.zooKeeper.close();
581 }
582 LOG.info("HMaster main thread exiting");
583 }
584
585
586
587
588
589
590
591 private boolean becomeActiveMaster(MonitoredTask startupStatus)
592 throws InterruptedException {
593
594
595 this.activeMasterManager = new ActiveMasterManager(zooKeeper, this.serverName,
596 this);
597 this.zooKeeper.registerListener(activeMasterManager);
598 stallIfBackupMaster(this.conf, this.activeMasterManager);
599
600
601
602
603 this.clusterStatusTracker = new ClusterStatusTracker(getZooKeeper(), this);
604 this.clusterStatusTracker.start();
605 return this.activeMasterManager.blockUntilBecomingActiveMaster(startupStatus);
606 }
607
608
609
610
611
612
613 void initializeZKBasedSystemTrackers() throws IOException,
614 InterruptedException, KeeperException {
615 this.catalogTracker = createCatalogTracker(this.zooKeeper, this.conf, this);
616 this.catalogTracker.start();
617
618 this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
619 this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this);
620 this.loadBalancerTracker.start();
621 this.assignmentManager = new AssignmentManager(this, serverManager,
622 this.catalogTracker, this.balancer, this.executorService, this.metricsMaster,
623 this.tableLockManager);
624 zooKeeper.registerListenerFirst(assignmentManager);
625
626 this.regionServerTracker = new RegionServerTracker(zooKeeper, this,
627 this.serverManager);
628 this.regionServerTracker.start();
629
630 this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this,
631 this.serverManager);
632 this.drainingServerTracker.start();
633
634
635
636 boolean wasUp = this.clusterStatusTracker.isClusterUp();
637 if (!wasUp) this.clusterStatusTracker.setClusterUp();
638
639 LOG.info("Server active/primary master=" + this.serverName +
640 ", sessionid=0x" +
641 Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) +
642 ", setting cluster-up flag (Was=" + wasUp + ")");
643
644
645 this.snapshotManager = new SnapshotManager(this, this.metricsMaster);
646 }
647
648
649
650
651
652
653
654
655
656
657
658
659
660 CatalogTracker createCatalogTracker(final ZooKeeperWatcher zk,
661 final Configuration conf, Abortable abortable)
662 throws IOException {
663 return new CatalogTracker(zk, conf, abortable);
664 }
665
666
667 private Sleeper stopSleeper = new Sleeper(100, this);
668
669 private void loop() {
670 long lastMsgTs = 0l;
671 long now = 0l;
672 while (!this.stopped) {
673 now = System.currentTimeMillis();
674 if ((now - lastMsgTs) >= this.msgInterval) {
675 doMetrics();
676 lastMsgTs = System.currentTimeMillis();
677 }
678 stopSleeper.sleep();
679 }
680 }
681
682
683
684
685
686 private void doMetrics() {
687 try {
688 this.assignmentManager.updateRegionsInTransitionMetrics();
689 } catch (Throwable e) {
690 LOG.error("Couldn't update metrics: " + e.getMessage());
691 }
692 }
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715 private void finishInitialization(MonitoredTask status, boolean masterRecovery)
716 throws IOException, InterruptedException, KeeperException {
717
718 isActiveMaster = true;
719
720
721
722
723
724
725
726 status.setStatus("Initializing Master file system");
727 this.masterActiveTime = System.currentTimeMillis();
728
729 this.fileSystemManager = new MasterFileSystem(this, this, metricsMaster, masterRecovery);
730
731 this.tableDescriptors =
732 new FSTableDescriptors(this.fileSystemManager.getFileSystem(),
733 this.fileSystemManager.getRootDir());
734
735
736 status.setStatus("Publishing Cluster ID in ZooKeeper");
737 ZKClusterId.setClusterId(this.zooKeeper, fileSystemManager.getClusterId());
738
739 if (!masterRecovery) {
740 this.executorService = new ExecutorService(getServerName().toString());
741 this.serverManager = createServerManager(this, this);
742 }
743
744
745
746 this.tableLockManager = TableLockManager.createTableLockManager(conf, zooKeeper, serverName);
747 if (!masterRecovery) {
748 this.tableLockManager.reapWriteLocks();
749 }
750
751 status.setStatus("Initializing ZK system trackers");
752 initializeZKBasedSystemTrackers();
753
754 if (!masterRecovery) {
755
756 status.setStatus("Initializing master coprocessors");
757 this.cpHost = new MasterCoprocessorHost(this, this.conf);
758
759 spanReceiverHost = new SpanReceiverHost(getConfiguration());
760 spanReceiverHost.loadSpanReceivers();
761
762
763 status.setStatus("Initializing master service threads");
764 startServiceThreads();
765 }
766
767
768 this.serverManager.waitForRegionServers(status);
769
770 for (ServerName sn: this.regionServerTracker.getOnlineServers()) {
771 if (!this.serverManager.isServerOnline(sn)
772 && serverManager.checkAlreadySameHostPortAndRecordNewServer(
773 sn, ServerLoad.EMPTY_SERVERLOAD)) {
774 LOG.info("Registered server found up in zk but who has not yet "
775 + "reported in: " + sn);
776 }
777 }
778
779 if (!masterRecovery) {
780 this.assignmentManager.startTimeOutMonitor();
781 }
782
783
784
785
786 Set<ServerName> previouslyFailedServers = this.fileSystemManager
787 .getFailedServersFromLogFolders();
788
789
790 this.fileSystemManager.removeStaleRecoveringRegionsFromZK(previouslyFailedServers);
791
792
793 ServerName oldMetaServerLocation = this.catalogTracker.getMetaLocation();
794 if (oldMetaServerLocation != null && previouslyFailedServers.contains(oldMetaServerLocation)) {
795 splitMetaLogBeforeAssignment(oldMetaServerLocation);
796
797
798 }
799
800 this.initializationBeforeMetaAssignment = true;
801
802 status.setStatus("Assigning Meta Region");
803 assignMeta(status);
804
805
806 if(this.stopped) return;
807
808 if (this.distributedLogReplay && oldMetaServerLocation != null
809 && previouslyFailedServers.contains(oldMetaServerLocation)) {
810
811 status.setStatus("replaying log for Meta Region");
812 this.fileSystemManager.splitMetaLog(oldMetaServerLocation);
813 }
814
815 enableServerShutdownHandler();
816
817 status.setStatus("Submitting log splitting work for previously failed region servers");
818
819
820 for (ServerName tmpServer : previouslyFailedServers) {
821 this.serverManager.processDeadServer(tmpServer, true);
822 }
823
824
825
826
827 org.apache.hadoop.hbase.catalog.MetaMigrationConvertingToPB
828 .updateMetaIfNecessary(this);
829
830 this.balancer.setMasterServices(this);
831
832 status.setStatus("Starting assignment manager");
833 this.assignmentManager.joinCluster();
834
835 this.balancer.setClusterStatus(getClusterStatus());
836
837 if (!masterRecovery) {
838
839
840 status.setStatus("Starting balancer and catalog janitor");
841 this.clusterStatusChore = getAndStartClusterStatusChore(this);
842 this.balancerChore = getAndStartBalancerChore(this);
843 this.catalogJanitorChore = new CatalogJanitor(this, this);
844 startCatalogJanitorChore();
845 }
846
847 status.markComplete("Initialization successful");
848 LOG.info("Master has completed initialization");
849 initialized = true;
850
851
852
853 this.serverManager.clearDeadServersWithSameHostNameAndPortOfOnlineServer();
854
855 if (!masterRecovery) {
856 if (this.cpHost != null) {
857
858 try {
859 this.cpHost.postStartMaster();
860 } catch (IOException ioe) {
861 LOG.error("Coprocessor postStartMaster() hook failed", ioe);
862 }
863 }
864 }
865 }
866
867
868
869
870
871 protected void startCatalogJanitorChore() {
872 Threads.setDaemonThreadRunning(catalogJanitorChore.getThread());
873 }
874
875
876
877
878
879
880
881
882
883 ServerManager createServerManager(final Server master,
884 final MasterServices services)
885 throws IOException {
886
887
888 return new ServerManager(master, services);
889 }
890
891
892
893
894
895 private void enableServerShutdownHandler() {
896 if (!serverShutdownHandlerEnabled) {
897 serverShutdownHandlerEnabled = true;
898 this.serverManager.processQueuedDeadServers();
899 }
900 }
901
902
903
904
905
906
907
908
909 void assignMeta(MonitoredTask status)
910 throws InterruptedException, IOException, KeeperException {
911
912 int assigned = 0;
913 long timeout = this.conf.getLong("hbase.catalog.verification.timeout", 1000);
914 boolean beingExpired = false;
915
916 status.setStatus("Assigning META region");
917
918 assignmentManager.getRegionStates().createRegionState(HRegionInfo.FIRST_META_REGIONINFO);
919 boolean rit = this.assignmentManager
920 .processRegionInTransitionAndBlockUntilAssigned(HRegionInfo.FIRST_META_REGIONINFO);
921 boolean metaRegionLocation = this.catalogTracker.verifyMetaRegionLocation(timeout);
922 if (!rit && !metaRegionLocation) {
923 ServerName currentMetaServer = this.catalogTracker.getMetaLocation();
924 if (currentMetaServer != null) {
925 beingExpired = expireIfOnline(currentMetaServer);
926 }
927 if (beingExpired) {
928 splitMetaLogBeforeAssignment(currentMetaServer);
929 }
930 assignmentManager.assignMeta();
931
932 enableSSHandWaitForMeta();
933 assigned++;
934 if (beingExpired && this.distributedLogReplay) {
935
936 this.fileSystemManager.splitMetaLog(currentMetaServer);
937 }
938 } else if (rit && !metaRegionLocation) {
939
940 enableSSHandWaitForMeta();
941 assigned++;
942 } else {
943
944 this.assignmentManager.regionOnline(HRegionInfo.FIRST_META_REGIONINFO,
945 this.catalogTracker.getMetaLocation());
946 }
947
948 enableCatalogTables(Bytes.toString(HConstants.META_TABLE_NAME));
949 LOG.info(".META. assigned=" + assigned + ", rit=" + rit + ", location="
950 + catalogTracker.getMetaLocation());
951 status.setStatus("META assigned.");
952 }
953
954 private void splitMetaLogBeforeAssignment(ServerName currentMetaServer) throws IOException {
955 if (this.distributedLogReplay) {
956
957 Set<HRegionInfo> regions = new HashSet<HRegionInfo>();
958 regions.add(HRegionInfo.FIRST_META_REGIONINFO);
959 this.fileSystemManager.prepareMetaLogReplay(currentMetaServer, regions);
960 } else {
961
962 this.fileSystemManager.splitMetaLog(currentMetaServer);
963 }
964 }
965
966 private void enableSSHandWaitForMeta() throws IOException, InterruptedException {
967 enableServerShutdownHandler();
968 this.catalogTracker.waitForMeta();
969
970
971 this.assignmentManager.waitForAssignment(HRegionInfo.FIRST_META_REGIONINFO);
972 }
973
974 private void enableCatalogTables(String catalogTableName) {
975 if (!this.assignmentManager.getZKTable().isEnabledTable(catalogTableName)) {
976 this.assignmentManager.setEnabledTable(catalogTableName);
977 }
978 }
979
980
981
982
983
984
985
986 private boolean expireIfOnline(final ServerName sn)
987 throws IOException {
988 if (sn == null || !serverManager.isServerOnline(sn)) {
989 return false;
990 }
991 LOG.info("Forcing expire of " + sn);
992 serverManager.expireServer(sn);
993 return true;
994 }
995
996 @Override
997 public TableDescriptors getTableDescriptors() {
998 return this.tableDescriptors;
999 }
1000
1001
1002 public InfoServer getInfoServer() {
1003 return this.infoServer;
1004 }
1005
1006 @Override
1007 public Configuration getConfiguration() {
1008 return this.conf;
1009 }
1010
1011 @Override
1012 public ServerManager getServerManager() {
1013 return this.serverManager;
1014 }
1015
1016 @Override
1017 public ExecutorService getExecutorService() {
1018 return this.executorService;
1019 }
1020
1021 @Override
1022 public MasterFileSystem getMasterFileSystem() {
1023 return this.fileSystemManager;
1024 }
1025
1026
1027
1028
1029
1030 public ZooKeeperWatcher getZooKeeperWatcher() {
1031 return this.zooKeeper;
1032 }
1033
1034
1035
1036
1037
1038
1039
1040
1041 void startServiceThreads() throws IOException{
1042
1043 this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION,
1044 conf.getInt("hbase.master.executor.openregion.threads", 5));
1045 this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION,
1046 conf.getInt("hbase.master.executor.closeregion.threads", 5));
1047 this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
1048 conf.getInt("hbase.master.executor.serverops.threads", 3));
1049 this.executorService.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS,
1050 conf.getInt("hbase.master.executor.serverops.threads", 5));
1051
1052
1053
1054
1055 this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
1056
1057
1058 String n = Thread.currentThread().getName();
1059 int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 60 * 1000);
1060 this.logCleaner =
1061 new LogCleaner(cleanerInterval,
1062 this, conf, getMasterFileSystem().getFileSystem(),
1063 getMasterFileSystem().getOldLogDir());
1064 Threads.setDaemonThreadRunning(logCleaner.getThread(), n + ".oldLogCleaner");
1065
1066
1067 Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
1068 this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem()
1069 .getFileSystem(), archiveDir);
1070 Threads.setDaemonThreadRunning(hfileCleaner.getThread(), n + ".archivedHFileCleaner");
1071
1072
1073 int port = this.conf.getInt(HConstants.MASTER_INFO_PORT, 60010);
1074 if (port >= 0) {
1075 String a = this.conf.get("hbase.master.info.bindAddress", "0.0.0.0");
1076 this.infoServer = new InfoServer(MASTER, a, port, false, this.conf);
1077 this.infoServer.addServlet("status", "/master-status", MasterStatusServlet.class);
1078 this.infoServer.addServlet("dump", "/dump", MasterDumpServlet.class);
1079 this.infoServer.setAttribute(MASTER, this);
1080 this.infoServer.start();
1081 }
1082
1083
1084 if (this.healthCheckChore != null) {
1085 Threads.setDaemonThreadRunning(this.healthCheckChore.getThread(), n + ".healthChecker");
1086 }
1087
1088
1089 this.rpcServer.openServer();
1090 this.rpcServerOpen = true;
1091 if (LOG.isTraceEnabled()) {
1092 LOG.trace("Started service threads");
1093 }
1094 }
1095
1096
1097
1098
1099
1100 boolean isRpcServerOpen() {
1101 return this.rpcServerOpen;
1102 }
1103
1104 private void stopServiceThreads() {
1105 if (LOG.isDebugEnabled()) {
1106 LOG.debug("Stopping service threads");
1107 }
1108 if (this.rpcServer != null) this.rpcServer.stop();
1109 this.rpcServerOpen = false;
1110
1111 if (this.logCleaner!= null) this.logCleaner.interrupt();
1112 if (this.hfileCleaner != null) this.hfileCleaner.interrupt();
1113
1114 if (this.infoServer != null) {
1115 LOG.info("Stopping infoServer");
1116 try {
1117 this.infoServer.stop();
1118 } catch (Exception ex) {
1119 ex.printStackTrace();
1120 }
1121 }
1122 if (this.executorService != null) this.executorService.shutdown();
1123 if (this.healthCheckChore != null) {
1124 this.healthCheckChore.interrupt();
1125 }
1126 }
1127
1128 private static Thread getAndStartClusterStatusChore(HMaster master) {
1129 if (master == null || master.balancer == null) {
1130 return null;
1131 }
1132 Chore chore = new ClusterStatusChore(master, master.balancer);
1133 return Threads.setDaemonThreadRunning(chore.getThread());
1134 }
1135
1136 private static Thread getAndStartBalancerChore(final HMaster master) {
1137
1138 Chore chore = new BalancerChore(master);
1139 return Threads.setDaemonThreadRunning(chore.getThread());
1140 }
1141
1142 private void stopChores() {
1143 if (this.balancerChore != null) {
1144 this.balancerChore.interrupt();
1145 }
1146 if (this.clusterStatusChore != null) {
1147 this.clusterStatusChore.interrupt();
1148 }
1149 if (this.catalogJanitorChore != null) {
1150 this.catalogJanitorChore.interrupt();
1151 }
1152 if (this.clusterStatusPublisherChore != null){
1153 clusterStatusPublisherChore.interrupt();
1154 }
1155 }
1156
1157 @Override
1158 public RegionServerStartupResponse regionServerStartup(
1159 RpcController controller, RegionServerStartupRequest request) throws ServiceException {
1160
1161 try {
1162 InetAddress ia = getRemoteInetAddress(request.getPort(), request.getServerStartCode());
1163 ServerName rs = this.serverManager.regionServerStartup(ia, request.getPort(),
1164 request.getServerStartCode(), request.getServerCurrentTime());
1165
1166
1167 RegionServerStartupResponse.Builder resp = createConfigurationSubset();
1168 NameStringPair.Builder entry = NameStringPair.newBuilder()
1169 .setName(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)
1170 .setValue(rs.getHostname());
1171 resp.addMapEntries(entry.build());
1172
1173 return resp.build();
1174 } catch (IOException ioe) {
1175 throw new ServiceException(ioe);
1176 }
1177 }
1178
1179
1180
1181
1182
1183 InetAddress getRemoteInetAddress(final int port, final long serverStartCode)
1184 throws UnknownHostException {
1185
1186
1187 return RpcServer.getRemoteIp();
1188 }
1189
1190
1191
1192
1193
1194 protected RegionServerStartupResponse.Builder createConfigurationSubset() {
1195 RegionServerStartupResponse.Builder resp = addConfig(
1196 RegionServerStartupResponse.newBuilder(), HConstants.HBASE_DIR);
1197 return addConfig(resp, "fs.default.name");
1198 }
1199
1200 private RegionServerStartupResponse.Builder addConfig(
1201 final RegionServerStartupResponse.Builder resp, final String key) {
1202 NameStringPair.Builder entry = NameStringPair.newBuilder()
1203 .setName(key)
1204 .setValue(this.conf.get(key));
1205 resp.addMapEntries(entry.build());
1206 return resp;
1207 }
1208
1209 @Override
1210 public GetLastFlushedSequenceIdResponse getLastFlushedSequenceId(RpcController controller,
1211 GetLastFlushedSequenceIdRequest request) throws ServiceException {
1212 byte[] regionName = request.getRegionName().toByteArray();
1213 long seqId = serverManager.getLastFlushedSequenceId(regionName);
1214 return ResponseConverter.buildGetLastFlushedSequenceIdResponse(seqId);
1215 }
1216
1217 @Override
1218 public RegionServerReportResponse regionServerReport(
1219 RpcController controller, RegionServerReportRequest request) throws ServiceException {
1220 try {
1221 HBaseProtos.ServerLoad sl = request.getLoad();
1222 this.serverManager.regionServerReport(ProtobufUtil.toServerName(request.getServer()), new ServerLoad(sl));
1223 if (sl != null && this.metricsMaster != null) {
1224
1225 this.metricsMaster.incrementRequests(sl.getTotalNumberOfRequests());
1226 }
1227 } catch (IOException ioe) {
1228 throw new ServiceException(ioe);
1229 }
1230
1231 return RegionServerReportResponse.newBuilder().build();
1232 }
1233
1234 @Override
1235 public ReportRSFatalErrorResponse reportRSFatalError(
1236 RpcController controller, ReportRSFatalErrorRequest request) throws ServiceException {
1237 String errorText = request.getErrorMessage();
1238 ServerName sn = ProtobufUtil.toServerName(request.getServer());
1239 String msg = "Region server " + sn +
1240 " reported a fatal error:\n" + errorText;
1241 LOG.error(msg);
1242 rsFatals.add(msg);
1243
1244 return ReportRSFatalErrorResponse.newBuilder().build();
1245 }
1246
1247 public boolean isMasterRunning() {
1248 return !isStopped();
1249 }
1250
1251 public IsMasterRunningResponse isMasterRunning(RpcController c, IsMasterRunningRequest req)
1252 throws ServiceException {
1253 return IsMasterRunningResponse.newBuilder().setIsMasterRunning(isMasterRunning()).build();
1254 }
1255
1256 @Override
1257 public CatalogScanResponse runCatalogScan(RpcController c,
1258 CatalogScanRequest req) throws ServiceException {
1259 try {
1260 return ResponseConverter.buildCatalogScanResponse(catalogJanitorChore.scan());
1261 } catch (IOException ioe) {
1262 throw new ServiceException(ioe);
1263 }
1264 }
1265
1266 @Override
1267 public EnableCatalogJanitorResponse enableCatalogJanitor(RpcController c,
1268 EnableCatalogJanitorRequest req) throws ServiceException {
1269 return EnableCatalogJanitorResponse.newBuilder().
1270 setPrevValue(catalogJanitorChore.setEnabled(req.getEnable())).build();
1271 }
1272
1273 @Override
1274 public IsCatalogJanitorEnabledResponse isCatalogJanitorEnabled(RpcController c,
1275 IsCatalogJanitorEnabledRequest req) throws ServiceException {
1276 boolean isEnabled = catalogJanitorChore != null ? catalogJanitorChore.getEnabled() : false;
1277 return IsCatalogJanitorEnabledResponse.newBuilder().setValue(isEnabled).build();
1278 }
1279
1280
1281
1282
1283 private int getBalancerCutoffTime() {
1284 int balancerCutoffTime =
1285 getConfiguration().getInt("hbase.balancer.max.balancing", -1);
1286 if (balancerCutoffTime == -1) {
1287
1288 int balancerPeriod =
1289 getConfiguration().getInt("hbase.balancer.period", 300000);
1290 balancerCutoffTime = balancerPeriod;
1291
1292 if (balancerCutoffTime <= 0) balancerCutoffTime = balancerPeriod;
1293 }
1294 return balancerCutoffTime;
1295 }
1296
1297 public boolean balance() {
1298
1299 if (!this.initialized) {
1300 LOG.debug("Master has not been initialized, don't run balancer.");
1301 return false;
1302 }
1303
1304 if (!this.loadBalancerTracker.isBalancerOn()) return false;
1305
1306 int maximumBalanceTime = getBalancerCutoffTime();
1307 boolean balancerRan;
1308 synchronized (this.balancer) {
1309
1310 if (this.assignmentManager.getRegionStates().isRegionsInTransition()) {
1311 Map<String, RegionState> regionsInTransition =
1312 this.assignmentManager.getRegionStates().getRegionsInTransition();
1313 LOG.debug("Not running balancer because " + regionsInTransition.size() +
1314 " region(s) in transition: " + org.apache.commons.lang.StringUtils.
1315 abbreviate(regionsInTransition.toString(), 256));
1316 return false;
1317 }
1318 if (this.serverManager.areDeadServersInProgress()) {
1319 LOG.debug("Not running balancer because processing dead regionserver(s): " +
1320 this.serverManager.getDeadServers());
1321 return false;
1322 }
1323
1324 if (this.cpHost != null) {
1325 try {
1326 if (this.cpHost.preBalance()) {
1327 LOG.debug("Coprocessor bypassing balancer request");
1328 return false;
1329 }
1330 } catch (IOException ioe) {
1331 LOG.error("Error invoking master coprocessor preBalance()", ioe);
1332 return false;
1333 }
1334 }
1335
1336 Map<String, Map<ServerName, List<HRegionInfo>>> assignmentsByTable =
1337 this.assignmentManager.getRegionStates().getAssignmentsByTable();
1338
1339 List<RegionPlan> plans = new ArrayList<RegionPlan>();
1340
1341 this.balancer.setClusterStatus(getClusterStatus());
1342 for (Map<ServerName, List<HRegionInfo>> assignments : assignmentsByTable.values()) {
1343 List<RegionPlan> partialPlans = this.balancer.balanceCluster(assignments);
1344 if (partialPlans != null) plans.addAll(partialPlans);
1345 }
1346 long cutoffTime = System.currentTimeMillis() + maximumBalanceTime;
1347 int rpCount = 0;
1348 long totalRegPlanExecTime = 0;
1349 balancerRan = plans != null;
1350 if (plans != null && !plans.isEmpty()) {
1351 for (RegionPlan plan: plans) {
1352 LOG.info("balance " + plan);
1353 long balStartTime = System.currentTimeMillis();
1354
1355 this.assignmentManager.balance(plan);
1356 totalRegPlanExecTime += System.currentTimeMillis()-balStartTime;
1357 rpCount++;
1358 if (rpCount < plans.size() &&
1359
1360 (System.currentTimeMillis() + (totalRegPlanExecTime / rpCount)) > cutoffTime) {
1361
1362 LOG.debug("No more balancing till next balance run; maximumBalanceTime=" +
1363 maximumBalanceTime);
1364 break;
1365 }
1366 }
1367 }
1368 if (this.cpHost != null) {
1369 try {
1370 this.cpHost.postBalance(rpCount < plans.size() ? plans.subList(0, rpCount) : plans);
1371 } catch (IOException ioe) {
1372
1373 LOG.error("Error invoking master coprocessor postBalance()", ioe);
1374 }
1375 }
1376 }
1377 return balancerRan;
1378 }
1379
1380 @Override
1381 public BalanceResponse balance(RpcController c, BalanceRequest request) throws ServiceException {
1382 return BalanceResponse.newBuilder().setBalancerRan(balance()).build();
1383 }
1384
1385 enum BalanceSwitchMode {
1386 SYNC,
1387 ASYNC
1388 }
1389
1390
1391
1392
1393
1394
1395 public boolean switchBalancer(final boolean b, BalanceSwitchMode mode) throws IOException {
1396 boolean oldValue = this.loadBalancerTracker.isBalancerOn();
1397 boolean newValue = b;
1398 try {
1399 if (this.cpHost != null) {
1400 newValue = this.cpHost.preBalanceSwitch(newValue);
1401 }
1402 try {
1403 if (mode == BalanceSwitchMode.SYNC) {
1404 synchronized (this.balancer) {
1405 this.loadBalancerTracker.setBalancerOn(newValue);
1406 }
1407 } else {
1408 this.loadBalancerTracker.setBalancerOn(newValue);
1409 }
1410 } catch (KeeperException ke) {
1411 throw new IOException(ke);
1412 }
1413 LOG.info("BalanceSwitch=" + newValue);
1414 if (this.cpHost != null) {
1415 this.cpHost.postBalanceSwitch(oldValue, newValue);
1416 }
1417 } catch (IOException ioe) {
1418 LOG.warn("Error flipping balance switch", ioe);
1419 }
1420 return oldValue;
1421 }
1422
1423 public boolean synchronousBalanceSwitch(final boolean b) throws IOException {
1424 return switchBalancer(b, BalanceSwitchMode.SYNC);
1425 }
1426
1427 public boolean balanceSwitch(final boolean b) throws IOException {
1428 return switchBalancer(b, BalanceSwitchMode.ASYNC);
1429 }
1430
1431 @Override
1432 public SetBalancerRunningResponse setBalancerRunning(
1433 RpcController controller, SetBalancerRunningRequest req) throws ServiceException {
1434 try {
1435 boolean prevValue = (req.getSynchronous())?
1436 synchronousBalanceSwitch(req.getOn()):balanceSwitch(req.getOn());
1437 return SetBalancerRunningResponse.newBuilder().setPrevBalanceValue(prevValue).build();
1438 } catch (IOException ioe) {
1439 throw new ServiceException(ioe);
1440 }
1441 }
1442
1443
1444
1445
1446
1447
1448
1449 public void setCatalogJanitorEnabled(final boolean b) {
1450 this.catalogJanitorChore.setEnabled(b);
1451 }
1452
1453 @Override
1454 public DispatchMergingRegionsResponse dispatchMergingRegions(
1455 RpcController controller, DispatchMergingRegionsRequest request)
1456 throws ServiceException {
1457 final byte[] encodedNameOfRegionA = request.getRegionA().getValue()
1458 .toByteArray();
1459 final byte[] encodedNameOfRegionB = request.getRegionB().getValue()
1460 .toByteArray();
1461 final boolean forcible = request.getForcible();
1462 if (request.getRegionA().getType() != RegionSpecifierType.ENCODED_REGION_NAME
1463 || request.getRegionB().getType() != RegionSpecifierType.ENCODED_REGION_NAME) {
1464 LOG.warn("mergeRegions specifier type: expected: "
1465 + RegionSpecifierType.ENCODED_REGION_NAME + " actual: region_a="
1466 + request.getRegionA().getType() + ", region_b="
1467 + request.getRegionB().getType());
1468 }
1469 RegionState regionStateA = assignmentManager.getRegionStates()
1470 .getRegionState(Bytes.toString(encodedNameOfRegionA));
1471 RegionState regionStateB = assignmentManager.getRegionStates()
1472 .getRegionState(Bytes.toString(encodedNameOfRegionB));
1473 if (regionStateA == null || regionStateB == null) {
1474 throw new ServiceException(new UnknownRegionException(
1475 Bytes.toStringBinary(regionStateA == null ? encodedNameOfRegionA
1476 : encodedNameOfRegionB)));
1477 }
1478
1479 if (!forcible && !HRegionInfo.areAdjacent(regionStateA.getRegion(),
1480 regionStateB.getRegion())) {
1481 throw new ServiceException("Unable to merge not adjacent regions "
1482 + regionStateA.getRegion().getRegionNameAsString() + ", "
1483 + regionStateB.getRegion().getRegionNameAsString()
1484 + " where forcible = " + forcible);
1485 }
1486
1487 try {
1488 dispatchMergingRegions(regionStateA.getRegion(), regionStateB.getRegion(), forcible);
1489 } catch (IOException ioe) {
1490 throw new ServiceException(ioe);
1491 }
1492
1493 return DispatchMergingRegionsResponse.newBuilder().build();
1494 }
1495
1496 @Override
1497 public void dispatchMergingRegions(final HRegionInfo region_a,
1498 final HRegionInfo region_b, final boolean forcible) throws IOException {
1499 checkInitialized();
1500 this.executorService.submit(new DispatchMergingRegionHandler(this,
1501 this.catalogJanitorChore, region_a, region_b, forcible));
1502 }
1503
1504 @Override
1505 public MoveRegionResponse moveRegion(RpcController controller, MoveRegionRequest req)
1506 throws ServiceException {
1507 final byte [] encodedRegionName = req.getRegion().getValue().toByteArray();
1508 RegionSpecifierType type = req.getRegion().getType();
1509 final byte [] destServerName = (req.hasDestServerName())?
1510 Bytes.toBytes(ProtobufUtil.toServerName(req.getDestServerName()).getServerName()):null;
1511 MoveRegionResponse mrr = MoveRegionResponse.newBuilder().build();
1512
1513 if (type != RegionSpecifierType.ENCODED_REGION_NAME) {
1514 LOG.warn("moveRegion specifier type: expected: " + RegionSpecifierType.ENCODED_REGION_NAME
1515 + " actual: " + type);
1516 }
1517
1518 try {
1519 move(encodedRegionName, destServerName);
1520 } catch (HBaseIOException ioe) {
1521 throw new ServiceException(ioe);
1522 }
1523 return mrr;
1524 }
1525
1526 void move(final byte[] encodedRegionName,
1527 final byte[] destServerName) throws HBaseIOException {
1528 RegionState regionState = assignmentManager.getRegionStates().
1529 getRegionState(Bytes.toString(encodedRegionName));
1530 if (regionState == null) {
1531 throw new UnknownRegionException(Bytes.toStringBinary(encodedRegionName));
1532 }
1533
1534 HRegionInfo hri = regionState.getRegion();
1535 ServerName dest;
1536 if (destServerName == null || destServerName.length == 0) {
1537 LOG.info("Passed destination servername is null/empty so " +
1538 "choosing a server at random");
1539 final List<ServerName> destServers = this.serverManager.createDestinationServersList(
1540 regionState.getServerName());
1541 dest = balancer.randomAssignment(hri, destServers);
1542 } else {
1543 dest = new ServerName(Bytes.toString(destServerName));
1544 if (dest.equals(regionState.getServerName())) {
1545 LOG.debug("Skipping move of region " + hri.getRegionNameAsString()
1546 + " because region already assigned to the same server " + dest + ".");
1547 return;
1548 }
1549 }
1550
1551
1552 RegionPlan rp = new RegionPlan(hri, regionState.getServerName(), dest);
1553
1554 try {
1555 checkInitialized();
1556 if (this.cpHost != null) {
1557 if (this.cpHost.preMove(hri, rp.getSource(), rp.getDestination())) {
1558 return;
1559 }
1560 }
1561 LOG.info("Added move plan " + rp + ", running balancer");
1562 this.assignmentManager.balance(rp);
1563 if (this.cpHost != null) {
1564 this.cpHost.postMove(hri, rp.getSource(), rp.getDestination());
1565 }
1566 } catch (IOException ioe) {
1567 if (ioe instanceof HBaseIOException) {
1568 throw (HBaseIOException)ioe;
1569 }
1570 throw new HBaseIOException(ioe);
1571 }
1572 }
1573
1574 @Override
1575 public void createTable(HTableDescriptor hTableDescriptor,
1576 byte [][] splitKeys)
1577 throws IOException {
1578 if (!isMasterRunning()) {
1579 throw new MasterNotRunningException();
1580 }
1581
1582 HRegionInfo [] newRegions = getHRegionInfos(hTableDescriptor, splitKeys);
1583 checkInitialized();
1584 checkCompression(hTableDescriptor);
1585 if (cpHost != null) {
1586 cpHost.preCreateTable(hTableDescriptor, newRegions);
1587 }
1588
1589 this.executorService.submit(new CreateTableHandler(this,
1590 this.fileSystemManager, hTableDescriptor, conf,
1591 newRegions, this).prepare());
1592 if (cpHost != null) {
1593 cpHost.postCreateTable(hTableDescriptor, newRegions);
1594 }
1595
1596 }
1597
1598 private void checkCompression(final HTableDescriptor htd)
1599 throws IOException {
1600 if (!this.masterCheckCompression) return;
1601 for (HColumnDescriptor hcd : htd.getColumnFamilies()) {
1602 checkCompression(hcd);
1603 }
1604 }
1605
1606 private void checkCompression(final HColumnDescriptor hcd)
1607 throws IOException {
1608 if (!this.masterCheckCompression) return;
1609 CompressionTest.testCompression(hcd.getCompression());
1610 CompressionTest.testCompression(hcd.getCompactionCompression());
1611 }
1612
1613 @Override
1614 public CreateTableResponse createTable(RpcController controller, CreateTableRequest req)
1615 throws ServiceException {
1616 HTableDescriptor hTableDescriptor = HTableDescriptor.convert(req.getTableSchema());
1617 byte [][] splitKeys = ProtobufUtil.getSplitKeysArray(req);
1618 try {
1619 createTable(hTableDescriptor,splitKeys);
1620 } catch (IOException ioe) {
1621 throw new ServiceException(ioe);
1622 }
1623 return CreateTableResponse.newBuilder().build();
1624 }
1625
1626 private HRegionInfo[] getHRegionInfos(HTableDescriptor hTableDescriptor,
1627 byte[][] splitKeys) {
1628 HRegionInfo[] hRegionInfos = null;
1629 if (splitKeys == null || splitKeys.length == 0) {
1630 hRegionInfos = new HRegionInfo[]{
1631 new HRegionInfo(hTableDescriptor.getName(), null, null)};
1632 } else {
1633 int numRegions = splitKeys.length + 1;
1634 hRegionInfos = new HRegionInfo[numRegions];
1635 byte[] startKey = null;
1636 byte[] endKey = null;
1637 for (int i = 0; i < numRegions; i++) {
1638 endKey = (i == splitKeys.length) ? null : splitKeys[i];
1639 hRegionInfos[i] =
1640 new HRegionInfo(hTableDescriptor.getName(), startKey, endKey);
1641 startKey = endKey;
1642 }
1643 }
1644 return hRegionInfos;
1645 }
1646
1647 private static boolean isCatalogTable(final byte [] tableName) {
1648 return Bytes.equals(tableName, HConstants.META_TABLE_NAME);
1649 }
1650
1651 @Override
1652 public void deleteTable(final byte[] tableName) throws IOException {
1653 checkInitialized();
1654 if (cpHost != null) {
1655 cpHost.preDeleteTable(tableName);
1656 }
1657 this.executorService.submit(new DeleteTableHandler(tableName, this, this).prepare());
1658 if (cpHost != null) {
1659 cpHost.postDeleteTable(tableName);
1660 }
1661 }
1662
1663 @Override
1664 public DeleteTableResponse deleteTable(RpcController controller, DeleteTableRequest request)
1665 throws ServiceException {
1666 try {
1667 deleteTable(request.getTableName().toByteArray());
1668 } catch (IOException ioe) {
1669 throw new ServiceException(ioe);
1670 }
1671 return DeleteTableResponse.newBuilder().build();
1672 }
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682 @Override
1683 public GetSchemaAlterStatusResponse getSchemaAlterStatus(
1684 RpcController controller, GetSchemaAlterStatusRequest req) throws ServiceException {
1685
1686
1687
1688
1689 byte [] tableName = req.getTableName().toByteArray();
1690
1691 try {
1692 Pair<Integer,Integer> pair = this.assignmentManager.getReopenStatus(tableName);
1693 GetSchemaAlterStatusResponse.Builder ret = GetSchemaAlterStatusResponse.newBuilder();
1694 ret.setYetToUpdateRegions(pair.getFirst());
1695 ret.setTotalRegions(pair.getSecond());
1696 return ret.build();
1697 } catch (IOException ioe) {
1698 throw new ServiceException(ioe);
1699 }
1700 }
1701
1702 @Override
1703 public void addColumn(final byte[] tableName, final HColumnDescriptor column)
1704 throws IOException {
1705 checkInitialized();
1706 if (cpHost != null) {
1707 if (cpHost.preAddColumn(tableName, column)) {
1708 return;
1709 }
1710 }
1711
1712 new TableAddFamilyHandler(tableName, column, this, this)
1713 .prepare().process();
1714 if (cpHost != null) {
1715 cpHost.postAddColumn(tableName, column);
1716 }
1717 }
1718
1719 @Override
1720 public AddColumnResponse addColumn(RpcController controller, AddColumnRequest req)
1721 throws ServiceException {
1722 try {
1723 addColumn(req.getTableName().toByteArray(),
1724 HColumnDescriptor.convert(req.getColumnFamilies()));
1725 } catch (IOException ioe) {
1726 throw new ServiceException(ioe);
1727 }
1728 return AddColumnResponse.newBuilder().build();
1729 }
1730
1731 @Override
1732 public void modifyColumn(byte[] tableName, HColumnDescriptor descriptor)
1733 throws IOException {
1734 checkInitialized();
1735 checkCompression(descriptor);
1736 if (cpHost != null) {
1737 if (cpHost.preModifyColumn(tableName, descriptor)) {
1738 return;
1739 }
1740 }
1741 new TableModifyFamilyHandler(tableName, descriptor, this, this)
1742 .prepare().process();
1743 if (cpHost != null) {
1744 cpHost.postModifyColumn(tableName, descriptor);
1745 }
1746 }
1747
1748 @Override
1749 public ModifyColumnResponse modifyColumn(RpcController controller, ModifyColumnRequest req)
1750 throws ServiceException {
1751 try {
1752 modifyColumn(req.getTableName().toByteArray(),
1753 HColumnDescriptor.convert(req.getColumnFamilies()));
1754 } catch (IOException ioe) {
1755 throw new ServiceException(ioe);
1756 }
1757 return ModifyColumnResponse.newBuilder().build();
1758 }
1759
1760 @Override
1761 public void deleteColumn(final byte[] tableName, final byte[] columnName)
1762 throws IOException {
1763 checkInitialized();
1764 if (cpHost != null) {
1765 if (cpHost.preDeleteColumn(tableName, columnName)) {
1766 return;
1767 }
1768 }
1769 new TableDeleteFamilyHandler(tableName, columnName, this, this).prepare().process();
1770 if (cpHost != null) {
1771 cpHost.postDeleteColumn(tableName, columnName);
1772 }
1773 }
1774
1775 @Override
1776 public DeleteColumnResponse deleteColumn(RpcController controller, DeleteColumnRequest req)
1777 throws ServiceException {
1778 try {
1779 deleteColumn(req.getTableName().toByteArray(), req.getColumnName().toByteArray());
1780 } catch (IOException ioe) {
1781 throw new ServiceException(ioe);
1782 }
1783 return DeleteColumnResponse.newBuilder().build();
1784 }
1785
1786 @Override
1787 public void enableTable(final byte[] tableName) throws IOException {
1788 checkInitialized();
1789 if (cpHost != null) {
1790 cpHost.preEnableTable(tableName);
1791 }
1792 this.executorService.submit(new EnableTableHandler(this, tableName,
1793 catalogTracker, assignmentManager, tableLockManager, false).prepare());
1794 if (cpHost != null) {
1795 cpHost.postEnableTable(tableName);
1796 }
1797 }
1798
1799 @Override
1800 public EnableTableResponse enableTable(RpcController controller, EnableTableRequest request)
1801 throws ServiceException {
1802 try {
1803 enableTable(request.getTableName().toByteArray());
1804 } catch (IOException ioe) {
1805 throw new ServiceException(ioe);
1806 }
1807 return EnableTableResponse.newBuilder().build();
1808 }
1809
1810 @Override
1811 public void disableTable(final byte[] tableName) throws IOException {
1812 checkInitialized();
1813 if (cpHost != null) {
1814 cpHost.preDisableTable(tableName);
1815 }
1816 this.executorService.submit(new DisableTableHandler(this, tableName,
1817 catalogTracker, assignmentManager, tableLockManager, false).prepare());
1818 if (cpHost != null) {
1819 cpHost.postDisableTable(tableName);
1820 }
1821 }
1822
1823 @Override
1824 public DisableTableResponse disableTable(RpcController controller, DisableTableRequest request)
1825 throws ServiceException {
1826 try {
1827 disableTable(request.getTableName().toByteArray());
1828 } catch (IOException ioe) {
1829 throw new ServiceException(ioe);
1830 }
1831 return DisableTableResponse.newBuilder().build();
1832 }
1833
1834
1835
1836
1837
1838
1839
1840 Pair<HRegionInfo, ServerName> getTableRegionForRow(
1841 final byte [] tableName, final byte [] rowKey)
1842 throws IOException {
1843 final AtomicReference<Pair<HRegionInfo, ServerName>> result =
1844 new AtomicReference<Pair<HRegionInfo, ServerName>>(null);
1845
1846 MetaScannerVisitor visitor =
1847 new MetaScannerVisitorBase() {
1848 @Override
1849 public boolean processRow(Result data) throws IOException {
1850 if (data == null || data.size() <= 0) {
1851 return true;
1852 }
1853 Pair<HRegionInfo, ServerName> pair = HRegionInfo.getHRegionInfoAndServerName(data);
1854 if (pair == null) {
1855 return false;
1856 }
1857 if (!Bytes.equals(pair.getFirst().getTableName(), tableName)) {
1858 return false;
1859 }
1860 result.set(pair);
1861 return true;
1862 }
1863 };
1864
1865 MetaScanner.metaScan(conf, visitor, tableName, rowKey, 1);
1866 return result.get();
1867 }
1868
1869 @Override
1870 public void modifyTable(final byte[] tableName, final HTableDescriptor descriptor)
1871 throws IOException {
1872 checkInitialized();
1873 checkCompression(descriptor);
1874 if (cpHost != null) {
1875 cpHost.preModifyTable(tableName, descriptor);
1876 }
1877 new ModifyTableHandler(tableName, descriptor, this, this).prepare().process();
1878 if (cpHost != null) {
1879 cpHost.postModifyTable(tableName, descriptor);
1880 }
1881 }
1882
1883 @Override
1884 public ModifyTableResponse modifyTable(RpcController controller, ModifyTableRequest req)
1885 throws ServiceException {
1886 try {
1887 modifyTable(req.getTableName().toByteArray(),
1888 HTableDescriptor.convert(req.getTableSchema()));
1889 } catch (IOException ioe) {
1890 throw new ServiceException(ioe);
1891 }
1892 return ModifyTableResponse.newBuilder().build();
1893 }
1894
1895 @Override
1896 public void checkTableModifiable(final byte [] tableName)
1897 throws IOException, TableNotFoundException, TableNotDisabledException {
1898 String tableNameStr = Bytes.toString(tableName);
1899 if (isCatalogTable(tableName)) {
1900 throw new IOException("Can't modify catalog tables");
1901 }
1902 if (!MetaReader.tableExists(getCatalogTracker(), tableNameStr)) {
1903 throw new TableNotFoundException(tableNameStr);
1904 }
1905 if (!getAssignmentManager().getZKTable().
1906 isDisabledTable(Bytes.toString(tableName))) {
1907 throw new TableNotDisabledException(tableName);
1908 }
1909 }
1910
1911 @Override
1912 public GetClusterStatusResponse getClusterStatus(RpcController controller,
1913 GetClusterStatusRequest req)
1914 throws ServiceException {
1915 GetClusterStatusResponse.Builder response = GetClusterStatusResponse.newBuilder();
1916 response.setClusterStatus(getClusterStatus().convert());
1917 return response.build();
1918 }
1919
1920
1921
1922
1923 public ClusterStatus getClusterStatus() {
1924
1925 List<String> backupMasterStrings;
1926 try {
1927 backupMasterStrings = ZKUtil.listChildrenNoWatch(this.zooKeeper,
1928 this.zooKeeper.backupMasterAddressesZNode);
1929 } catch (KeeperException e) {
1930 LOG.warn(this.zooKeeper.prefix("Unable to list backup servers"), e);
1931 backupMasterStrings = new ArrayList<String>(0);
1932 }
1933 List<ServerName> backupMasters = new ArrayList<ServerName>(
1934 backupMasterStrings.size());
1935 for (String s: backupMasterStrings) {
1936 try {
1937 byte [] bytes =
1938 ZKUtil.getData(this.zooKeeper, ZKUtil.joinZNode(
1939 this.zooKeeper.backupMasterAddressesZNode, s));
1940 if (bytes != null) {
1941 ServerName sn;
1942 try {
1943 sn = ServerName.parseFrom(bytes);
1944 } catch (DeserializationException e) {
1945 LOG.warn("Failed parse, skipping registering backup server", e);
1946 continue;
1947 }
1948 backupMasters.add(sn);
1949 }
1950 } catch (KeeperException e) {
1951 LOG.warn(this.zooKeeper.prefix("Unable to get information about " +
1952 "backup servers"), e);
1953 }
1954 }
1955 Collections.sort(backupMasters, new Comparator<ServerName>() {
1956 public int compare(ServerName s1, ServerName s2) {
1957 return s1.getServerName().compareTo(s2.getServerName());
1958 }});
1959
1960 return new ClusterStatus(VersionInfo.getVersion(),
1961 this.fileSystemManager.getClusterId().toString(),
1962 this.serverManager.getOnlineServers(),
1963 this.serverManager.getDeadServers().copyServerNames(),
1964 this.serverName,
1965 backupMasters,
1966 this.assignmentManager.getRegionStates().getRegionsInTransition(),
1967 this.getCoprocessors(), this.loadBalancerTracker.isBalancerOn());
1968 }
1969
1970 public String getClusterId() {
1971 if (fileSystemManager == null) {
1972 return "";
1973 }
1974 ClusterId id = fileSystemManager.getClusterId();
1975 if (id == null) {
1976 return "";
1977 }
1978 return id.toString();
1979 }
1980
1981
1982
1983
1984
1985
1986
1987
1988 public static String getLoadedCoprocessors() {
1989 return CoprocessorHost.getLoadedCoprocessors().toString();
1990 }
1991
1992
1993
1994
1995 public long getMasterStartTime() {
1996 return masterStartTime;
1997 }
1998
1999
2000
2001
2002 public long getMasterActiveTime() {
2003 return masterActiveTime;
2004 }
2005
2006
2007
2008
2009 public String[] getCoprocessors() {
2010 Set<String> masterCoprocessors =
2011 getCoprocessorHost().getCoprocessors();
2012 return masterCoprocessors.toArray(new String[masterCoprocessors.size()]);
2013 }
2014
2015 @Override
2016 public void abort(final String msg, final Throwable t) {
2017 if (cpHost != null) {
2018
2019 LOG.fatal("Master server abort: loaded coprocessors are: " +
2020 getLoadedCoprocessors());
2021 }
2022
2023 if (abortNow(msg, t)) {
2024 if (t != null) LOG.fatal(msg, t);
2025 else LOG.fatal(msg);
2026 this.abort = true;
2027 stop("Aborting");
2028 }
2029 }
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048 private boolean tryRecoveringExpiredZKSession() throws InterruptedException,
2049 IOException, KeeperException, ExecutionException {
2050
2051 this.zooKeeper.unregisterAllListeners();
2052 this.zooKeeper.reconnectAfterExpiration();
2053
2054 Callable<Boolean> callable = new Callable<Boolean> () {
2055 public Boolean call() throws InterruptedException,
2056 IOException, KeeperException {
2057 MonitoredTask status =
2058 TaskMonitor.get().createStatus("Recovering expired ZK session");
2059 try {
2060 if (!becomeActiveMaster(status)) {
2061 return Boolean.FALSE;
2062 }
2063 serverShutdownHandlerEnabled = false;
2064 initialized = false;
2065 finishInitialization(status, true);
2066 return !stopped;
2067 } finally {
2068 status.cleanup();
2069 }
2070 }
2071 };
2072
2073 long timeout =
2074 conf.getLong("hbase.master.zksession.recover.timeout", 300000);
2075 java.util.concurrent.ExecutorService executor =
2076 Executors.newSingleThreadExecutor();
2077 Future<Boolean> result = executor.submit(callable);
2078 executor.shutdown();
2079 if (executor.awaitTermination(timeout, TimeUnit.MILLISECONDS)
2080 && result.isDone()) {
2081 Boolean recovered = result.get();
2082 if (recovered != null) {
2083 return recovered.booleanValue();
2084 }
2085 }
2086 executor.shutdownNow();
2087 return false;
2088 }
2089
2090
2091
2092
2093
2094
2095
2096
2097
2098 private boolean abortNow(final String msg, final Throwable t) {
2099 if (!this.isActiveMaster) {
2100 return true;
2101 }
2102 if (t != null && t instanceof KeeperException.SessionExpiredException) {
2103 try {
2104 LOG.info("Primary Master trying to recover from ZooKeeper session " +
2105 "expiry.");
2106 return !tryRecoveringExpiredZKSession();
2107 } catch (Throwable newT) {
2108 LOG.error("Primary master encountered unexpected exception while " +
2109 "trying to recover from ZooKeeper session" +
2110 " expiry. Proceeding with server abort.", newT);
2111 }
2112 }
2113 return true;
2114 }
2115
2116 @Override
2117 public ZooKeeperWatcher getZooKeeper() {
2118 return zooKeeper;
2119 }
2120
2121 @Override
2122 public MasterCoprocessorHost getCoprocessorHost() {
2123 return cpHost;
2124 }
2125
2126 @Override
2127 public ServerName getServerName() {
2128 return this.serverName;
2129 }
2130
2131 @Override
2132 public CatalogTracker getCatalogTracker() {
2133 return catalogTracker;
2134 }
2135
2136 @Override
2137 public AssignmentManager getAssignmentManager() {
2138 return this.assignmentManager;
2139 }
2140
2141 @Override
2142 public TableLockManager getTableLockManager() {
2143 return this.tableLockManager;
2144 }
2145
2146 public MemoryBoundedLogMessageBuffer getRegionServerFatalLogBuffer() {
2147 return rsFatals;
2148 }
2149
2150 public void shutdown() {
2151 if (spanReceiverHost != null) {
2152 spanReceiverHost.closeReceivers();
2153 }
2154 if (cpHost != null) {
2155 try {
2156 cpHost.preShutdown();
2157 } catch (IOException ioe) {
2158 LOG.error("Error call master coprocessor preShutdown()", ioe);
2159 }
2160 }
2161 if (mxBean != null) {
2162 MBeanUtil.unregisterMBean(mxBean);
2163 mxBean = null;
2164 }
2165 if (this.assignmentManager != null) this.assignmentManager.shutdown();
2166 if (this.serverManager != null) this.serverManager.shutdownCluster();
2167 try {
2168 if (this.clusterStatusTracker != null){
2169 this.clusterStatusTracker.setClusterDown();
2170 }
2171 } catch (KeeperException e) {
2172 LOG.error("ZooKeeper exception trying to set cluster as down in ZK", e);
2173 }
2174 }
2175
2176 @Override
2177 public ShutdownResponse shutdown(RpcController controller, ShutdownRequest request)
2178 throws ServiceException {
2179 shutdown();
2180 return ShutdownResponse.newBuilder().build();
2181 }
2182
2183 public void stopMaster() {
2184 if (cpHost != null) {
2185 try {
2186 cpHost.preStopMaster();
2187 } catch (IOException ioe) {
2188 LOG.error("Error call master coprocessor preStopMaster()", ioe);
2189 }
2190 }
2191 stop("Stopped by " + Thread.currentThread().getName());
2192 }
2193
2194 @Override
2195 public StopMasterResponse stopMaster(RpcController controller, StopMasterRequest request)
2196 throws ServiceException {
2197 stopMaster();
2198 return StopMasterResponse.newBuilder().build();
2199 }
2200
2201 @Override
2202 public void stop(final String why) {
2203 LOG.info(why);
2204 this.stopped = true;
2205
2206 stopSleeper.skipSleepCycle();
2207
2208 if (this.activeMasterManager != null) {
2209 synchronized (this.activeMasterManager.clusterHasActiveMaster) {
2210 this.activeMasterManager.clusterHasActiveMaster.notifyAll();
2211 }
2212 }
2213
2214
2215 if (this.catalogTracker != null && this.serverManager.getOnlineServers().isEmpty()) {
2216 this.catalogTracker.stop();
2217 }
2218 }
2219
2220 @Override
2221 public boolean isStopped() {
2222 return this.stopped;
2223 }
2224
2225 public boolean isAborted() {
2226 return this.abort;
2227 }
2228
2229 void checkInitialized() throws PleaseHoldException {
2230 if (!this.initialized) {
2231 throw new PleaseHoldException("Master is initializing");
2232 }
2233 }
2234
2235
2236
2237
2238
2239
2240
2241
2242
2243 public boolean isActiveMaster() {
2244 return isActiveMaster;
2245 }
2246
2247
2248
2249
2250
2251
2252
2253
2254
2255
2256 public boolean isInitialized() {
2257 return initialized;
2258 }
2259
2260
2261
2262
2263
2264
2265 public boolean isServerShutdownHandlerEnabled() {
2266 return this.serverShutdownHandlerEnabled;
2267 }
2268
2269
2270
2271
2272
2273 public boolean isInitializationStartsMetaRegionAssignment() {
2274 return this.initializationBeforeMetaAssignment;
2275 }
2276
2277 @Override
2278 public AssignRegionResponse assignRegion(RpcController controller, AssignRegionRequest req)
2279 throws ServiceException {
2280 try {
2281 final byte [] regionName = req.getRegion().getValue().toByteArray();
2282 RegionSpecifierType type = req.getRegion().getType();
2283 AssignRegionResponse arr = AssignRegionResponse.newBuilder().build();
2284
2285 checkInitialized();
2286 if (type != RegionSpecifierType.REGION_NAME) {
2287 LOG.warn("assignRegion specifier type: expected: " + RegionSpecifierType.REGION_NAME
2288 + " actual: " + type);
2289 }
2290 HRegionInfo regionInfo = assignmentManager.getRegionStates().getRegionInfo(regionName);
2291 if (regionInfo == null) throw new UnknownRegionException(Bytes.toString(regionName));
2292 if (cpHost != null) {
2293 if (cpHost.preAssign(regionInfo)) {
2294 return arr;
2295 }
2296 }
2297 assignmentManager.assign(regionInfo, true, true);
2298 if (cpHost != null) {
2299 cpHost.postAssign(regionInfo);
2300 }
2301
2302 return arr;
2303 } catch (IOException ioe) {
2304 throw new ServiceException(ioe);
2305 }
2306 }
2307
2308 public void assignRegion(HRegionInfo hri) {
2309 assignmentManager.assign(hri, true);
2310 }
2311
2312 @Override
2313 public UnassignRegionResponse unassignRegion(RpcController controller, UnassignRegionRequest req)
2314 throws ServiceException {
2315 try {
2316 final byte [] regionName = req.getRegion().getValue().toByteArray();
2317 RegionSpecifierType type = req.getRegion().getType();
2318 final boolean force = req.getForce();
2319 UnassignRegionResponse urr = UnassignRegionResponse.newBuilder().build();
2320
2321 checkInitialized();
2322 if (type != RegionSpecifierType.REGION_NAME) {
2323 LOG.warn("unassignRegion specifier type: expected: " + RegionSpecifierType.REGION_NAME
2324 + " actual: " + type);
2325 }
2326 Pair<HRegionInfo, ServerName> pair =
2327 MetaReader.getRegion(this.catalogTracker, regionName);
2328 if (pair == null) throw new UnknownRegionException(Bytes.toString(regionName));
2329 HRegionInfo hri = pair.getFirst();
2330 if (cpHost != null) {
2331 if (cpHost.preUnassign(hri, force)) {
2332 return urr;
2333 }
2334 }
2335 LOG.debug("Close region " + hri.getRegionNameAsString()
2336 + " on current location if it is online and reassign.force=" + force);
2337 this.assignmentManager.unassign(hri, force);
2338 if (!this.assignmentManager.getRegionStates().isRegionInTransition(hri)
2339 && !this.assignmentManager.getRegionStates().isRegionAssigned(hri)) {
2340 LOG.debug("Region " + hri.getRegionNameAsString()
2341 + " is not online on any region server, reassigning it.");
2342 assignRegion(hri);
2343 }
2344 if (cpHost != null) {
2345 cpHost.postUnassign(hri, force);
2346 }
2347
2348 return urr;
2349 } catch (IOException ioe) {
2350 throw new ServiceException(ioe);
2351 }
2352 }
2353
2354
2355
2356
2357
2358
2359
2360
2361
2362 public GetTableDescriptorsResponse getTableDescriptors(
2363 RpcController controller, GetTableDescriptorsRequest req) throws ServiceException {
2364 GetTableDescriptorsResponse.Builder builder = GetTableDescriptorsResponse.newBuilder();
2365 if (req.getTableNamesCount() == 0) {
2366
2367 Map<String, HTableDescriptor> descriptors = null;
2368 try {
2369 descriptors = this.tableDescriptors.getAll();
2370 } catch (IOException e) {
2371 LOG.warn("Failed getting all descriptors", e);
2372 }
2373 if (descriptors != null) {
2374 for (HTableDescriptor htd : descriptors.values()) {
2375 builder.addTableSchema(htd.convert());
2376 }
2377 }
2378 }
2379 else {
2380 for (String s: req.getTableNamesList()) {
2381 HTableDescriptor htd = null;
2382 try {
2383 htd = this.tableDescriptors.get(s);
2384 } catch (IOException e) {
2385 LOG.warn("Failed getting descriptor for " + s, e);
2386 }
2387 if (htd == null) continue;
2388 builder.addTableSchema(htd.convert());
2389 }
2390 }
2391 return builder.build();
2392 }
2393
2394
2395
2396
2397
2398
2399
2400 public double getAverageLoad() {
2401 if (this.assignmentManager == null) {
2402 return 0;
2403 }
2404
2405 RegionStates regionStates = this.assignmentManager.getRegionStates();
2406 if (regionStates == null) {
2407 return 0;
2408 }
2409 return regionStates.getAverageLoad();
2410 }
2411
2412
2413
2414
2415
2416
2417
2418
2419 @Override
2420 public OfflineRegionResponse offlineRegion(RpcController controller, OfflineRegionRequest request)
2421 throws ServiceException {
2422 final byte [] regionName = request.getRegion().getValue().toByteArray();
2423 RegionSpecifierType type = request.getRegion().getType();
2424 if (type != RegionSpecifierType.REGION_NAME) {
2425 LOG.warn("moveRegion specifier type: expected: " + RegionSpecifierType.REGION_NAME
2426 + " actual: " + type);
2427 }
2428
2429 try {
2430 Pair<HRegionInfo, ServerName> pair =
2431 MetaReader.getRegion(this.catalogTracker, regionName);
2432 if (pair == null) throw new UnknownRegionException(Bytes.toStringBinary(regionName));
2433 HRegionInfo hri = pair.getFirst();
2434 if (cpHost != null) {
2435 cpHost.preRegionOffline(hri);
2436 }
2437 this.assignmentManager.regionOffline(hri);
2438 if (cpHost != null) {
2439 cpHost.postRegionOffline(hri);
2440 }
2441 } catch (IOException ioe) {
2442 throw new ServiceException(ioe);
2443 }
2444 return OfflineRegionResponse.newBuilder().build();
2445 }
2446
2447 @Override
2448 public boolean registerService(Service instance) {
2449
2450
2451
2452 Descriptors.ServiceDescriptor serviceDesc = instance.getDescriptorForType();
2453 if (coprocessorServiceHandlers.containsKey(serviceDesc.getFullName())) {
2454 LOG.error("Coprocessor service "+serviceDesc.getFullName()+
2455 " already registered, rejecting request from "+instance
2456 );
2457 return false;
2458 }
2459
2460 coprocessorServiceHandlers.put(serviceDesc.getFullName(), instance);
2461 if (LOG.isDebugEnabled()) {
2462 LOG.debug("Registered master coprocessor service: service="+serviceDesc.getFullName());
2463 }
2464 return true;
2465 }
2466
2467 @Override
2468 public ClientProtos.CoprocessorServiceResponse execMasterService(final RpcController controller,
2469 final ClientProtos.CoprocessorServiceRequest request) throws ServiceException {
2470 try {
2471 ServerRpcController execController = new ServerRpcController();
2472
2473 ClientProtos.CoprocessorServiceCall call = request.getCall();
2474 String serviceName = call.getServiceName();
2475 String methodName = call.getMethodName();
2476 if (!coprocessorServiceHandlers.containsKey(serviceName)) {
2477 throw new UnknownProtocolException(null,
2478 "No registered master coprocessor service found for name "+serviceName);
2479 }
2480
2481 Service service = coprocessorServiceHandlers.get(serviceName);
2482 Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();
2483 Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
2484 if (methodDesc == null) {
2485 throw new UnknownProtocolException(service.getClass(),
2486 "Unknown method "+methodName+" called on master service "+serviceName);
2487 }
2488
2489
2490 Message execRequest = service.getRequestPrototype(methodDesc).newBuilderForType()
2491 .mergeFrom(call.getRequest()).build();
2492 final Message.Builder responseBuilder =
2493 service.getResponsePrototype(methodDesc).newBuilderForType();
2494 service.callMethod(methodDesc, execController, execRequest, new RpcCallback<Message>() {
2495 @Override
2496 public void run(Message message) {
2497 if (message != null) {
2498 responseBuilder.mergeFrom(message);
2499 }
2500 }
2501 });
2502 Message execResult = responseBuilder.build();
2503
2504 if (execController.getFailedOn() != null) {
2505 throw execController.getFailedOn();
2506 }
2507 ClientProtos.CoprocessorServiceResponse.Builder builder =
2508 ClientProtos.CoprocessorServiceResponse.newBuilder();
2509 builder.setRegion(RequestConverter.buildRegionSpecifier(
2510 RegionSpecifierType.REGION_NAME, HConstants.EMPTY_BYTE_ARRAY));
2511 builder.setValue(
2512 builder.getValueBuilder().setName(execResult.getClass().getName())
2513 .setValue(execResult.toByteString()));
2514 return builder.build();
2515 } catch (IOException ie) {
2516 throw new ServiceException(ie);
2517 }
2518 }
2519
2520
2521
2522
2523
2524
2525
2526 public static HMaster constructMaster(Class<? extends HMaster> masterClass,
2527 final Configuration conf) {
2528 try {
2529 Constructor<? extends HMaster> c =
2530 masterClass.getConstructor(Configuration.class);
2531 return c.newInstance(conf);
2532 } catch (InvocationTargetException ite) {
2533 Throwable target = ite.getTargetException() != null?
2534 ite.getTargetException(): ite;
2535 if (target.getCause() != null) target = target.getCause();
2536 throw new RuntimeException("Failed construction of Master: " +
2537 masterClass.toString(), target);
2538 } catch (Exception e) {
2539 throw new RuntimeException("Failed construction of Master: " +
2540 masterClass.toString() + ((e.getCause() != null)?
2541 e.getCause().getMessage(): ""), e);
2542 }
2543 }
2544
2545
2546
2547
2548 public static void main(String [] args) {
2549 VersionInfo.logVersion();
2550 new HMasterCommandLine(HMaster.class).doMain(args);
2551 }
2552
2553 public HFileCleaner getHFileCleaner() {
2554 return this.hfileCleaner;
2555 }
2556
2557
2558
2559
2560
2561 public SnapshotManager getSnapshotManagerForTesting() {
2562 return this.snapshotManager;
2563 }
2564
2565
2566
2567
2568
2569 @Override
2570 public TakeSnapshotResponse snapshot(RpcController controller, TakeSnapshotRequest request)
2571 throws ServiceException {
2572 try {
2573 this.snapshotManager.checkSnapshotSupport();
2574 } catch (UnsupportedOperationException e) {
2575 throw new ServiceException(e);
2576 }
2577
2578 LOG.debug("Submitting snapshot request for:" +
2579 ClientSnapshotDescriptionUtils.toString(request.getSnapshot()));
2580
2581 SnapshotDescription snapshot = SnapshotDescriptionUtils.validate(request.getSnapshot(),
2582 this.conf);
2583 try {
2584 snapshotManager.takeSnapshot(snapshot);
2585 } catch (IOException e) {
2586 throw new ServiceException(e);
2587 }
2588
2589
2590 long waitTime = SnapshotDescriptionUtils.getMaxMasterTimeout(conf, snapshot.getType(),
2591 SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME);
2592 return TakeSnapshotResponse.newBuilder().setExpectedTimeout(waitTime).build();
2593 }
2594
2595
2596
2597
2598 @Override
2599 public ListSnapshotResponse getCompletedSnapshots(RpcController controller,
2600 ListSnapshotRequest request) throws ServiceException {
2601 try {
2602 ListSnapshotResponse.Builder builder = ListSnapshotResponse.newBuilder();
2603 List<SnapshotDescription> snapshots = snapshotManager.getCompletedSnapshots();
2604
2605
2606 for (SnapshotDescription snapshot : snapshots) {
2607 builder.addSnapshots(snapshot);
2608 }
2609 return builder.build();
2610 } catch (IOException e) {
2611 throw new ServiceException(e);
2612 }
2613 }
2614
2615
2616
2617
2618
2619
2620
2621
2622 @Override
2623 public DeleteSnapshotResponse deleteSnapshot(RpcController controller,
2624 DeleteSnapshotRequest request) throws ServiceException {
2625 try {
2626 this.snapshotManager.checkSnapshotSupport();
2627 } catch (UnsupportedOperationException e) {
2628 throw new ServiceException(e);
2629 }
2630
2631 try {
2632 snapshotManager.deleteSnapshot(request.getSnapshot());
2633 return DeleteSnapshotResponse.newBuilder().build();
2634 } catch (IOException e) {
2635 throw new ServiceException(e);
2636 }
2637 }
2638
2639
2640
2641
2642
2643
2644
2645
2646 @Override
2647 public IsSnapshotDoneResponse isSnapshotDone(RpcController controller,
2648 IsSnapshotDoneRequest request) throws ServiceException {
2649 LOG.debug("Checking to see if snapshot from request:" +
2650 ClientSnapshotDescriptionUtils.toString(request.getSnapshot()) + " is done");
2651 try {
2652 IsSnapshotDoneResponse.Builder builder = IsSnapshotDoneResponse.newBuilder();
2653 boolean done = snapshotManager.isSnapshotDone(request.getSnapshot());
2654 builder.setDone(done);
2655 return builder.build();
2656 } catch (IOException e) {
2657 throw new ServiceException(e);
2658 }
2659 }
2660
2661
2662
2663
2664
2665
2666
2667
2668
2669
2670
2671
2672
2673
2674 @Override
2675 public RestoreSnapshotResponse restoreSnapshot(RpcController controller,
2676 RestoreSnapshotRequest request) throws ServiceException {
2677 try {
2678 this.snapshotManager.checkSnapshotSupport();
2679 } catch (UnsupportedOperationException e) {
2680 throw new ServiceException(e);
2681 }
2682
2683 try {
2684 SnapshotDescription reqSnapshot = request.getSnapshot();
2685 snapshotManager.restoreSnapshot(reqSnapshot);
2686 return RestoreSnapshotResponse.newBuilder().build();
2687 } catch (IOException e) {
2688 throw new ServiceException(e);
2689 }
2690 }
2691
2692
2693
2694
2695
2696
2697
2698
2699
2700
2701
2702 @Override
2703 public IsRestoreSnapshotDoneResponse isRestoreSnapshotDone(RpcController controller,
2704 IsRestoreSnapshotDoneRequest request) throws ServiceException {
2705 try {
2706 SnapshotDescription snapshot = request.getSnapshot();
2707 IsRestoreSnapshotDoneResponse.Builder builder = IsRestoreSnapshotDoneResponse.newBuilder();
2708 boolean done = snapshotManager.isRestoreDone(snapshot);
2709 builder.setDone(done);
2710 return builder.build();
2711 } catch (IOException e) {
2712 throw new ServiceException(e);
2713 }
2714 }
2715
2716 private boolean isHealthCheckerConfigured() {
2717 String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
2718 return org.apache.commons.lang.StringUtils.isNotBlank(healthScriptLocation);
2719 }
2720
2721 }