1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.master;
20
21 import java.io.IOException;
22 import java.net.InetAddress;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.HashMap;
26 import java.util.HashSet;
27 import java.util.Iterator;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Map.Entry;
31 import java.util.Set;
32 import java.util.concurrent.ConcurrentHashMap;
33 import java.util.concurrent.ConcurrentNavigableMap;
34 import java.util.concurrent.ConcurrentSkipListMap;
35 import java.util.concurrent.CopyOnWriteArrayList;
36
37 import org.apache.commons.logging.Log;
38 import org.apache.commons.logging.LogFactory;
39 import org.apache.hadoop.conf.Configuration;
40 import org.apache.hadoop.hbase.ClockOutOfSyncException;
41 import org.apache.hadoop.hbase.HConstants;
42 import org.apache.hadoop.hbase.HRegionInfo;
43 import org.apache.hadoop.hbase.NotServingRegionException;
44 import org.apache.hadoop.hbase.RegionLoad;
45 import org.apache.hadoop.hbase.Server;
46 import org.apache.hadoop.hbase.ServerLoad;
47 import org.apache.hadoop.hbase.ServerName;
48 import org.apache.hadoop.hbase.YouAreDeadException;
49 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
50 import org.apache.hadoop.hbase.classification.InterfaceAudience;
51 import org.apache.hadoop.hbase.client.ClusterConnection;
52 import org.apache.hadoop.hbase.client.ConnectionFactory;
53 import org.apache.hadoop.hbase.client.RetriesExhaustedException;
54 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
55 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
56 import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
57 import org.apache.hadoop.hbase.master.handler.MetaServerShutdownHandler;
58 import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler;
59 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
60 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
61 import org.apache.hadoop.hbase.protobuf.RequestConverter;
62 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
63 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
64 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
65 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
66 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
67 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
68 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
69 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
70 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
71 import org.apache.hadoop.hbase.regionserver.HRegionServer;
72 import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
73 import org.apache.hadoop.hbase.util.Bytes;
74 import org.apache.hadoop.hbase.util.Triple;
75 import org.apache.hadoop.hbase.util.RetryCounter;
76 import org.apache.hadoop.hbase.util.RetryCounterFactory;
77 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
78 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
79 import org.apache.zookeeper.KeeperException;
80
81 import com.google.common.annotations.VisibleForTesting;
82 import com.google.protobuf.ByteString;
83 import com.google.protobuf.ServiceException;
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107 @InterfaceAudience.Private
108 public class ServerManager {
109 public static final String WAIT_ON_REGIONSERVERS_MAXTOSTART =
110 "hbase.master.wait.on.regionservers.maxtostart";
111
112 public static final String WAIT_ON_REGIONSERVERS_MINTOSTART =
113 "hbase.master.wait.on.regionservers.mintostart";
114
115 public static final String WAIT_ON_REGIONSERVERS_TIMEOUT =
116 "hbase.master.wait.on.regionservers.timeout";
117
118 public static final String WAIT_ON_REGIONSERVERS_INTERVAL =
119 "hbase.master.wait.on.regionservers.interval";
120
121 private static final Log LOG = LogFactory.getLog(ServerManager.class);
122
123
124 private volatile boolean clusterShutdown = false;
125
126 private final ConcurrentNavigableMap<byte[], Long> flushedSequenceIdByRegion =
127 new ConcurrentSkipListMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
128
129 private final ConcurrentNavigableMap<byte[], ConcurrentNavigableMap<byte[], Long>>
130 storeFlushedSequenceIdsByRegion =
131 new ConcurrentSkipListMap<byte[], ConcurrentNavigableMap<byte[], Long>>(Bytes.BYTES_COMPARATOR);
132
133
134 private final ConcurrentHashMap<ServerName, ServerLoad> onlineServers =
135 new ConcurrentHashMap<ServerName, ServerLoad>();
136
137
138
139
140
141 private final Map<ServerName, AdminService.BlockingInterface> rsAdmins =
142 new HashMap<ServerName, AdminService.BlockingInterface>();
143
144
145
146
147
148 private final ArrayList<ServerName> drainingServers =
149 new ArrayList<ServerName>();
150
151 private final Server master;
152 private final MasterServices services;
153 private final ClusterConnection connection;
154
155 private final DeadServer deadservers = new DeadServer();
156
157 private final long maxSkew;
158 private final long warningSkew;
159
160 private final RetryCounterFactory pingRetryCounterFactory;
161 private final RpcControllerFactory rpcControllerFactory;
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179 private Set<ServerName> queuedDeadServers = new HashSet<ServerName>();
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196 private Map<ServerName, Boolean> requeuedDeadServers
197 = new ConcurrentHashMap<ServerName, Boolean>();
198
199
200 private List<ServerListener> listeners = new CopyOnWriteArrayList<ServerListener>();
201
202
203
204
205
206
207
208 public ServerManager(final Server master, final MasterServices services)
209 throws IOException {
210 this(master, services, true);
211 }
212
213 ServerManager(final Server master, final MasterServices services,
214 final boolean connect) throws IOException {
215 this.master = master;
216 this.services = services;
217 Configuration c = master.getConfiguration();
218 maxSkew = c.getLong("hbase.master.maxclockskew", 30000);
219 warningSkew = c.getLong("hbase.master.warningclockskew", 10000);
220 this.connection = connect ? (ClusterConnection)ConnectionFactory.createConnection(c) : null;
221 int pingMaxAttempts = Math.max(1, master.getConfiguration().getInt(
222 "hbase.master.maximum.ping.server.attempts", 10));
223 int pingSleepInterval = Math.max(1, master.getConfiguration().getInt(
224 "hbase.master.ping.server.retry.sleep.interval", 100));
225 this.pingRetryCounterFactory = new RetryCounterFactory(pingMaxAttempts, pingSleepInterval);
226 this.rpcControllerFactory = this.connection == null
227 ? null
228 : connection.getRpcControllerFactory();
229 }
230
231
232
233
234
235 public void registerListener(final ServerListener listener) {
236 this.listeners.add(listener);
237 }
238
239
240
241
242
243 public boolean unregisterListener(final ServerListener listener) {
244 return this.listeners.remove(listener);
245 }
246
247
248
249
250
251
252
253
254 ServerName regionServerStartup(RegionServerStartupRequest request, InetAddress ia)
255 throws IOException {
256
257
258
259
260
261
262
263
264 final String hostname = request.hasUseThisHostnameInstead() ?
265 request.getUseThisHostnameInstead() :ia.getHostName();
266 ServerName sn = ServerName.valueOf(hostname, request.getPort(),
267 request.getServerStartCode());
268 checkClockSkew(sn, request.getServerCurrentTime());
269 checkIsDead(sn, "STARTUP");
270 if (!checkAndRecordNewServer(sn, ServerLoad.EMPTY_SERVERLOAD)) {
271 LOG.warn("THIS SHOULD NOT HAPPEN, RegionServerStartup"
272 + " could not record the server: " + sn);
273 }
274 return sn;
275 }
276
277 private ConcurrentNavigableMap<byte[], Long> getOrCreateStoreFlushedSequenceId(
278 byte[] regionName) {
279 ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId =
280 storeFlushedSequenceIdsByRegion.get(regionName);
281 if (storeFlushedSequenceId != null) {
282 return storeFlushedSequenceId;
283 }
284 storeFlushedSequenceId = new ConcurrentSkipListMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
285 ConcurrentNavigableMap<byte[], Long> alreadyPut =
286 storeFlushedSequenceIdsByRegion.putIfAbsent(regionName, storeFlushedSequenceId);
287 return alreadyPut == null ? storeFlushedSequenceId : alreadyPut;
288 }
289
290
291
292
293
294 private void updateLastFlushedSequenceIds(ServerName sn, ServerLoad hsl) {
295 Map<byte[], RegionLoad> regionsLoad = hsl.getRegionsLoad();
296 for (Entry<byte[], RegionLoad> entry : regionsLoad.entrySet()) {
297 byte[] encodedRegionName = Bytes.toBytes(HRegionInfo.encodeRegionName(entry.getKey()));
298 Long existingValue = flushedSequenceIdByRegion.get(encodedRegionName);
299 long l = entry.getValue().getCompleteSequenceId();
300
301 if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue)) {
302 flushedSequenceIdByRegion.put(encodedRegionName, l);
303 } else if (l != HConstants.NO_SEQNUM && l < existingValue) {
304 LOG.warn("RegionServer " + sn + " indicates a last flushed sequence id ("
305 + l + ") that is less than the previous last flushed sequence id ("
306 + existingValue + ") for region " + Bytes.toString(entry.getKey()) + " Ignoring.");
307 }
308 ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId =
309 getOrCreateStoreFlushedSequenceId(encodedRegionName);
310 for (StoreSequenceId storeSeqId : entry.getValue().getStoreCompleteSequenceId()) {
311 byte[] family = storeSeqId.getFamilyName().toByteArray();
312 existingValue = storeFlushedSequenceId.get(family);
313 l = storeSeqId.getSequenceId();
314
315 if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue.longValue())) {
316 storeFlushedSequenceId.put(family, l);
317 }
318 }
319 }
320 }
321
322 void regionServerReport(ServerName sn,
323 ServerLoad sl) throws YouAreDeadException {
324 checkIsDead(sn, "REPORT");
325 if (null == this.onlineServers.replace(sn, sl)) {
326
327
328
329
330
331
332 if (!checkAndRecordNewServer(sn, sl)) {
333 LOG.info("RegionServerReport ignored, could not record the server: " + sn);
334 return;
335 }
336 }
337 updateLastFlushedSequenceIds(sn, sl);
338 }
339
340
341
342
343
344
345
346
347
348 boolean checkAndRecordNewServer(
349 final ServerName serverName, final ServerLoad sl) {
350 ServerName existingServer = null;
351 synchronized (this.onlineServers) {
352 existingServer = findServerWithSameHostnamePortWithLock(serverName);
353 if (existingServer != null && (existingServer.getStartcode() > serverName.getStartcode())) {
354 LOG.info("Server serverName=" + serverName + " rejected; we already have "
355 + existingServer.toString() + " registered with same hostname and port");
356 return false;
357 }
358 recordNewServerWithLock(serverName, sl);
359 }
360
361
362 if (!this.listeners.isEmpty()) {
363 for (ServerListener listener : this.listeners) {
364 listener.serverAdded(serverName);
365 }
366 }
367
368
369
370 if (existingServer != null && (existingServer.getStartcode() < serverName.getStartcode())) {
371 LOG.info("Triggering server recovery; existingServer " +
372 existingServer + " looks stale, new server:" + serverName);
373 expireServer(existingServer);
374 }
375 return true;
376 }
377
378
379
380
381
382
383
384
385
386 private void checkClockSkew(final ServerName serverName, final long serverCurrentTime)
387 throws ClockOutOfSyncException {
388 long skew = Math.abs(System.currentTimeMillis() - serverCurrentTime);
389 if (skew > maxSkew) {
390 String message = "Server " + serverName + " has been " +
391 "rejected; Reported time is too far out of sync with master. " +
392 "Time difference of " + skew + "ms > max allowed of " + maxSkew + "ms";
393 LOG.warn(message);
394 throw new ClockOutOfSyncException(message);
395 } else if (skew > warningSkew){
396 String message = "Reported time for server " + serverName + " is out of sync with master " +
397 "by " + skew + "ms. (Warning threshold is " + warningSkew + "ms; " +
398 "error threshold is " + maxSkew + "ms)";
399 LOG.warn(message);
400 }
401 }
402
403
404
405
406
407
408
409
410
411 private void checkIsDead(final ServerName serverName, final String what)
412 throws YouAreDeadException {
413 if (this.deadservers.isDeadServer(serverName)) {
414
415
416 String message = "Server " + what + " rejected; currently processing " +
417 serverName + " as dead server";
418 LOG.debug(message);
419 throw new YouAreDeadException(message);
420 }
421
422
423 if ((this.services == null || ((HMaster) this.services).isInitialized())
424 && this.deadservers.cleanPreviousInstance(serverName)) {
425
426
427 LOG.debug(what + ":" + " Server " + serverName + " came back up," +
428 " removed it from the dead servers list");
429 }
430 }
431
432
433
434
435
436 private ServerName findServerWithSameHostnamePortWithLock(
437 final ServerName serverName) {
438 for (ServerName sn: this.onlineServers.keySet()) {
439 if (ServerName.isSameHostnameAndPort(serverName, sn)) return sn;
440 }
441 return null;
442 }
443
444
445
446
447
448
449
450 @VisibleForTesting
451 void recordNewServerWithLock(final ServerName serverName, final ServerLoad sl) {
452 LOG.info("Registering server=" + serverName);
453 this.onlineServers.put(serverName, sl);
454 this.rsAdmins.remove(serverName);
455 }
456
457 public RegionStoreSequenceIds getLastFlushedSequenceId(byte[] encodedRegionName) {
458 RegionStoreSequenceIds.Builder builder = RegionStoreSequenceIds.newBuilder();
459 Long seqId = flushedSequenceIdByRegion.get(encodedRegionName);
460 builder.setLastFlushedSequenceId(seqId != null ? seqId.longValue() : HConstants.NO_SEQNUM);
461 Map<byte[], Long> storeFlushedSequenceId =
462 storeFlushedSequenceIdsByRegion.get(encodedRegionName);
463 if (storeFlushedSequenceId != null) {
464 for (Map.Entry<byte[], Long> entry : storeFlushedSequenceId.entrySet()) {
465 builder.addStoreSequenceId(StoreSequenceId.newBuilder()
466 .setFamilyName(ByteString.copyFrom(entry.getKey()))
467 .setSequenceId(entry.getValue().longValue()).build());
468 }
469 }
470 return builder.build();
471 }
472
473
474
475
476
477 public ServerLoad getLoad(final ServerName serverName) {
478 return this.onlineServers.get(serverName);
479 }
480
481
482
483
484
485
486
487 public double getAverageLoad() {
488 int totalLoad = 0;
489 int numServers = 0;
490 for (ServerLoad sl: this.onlineServers.values()) {
491 numServers++;
492 totalLoad += sl.getNumberOfRegions();
493 }
494 return numServers == 0 ? 0 :
495 (double)totalLoad / (double)numServers;
496 }
497
498
499 public int countOfRegionServers() {
500
501 return this.onlineServers.size();
502 }
503
504
505
506
507 public Map<ServerName, ServerLoad> getOnlineServers() {
508
509 synchronized (this.onlineServers) {
510 return Collections.unmodifiableMap(this.onlineServers);
511 }
512 }
513
514
515 public DeadServer getDeadServers() {
516 return this.deadservers;
517 }
518
519
520
521
522
523 public boolean areDeadServersInProgress() {
524 return this.deadservers.areDeadServersInProgress();
525 }
526
527 void letRegionServersShutdown() {
528 long previousLogTime = 0;
529 ServerName sn = master.getServerName();
530 ZooKeeperWatcher zkw = master.getZooKeeper();
531 int onlineServersCt;
532 while ((onlineServersCt = onlineServers.size()) > 0){
533
534 if (System.currentTimeMillis() > (previousLogTime + 1000)) {
535 Set<ServerName> remainingServers = onlineServers.keySet();
536 synchronized (onlineServers) {
537 if (remainingServers.size() == 1 && remainingServers.contains(sn)) {
538
539 return;
540 }
541 }
542 StringBuilder sb = new StringBuilder();
543
544 for (ServerName key : remainingServers) {
545 if (sb.length() > 0) {
546 sb.append(", ");
547 }
548 sb.append(key);
549 }
550 LOG.info("Waiting on regionserver(s) to go down " + sb.toString());
551 previousLogTime = System.currentTimeMillis();
552 }
553
554 try {
555 List<String> servers = ZKUtil.listChildrenNoWatch(zkw, zkw.rsZNode);
556 if (servers == null || servers.size() == 0 || (servers.size() == 1
557 && servers.contains(sn.toString()))) {
558 LOG.info("ZK shows there is only the master self online, exiting now");
559
560 break;
561 }
562 } catch (KeeperException ke) {
563 LOG.warn("Failed to list regionservers", ke);
564
565 break;
566 }
567 synchronized (onlineServers) {
568 try {
569 if (onlineServersCt == onlineServers.size()) onlineServers.wait(100);
570 } catch (InterruptedException ignored) {
571
572 }
573 }
574 }
575 }
576
577
578
579
580
581 public synchronized void expireServer(final ServerName serverName) {
582 if (serverName.equals(master.getServerName())) {
583 if (!(master.isAborted() || master.isStopped())) {
584 master.stop("We lost our znode?");
585 }
586 return;
587 }
588 if (!services.isServerShutdownHandlerEnabled()) {
589 LOG.info("Master doesn't enable ServerShutdownHandler during initialization, "
590 + "delay expiring server " + serverName);
591 this.queuedDeadServers.add(serverName);
592 return;
593 }
594 if (this.deadservers.isDeadServer(serverName)) {
595
596 LOG.warn("Expiration of " + serverName +
597 " but server shutdown already in progress");
598 return;
599 }
600 synchronized (onlineServers) {
601 if (!this.onlineServers.containsKey(serverName)) {
602 LOG.warn("Expiration of " + serverName + " but server not online");
603 }
604
605
606
607 this.deadservers.add(serverName);
608 this.onlineServers.remove(serverName);
609 onlineServers.notifyAll();
610 }
611 this.rsAdmins.remove(serverName);
612
613
614 if (this.clusterShutdown) {
615 LOG.info("Cluster shutdown set; " + serverName +
616 " expired; onlineServers=" + this.onlineServers.size());
617 if (this.onlineServers.isEmpty()) {
618 master.stop("Cluster shutdown set; onlineServer=0");
619 }
620 return;
621 }
622
623 boolean carryingMeta = services.getAssignmentManager().isCarryingMeta(serverName) ==
624 AssignmentManager.ServerHostRegion.HOSTING_REGION;
625 if (carryingMeta) {
626 this.services.getExecutorService().submit(new MetaServerShutdownHandler(this.master,
627 this.services, this.deadservers, serverName));
628 } else {
629 this.services.getExecutorService().submit(new ServerShutdownHandler(this.master,
630 this.services, this.deadservers, serverName, true));
631 }
632 LOG.debug("Added=" + serverName +
633 " to dead servers, submitted shutdown handler to be executed meta=" + carryingMeta);
634
635
636 if (!this.listeners.isEmpty()) {
637 for (ServerListener listener : this.listeners) {
638 listener.serverRemoved(serverName);
639 }
640 }
641 }
642
643 public synchronized void processDeadServer(final ServerName serverName) {
644 this.processDeadServer(serverName, false);
645 }
646
647 public synchronized void processDeadServer(final ServerName serverName, boolean shouldSplitWal) {
648
649
650
651
652
653
654
655
656 if (!services.getAssignmentManager().isFailoverCleanupDone()) {
657 requeuedDeadServers.put(serverName, shouldSplitWal);
658 return;
659 }
660
661 this.deadservers.add(serverName);
662 this.services.getExecutorService().submit(
663 new ServerShutdownHandler(this.master, this.services, this.deadservers, serverName,
664 shouldSplitWal));
665 }
666
667
668
669
670
671 synchronized void processQueuedDeadServers() {
672 if (!services.isServerShutdownHandlerEnabled()) {
673 LOG.info("Master hasn't enabled ServerShutdownHandler");
674 }
675 Iterator<ServerName> serverIterator = queuedDeadServers.iterator();
676 while (serverIterator.hasNext()) {
677 ServerName tmpServerName = serverIterator.next();
678 expireServer(tmpServerName);
679 serverIterator.remove();
680 requeuedDeadServers.remove(tmpServerName);
681 }
682
683 if (!services.getAssignmentManager().isFailoverCleanupDone()) {
684 LOG.info("AssignmentManager hasn't finished failover cleanup; waiting");
685 }
686
687 for(ServerName tmpServerName : requeuedDeadServers.keySet()){
688 processDeadServer(tmpServerName, requeuedDeadServers.get(tmpServerName));
689 }
690 requeuedDeadServers.clear();
691 }
692
693
694
695
696 public boolean removeServerFromDrainList(final ServerName sn) {
697
698
699
700 if (!this.isServerOnline(sn)) {
701 LOG.warn("Server " + sn + " is not currently online. " +
702 "Removing from draining list anyway, as requested.");
703 }
704
705 return this.drainingServers.remove(sn);
706 }
707
708
709
710
711 public boolean addServerToDrainList(final ServerName sn) {
712
713
714
715 if (!this.isServerOnline(sn)) {
716 LOG.warn("Server " + sn + " is not currently online. " +
717 "Ignoring request to add it to draining list.");
718 return false;
719 }
720
721
722 if (this.drainingServers.contains(sn)) {
723 LOG.warn("Server " + sn + " is already in the draining server list." +
724 "Ignoring request to add it again.");
725 return false;
726 }
727 return this.drainingServers.add(sn);
728 }
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743 public RegionOpeningState sendRegionOpen(final ServerName server,
744 HRegionInfo region, int versionOfOfflineNode, List<ServerName> favoredNodes)
745 throws IOException {
746 AdminService.BlockingInterface admin = getRsAdmin(server);
747 if (admin == null) {
748 LOG.warn("Attempting to send OPEN RPC to server " + server.toString() +
749 " failed because no RPC connection found to this server");
750 return RegionOpeningState.FAILED_OPENING;
751 }
752 OpenRegionRequest request = RequestConverter.buildOpenRegionRequest(server,
753 region, versionOfOfflineNode, favoredNodes,
754 (RecoveryMode.LOG_REPLAY == this.services.getMasterFileSystem().getLogRecoveryMode()));
755 try {
756 OpenRegionResponse response = admin.openRegion(null, request);
757 return ResponseConverter.getRegionOpeningState(response);
758 } catch (ServiceException se) {
759 throw ProtobufUtil.getRemoteException(se);
760 }
761 }
762
763
764
765
766
767
768
769
770
771
772 public List<RegionOpeningState> sendRegionOpen(ServerName server,
773 List<Triple<HRegionInfo, Integer, List<ServerName>>> regionOpenInfos)
774 throws IOException {
775 AdminService.BlockingInterface admin = getRsAdmin(server);
776 if (admin == null) {
777 LOG.warn("Attempting to send OPEN RPC to server " + server.toString() +
778 " failed because no RPC connection found to this server");
779 return null;
780 }
781
782 OpenRegionRequest request = RequestConverter.buildOpenRegionRequest(server, regionOpenInfos,
783 (RecoveryMode.LOG_REPLAY == this.services.getMasterFileSystem().getLogRecoveryMode()));
784 try {
785 OpenRegionResponse response = admin.openRegion(null, request);
786 return ResponseConverter.getRegionOpeningStateList(response);
787 } catch (ServiceException se) {
788 throw ProtobufUtil.getRemoteException(se);
789 }
790 }
791
792 private PayloadCarryingRpcController newRpcController() {
793 return rpcControllerFactory == null ? null : rpcControllerFactory.newController();
794 }
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810 public boolean sendRegionClose(ServerName server, HRegionInfo region,
811 int versionOfClosingNode, ServerName dest, boolean transitionInZK) throws IOException {
812 if (server == null) throw new NullPointerException("Passed server is null");
813 AdminService.BlockingInterface admin = getRsAdmin(server);
814 if (admin == null) {
815 throw new IOException("Attempting to send CLOSE RPC to server " +
816 server.toString() + " for region " +
817 region.getRegionNameAsString() +
818 " failed because no RPC connection found to this server");
819 }
820 PayloadCarryingRpcController controller = newRpcController();
821 return ProtobufUtil.closeRegion(controller, admin, server, region.getRegionName(),
822 versionOfClosingNode, dest, transitionInZK);
823 }
824
825 public boolean sendRegionClose(ServerName server,
826 HRegionInfo region, int versionOfClosingNode) throws IOException {
827 return sendRegionClose(server, region, versionOfClosingNode, null, true);
828 }
829
830
831
832
833
834
835
836
837
838 public void sendRegionWarmup(ServerName server,
839 HRegionInfo region) {
840 if (server == null) return;
841 try {
842 AdminService.BlockingInterface admin = getRsAdmin(server);
843 PayloadCarryingRpcController controller = newRpcController();
844 ProtobufUtil.warmupRegion(controller, admin, region);
845 } catch (IOException e) {
846 LOG.error("Received exception in RPC for warmup server:" +
847 server + "region: " + region +
848 "exception: " + e);
849 }
850 }
851
852
853
854
855
856 public static void closeRegionSilentlyAndWait(ClusterConnection connection,
857 ServerName server, HRegionInfo region, long timeout) throws IOException, InterruptedException {
858 AdminService.BlockingInterface rs = connection.getAdmin(server);
859 PayloadCarryingRpcController controller = connection.getRpcControllerFactory().newController();
860 try {
861 ProtobufUtil.closeRegion(controller, rs, server, region.getRegionName(), false);
862 } catch (IOException e) {
863 LOG.warn("Exception when closing region: " + region.getRegionNameAsString(), e);
864 }
865 long expiration = timeout + System.currentTimeMillis();
866 while (System.currentTimeMillis() < expiration) {
867 try {
868 HRegionInfo rsRegion =
869 ProtobufUtil.getRegionInfo(controller, rs, region.getRegionName());
870 if (rsRegion == null) return;
871 } catch (IOException ioe) {
872 if (ioe instanceof NotServingRegionException)
873 return;
874 LOG.warn("Exception when retrieving regioninfo from: "
875 + region.getRegionNameAsString(), ioe);
876 }
877 Thread.sleep(1000);
878 }
879 throw new IOException("Region " + region + " failed to close within"
880 + " timeout " + timeout);
881 }
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896 public void sendRegionsMerge(ServerName server, HRegionInfo region_a,
897 HRegionInfo region_b, boolean forcible) throws IOException {
898 if (server == null)
899 throw new NullPointerException("Passed server is null");
900 if (region_a == null || region_b == null)
901 throw new NullPointerException("Passed region is null");
902 AdminService.BlockingInterface admin = getRsAdmin(server);
903 if (admin == null) {
904 throw new IOException("Attempting to send MERGE REGIONS RPC to server "
905 + server.toString() + " for region "
906 + region_a.getRegionNameAsString() + ","
907 + region_b.getRegionNameAsString()
908 + " failed because no RPC connection found to this server");
909 }
910 PayloadCarryingRpcController controller = newRpcController();
911 ProtobufUtil.mergeRegions(controller, admin, region_a, region_b, forcible);
912 }
913
914
915
916
917 public boolean isServerReachable(ServerName server) {
918 if (server == null) throw new NullPointerException("Passed server is null");
919
920
921 RetryCounter retryCounter = pingRetryCounterFactory.create();
922 while (retryCounter.shouldRetry()) {
923 synchronized (this.onlineServers) {
924 if (this.deadservers.isDeadServer(server)) {
925 return false;
926 }
927 }
928 try {
929 PayloadCarryingRpcController controller = newRpcController();
930 AdminService.BlockingInterface admin = getRsAdmin(server);
931 if (admin != null) {
932 ServerInfo info = ProtobufUtil.getServerInfo(controller, admin);
933 return info != null && info.hasServerName()
934 && server.getStartcode() == info.getServerName().getStartCode();
935 }
936 } catch (IOException ioe) {
937 if (LOG.isDebugEnabled()) {
938 LOG.debug("Couldn't reach " + server + ", try=" + retryCounter.getAttemptTimes() + " of "
939 + retryCounter.getMaxAttempts(), ioe);
940 }
941 try {
942 retryCounter.sleepUntilNextRetry();
943 } catch(InterruptedException ie) {
944 Thread.currentThread().interrupt();
945 break;
946 }
947 }
948 }
949 return false;
950 }
951
952
953
954
955
956
957
958 private AdminService.BlockingInterface getRsAdmin(final ServerName sn)
959 throws IOException {
960 AdminService.BlockingInterface admin = this.rsAdmins.get(sn);
961 if (admin == null) {
962 LOG.debug("New admin connection to " + sn.toString());
963 if (sn.equals(master.getServerName()) && master instanceof HRegionServer) {
964
965 admin = ((HRegionServer)master).getRSRpcServices();
966 } else {
967 admin = this.connection.getAdmin(sn);
968 }
969 this.rsAdmins.put(sn, admin);
970 }
971 return admin;
972 }
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987 public void waitForRegionServers(MonitoredTask status)
988 throws InterruptedException {
989 final long interval = this.master.getConfiguration().
990 getLong(WAIT_ON_REGIONSERVERS_INTERVAL, 1500);
991 final long timeout = this.master.getConfiguration().
992 getLong(WAIT_ON_REGIONSERVERS_TIMEOUT, 4500);
993 int defaultMinToStart = 1;
994 if (BaseLoadBalancer.tablesOnMaster(master.getConfiguration())) {
995
996
997
998
999 defaultMinToStart = 2;
1000 }
1001 int minToStart = this.master.getConfiguration().
1002 getInt(WAIT_ON_REGIONSERVERS_MINTOSTART, defaultMinToStart);
1003 if (minToStart < 1) {
1004 LOG.warn(String.format(
1005 "The value of '%s' (%d) can not be less than 1, ignoring.",
1006 WAIT_ON_REGIONSERVERS_MINTOSTART, minToStart));
1007 minToStart = 1;
1008 }
1009 int maxToStart = this.master.getConfiguration().
1010 getInt(WAIT_ON_REGIONSERVERS_MAXTOSTART, Integer.MAX_VALUE);
1011 if (maxToStart < minToStart) {
1012 LOG.warn(String.format(
1013 "The value of '%s' (%d) is set less than '%s' (%d), ignoring.",
1014 WAIT_ON_REGIONSERVERS_MAXTOSTART, maxToStart,
1015 WAIT_ON_REGIONSERVERS_MINTOSTART, minToStart));
1016 maxToStart = Integer.MAX_VALUE;
1017 }
1018
1019 long now = System.currentTimeMillis();
1020 final long startTime = now;
1021 long slept = 0;
1022 long lastLogTime = 0;
1023 long lastCountChange = startTime;
1024 int count = countOfRegionServers();
1025 int oldCount = 0;
1026 while (!this.master.isStopped() && count < maxToStart
1027 && (lastCountChange+interval > now || timeout > slept || count < minToStart)) {
1028
1029 if (oldCount != count || lastLogTime+interval < now){
1030 lastLogTime = now;
1031 String msg =
1032 "Waiting for region servers count to settle; currently"+
1033 " checked in " + count + ", slept for " + slept + " ms," +
1034 " expecting minimum of " + minToStart + ", maximum of "+ maxToStart+
1035 ", timeout of "+timeout+" ms, interval of "+interval+" ms.";
1036 LOG.info(msg);
1037 status.setStatus(msg);
1038 }
1039
1040
1041 final long sleepTime = 50;
1042 Thread.sleep(sleepTime);
1043 now = System.currentTimeMillis();
1044 slept = now - startTime;
1045
1046 oldCount = count;
1047 count = countOfRegionServers();
1048 if (count != oldCount) {
1049 lastCountChange = now;
1050 }
1051 }
1052
1053 LOG.info("Finished waiting for region servers count to settle;" +
1054 " checked in " + count + ", slept for " + slept + " ms," +
1055 " expecting minimum of " + minToStart + ", maximum of "+ maxToStart+","+
1056 " master is "+ (this.master.isStopped() ? "stopped.": "running")
1057 );
1058 }
1059
1060
1061
1062
1063 public List<ServerName> getOnlineServersList() {
1064
1065
1066 return new ArrayList<ServerName>(this.onlineServers.keySet());
1067 }
1068
1069
1070
1071
1072 public List<ServerName> getDrainingServersList() {
1073 return new ArrayList<ServerName>(this.drainingServers);
1074 }
1075
1076
1077
1078
1079 Set<ServerName> getDeadNotExpiredServers() {
1080 return new HashSet<ServerName>(this.queuedDeadServers);
1081 }
1082
1083
1084
1085
1086
1087
1088 void removeRequeuedDeadServers() {
1089 requeuedDeadServers.clear();
1090 }
1091
1092
1093
1094
1095
1096 Map<ServerName, Boolean> getRequeuedDeadServers() {
1097 return Collections.unmodifiableMap(this.requeuedDeadServers);
1098 }
1099
1100 public boolean isServerOnline(ServerName serverName) {
1101 return serverName != null && onlineServers.containsKey(serverName);
1102 }
1103
1104
1105
1106
1107
1108
1109
1110 public synchronized boolean isServerDead(ServerName serverName) {
1111 return serverName == null || deadservers.isDeadServer(serverName)
1112 || queuedDeadServers.contains(serverName)
1113 || requeuedDeadServers.containsKey(serverName);
1114 }
1115
1116 public void shutdownCluster() {
1117 this.clusterShutdown = true;
1118 this.master.stop("Cluster shutdown requested");
1119 }
1120
1121 public boolean isClusterShutdown() {
1122 return this.clusterShutdown;
1123 }
1124
1125
1126
1127
1128 public void stop() {
1129 if (connection != null) {
1130 try {
1131 connection.close();
1132 } catch (IOException e) {
1133 LOG.error("Attempt to close connection to master failed", e);
1134 }
1135 }
1136 }
1137
1138
1139
1140
1141
1142
1143 public List<ServerName> createDestinationServersList(final ServerName serverToExclude){
1144 final List<ServerName> destServers = getOnlineServersList();
1145
1146 if (serverToExclude != null){
1147 destServers.remove(serverToExclude);
1148 }
1149
1150
1151 final List<ServerName> drainingServersCopy = getDrainingServersList();
1152 if (!drainingServersCopy.isEmpty()) {
1153 for (final ServerName server: drainingServersCopy) {
1154 destServers.remove(server);
1155 }
1156 }
1157
1158
1159 removeDeadNotExpiredServers(destServers);
1160 return destServers;
1161 }
1162
1163
1164
1165
1166 public List<ServerName> createDestinationServersList(){
1167 return createDestinationServersList(null);
1168 }
1169
1170
1171
1172
1173
1174
1175
1176 void removeDeadNotExpiredServers(List<ServerName> servers) {
1177 Set<ServerName> deadNotExpiredServersCopy = this.getDeadNotExpiredServers();
1178 if (!deadNotExpiredServersCopy.isEmpty()) {
1179 for (ServerName server : deadNotExpiredServersCopy) {
1180 LOG.debug("Removing dead but not expired server: " + server
1181 + " from eligible server pool.");
1182 servers.remove(server);
1183 }
1184 }
1185 }
1186
1187
1188
1189
1190 void clearDeadServersWithSameHostNameAndPortOfOnlineServer() {
1191 for (ServerName serverName : getOnlineServersList()) {
1192 deadservers.cleanAllPreviousInstances(serverName);
1193 }
1194 }
1195
1196
1197
1198
1199 public void removeRegion(final HRegionInfo regionInfo) {
1200 final byte[] encodedName = regionInfo.getEncodedNameAsBytes();
1201 storeFlushedSequenceIdsByRegion.remove(encodedName);
1202 flushedSequenceIdByRegion.remove(encodedName);
1203 }
1204
1205
1206
1207
1208 public void removeRegions(final List<HRegionInfo> regions) {
1209 for (HRegionInfo hri: regions) {
1210 removeRegion(hri);
1211 }
1212 }
1213 }