1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.master;
20
21 import java.io.IOException;
22 import java.net.InetAddress;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.HashMap;
26 import java.util.HashSet;
27 import java.util.Iterator;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Map.Entry;
31 import java.util.Set;
32 import java.util.concurrent.ConcurrentHashMap;
33 import java.util.concurrent.ConcurrentNavigableMap;
34 import java.util.concurrent.ConcurrentSkipListMap;
35 import java.util.concurrent.CopyOnWriteArrayList;
36
37 import org.apache.commons.logging.Log;
38 import org.apache.commons.logging.LogFactory;
39 import org.apache.hadoop.conf.Configuration;
40 import org.apache.hadoop.hbase.ClockOutOfSyncException;
41 import org.apache.hadoop.hbase.HConstants;
42 import org.apache.hadoop.hbase.HRegionInfo;
43 import org.apache.hadoop.hbase.NotServingRegionException;
44 import org.apache.hadoop.hbase.RegionLoad;
45 import org.apache.hadoop.hbase.Server;
46 import org.apache.hadoop.hbase.ServerLoad;
47 import org.apache.hadoop.hbase.ServerName;
48 import org.apache.hadoop.hbase.YouAreDeadException;
49 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
50 import org.apache.hadoop.hbase.classification.InterfaceAudience;
51 import org.apache.hadoop.hbase.client.ClusterConnection;
52 import org.apache.hadoop.hbase.client.ConnectionFactory;
53 import org.apache.hadoop.hbase.client.RetriesExhaustedException;
54 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
55 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
56 import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
57 import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
58 import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
59 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
60 import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
61 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
62 import org.apache.hadoop.hbase.protobuf.RequestConverter;
63 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
64 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
65 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
66 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
67 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ServerInfo;
68 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
69 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
70 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
71 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
72 import org.apache.hadoop.hbase.regionserver.HRegionServer;
73 import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
74 import org.apache.hadoop.hbase.util.Bytes;
75 import org.apache.hadoop.hbase.util.Triple;
76 import org.apache.hadoop.hbase.util.RetryCounter;
77 import org.apache.hadoop.hbase.util.RetryCounterFactory;
78 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
79 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
80 import org.apache.zookeeper.KeeperException;
81
82 import com.google.common.annotations.VisibleForTesting;
83 import com.google.protobuf.ByteString;
84 import com.google.protobuf.ServiceException;
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108 @InterfaceAudience.Private
109 public class ServerManager {
110 public static final String WAIT_ON_REGIONSERVERS_MAXTOSTART =
111 "hbase.master.wait.on.regionservers.maxtostart";
112
113 public static final String WAIT_ON_REGIONSERVERS_MINTOSTART =
114 "hbase.master.wait.on.regionservers.mintostart";
115
116 public static final String WAIT_ON_REGIONSERVERS_TIMEOUT =
117 "hbase.master.wait.on.regionservers.timeout";
118
119 public static final String WAIT_ON_REGIONSERVERS_INTERVAL =
120 "hbase.master.wait.on.regionservers.interval";
121
122 private static final Log LOG = LogFactory.getLog(ServerManager.class);
123
124
125 private volatile boolean clusterShutdown = false;
126
127
128
129
130 private final ConcurrentNavigableMap<byte[], Long> flushedSequenceIdByRegion =
131 new ConcurrentSkipListMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
132
133
134
135
136 private final ConcurrentNavigableMap<byte[], ConcurrentNavigableMap<byte[], Long>>
137 storeFlushedSequenceIdsByRegion =
138 new ConcurrentSkipListMap<byte[], ConcurrentNavigableMap<byte[], Long>>(Bytes.BYTES_COMPARATOR);
139
140
141 private final ConcurrentHashMap<ServerName, ServerLoad> onlineServers =
142 new ConcurrentHashMap<ServerName, ServerLoad>();
143
144
145
146
147
148 private final Map<ServerName, AdminService.BlockingInterface> rsAdmins =
149 new HashMap<ServerName, AdminService.BlockingInterface>();
150
151
152
153
154
155 private final ArrayList<ServerName> drainingServers =
156 new ArrayList<ServerName>();
157
158 private final Server master;
159 private final MasterServices services;
160 private final ClusterConnection connection;
161
162 private final DeadServer deadservers = new DeadServer();
163
164 private final long maxSkew;
165 private final long warningSkew;
166
167 private final RetryCounterFactory pingRetryCounterFactory;
168 private final RpcControllerFactory rpcControllerFactory;
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186 private Set<ServerName> queuedDeadServers = new HashSet<ServerName>();
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203 private Map<ServerName, Boolean> requeuedDeadServers
204 = new ConcurrentHashMap<ServerName, Boolean>();
205
206
207 private List<ServerListener> listeners = new CopyOnWriteArrayList<ServerListener>();
208
209
210
211
212
213
214
215 public ServerManager(final Server master, final MasterServices services)
216 throws IOException {
217 this(master, services, true);
218 }
219
220 ServerManager(final Server master, final MasterServices services,
221 final boolean connect) throws IOException {
222 this.master = master;
223 this.services = services;
224 Configuration c = master.getConfiguration();
225 maxSkew = c.getLong("hbase.master.maxclockskew", 30000);
226 warningSkew = c.getLong("hbase.master.warningclockskew", 10000);
227 this.connection = connect ? (ClusterConnection)ConnectionFactory.createConnection(c) : null;
228 int pingMaxAttempts = Math.max(1, master.getConfiguration().getInt(
229 "hbase.master.maximum.ping.server.attempts", 10));
230 int pingSleepInterval = Math.max(1, master.getConfiguration().getInt(
231 "hbase.master.ping.server.retry.sleep.interval", 100));
232 this.pingRetryCounterFactory = new RetryCounterFactory(pingMaxAttempts, pingSleepInterval);
233 this.rpcControllerFactory = this.connection == null
234 ? null
235 : connection.getRpcControllerFactory();
236 }
237
238
239
240
241
242 public void registerListener(final ServerListener listener) {
243 this.listeners.add(listener);
244 }
245
246
247
248
249
250 public boolean unregisterListener(final ServerListener listener) {
251 return this.listeners.remove(listener);
252 }
253
254
255
256
257
258
259
260
261 ServerName regionServerStartup(RegionServerStartupRequest request, InetAddress ia)
262 throws IOException {
263
264
265
266
267
268
269
270
271 final String hostname = request.hasUseThisHostnameInstead() ?
272 request.getUseThisHostnameInstead() :ia.getHostName();
273 ServerName sn = ServerName.valueOf(hostname, request.getPort(),
274 request.getServerStartCode());
275 checkClockSkew(sn, request.getServerCurrentTime());
276 checkIsDead(sn, "STARTUP");
277 if (!checkAndRecordNewServer(sn, ServerLoad.EMPTY_SERVERLOAD)) {
278 LOG.warn("THIS SHOULD NOT HAPPEN, RegionServerStartup"
279 + " could not record the server: " + sn);
280 }
281 return sn;
282 }
283
284 private ConcurrentNavigableMap<byte[], Long> getOrCreateStoreFlushedSequenceId(
285 byte[] regionName) {
286 ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId =
287 storeFlushedSequenceIdsByRegion.get(regionName);
288 if (storeFlushedSequenceId != null) {
289 return storeFlushedSequenceId;
290 }
291 storeFlushedSequenceId = new ConcurrentSkipListMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
292 ConcurrentNavigableMap<byte[], Long> alreadyPut =
293 storeFlushedSequenceIdsByRegion.putIfAbsent(regionName, storeFlushedSequenceId);
294 return alreadyPut == null ? storeFlushedSequenceId : alreadyPut;
295 }
296
297
298
299
300
301 private void updateLastFlushedSequenceIds(ServerName sn, ServerLoad hsl) {
302 Map<byte[], RegionLoad> regionsLoad = hsl.getRegionsLoad();
303 for (Entry<byte[], RegionLoad> entry : regionsLoad.entrySet()) {
304 byte[] encodedRegionName = Bytes.toBytes(HRegionInfo.encodeRegionName(entry.getKey()));
305 Long existingValue = flushedSequenceIdByRegion.get(encodedRegionName);
306 long l = entry.getValue().getCompleteSequenceId();
307
308 if (LOG.isTraceEnabled()) {
309 LOG.trace(Bytes.toString(encodedRegionName) + ", existingValue=" + existingValue +
310 ", completeSequenceId=" + l);
311 }
312 if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue)) {
313 flushedSequenceIdByRegion.put(encodedRegionName, l);
314 } else if (l != HConstants.NO_SEQNUM && l < existingValue) {
315 LOG.warn("RegionServer " + sn + " indicates a last flushed sequence id ("
316 + l + ") that is less than the previous last flushed sequence id ("
317 + existingValue + ") for region " + Bytes.toString(entry.getKey()) + " Ignoring.");
318 }
319 ConcurrentNavigableMap<byte[], Long> storeFlushedSequenceId =
320 getOrCreateStoreFlushedSequenceId(encodedRegionName);
321 for (StoreSequenceId storeSeqId : entry.getValue().getStoreCompleteSequenceId()) {
322 byte[] family = storeSeqId.getFamilyName().toByteArray();
323 existingValue = storeFlushedSequenceId.get(family);
324 l = storeSeqId.getSequenceId();
325 if (LOG.isTraceEnabled()) {
326 LOG.trace(Bytes.toString(encodedRegionName) + ", family=" + Bytes.toString(family) +
327 ", existingValue=" + existingValue + ", completeSequenceId=" + l);
328 }
329
330 if (existingValue == null || (l != HConstants.NO_SEQNUM && l > existingValue.longValue())) {
331 storeFlushedSequenceId.put(family, l);
332 }
333 }
334 }
335 }
336
337 void regionServerReport(ServerName sn,
338 ServerLoad sl) throws YouAreDeadException {
339 checkIsDead(sn, "REPORT");
340 if (null == this.onlineServers.replace(sn, sl)) {
341
342
343
344
345
346
347 if (!checkAndRecordNewServer(sn, sl)) {
348 LOG.info("RegionServerReport ignored, could not record the server: " + sn);
349 return;
350 }
351 }
352 updateLastFlushedSequenceIds(sn, sl);
353 }
354
355
356
357
358
359
360
361
362
363 boolean checkAndRecordNewServer(
364 final ServerName serverName, final ServerLoad sl) {
365 ServerName existingServer = null;
366 synchronized (this.onlineServers) {
367 existingServer = findServerWithSameHostnamePortWithLock(serverName);
368 if (existingServer != null && (existingServer.getStartcode() > serverName.getStartcode())) {
369 LOG.info("Server serverName=" + serverName + " rejected; we already have "
370 + existingServer.toString() + " registered with same hostname and port");
371 return false;
372 }
373 recordNewServerWithLock(serverName, sl);
374 }
375
376
377 if (!this.listeners.isEmpty()) {
378 for (ServerListener listener : this.listeners) {
379 listener.serverAdded(serverName);
380 }
381 }
382
383
384
385 if (existingServer != null && (existingServer.getStartcode() < serverName.getStartcode())) {
386 LOG.info("Triggering server recovery; existingServer " +
387 existingServer + " looks stale, new server:" + serverName);
388 expireServer(existingServer);
389 }
390 return true;
391 }
392
393
394
395
396
397
398
399
400
401 private void checkClockSkew(final ServerName serverName, final long serverCurrentTime)
402 throws ClockOutOfSyncException {
403 long skew = Math.abs(System.currentTimeMillis() - serverCurrentTime);
404 if (skew > maxSkew) {
405 String message = "Server " + serverName + " has been " +
406 "rejected; Reported time is too far out of sync with master. " +
407 "Time difference of " + skew + "ms > max allowed of " + maxSkew + "ms";
408 LOG.warn(message);
409 throw new ClockOutOfSyncException(message);
410 } else if (skew > warningSkew){
411 String message = "Reported time for server " + serverName + " is out of sync with master " +
412 "by " + skew + "ms. (Warning threshold is " + warningSkew + "ms; " +
413 "error threshold is " + maxSkew + "ms)";
414 LOG.warn(message);
415 }
416 }
417
418
419
420
421
422
423
424
425
426 private void checkIsDead(final ServerName serverName, final String what)
427 throws YouAreDeadException {
428 if (this.deadservers.isDeadServer(serverName)) {
429
430
431 String message = "Server " + what + " rejected; currently processing " +
432 serverName + " as dead server";
433 LOG.debug(message);
434 throw new YouAreDeadException(message);
435 }
436
437
438 if ((this.services == null || ((HMaster) this.services).isInitialized())
439 && this.deadservers.cleanPreviousInstance(serverName)) {
440
441
442 LOG.debug(what + ":" + " Server " + serverName + " came back up," +
443 " removed it from the dead servers list");
444 }
445 }
446
447
448
449
450
451 private ServerName findServerWithSameHostnamePortWithLock(
452 final ServerName serverName) {
453 for (ServerName sn: this.onlineServers.keySet()) {
454 if (ServerName.isSameHostnameAndPort(serverName, sn)) return sn;
455 }
456 return null;
457 }
458
459
460
461
462
463
464
465 @VisibleForTesting
466 void recordNewServerWithLock(final ServerName serverName, final ServerLoad sl) {
467 LOG.info("Registering server=" + serverName);
468 this.onlineServers.put(serverName, sl);
469 this.rsAdmins.remove(serverName);
470 }
471
472 public RegionStoreSequenceIds getLastFlushedSequenceId(byte[] encodedRegionName) {
473 RegionStoreSequenceIds.Builder builder = RegionStoreSequenceIds.newBuilder();
474 Long seqId = flushedSequenceIdByRegion.get(encodedRegionName);
475 builder.setLastFlushedSequenceId(seqId != null ? seqId.longValue() : HConstants.NO_SEQNUM);
476 Map<byte[], Long> storeFlushedSequenceId =
477 storeFlushedSequenceIdsByRegion.get(encodedRegionName);
478 if (storeFlushedSequenceId != null) {
479 for (Map.Entry<byte[], Long> entry : storeFlushedSequenceId.entrySet()) {
480 builder.addStoreSequenceId(StoreSequenceId.newBuilder()
481 .setFamilyName(ByteString.copyFrom(entry.getKey()))
482 .setSequenceId(entry.getValue().longValue()).build());
483 }
484 }
485 return builder.build();
486 }
487
488
489
490
491
492 public ServerLoad getLoad(final ServerName serverName) {
493 return this.onlineServers.get(serverName);
494 }
495
496
497
498
499
500
501
502 public double getAverageLoad() {
503 int totalLoad = 0;
504 int numServers = 0;
505 for (ServerLoad sl: this.onlineServers.values()) {
506 numServers++;
507 totalLoad += sl.getNumberOfRegions();
508 }
509 return numServers == 0 ? 0 :
510 (double)totalLoad / (double)numServers;
511 }
512
513
514 public int countOfRegionServers() {
515
516 return this.onlineServers.size();
517 }
518
519
520
521
522 public Map<ServerName, ServerLoad> getOnlineServers() {
523
524 synchronized (this.onlineServers) {
525 return Collections.unmodifiableMap(this.onlineServers);
526 }
527 }
528
529
530 public DeadServer getDeadServers() {
531 return this.deadservers;
532 }
533
534
535
536
537
538 public boolean areDeadServersInProgress() {
539 return this.deadservers.areDeadServersInProgress();
540 }
541
542 void letRegionServersShutdown() {
543 long previousLogTime = 0;
544 ServerName sn = master.getServerName();
545 ZooKeeperWatcher zkw = master.getZooKeeper();
546 int onlineServersCt;
547 while ((onlineServersCt = onlineServers.size()) > 0){
548
549 if (System.currentTimeMillis() > (previousLogTime + 1000)) {
550 Set<ServerName> remainingServers = onlineServers.keySet();
551 synchronized (onlineServers) {
552 if (remainingServers.size() == 1 && remainingServers.contains(sn)) {
553
554 return;
555 }
556 }
557 StringBuilder sb = new StringBuilder();
558
559 for (ServerName key : remainingServers) {
560 if (sb.length() > 0) {
561 sb.append(", ");
562 }
563 sb.append(key);
564 }
565 LOG.info("Waiting on regionserver(s) to go down " + sb.toString());
566 previousLogTime = System.currentTimeMillis();
567 }
568
569 try {
570 List<String> servers = ZKUtil.listChildrenNoWatch(zkw, zkw.rsZNode);
571 if (servers == null || servers.size() == 0 || (servers.size() == 1
572 && servers.contains(sn.toString()))) {
573 LOG.info("ZK shows there is only the master self online, exiting now");
574
575 break;
576 }
577 } catch (KeeperException ke) {
578 LOG.warn("Failed to list regionservers", ke);
579
580 break;
581 }
582 synchronized (onlineServers) {
583 try {
584 if (onlineServersCt == onlineServers.size()) onlineServers.wait(100);
585 } catch (InterruptedException ignored) {
586
587 }
588 }
589 }
590 }
591
592
593
594
595
596 public synchronized void expireServer(final ServerName serverName) {
597 if (serverName.equals(master.getServerName())) {
598 if (!(master.isAborted() || master.isStopped())) {
599 master.stop("We lost our znode?");
600 }
601 return;
602 }
603 if (!services.isServerCrashProcessingEnabled()) {
604 LOG.info("Master doesn't enable ServerShutdownHandler during initialization, "
605 + "delay expiring server " + serverName);
606 this.queuedDeadServers.add(serverName);
607 return;
608 }
609 if (this.deadservers.isDeadServer(serverName)) {
610
611 LOG.warn("Expiration of " + serverName +
612 " but server shutdown already in progress");
613 return;
614 }
615 moveFromOnelineToDeadServers(serverName);
616
617
618
619 if (this.clusterShutdown) {
620 LOG.info("Cluster shutdown set; " + serverName +
621 " expired; onlineServers=" + this.onlineServers.size());
622 if (this.onlineServers.isEmpty()) {
623 master.stop("Cluster shutdown set; onlineServer=0");
624 }
625 return;
626 }
627
628 boolean carryingMeta = services.getAssignmentManager().isCarryingMeta(serverName) ==
629 AssignmentManager.ServerHostRegion.HOSTING_REGION;
630 ProcedureExecutor<MasterProcedureEnv> procExec = this.services.getMasterProcedureExecutor();
631 procExec.submitProcedure(new ServerCrashProcedure(
632 procExec.getEnvironment(), serverName, true, carryingMeta));
633 LOG.debug("Added=" + serverName +
634 " to dead servers, submitted shutdown handler to be executed meta=" + carryingMeta);
635
636
637 if (!this.listeners.isEmpty()) {
638 for (ServerListener listener : this.listeners) {
639 listener.serverRemoved(serverName);
640 }
641 }
642 }
643
644 @VisibleForTesting
645 public void moveFromOnelineToDeadServers(final ServerName sn) {
646 synchronized (onlineServers) {
647 if (!this.onlineServers.containsKey(sn)) {
648 LOG.warn("Expiration of " + sn + " but server not online");
649 }
650
651
652
653 this.deadservers.add(sn);
654 this.onlineServers.remove(sn);
655 onlineServers.notifyAll();
656 }
657 this.rsAdmins.remove(sn);
658 }
659
660 public synchronized void processDeadServer(final ServerName serverName, boolean shouldSplitWal) {
661
662
663
664
665
666
667
668
669 if (!services.getAssignmentManager().isFailoverCleanupDone()) {
670 requeuedDeadServers.put(serverName, shouldSplitWal);
671 return;
672 }
673
674 this.deadservers.add(serverName);
675 ProcedureExecutor<MasterProcedureEnv> procExec = this.services.getMasterProcedureExecutor();
676 procExec.submitProcedure(new ServerCrashProcedure(
677 procExec.getEnvironment(), serverName, shouldSplitWal, false));
678 }
679
680
681
682
683
684 synchronized void processQueuedDeadServers() {
685 if (!services.isServerCrashProcessingEnabled()) {
686 LOG.info("Master hasn't enabled ServerShutdownHandler");
687 }
688 Iterator<ServerName> serverIterator = queuedDeadServers.iterator();
689 while (serverIterator.hasNext()) {
690 ServerName tmpServerName = serverIterator.next();
691 expireServer(tmpServerName);
692 serverIterator.remove();
693 requeuedDeadServers.remove(tmpServerName);
694 }
695
696 if (!services.getAssignmentManager().isFailoverCleanupDone()) {
697 LOG.info("AssignmentManager hasn't finished failover cleanup; waiting");
698 }
699 for (Map.Entry<ServerName, Boolean> entry : requeuedDeadServers.entrySet()) {
700 processDeadServer(entry.getKey(), entry.getValue());
701 }
702 requeuedDeadServers.clear();
703 }
704
705
706
707
708 public boolean removeServerFromDrainList(final ServerName sn) {
709
710
711
712 if (!this.isServerOnline(sn)) {
713 LOG.warn("Server " + sn + " is not currently online. " +
714 "Removing from draining list anyway, as requested.");
715 }
716
717 return this.drainingServers.remove(sn);
718 }
719
720
721
722
723 public boolean addServerToDrainList(final ServerName sn) {
724
725
726
727 if (!this.isServerOnline(sn)) {
728 LOG.warn("Server " + sn + " is not currently online. " +
729 "Ignoring request to add it to draining list.");
730 return false;
731 }
732
733
734 if (this.drainingServers.contains(sn)) {
735 LOG.warn("Server " + sn + " is already in the draining server list." +
736 "Ignoring request to add it again.");
737 return false;
738 }
739 return this.drainingServers.add(sn);
740 }
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755 public RegionOpeningState sendRegionOpen(final ServerName server,
756 HRegionInfo region, int versionOfOfflineNode, List<ServerName> favoredNodes)
757 throws IOException {
758 AdminService.BlockingInterface admin = getRsAdmin(server);
759 if (admin == null) {
760 LOG.warn("Attempting to send OPEN RPC to server " + server.toString() +
761 " failed because no RPC connection found to this server");
762 return RegionOpeningState.FAILED_OPENING;
763 }
764 OpenRegionRequest request = RequestConverter.buildOpenRegionRequest(server,
765 region, versionOfOfflineNode, favoredNodes,
766 (RecoveryMode.LOG_REPLAY == this.services.getMasterFileSystem().getLogRecoveryMode()));
767 try {
768 OpenRegionResponse response = admin.openRegion(null, request);
769 return ResponseConverter.getRegionOpeningState(response);
770 } catch (ServiceException se) {
771 throw ProtobufUtil.getRemoteException(se);
772 }
773 }
774
775
776
777
778
779
780
781
782
783
784 public List<RegionOpeningState> sendRegionOpen(ServerName server,
785 List<Triple<HRegionInfo, Integer, List<ServerName>>> regionOpenInfos)
786 throws IOException {
787 AdminService.BlockingInterface admin = getRsAdmin(server);
788 if (admin == null) {
789 LOG.warn("Attempting to send OPEN RPC to server " + server.toString() +
790 " failed because no RPC connection found to this server");
791 return null;
792 }
793
794 OpenRegionRequest request = RequestConverter.buildOpenRegionRequest(server, regionOpenInfos,
795 (RecoveryMode.LOG_REPLAY == this.services.getMasterFileSystem().getLogRecoveryMode()));
796 try {
797 OpenRegionResponse response = admin.openRegion(null, request);
798 return ResponseConverter.getRegionOpeningStateList(response);
799 } catch (ServiceException se) {
800 throw ProtobufUtil.getRemoteException(se);
801 }
802 }
803
804 private PayloadCarryingRpcController newRpcController() {
805 return rpcControllerFactory == null ? null : rpcControllerFactory.newController();
806 }
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822 public boolean sendRegionClose(ServerName server, HRegionInfo region,
823 int versionOfClosingNode, ServerName dest, boolean transitionInZK) throws IOException {
824 if (server == null) throw new NullPointerException("Passed server is null");
825 AdminService.BlockingInterface admin = getRsAdmin(server);
826 if (admin == null) {
827 throw new IOException("Attempting to send CLOSE RPC to server " +
828 server.toString() + " for region " +
829 region.getRegionNameAsString() +
830 " failed because no RPC connection found to this server");
831 }
832 PayloadCarryingRpcController controller = newRpcController();
833 return ProtobufUtil.closeRegion(controller, admin, server, region.getRegionName(),
834 versionOfClosingNode, dest, transitionInZK);
835 }
836
837 public boolean sendRegionClose(ServerName server,
838 HRegionInfo region, int versionOfClosingNode) throws IOException {
839 return sendRegionClose(server, region, versionOfClosingNode, null, true);
840 }
841
842
843
844
845
846
847
848
849
850 public void sendRegionWarmup(ServerName server,
851 HRegionInfo region) {
852 if (server == null) return;
853 try {
854 AdminService.BlockingInterface admin = getRsAdmin(server);
855 PayloadCarryingRpcController controller = newRpcController();
856 ProtobufUtil.warmupRegion(controller, admin, region);
857 } catch (IOException e) {
858 LOG.error("Received exception in RPC for warmup server:" +
859 server + "region: " + region +
860 "exception: " + e);
861 }
862 }
863
864
865
866
867
868 public static void closeRegionSilentlyAndWait(ClusterConnection connection,
869 ServerName server, HRegionInfo region, long timeout) throws IOException, InterruptedException {
870 AdminService.BlockingInterface rs = connection.getAdmin(server);
871 PayloadCarryingRpcController controller = connection.getRpcControllerFactory().newController();
872 try {
873 ProtobufUtil.closeRegion(controller, rs, server, region.getRegionName(), false);
874 } catch (IOException e) {
875 LOG.warn("Exception when closing region: " + region.getRegionNameAsString(), e);
876 }
877 long expiration = timeout + System.currentTimeMillis();
878 while (System.currentTimeMillis() < expiration) {
879 try {
880 HRegionInfo rsRegion =
881 ProtobufUtil.getRegionInfo(controller, rs, region.getRegionName());
882 if (rsRegion == null) return;
883 } catch (IOException ioe) {
884 if (ioe instanceof NotServingRegionException)
885 return;
886 LOG.warn("Exception when retrieving regioninfo from: "
887 + region.getRegionNameAsString(), ioe);
888 }
889 Thread.sleep(1000);
890 }
891 throw new IOException("Region " + region + " failed to close within"
892 + " timeout " + timeout);
893 }
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908 public void sendRegionsMerge(ServerName server, HRegionInfo region_a,
909 HRegionInfo region_b, boolean forcible) throws IOException {
910 if (server == null)
911 throw new NullPointerException("Passed server is null");
912 if (region_a == null || region_b == null)
913 throw new NullPointerException("Passed region is null");
914 AdminService.BlockingInterface admin = getRsAdmin(server);
915 if (admin == null) {
916 throw new IOException("Attempting to send MERGE REGIONS RPC to server "
917 + server.toString() + " for region "
918 + region_a.getRegionNameAsString() + ","
919 + region_b.getRegionNameAsString()
920 + " failed because no RPC connection found to this server");
921 }
922 PayloadCarryingRpcController controller = newRpcController();
923 ProtobufUtil.mergeRegions(controller, admin, region_a, region_b, forcible);
924 }
925
926
927
928
929 public boolean isServerReachable(ServerName server) {
930 if (server == null) throw new NullPointerException("Passed server is null");
931
932
933 RetryCounter retryCounter = pingRetryCounterFactory.create();
934 while (retryCounter.shouldRetry()) {
935 synchronized (this.onlineServers) {
936 if (this.deadservers.isDeadServer(server)) {
937 return false;
938 }
939 }
940 try {
941 PayloadCarryingRpcController controller = newRpcController();
942 AdminService.BlockingInterface admin = getRsAdmin(server);
943 if (admin != null) {
944 ServerInfo info = ProtobufUtil.getServerInfo(controller, admin);
945 return info != null && info.hasServerName()
946 && server.getStartcode() == info.getServerName().getStartCode();
947 }
948 } catch (IOException ioe) {
949 if (LOG.isDebugEnabled()) {
950 LOG.debug("Couldn't reach " + server + ", try=" + retryCounter.getAttemptTimes() + " of "
951 + retryCounter.getMaxAttempts(), ioe);
952 }
953 try {
954 retryCounter.sleepUntilNextRetry();
955 } catch(InterruptedException ie) {
956 Thread.currentThread().interrupt();
957 break;
958 }
959 }
960 }
961 return false;
962 }
963
964
965
966
967
968
969
970 private AdminService.BlockingInterface getRsAdmin(final ServerName sn)
971 throws IOException {
972 AdminService.BlockingInterface admin = this.rsAdmins.get(sn);
973 if (admin == null) {
974 LOG.debug("New admin connection to " + sn.toString());
975 if (sn.equals(master.getServerName()) && master instanceof HRegionServer) {
976
977 admin = ((HRegionServer)master).getRSRpcServices();
978 } else {
979 admin = this.connection.getAdmin(sn);
980 }
981 this.rsAdmins.put(sn, admin);
982 }
983 return admin;
984 }
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999 public void waitForRegionServers(MonitoredTask status)
1000 throws InterruptedException {
1001 final long interval = this.master.getConfiguration().
1002 getLong(WAIT_ON_REGIONSERVERS_INTERVAL, 1500);
1003 final long timeout = this.master.getConfiguration().
1004 getLong(WAIT_ON_REGIONSERVERS_TIMEOUT, 4500);
1005 int defaultMinToStart = 1;
1006 if (BaseLoadBalancer.tablesOnMaster(master.getConfiguration())) {
1007
1008
1009
1010
1011 defaultMinToStart = 2;
1012 }
1013 int minToStart = this.master.getConfiguration().
1014 getInt(WAIT_ON_REGIONSERVERS_MINTOSTART, defaultMinToStart);
1015 if (minToStart < 1) {
1016 LOG.warn(String.format(
1017 "The value of '%s' (%d) can not be less than 1, ignoring.",
1018 WAIT_ON_REGIONSERVERS_MINTOSTART, minToStart));
1019 minToStart = 1;
1020 }
1021 int maxToStart = this.master.getConfiguration().
1022 getInt(WAIT_ON_REGIONSERVERS_MAXTOSTART, Integer.MAX_VALUE);
1023 if (maxToStart < minToStart) {
1024 LOG.warn(String.format(
1025 "The value of '%s' (%d) is set less than '%s' (%d), ignoring.",
1026 WAIT_ON_REGIONSERVERS_MAXTOSTART, maxToStart,
1027 WAIT_ON_REGIONSERVERS_MINTOSTART, minToStart));
1028 maxToStart = Integer.MAX_VALUE;
1029 }
1030
1031 long now = System.currentTimeMillis();
1032 final long startTime = now;
1033 long slept = 0;
1034 long lastLogTime = 0;
1035 long lastCountChange = startTime;
1036 int count = countOfRegionServers();
1037 int oldCount = 0;
1038 while (!this.master.isStopped() && count < maxToStart
1039 && (lastCountChange+interval > now || timeout > slept || count < minToStart)) {
1040
1041 if (oldCount != count || lastLogTime+interval < now){
1042 lastLogTime = now;
1043 String msg =
1044 "Waiting for region servers count to settle; currently"+
1045 " checked in " + count + ", slept for " + slept + " ms," +
1046 " expecting minimum of " + minToStart + ", maximum of "+ maxToStart+
1047 ", timeout of "+timeout+" ms, interval of "+interval+" ms.";
1048 LOG.info(msg);
1049 status.setStatus(msg);
1050 }
1051
1052
1053 final long sleepTime = 50;
1054 Thread.sleep(sleepTime);
1055 now = System.currentTimeMillis();
1056 slept = now - startTime;
1057
1058 oldCount = count;
1059 count = countOfRegionServers();
1060 if (count != oldCount) {
1061 lastCountChange = now;
1062 }
1063 }
1064
1065 LOG.info("Finished waiting for region servers count to settle;" +
1066 " checked in " + count + ", slept for " + slept + " ms," +
1067 " expecting minimum of " + minToStart + ", maximum of "+ maxToStart+","+
1068 " master is "+ (this.master.isStopped() ? "stopped.": "running")
1069 );
1070 }
1071
1072
1073
1074
1075 public List<ServerName> getOnlineServersList() {
1076
1077
1078 return new ArrayList<ServerName>(this.onlineServers.keySet());
1079 }
1080
1081
1082
1083
1084 public List<ServerName> getDrainingServersList() {
1085 return new ArrayList<ServerName>(this.drainingServers);
1086 }
1087
1088
1089
1090
1091 Set<ServerName> getDeadNotExpiredServers() {
1092 return new HashSet<ServerName>(this.queuedDeadServers);
1093 }
1094
1095
1096
1097
1098
1099
1100 void removeRequeuedDeadServers() {
1101 requeuedDeadServers.clear();
1102 }
1103
1104
1105
1106
1107
1108 Map<ServerName, Boolean> getRequeuedDeadServers() {
1109 return Collections.unmodifiableMap(this.requeuedDeadServers);
1110 }
1111
1112 public boolean isServerOnline(ServerName serverName) {
1113 return serverName != null && onlineServers.containsKey(serverName);
1114 }
1115
1116
1117
1118
1119
1120 public boolean isServerWithSameHostnamePortOnline(final ServerName serverName) {
1121 return findServerWithSameHostnamePortWithLock(serverName) != null;
1122 }
1123
1124
1125
1126
1127
1128
1129
1130 public synchronized boolean isServerDead(ServerName serverName) {
1131 return serverName == null || deadservers.isDeadServer(serverName)
1132 || queuedDeadServers.contains(serverName)
1133 || requeuedDeadServers.containsKey(serverName);
1134 }
1135
1136 public void shutdownCluster() {
1137 this.clusterShutdown = true;
1138 this.master.stop("Cluster shutdown requested");
1139 }
1140
1141 public boolean isClusterShutdown() {
1142 return this.clusterShutdown;
1143 }
1144
1145
1146
1147
1148 public void stop() {
1149 if (connection != null) {
1150 try {
1151 connection.close();
1152 } catch (IOException e) {
1153 LOG.error("Attempt to close connection to master failed", e);
1154 }
1155 }
1156 }
1157
1158
1159
1160
1161
1162
1163 public List<ServerName> createDestinationServersList(final ServerName serverToExclude){
1164 final List<ServerName> destServers = getOnlineServersList();
1165
1166 if (serverToExclude != null){
1167 destServers.remove(serverToExclude);
1168 }
1169
1170
1171 final List<ServerName> drainingServersCopy = getDrainingServersList();
1172 if (!drainingServersCopy.isEmpty()) {
1173 for (final ServerName server: drainingServersCopy) {
1174 destServers.remove(server);
1175 }
1176 }
1177
1178
1179 removeDeadNotExpiredServers(destServers);
1180 return destServers;
1181 }
1182
1183
1184
1185
1186 public List<ServerName> createDestinationServersList(){
1187 return createDestinationServersList(null);
1188 }
1189
1190
1191
1192
1193
1194
1195
1196 void removeDeadNotExpiredServers(List<ServerName> servers) {
1197 Set<ServerName> deadNotExpiredServersCopy = this.getDeadNotExpiredServers();
1198 if (!deadNotExpiredServersCopy.isEmpty()) {
1199 for (ServerName server : deadNotExpiredServersCopy) {
1200 LOG.debug("Removing dead but not expired server: " + server
1201 + " from eligible server pool.");
1202 servers.remove(server);
1203 }
1204 }
1205 }
1206
1207
1208
1209
1210 void clearDeadServersWithSameHostNameAndPortOfOnlineServer() {
1211 for (ServerName serverName : getOnlineServersList()) {
1212 deadservers.cleanAllPreviousInstances(serverName);
1213 }
1214 }
1215
1216
1217
1218
1219 public void removeRegion(final HRegionInfo regionInfo) {
1220 final byte[] encodedName = regionInfo.getEncodedNameAsBytes();
1221 storeFlushedSequenceIdsByRegion.remove(encodedName);
1222 flushedSequenceIdByRegion.remove(encodedName);
1223 }
1224
1225 @VisibleForTesting
1226 public boolean isRegionInServerManagerStates(final HRegionInfo hri) {
1227 final byte[] encodedName = hri.getEncodedNameAsBytes();
1228 return (storeFlushedSequenceIdsByRegion.containsKey(encodedName)
1229 || flushedSequenceIdByRegion.containsKey(encodedName));
1230 }
1231
1232
1233
1234
1235 public void removeRegions(final List<HRegionInfo> regions) {
1236 for (HRegionInfo hri: regions) {
1237 removeRegion(hri);
1238 }
1239 }
1240 }