1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.master;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Arrays;
24 import java.util.Collections;
25 import java.util.HashMap;
26 import java.util.HashSet;
27 import java.util.Iterator;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.NavigableMap;
31 import java.util.Set;
32 import java.util.TreeMap;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.ConcurrentSkipListSet;
35 import java.util.concurrent.ThreadFactory;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.atomic.AtomicBoolean;
38 import java.util.concurrent.atomic.AtomicInteger;
39 import java.util.concurrent.locks.Lock;
40 import java.util.concurrent.locks.ReentrantLock;
41
42 import org.apache.commons.logging.Log;
43 import org.apache.commons.logging.LogFactory;
44 import org.apache.hadoop.classification.InterfaceAudience;
45 import org.apache.hadoop.conf.Configuration;
46 import org.apache.hadoop.hbase.Chore;
47 import org.apache.hadoop.hbase.HConstants;
48 import org.apache.hadoop.hbase.HRegionInfo;
49 import org.apache.hadoop.hbase.RegionTransition;
50 import org.apache.hadoop.hbase.Server;
51 import org.apache.hadoop.hbase.ServerName;
52 import org.apache.hadoop.hbase.Stoppable;
53 import org.apache.hadoop.hbase.catalog.CatalogTracker;
54 import org.apache.hadoop.hbase.catalog.MetaReader;
55 import org.apache.hadoop.hbase.client.Result;
56 import org.apache.hadoop.hbase.exceptions.DeserializationException;
57 import org.apache.hadoop.hbase.exceptions.NotServingRegionException;
58 import org.apache.hadoop.hbase.exceptions.RegionAlreadyInTransitionException;
59 import org.apache.hadoop.hbase.exceptions.RegionServerStoppedException;
60 import org.apache.hadoop.hbase.exceptions.ServerNotRunningYetException;
61 import org.apache.hadoop.hbase.exceptions.TableNotFoundException;
62 import org.apache.hadoop.hbase.executor.EventHandler;
63 import org.apache.hadoop.hbase.executor.EventType;
64 import org.apache.hadoop.hbase.executor.ExecutorService;
65 import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper;
66 import org.apache.hadoop.hbase.master.balancer.FavoredNodeLoadBalancer;
67 import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler;
68 import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
69 import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
70 import org.apache.hadoop.hbase.master.handler.MergedRegionHandler;
71 import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler;
72 import org.apache.hadoop.hbase.master.handler.SplitRegionHandler;
73 import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
74 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
75 import org.apache.hadoop.hbase.util.KeyLocker;
76 import org.apache.hadoop.hbase.util.Pair;
77 import org.apache.hadoop.hbase.util.Threads;
78 import org.apache.hadoop.hbase.util.Triple;
79 import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker;
80 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
81 import org.apache.hadoop.hbase.zookeeper.ZKTable;
82 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
83 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
84 import org.apache.hadoop.ipc.RemoteException;
85 import org.apache.zookeeper.AsyncCallback;
86 import org.apache.zookeeper.KeeperException;
87 import org.apache.zookeeper.KeeperException.NoNodeException;
88 import org.apache.zookeeper.KeeperException.NodeExistsException;
89 import org.apache.zookeeper.data.Stat;
90
91 import com.google.common.base.Preconditions;
92 import com.google.common.collect.LinkedHashMultimap;
93
94
95
96
97
98
99
100
101 @InterfaceAudience.Private
102 public class AssignmentManager extends ZooKeeperListener {
103 private static final Log LOG = LogFactory.getLog(AssignmentManager.class);
104
105 public static final ServerName HBCK_CODE_SERVERNAME = new ServerName(HConstants.HBCK_CODE_NAME,
106 -1, -1L);
107
108 protected final Server server;
109
110 private ServerManager serverManager;
111
112 private boolean shouldAssignRegionsWithFavoredNodes;
113
114 private CatalogTracker catalogTracker;
115
116 protected final TimeoutMonitor timeoutMonitor;
117
118 private final TimerUpdater timerUpdater;
119
120 private LoadBalancer balancer;
121
122 private final TableLockManager tableLockManager;
123
124 final private KeyLocker<String> locker = new KeyLocker<String>();
125
126
127
128
129
130 private final Map <String, HRegionInfo> regionsToReopen;
131
132
133
134
135
136 private final int maximumAttempts;
137
138
139
140
141
142 final NavigableMap<String, RegionPlan> regionPlans =
143 new TreeMap<String, RegionPlan>();
144
145 private final ZKTable zkTable;
146
147
148
149
150
151 private final ConcurrentSkipListSet<ServerName> serversInUpdatingTimer;
152
153 private final ExecutorService executorService;
154
155
156 private java.util.concurrent.ExecutorService threadPoolExecutorService;
157
158
159 private final java.util.concurrent.ExecutorService zkEventWorkers;
160
161 private List<EventType> ignoreStatesRSOffline = Arrays.asList(
162 EventType.RS_ZK_REGION_FAILED_OPEN, EventType.RS_ZK_REGION_CLOSED);
163
164
165 MetricsMaster metricsMaster;
166
167 private final RegionStates regionStates;
168
169
170
171
172
173 private final int bulkAssignThresholdRegions;
174 private final int bulkAssignThresholdServers;
175
176
177
178
179 private final boolean bulkAssignWaitTillAllAssigned;
180
181
182
183
184
185
186
187
188
189 protected final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
190
191
192 private final boolean tomActivated;
193
194
195
196
197
198
199
200
201 private final ConcurrentHashMap<String, AtomicInteger>
202 failedOpenTracker = new ConcurrentHashMap<String, AtomicInteger>();
203
204
205
206
207
208
209
210
211
212
213
214 public AssignmentManager(Server server, ServerManager serverManager,
215 CatalogTracker catalogTracker, final LoadBalancer balancer,
216 final ExecutorService service, MetricsMaster metricsMaster,
217 final TableLockManager tableLockManager) throws KeeperException, IOException {
218 super(server.getZooKeeper());
219 this.server = server;
220 this.serverManager = serverManager;
221 this.catalogTracker = catalogTracker;
222 this.executorService = service;
223 this.regionsToReopen = Collections.synchronizedMap
224 (new HashMap<String, HRegionInfo> ());
225 Configuration conf = server.getConfiguration();
226
227 this.shouldAssignRegionsWithFavoredNodes = conf.getClass(
228 HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class).equals(
229 FavoredNodeLoadBalancer.class);
230 this.tomActivated = conf.getBoolean("hbase.assignment.timeout.management", false);
231 if (tomActivated){
232 this.serversInUpdatingTimer = new ConcurrentSkipListSet<ServerName>();
233 this.timeoutMonitor = new TimeoutMonitor(
234 conf.getInt("hbase.master.assignment.timeoutmonitor.period", 30000),
235 server, serverManager,
236 conf.getInt("hbase.master.assignment.timeoutmonitor.timeout", 600000));
237 this.timerUpdater = new TimerUpdater(conf.getInt(
238 "hbase.master.assignment.timerupdater.period", 10000), server);
239 Threads.setDaemonThreadRunning(timerUpdater.getThread(),
240 server.getServerName() + ".timerUpdater");
241 } else {
242 this.serversInUpdatingTimer = null;
243 this.timeoutMonitor = null;
244 this.timerUpdater = null;
245 }
246 this.zkTable = new ZKTable(this.watcher);
247 this.maximumAttempts =
248 this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10);
249 this.balancer = balancer;
250 int maxThreads = conf.getInt("hbase.assignment.threads.max", 30);
251 this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool(
252 maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("hbase-am"));
253 this.metricsMaster = metricsMaster;
254 this.regionStates = new RegionStates(server, serverManager);
255
256 this.bulkAssignWaitTillAllAssigned =
257 conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false);
258 this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7);
259 this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3);
260
261 int workers = conf.getInt("hbase.assignment.zkevent.workers", 20);
262 ThreadFactory threadFactory = Threads.newDaemonThreadFactory("hbase-am-zkevent-worker");
263 zkEventWorkers = Threads.getBoundedCachedThreadPool(workers, 60L,
264 TimeUnit.SECONDS, threadFactory);
265 this.tableLockManager = tableLockManager;
266 }
267
268 void startTimeOutMonitor() {
269 if (tomActivated) {
270 Threads.setDaemonThreadRunning(timeoutMonitor.getThread(), server.getServerName()
271 + ".timeoutMonitor");
272 }
273 }
274
275
276
277
278 public ZKTable getZKTable() {
279
280
281 return this.zkTable;
282 }
283
284
285
286
287
288
289
290 public RegionStates getRegionStates() {
291 return regionStates;
292 }
293
294 public RegionPlan getRegionReopenPlan(HRegionInfo hri) {
295 return new RegionPlan(hri, null, regionStates.getRegionServerOfRegion(hri));
296 }
297
298
299
300
301
302
303 public void addPlan(String encodedName, RegionPlan plan) {
304 synchronized (regionPlans) {
305 regionPlans.put(encodedName, plan);
306 }
307 }
308
309
310
311
312 public void addPlans(Map<String, RegionPlan> plans) {
313 synchronized (regionPlans) {
314 regionPlans.putAll(plans);
315 }
316 }
317
318
319
320
321
322
323
324
325 public void setRegionsToReopen(List <HRegionInfo> regions) {
326 for(HRegionInfo hri : regions) {
327 regionsToReopen.put(hri.getEncodedName(), hri);
328 }
329 }
330
331
332
333
334
335
336
337
338 public Pair<Integer, Integer> getReopenStatus(byte[] tableName)
339 throws IOException {
340 List <HRegionInfo> hris =
341 MetaReader.getTableRegions(this.server.getCatalogTracker(), tableName, true);
342 Integer pending = 0;
343 for (HRegionInfo hri : hris) {
344 String name = hri.getEncodedName();
345
346 if (regionsToReopen.containsKey(name)
347 || regionStates.isRegionInTransition(name)) {
348 pending++;
349 }
350 }
351 return new Pair<Integer, Integer>(pending, hris.size());
352 }
353
354
355
356
357
358
359 public boolean isFailoverCleanupDone() {
360 return failoverCleanupDone.get();
361 }
362
363
364
365
366
367 void failoverCleanupDone() {
368 failoverCleanupDone.set(true);
369 serverManager.processQueuedDeadServers();
370 }
371
372
373
374
375
376
377
378
379 void joinCluster() throws IOException,
380 KeeperException, InterruptedException {
381
382
383
384
385
386
387
388
389
390
391 Map<ServerName, List<HRegionInfo>> deadServers = rebuildUserRegions();
392
393
394
395
396 processDeadServersAndRegionsInTransition(deadServers);
397
398 recoverTableInDisablingState();
399 recoverTableInEnablingState();
400 }
401
402
403
404
405
406
407
408
409
410
411
412
413 void processDeadServersAndRegionsInTransition(
414 final Map<ServerName, List<HRegionInfo>> deadServers)
415 throws KeeperException, IOException, InterruptedException {
416 List<String> nodes = ZKUtil.listChildrenNoWatch(watcher,
417 watcher.assignmentZNode);
418
419 if (nodes == null) {
420 String errorMessage = "Failed to get the children from ZK";
421 server.abort(errorMessage, new IOException(errorMessage));
422 return;
423 }
424
425 boolean failover = (!serverManager.getDeadServers().isEmpty() || !serverManager
426 .getRequeuedDeadServers().isEmpty());
427
428 if (!failover) {
429
430
431 Map<HRegionInfo, ServerName> regions = regionStates.getRegionAssignments();
432 for (Map.Entry<HRegionInfo, ServerName> e: regions.entrySet()) {
433 if (!e.getKey().isMetaTable() && e.getValue() != null) {
434 LOG.debug("Found " + e + " out on cluster");
435 failover = true;
436 break;
437 }
438 if (nodes.contains(e.getKey().getEncodedName())) {
439 LOG.debug("Found " + e.getKey().getRegionNameAsString() + " in RITs");
440
441 failover = true;
442 break;
443 }
444 }
445 }
446
447
448 if (failover) {
449 LOG.info("Found regions out on cluster or in RIT; failover");
450
451
452 processDeadServersAndRecoverLostRegions(deadServers);
453 } else {
454
455 LOG.info("Clean cluster startup. Assigning userregions");
456 assignAllUserRegions();
457 }
458 }
459
460
461
462
463
464
465
466
467
468
469
470
471 boolean processRegionInTransitionAndBlockUntilAssigned(final HRegionInfo hri)
472 throws InterruptedException, KeeperException, IOException {
473 boolean intransistion = processRegionInTransition(hri.getEncodedName(), hri);
474 if (!intransistion) return intransistion;
475 LOG.debug("Waiting on " + HRegionInfo.prettyPrint(hri.getEncodedName()));
476 while (!this.server.isStopped() &&
477 this.regionStates.isRegionInTransition(hri.getEncodedName())) {
478
479
480 this.regionStates.waitForUpdate(100);
481 }
482 return intransistion;
483 }
484
485
486
487
488
489
490
491
492
493
494 boolean processRegionInTransition(final String encodedRegionName,
495 final HRegionInfo regionInfo) throws KeeperException, IOException {
496
497
498
499
500 Lock lock = locker.acquireLock(encodedRegionName);
501 try {
502 Stat stat = new Stat();
503 byte [] data = ZKAssign.getDataAndWatch(watcher, encodedRegionName, stat);
504 if (data == null) return false;
505 RegionTransition rt;
506 try {
507 rt = RegionTransition.parseFrom(data);
508 } catch (DeserializationException e) {
509 LOG.warn("Failed parse znode data", e);
510 return false;
511 }
512 HRegionInfo hri = regionInfo;
513 if (hri == null) {
514 hri = regionStates.getRegionInfo(rt.getRegionName());
515 if (hri == null) return false;
516 }
517 processRegionsInTransition(rt, hri, stat.getVersion());
518 return true;
519 } finally {
520 lock.unlock();
521 }
522 }
523
524
525
526
527
528
529
530
531 void processRegionsInTransition(
532 final RegionTransition rt, final HRegionInfo regionInfo,
533 final int expectedVersion) throws KeeperException {
534 EventType et = rt.getEventType();
535
536 final ServerName sn = rt.getServerName();
537 String encodedRegionName = regionInfo.getEncodedName();
538 LOG.info("Processing region " + regionInfo.getRegionNameAsString() + " in state " + et);
539
540
541 if (regionStates.isRegionInTransition(encodedRegionName)) {
542
543 return;
544 }
545 switch (et) {
546 case M_ZK_REGION_CLOSING:
547
548
549 if (!serverManager.isServerOnline(sn)) {
550
551
552 forceOffline(regionInfo, rt);
553 } else {
554
555
556 regionStates.updateRegionState(rt, RegionState.State.CLOSING);
557 final RegionState rs = regionStates.getRegionState(regionInfo);
558 this.executorService.submit(
559 new EventHandler(server, EventType.M_MASTER_RECOVERY) {
560 @Override
561 public void process() throws IOException {
562 ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
563 try {
564 unassign(regionInfo, rs, expectedVersion, null, true, null);
565 } finally {
566 lock.unlock();
567 }
568 }
569 });
570 }
571 break;
572
573 case RS_ZK_REGION_CLOSED:
574 case RS_ZK_REGION_FAILED_OPEN:
575
576 addToRITandCallClose(regionInfo, RegionState.State.CLOSED, rt);
577 break;
578
579 case M_ZK_REGION_OFFLINE:
580
581
582 if (!serverManager.isServerOnline(sn)) {
583
584 addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, rt);
585 } else {
586
587 regionStates.updateRegionState(rt, RegionState.State.PENDING_OPEN);
588 final RegionState rs = regionStates.getRegionState(regionInfo);
589 this.executorService.submit(
590 new EventHandler(server, EventType.M_MASTER_RECOVERY) {
591 @Override
592 public void process() throws IOException {
593 ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
594 try {
595 assign(rs, false, false);
596 } finally {
597 lock.unlock();
598 }
599 }
600 });
601 }
602 break;
603
604 case RS_ZK_REGION_OPENING:
605 if (!serverManager.isServerOnline(sn)) {
606 forceOffline(regionInfo, rt);
607 } else {
608 regionStates.updateRegionState(rt, RegionState.State.OPENING);
609 }
610 break;
611
612 case RS_ZK_REGION_OPENED:
613 if (!serverManager.isServerOnline(sn)) {
614 forceOffline(regionInfo, rt);
615 } else {
616
617
618
619 regionStates.updateRegionState(rt, RegionState.State.OPEN);
620 new OpenedRegionHandler(server, this, regionInfo, sn, expectedVersion).process();
621 }
622 break;
623 case RS_ZK_REGION_SPLITTING:
624 if (!serverManager.isServerOnline(sn)) {
625
626
627
628 LOG.warn("Processed region " + regionInfo.getEncodedName() + " in state : " + et +
629 " on a dead regionserver: " + sn + " doing nothing");
630 } else {
631 LOG.info("Processed region " + regionInfo.getEncodedName() + " in state : " +
632 et + " nothing to do.");
633
634
635 }
636 break;
637 case RS_ZK_REGION_SPLIT:
638 if (!serverManager.isServerOnline(sn)) {
639 forceOffline(regionInfo, rt);
640 } else {
641 LOG.info("Processed region " + regionInfo.getEncodedName() + " in state : " +
642 et + " nothing to do.");
643
644
645 }
646 break;
647 case RS_ZK_REGION_MERGING:
648
649 LOG.info("Processed region " + regionInfo.getEncodedName()
650 + " in state : " + et + " nothing to do.");
651 break;
652 case RS_ZK_REGION_MERGE:
653 if (!serverManager.isServerOnline(sn)) {
654
655 LOG.warn("Processed region " + regionInfo.getEncodedName()
656 + " in state : " + et + " on a dead regionserver: " + sn
657 + " doing nothing");
658 } else {
659 LOG.info("Processed region " + regionInfo.getEncodedName() + " in state : " +
660 et + " nothing to do.");
661
662
663 }
664 break;
665 default:
666 throw new IllegalStateException("Received region in state :" + et + " is not valid.");
667 }
668 }
669
670
671
672
673
674
675
676
677
678
679 private void forceOffline(final HRegionInfo hri, final RegionTransition oldRt)
680 throws KeeperException {
681
682
683 LOG.debug("RIT " + hri.getEncodedName() + " in state=" + oldRt.getEventType() +
684 " was on deadserver; forcing offline");
685 ZKAssign.createOrForceNodeOffline(this.watcher, hri, oldRt.getServerName());
686 addToRITandCallClose(hri, RegionState.State.OFFLINE, oldRt);
687 }
688
689
690
691
692
693
694
695
696 private void addToRITandCallClose(final HRegionInfo hri,
697 final RegionState.State state, final RegionTransition oldData) {
698 regionStates.updateRegionState(oldData, state);
699 new ClosedRegionHandler(this.server, this, hri).process();
700 }
701
702
703
704
705
706 public void removeClosedRegion(HRegionInfo hri) {
707 if (regionsToReopen.remove(hri.getEncodedName()) != null) {
708 LOG.debug("Removed region from reopening regions because it was closed");
709 }
710 }
711
712
713
714
715
716
717
718
719
720
721
722 private void handleRegion(final RegionTransition rt, int expectedVersion) {
723 if (rt == null) {
724 LOG.warn("Unexpected NULL input for RegionTransition rt");
725 return;
726 }
727 final ServerName sn = rt.getServerName();
728
729 if (sn.equals(HBCK_CODE_SERVERNAME)) {
730 handleHBCK(rt);
731 return;
732 }
733 final long createTime = rt.getCreateTime();
734 final byte[] regionName = rt.getRegionName();
735 String encodedName = HRegionInfo.encodeRegionName(regionName);
736 String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
737
738 if (!serverManager.isServerOnline(sn)
739 && !ignoreStatesRSOffline.contains(rt.getEventType())) {
740 LOG.warn("Attempted to handle region transition for server but " +
741 "server is not online: " + prettyPrintedRegionName);
742 return;
743 }
744
745 RegionState regionState =
746 regionStates.getRegionTransitionState(encodedName);
747 long startTime = System.currentTimeMillis();
748 if (LOG.isDebugEnabled()) {
749 boolean lateEvent = createTime < (startTime - 15000);
750 LOG.debug("Handling transition=" + rt.getEventType() +
751 ", server=" + sn + ", region=" +
752 (prettyPrintedRegionName == null ? "null" : prettyPrintedRegionName) +
753 (lateEvent ? ", which is more than 15 seconds late" : "") +
754 ", current state from region state map =" + regionState);
755 }
756
757
758 if (rt.getEventType() == EventType.M_ZK_REGION_OFFLINE) {
759 return;
760 }
761
762
763 Lock lock = locker.acquireLock(encodedName);
764 try {
765 RegionState latestState =
766 regionStates.getRegionTransitionState(encodedName);
767 if ((regionState == null && latestState != null)
768 || (regionState != null && latestState == null)
769 || (regionState != null && latestState != null
770 && latestState.getState() != regionState.getState())) {
771 LOG.warn("Region state changed from " + regionState + " to "
772 + latestState + ", while acquiring lock");
773 }
774 long waitedTime = System.currentTimeMillis() - startTime;
775 if (waitedTime > 5000) {
776 LOG.warn("Took " + waitedTime + "ms to acquire the lock");
777 }
778 regionState = latestState;
779 switch (rt.getEventType()) {
780 case RS_ZK_REGION_SPLITTING:
781 if (!isInStateForSplitting(regionState)) break;
782 regionStates.updateRegionState(rt, RegionState.State.SPLITTING);
783 break;
784
785 case RS_ZK_REGION_SPLIT:
786
787 if (!isInStateForSplitting(regionState)) break;
788
789 if (regionState == null) {
790 regionState = regionStates.updateRegionState(rt,
791 RegionState.State.SPLITTING);
792
793 String message = "Received SPLIT for region " + prettyPrintedRegionName +
794 " from server " + sn;
795
796 if (regionState == null) {
797 LOG.warn(message + " but it doesn't exist anymore," +
798 " probably already processed its split");
799 break;
800 }
801 LOG.info(message +
802 " but region was not first in SPLITTING state; continuing");
803 }
804
805 byte [] payload = rt.getPayload();
806 List<HRegionInfo> daughters;
807 try {
808 daughters = HRegionInfo.parseDelimitedFrom(payload, 0, payload.length);
809 } catch (IOException e) {
810 LOG.error("Dropped split! Failed reading split payload for " +
811 prettyPrintedRegionName);
812 break;
813 }
814 assert daughters.size() == 2;
815
816 if (!this.serverManager.isServerOnline(sn)) {
817 LOG.error("Dropped split! ServerName=" + sn + " unknown.");
818 break;
819 }
820
821 this.executorService.submit(new SplitRegionHandler(server, this,
822 regionState.getRegion(), sn, daughters));
823 break;
824
825 case RS_ZK_REGION_MERGING:
826
827
828 break;
829
830 case RS_ZK_REGION_MERGE:
831
832 if (!this.serverManager.isServerOnline(sn)) {
833 LOG.error("Dropped merge! ServerName=" + sn + " unknown.");
834 break;
835 }
836
837 byte[] payloadOfMerge = rt.getPayload();
838 List<HRegionInfo> mergeRegions;
839 try {
840 mergeRegions = HRegionInfo.parseDelimitedFrom(payloadOfMerge, 0,
841 payloadOfMerge.length);
842 } catch (IOException e) {
843 LOG.error("Dropped merge! Failed reading merge payload for " +
844 prettyPrintedRegionName);
845 break;
846 }
847 assert mergeRegions.size() == 3;
848
849 this.executorService.submit(new MergedRegionHandler(server, this, sn,
850 mergeRegions));
851 break;
852
853 case M_ZK_REGION_CLOSING:
854
855
856 if (regionState != null
857 && !regionState.isPendingCloseOrClosingOnServer(sn)) {
858 LOG.warn("Received CLOSING for region " + prettyPrintedRegionName
859 + " from server " + sn + " but region was in the state " + regionState
860 + " and not in expected PENDING_CLOSE or CLOSING states,"
861 + " or not on the expected server");
862 return;
863 }
864
865 regionStates.updateRegionState(rt, RegionState.State.CLOSING);
866 break;
867
868 case RS_ZK_REGION_CLOSED:
869
870 if (regionState != null
871 && !regionState.isPendingCloseOrClosingOnServer(sn)) {
872 LOG.warn("Received CLOSED for region " + prettyPrintedRegionName
873 + " from server " + sn + " but region was in the state " + regionState
874 + " and not in expected PENDING_CLOSE or CLOSING states,"
875 + " or not on the expected server");
876 return;
877 }
878
879
880
881 regionState = regionStates.updateRegionState(rt, RegionState.State.CLOSED);
882 if (regionState != null) {
883 removeClosedRegion(regionState.getRegion());
884 this.executorService.submit(new ClosedRegionHandler(server,
885 this, regionState.getRegion()));
886 }
887 break;
888
889 case RS_ZK_REGION_FAILED_OPEN:
890 if (regionState != null
891 && !regionState.isPendingOpenOrOpeningOnServer(sn)) {
892 LOG.warn("Received FAILED_OPEN for region " + prettyPrintedRegionName
893 + " from server " + sn + " but region was in the state " + regionState
894 + " and not in expected PENDING_OPEN or OPENING states,"
895 + " or not on the expected server");
896 return;
897 }
898
899 regionState = regionStates.updateRegionState(rt, RegionState.State.CLOSED);
900
901
902 if (regionState != null) {
903 AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
904 if (failedOpenCount == null) {
905 failedOpenCount = new AtomicInteger();
906
907
908
909 failedOpenTracker.put(encodedName, failedOpenCount);
910 }
911 if (failedOpenCount.incrementAndGet() >= maximumAttempts) {
912 regionStates.updateRegionState(
913 regionState.getRegion(), RegionState.State.FAILED_OPEN);
914
915
916 failedOpenTracker.remove(encodedName);
917 } else {
918 getRegionPlan(regionState.getRegion(), sn, true);
919 this.executorService.submit(new ClosedRegionHandler(server,
920 this, regionState.getRegion()));
921 }
922 }
923 break;
924
925 case RS_ZK_REGION_OPENING:
926
927
928 if (regionState != null
929 && !regionState.isPendingOpenOrOpeningOnServer(sn)) {
930 LOG.warn("Received OPENING for region " + prettyPrintedRegionName
931 + " from server " + sn + " but region was in the state " + regionState
932 + " and not in expected PENDING_OPEN or OPENING states,"
933 + " or not on the expected server");
934 return;
935 }
936
937 regionStates.updateRegionState(rt, RegionState.State.OPENING);
938 break;
939
940 case RS_ZK_REGION_OPENED:
941
942 if (regionState != null
943 && !regionState.isPendingOpenOrOpeningOnServer(sn)) {
944 LOG.warn("Received OPENED for region " + prettyPrintedRegionName
945 + " from server " + sn + " but region was in the state " + regionState
946 + " and not in expected PENDING_OPEN or OPENING states,"
947 + " or not on the expected server");
948
949
950
951 unassign(regionState.getRegion(), null, -1, null, false, sn);
952 return;
953 }
954
955 regionState = regionStates.updateRegionState(rt, RegionState.State.OPEN);
956 if (regionState != null) {
957 failedOpenTracker.remove(encodedName);
958 this.executorService.submit(new OpenedRegionHandler(
959 server, this, regionState.getRegion(), sn, expectedVersion));
960 }
961 break;
962
963 default:
964 throw new IllegalStateException("Received event is not valid.");
965 }
966 } finally {
967 lock.unlock();
968 }
969 }
970
971
972
973
974
975
976 private boolean isInStateForSplitting(final RegionState rs) {
977 if (rs == null) return true;
978 if (rs.isSplitting()) return true;
979 if (convertPendingCloseToSplitting(rs)) return true;
980 LOG.warn("Dropped region split! Not in state good for SPLITTING; rs=" + rs);
981 return false;
982 }
983
984
985
986
987
988
989 void processFavoredNodes(List<HRegionInfo> regions) throws IOException {
990 if (!shouldAssignRegionsWithFavoredNodes) return;
991
992
993 Map<HRegionInfo, List<ServerName>> regionToFavoredNodes =
994 new HashMap<HRegionInfo, List<ServerName>>();
995 for (HRegionInfo region : regions) {
996 regionToFavoredNodes.put(region,
997 ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region));
998 }
999 FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(regionToFavoredNodes, catalogTracker);
1000 }
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011 private boolean convertPendingCloseToSplitting(final RegionState rs) {
1012 if (!rs.isPendingClose()) return false;
1013 LOG.debug("Converting PENDING_CLOSE to SPLITTING; rs=" + rs);
1014 regionStates.updateRegionState(
1015 rs.getRegion(), RegionState.State.SPLITTING);
1016
1017
1018 clearRegionPlan(rs.getRegion());
1019 return true;
1020 }
1021
1022
1023
1024
1025
1026
1027
1028 private void handleHBCK(RegionTransition rt) {
1029 String encodedName = HRegionInfo.encodeRegionName(rt.getRegionName());
1030 LOG.info("Handling HBCK triggered transition=" + rt.getEventType() +
1031 ", server=" + rt.getServerName() + ", region=" +
1032 HRegionInfo.prettyPrint(encodedName));
1033 RegionState regionState = regionStates.getRegionTransitionState(encodedName);
1034 switch (rt.getEventType()) {
1035 case M_ZK_REGION_OFFLINE:
1036 HRegionInfo regionInfo;
1037 if (regionState != null) {
1038 regionInfo = regionState.getRegion();
1039 } else {
1040 try {
1041 byte [] name = rt.getRegionName();
1042 Pair<HRegionInfo, ServerName> p = MetaReader.getRegion(catalogTracker, name);
1043 regionInfo = p.getFirst();
1044 } catch (IOException e) {
1045 LOG.info("Exception reading META doing HBCK repair operation", e);
1046 return;
1047 }
1048 }
1049 LOG.info("HBCK repair is triggering assignment of region=" +
1050 regionInfo.getRegionNameAsString());
1051
1052 assign(regionInfo, false);
1053 break;
1054
1055 default:
1056 LOG.warn("Received unexpected region state from HBCK: " + rt.toString());
1057 break;
1058 }
1059
1060 }
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076 @Override
1077 public void nodeCreated(String path) {
1078 handleAssignmentEvent(path);
1079 }
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093 @Override
1094 public void nodeDataChanged(String path) {
1095 handleAssignmentEvent(path);
1096 }
1097
1098
1099
1100
1101
1102 private final Set<String> regionsInProgress = new HashSet<String>();
1103
1104
1105 private final LinkedHashMultimap <String, RegionRunnable>
1106 zkEventWorkerWaitingList = LinkedHashMultimap.create();
1107
1108
1109
1110
1111 private static interface RegionRunnable extends Runnable{
1112
1113
1114
1115 public String getRegionName();
1116 }
1117
1118
1119
1120
1121
1122 protected void zkEventWorkersSubmit(final RegionRunnable regRunnable) {
1123
1124 synchronized (regionsInProgress) {
1125
1126
1127 if (regionsInProgress.contains(regRunnable.getRegionName())) {
1128 synchronized (zkEventWorkerWaitingList){
1129 zkEventWorkerWaitingList.put(regRunnable.getRegionName(), regRunnable);
1130 }
1131 return;
1132 }
1133
1134
1135 regionsInProgress.add(regRunnable.getRegionName());
1136 zkEventWorkers.submit(new Runnable() {
1137 @Override
1138 public void run() {
1139 try {
1140 regRunnable.run();
1141 } finally {
1142
1143
1144 synchronized (regionsInProgress) {
1145 regionsInProgress.remove(regRunnable.getRegionName());
1146 synchronized (zkEventWorkerWaitingList) {
1147 java.util.Set<RegionRunnable> waiting = zkEventWorkerWaitingList.get(
1148 regRunnable.getRegionName());
1149 if (!waiting.isEmpty()) {
1150
1151 RegionRunnable toSubmit = waiting.iterator().next();
1152 zkEventWorkerWaitingList.remove(toSubmit.getRegionName(), toSubmit);
1153 zkEventWorkersSubmit(toSubmit);
1154 }
1155 }
1156 }
1157 }
1158 }
1159 });
1160 }
1161 }
1162
1163 @Override
1164 public void nodeDeleted(final String path) {
1165 if (path.startsWith(watcher.assignmentZNode)) {
1166 final String regionName = ZKAssign.getRegionName(watcher, path);
1167 zkEventWorkersSubmit(new RegionRunnable() {
1168 @Override
1169 public String getRegionName() {
1170 return regionName;
1171 }
1172
1173 @Override
1174 public void run() {
1175 Lock lock = locker.acquireLock(regionName);
1176 try {
1177 RegionState rs = regionStates.getRegionTransitionState(regionName);
1178 if (rs == null) return;
1179
1180 HRegionInfo regionInfo = rs.getRegion();
1181 if (rs.isSplit()) {
1182 LOG.debug("Ephemeral node deleted, regionserver crashed?, " +
1183 "clearing from RIT; rs=" + rs);
1184 regionOffline(rs.getRegion());
1185 } else {
1186 String regionNameStr = regionInfo.getRegionNameAsString();
1187 LOG.debug("The znode of region " + regionNameStr
1188 + " has been deleted.");
1189 if (rs.isOpened()) {
1190 ServerName serverName = rs.getServerName();
1191 regionOnline(regionInfo, serverName);
1192 LOG.info("The master has opened the region "
1193 + regionNameStr + " that was online on " + serverName);
1194 boolean disabled = getZKTable().isDisablingOrDisabledTable(
1195 regionInfo.getTableNameAsString());
1196 if (!serverManager.isServerOnline(serverName) && !disabled) {
1197 LOG.info("Opened region " + regionNameStr
1198 + "but the region server is offline, reassign the region");
1199 assign(regionInfo, true);
1200 } else if (disabled) {
1201
1202 LOG.info("Opened region " + regionNameStr
1203 + "but this table is disabled, triggering close of region");
1204 unassign(regionInfo);
1205 }
1206 }
1207 }
1208 } finally {
1209 lock.unlock();
1210 }
1211 }
1212 });
1213 }
1214 }
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228 @Override
1229 public void nodeChildrenChanged(String path) {
1230 if (path.equals(watcher.assignmentZNode)) {
1231 zkEventWorkers.submit(new Runnable() {
1232 @Override
1233 public void run() {
1234 try {
1235
1236 List<String> children =
1237 ZKUtil.listChildrenAndWatchForNewChildren(
1238 watcher, watcher.assignmentZNode);
1239 if (children != null) {
1240 Stat stat = new Stat();
1241 for (String child : children) {
1242
1243
1244
1245 if (!regionStates.isRegionInTransition(child)) {
1246 stat.setVersion(0);
1247 byte[] data = ZKAssign.getDataAndWatch(watcher,
1248 ZKUtil.joinZNode(watcher.assignmentZNode, child), stat);
1249 if (data != null && stat.getVersion() > 0) {
1250 try {
1251 RegionTransition rt = RegionTransition.parseFrom(data);
1252
1253
1254 if (rt.getEventType() == EventType.RS_ZK_REGION_SPLITTING) {
1255 handleRegion(rt, stat.getVersion());
1256 }
1257 } catch (DeserializationException de) {
1258 LOG.error("error getting data for " + child, de);
1259 }
1260 }
1261 }
1262 }
1263 }
1264 } catch (KeeperException e) {
1265 server.abort("Unexpected ZK exception reading unassigned children", e);
1266 }
1267 }
1268 });
1269 }
1270 }
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280 void regionOnline(HRegionInfo regionInfo, ServerName sn) {
1281 if (!serverManager.isServerOnline(sn)) {
1282 LOG.warn("A region was opened on a dead server, ServerName=" +
1283 sn + ", region=" + regionInfo.getEncodedName());
1284 }
1285
1286 regionStates.regionOnline(regionInfo, sn);
1287
1288
1289 clearRegionPlan(regionInfo);
1290
1291 addToServersInUpdatingTimer(sn);
1292 }
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302 private void handleAssignmentEvent(final String path) {
1303 if (path.startsWith(watcher.assignmentZNode)) {
1304 final String regionName = ZKAssign.getRegionName(watcher, path);
1305
1306 zkEventWorkersSubmit(new RegionRunnable() {
1307 @Override
1308 public String getRegionName() {
1309 return regionName;
1310 }
1311
1312 @Override
1313 public void run() {
1314 try {
1315 Stat stat = new Stat();
1316 byte [] data = ZKAssign.getDataAndWatch(watcher, path, stat);
1317 if (data == null) return;
1318
1319 RegionTransition rt = RegionTransition.parseFrom(data);
1320 handleRegion(rt, stat.getVersion());
1321 } catch (KeeperException e) {
1322 server.abort("Unexpected ZK exception reading unassigned node data", e);
1323 } catch (DeserializationException e) {
1324 server.abort("Unexpected exception deserializing node data", e);
1325 }
1326 }
1327 });
1328 }
1329 }
1330
1331
1332
1333
1334
1335
1336 private void addToServersInUpdatingTimer(final ServerName sn) {
1337 if (tomActivated){
1338 this.serversInUpdatingTimer.add(sn);
1339 }
1340 }
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355 private void updateTimers(final ServerName sn) {
1356 Preconditions.checkState(tomActivated);
1357 if (sn == null) return;
1358
1359
1360
1361
1362
1363 List<Map.Entry<String, RegionPlan>> rps;
1364 synchronized(this.regionPlans) {
1365 rps = new ArrayList<Map.Entry<String, RegionPlan>>(regionPlans.entrySet());
1366 }
1367
1368 for (Map.Entry<String, RegionPlan> e : rps) {
1369 if (e.getValue() != null && e.getKey() != null && sn.equals(e.getValue().getDestination())) {
1370 RegionState regionState = regionStates.getRegionTransitionState(e.getKey());
1371 if (regionState != null) {
1372 regionState.updateTimestampToNow();
1373 }
1374 }
1375 }
1376 }
1377
1378
1379
1380
1381
1382
1383
1384
1385 public void regionOffline(final HRegionInfo regionInfo) {
1386 regionStates.regionOffline(regionInfo);
1387 removeClosedRegion(regionInfo);
1388
1389 clearRegionPlan(regionInfo);
1390 }
1391
1392 public void offlineDisabledRegion(HRegionInfo regionInfo) {
1393
1394 LOG.debug("Table being disabled so deleting ZK node and removing from " +
1395 "regions in transition, skipping assignment of region " +
1396 regionInfo.getRegionNameAsString());
1397 try {
1398 if (!ZKAssign.deleteClosedNode(watcher, regionInfo.getEncodedName())) {
1399
1400 ZKAssign.deleteOfflineNode(watcher, regionInfo.getEncodedName());
1401 }
1402 } catch (KeeperException.NoNodeException nne) {
1403 LOG.debug("Tried to delete closed node for " + regionInfo + " but it " +
1404 "does not exist so just offlining");
1405 } catch (KeeperException e) {
1406 this.server.abort("Error deleting CLOSED node in ZK", e);
1407 }
1408 regionOffline(regionInfo);
1409 }
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431 public void assign(HRegionInfo region, boolean setOfflineInZK) {
1432 assign(region, setOfflineInZK, false);
1433 }
1434
1435
1436
1437
1438 public void assign(HRegionInfo region,
1439 boolean setOfflineInZK, boolean forceNewPlan) {
1440 if (!setOfflineInZK && isDisabledorDisablingRegionInRIT(region)) {
1441 return;
1442 }
1443 if (this.serverManager.isClusterShutdown()) {
1444 LOG.info("Cluster shutdown is set; skipping assign of " +
1445 region.getRegionNameAsString());
1446 return;
1447 }
1448 String encodedName = region.getEncodedName();
1449 Lock lock = locker.acquireLock(encodedName);
1450 try {
1451 RegionState state = forceRegionStateToOffline(region, forceNewPlan);
1452 if (state != null) {
1453 assign(state, setOfflineInZK, forceNewPlan);
1454 }
1455 } finally {
1456 lock.unlock();
1457 }
1458 }
1459
1460
1461
1462
1463
1464
1465
1466 boolean assign(final ServerName destination,
1467 final List<HRegionInfo> regions) {
1468 int regionCount = regions.size();
1469 if (regionCount == 0) {
1470 return true;
1471 }
1472 LOG.debug("Bulk assigning " + regionCount + " region(s) to " +
1473 destination.toString());
1474
1475 Set<String> encodedNames = new HashSet<String>(regionCount);
1476 for (HRegionInfo region : regions) {
1477 encodedNames.add(region.getEncodedName());
1478 }
1479
1480 List<HRegionInfo> failedToOpenRegions = new ArrayList<HRegionInfo>();
1481 Map<String, Lock> locks = locker.acquireLocks(encodedNames);
1482 try {
1483 AtomicInteger counter = new AtomicInteger(0);
1484 Map<String, Integer> offlineNodesVersions = new ConcurrentHashMap<String, Integer>();
1485 OfflineCallback cb = new OfflineCallback(
1486 watcher, destination, counter, offlineNodesVersions);
1487 Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>(regions.size());
1488 List<RegionState> states = new ArrayList<RegionState>(regions.size());
1489 for (HRegionInfo region : regions) {
1490 String encodedRegionName = region.getEncodedName();
1491 RegionState state = forceRegionStateToOffline(region, true);
1492 if (state != null && asyncSetOfflineInZooKeeper(state, cb, destination)) {
1493 RegionPlan plan = new RegionPlan(region, state.getServerName(), destination);
1494 plans.put(encodedRegionName, plan);
1495 states.add(state);
1496 } else {
1497 LOG.warn("failed to force region state to offline or "
1498 + "failed to set it offline in ZK, will reassign later: " + region);
1499 failedToOpenRegions.add(region);
1500 Lock lock = locks.remove(encodedRegionName);
1501 lock.unlock();
1502 }
1503 }
1504
1505
1506 int total = states.size();
1507 for (int oldCounter = 0; !server.isStopped();) {
1508 int count = counter.get();
1509 if (oldCounter != count) {
1510 LOG.info(destination.toString() + " unassigned znodes=" + count +
1511 " of total=" + total);
1512 oldCounter = count;
1513 }
1514 if (count >= total) break;
1515 Threads.sleep(5);
1516 }
1517
1518 if (server.isStopped()) {
1519 return false;
1520 }
1521
1522
1523
1524 this.addPlans(plans);
1525
1526 List<Triple<HRegionInfo, Integer, List<ServerName>>> regionOpenInfos =
1527 new ArrayList<Triple<HRegionInfo, Integer, List<ServerName>>>(states.size());
1528 for (RegionState state: states) {
1529 HRegionInfo region = state.getRegion();
1530 String encodedRegionName = region.getEncodedName();
1531 Integer nodeVersion = offlineNodesVersions.get(encodedRegionName);
1532 if (nodeVersion == null || nodeVersion == -1) {
1533 LOG.warn("failed to offline in zookeeper: " + region);
1534 failedToOpenRegions.add(region);
1535 Lock lock = locks.remove(encodedRegionName);
1536 lock.unlock();
1537 } else {
1538 regionStates.updateRegionState(region,
1539 RegionState.State.PENDING_OPEN, destination);
1540 List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
1541 if (this.shouldAssignRegionsWithFavoredNodes) {
1542 favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
1543 }
1544 regionOpenInfos.add(new Triple<HRegionInfo, Integer, List<ServerName>>(
1545 region, nodeVersion, favoredNodes));
1546 }
1547 }
1548
1549
1550 try {
1551
1552
1553 long maxWaitTime = System.currentTimeMillis() +
1554 this.server.getConfiguration().
1555 getLong("hbase.regionserver.rpc.startup.waittime", 60000);
1556 for (int i = 1; i <= maximumAttempts && !server.isStopped(); i++) {
1557 try {
1558 List<RegionOpeningState> regionOpeningStateList = serverManager
1559 .sendRegionOpen(destination, regionOpenInfos);
1560 if (regionOpeningStateList == null) {
1561
1562 return false;
1563 }
1564 for (int k = 0, n = regionOpeningStateList.size(); k < n; k++) {
1565 RegionOpeningState openingState = regionOpeningStateList.get(k);
1566 if (openingState != RegionOpeningState.OPENED) {
1567 HRegionInfo region = regionOpenInfos.get(k).getFirst();
1568 if (openingState == RegionOpeningState.ALREADY_OPENED) {
1569 processAlreadyOpenedRegion(region, destination);
1570 } else if (openingState == RegionOpeningState.FAILED_OPENING) {
1571
1572 failedToOpenRegions.add(region);
1573 } else {
1574 LOG.warn("THIS SHOULD NOT HAPPEN: unknown opening state "
1575 + openingState + " in assigning region " + region);
1576 }
1577 }
1578 }
1579 break;
1580 } catch (IOException e) {
1581 if (e instanceof RemoteException) {
1582 e = ((RemoteException)e).unwrapRemoteException();
1583 }
1584 if (e instanceof RegionServerStoppedException) {
1585 LOG.warn("The region server was shut down, ", e);
1586
1587 return false;
1588 } else if (e instanceof ServerNotRunningYetException) {
1589 long now = System.currentTimeMillis();
1590 if (now < maxWaitTime) {
1591 LOG.debug("Server is not yet up; waiting up to " +
1592 (maxWaitTime - now) + "ms", e);
1593 Thread.sleep(100);
1594 i--;
1595 continue;
1596 }
1597 } else if (e instanceof java.net.SocketTimeoutException
1598 && this.serverManager.isServerOnline(destination)) {
1599
1600
1601
1602
1603 if (LOG.isDebugEnabled()) {
1604 LOG.debug("Bulk assigner openRegion() to " + destination
1605 + " has timed out, but the regions might"
1606 + " already be opened on it.", e);
1607 }
1608 continue;
1609 }
1610 throw e;
1611 }
1612 }
1613 } catch (IOException e) {
1614
1615 LOG.info("Unable to communicate with the region server in order" +
1616 " to assign regions", e);
1617 return false;
1618 } catch (InterruptedException e) {
1619 throw new RuntimeException(e);
1620 }
1621 } finally {
1622 for (Lock lock : locks.values()) {
1623 lock.unlock();
1624 }
1625 }
1626
1627 if (!failedToOpenRegions.isEmpty()) {
1628 for (HRegionInfo region : failedToOpenRegions) {
1629 invokeAssign(region);
1630 }
1631 }
1632 LOG.debug("Bulk assigning done for " + destination.toString());
1633 return true;
1634 }
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646 private void unassign(final HRegionInfo region,
1647 final RegionState state, final int versionOfClosingNode,
1648 final ServerName dest, final boolean transitionInZK,
1649 final ServerName src) {
1650 ServerName server = src;
1651 if (state != null) {
1652 server = state.getServerName();
1653 }
1654 for (int i = 1; i <= this.maximumAttempts; i++) {
1655
1656 if (!serverManager.isServerOnline(server)) {
1657 if (transitionInZK) {
1658
1659 deleteClosingOrClosedNode(region);
1660 }
1661 if (state != null) {
1662 regionOffline(region);
1663 }
1664 return;
1665 }
1666 try {
1667
1668 if (serverManager.sendRegionClose(server, region,
1669 versionOfClosingNode, dest, transitionInZK)) {
1670 LOG.debug("Sent CLOSE to " + server + " for region " +
1671 region.getRegionNameAsString());
1672 return;
1673 }
1674
1675
1676 LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
1677 region.getRegionNameAsString());
1678 } catch (Throwable t) {
1679 if (t instanceof RemoteException) {
1680 t = ((RemoteException)t).unwrapRemoteException();
1681 }
1682 if (t instanceof NotServingRegionException
1683 || t instanceof RegionServerStoppedException) {
1684 if (transitionInZK) {
1685 deleteClosingOrClosedNode(region);
1686 }
1687 if (state != null) {
1688 regionOffline(region);
1689 }
1690 return;
1691 } else if (state != null
1692 && t instanceof RegionAlreadyInTransitionException) {
1693
1694 LOG.debug("update " + state + " the timestamp.");
1695 state.updateTimestampToNow();
1696 }
1697 LOG.info("Server " + server + " returned " + t + " for "
1698 + region.getRegionNameAsString() + ", try=" + i
1699 + " of " + this.maximumAttempts, t);
1700
1701 }
1702 }
1703
1704 if (!tomActivated && state != null) {
1705 regionStates.updateRegionState(region, RegionState.State.FAILED_CLOSE);
1706 }
1707 }
1708
1709
1710
1711
1712 private RegionState forceRegionStateToOffline(
1713 final HRegionInfo region, final boolean forceNewPlan) {
1714 RegionState state = regionStates.getRegionState(region);
1715 if (state == null) {
1716 LOG.warn("Assigning a region not in region states: " + region);
1717 state = regionStates.createRegionState(region);
1718 } else {
1719 switch (state.getState()) {
1720 case OPEN:
1721 case OPENING:
1722 case PENDING_OPEN:
1723 if (!forceNewPlan) {
1724 LOG.debug("Attempting to assign region " +
1725 region + " but it is already in transition: " + state);
1726 return null;
1727 }
1728 case CLOSING:
1729 case PENDING_CLOSE:
1730 case FAILED_CLOSE:
1731 unassign(region, state, -1, null, false, null);
1732 state = regionStates.getRegionState(region);
1733 if (state.isOffline()) break;
1734 case FAILED_OPEN:
1735 case CLOSED:
1736 LOG.debug("Forcing OFFLINE; was=" + state);
1737 state = regionStates.updateRegionState(
1738 region, RegionState.State.OFFLINE);
1739 case OFFLINE:
1740 break;
1741 default:
1742 LOG.error("Trying to assign region " + region
1743 + ", which is in state " + state);
1744 return null;
1745 }
1746 }
1747 return state;
1748 }
1749
1750
1751
1752
1753
1754
1755
1756 private void assign(RegionState state,
1757 final boolean setOfflineInZK, final boolean forceNewPlan) {
1758 RegionState currentState = state;
1759 int versionOfOfflineNode = -1;
1760 RegionPlan plan = null;
1761 long maxRegionServerStartupWaitTime = -1;
1762 HRegionInfo region = state.getRegion();
1763 RegionOpeningState regionOpenState;
1764 for (int i = 1; i <= maximumAttempts && !server.isStopped(); i++) {
1765 if (plan == null) {
1766 plan = getRegionPlan(region, forceNewPlan);
1767 }
1768 if (plan == null) {
1769 LOG.warn("Unable to determine a plan to assign " + region);
1770 if (tomActivated){
1771 this.timeoutMonitor.setAllRegionServersOffline(true);
1772 } else {
1773 regionStates.updateRegionState(region, RegionState.State.FAILED_OPEN);
1774 }
1775 return;
1776 }
1777 if (setOfflineInZK && versionOfOfflineNode == -1) {
1778
1779
1780 versionOfOfflineNode = setOfflineInZooKeeper(currentState, plan.getDestination());
1781 if (versionOfOfflineNode != -1) {
1782 if (isDisabledorDisablingRegionInRIT(region)) {
1783 return;
1784 }
1785
1786
1787
1788
1789
1790
1791 String tableName = region.getTableNameAsString();
1792 if (!zkTable.isEnablingTable(tableName) && !zkTable.isEnabledTable(tableName)) {
1793 LOG.debug("Setting table " + tableName + " to ENABLED state.");
1794 setEnabledTable(tableName);
1795 }
1796 }
1797 }
1798 if (setOfflineInZK && versionOfOfflineNode == -1) {
1799 LOG.info("Unable to set offline in ZooKeeper to assign " + region);
1800
1801
1802
1803
1804 if (!server.isAborted()) {
1805 continue;
1806 }
1807 }
1808 if (this.server.isStopped() || this.server.isAborted()) {
1809 LOG.debug("Server stopped/aborted; skipping assign of " + region);
1810 return;
1811 }
1812 LOG.info("Assigning region " + region.getRegionNameAsString() +
1813 " to " + plan.getDestination().toString());
1814
1815 currentState = regionStates.updateRegionState(region,
1816 RegionState.State.PENDING_OPEN, plan.getDestination());
1817
1818 boolean needNewPlan;
1819 final String assignMsg = "Failed assignment of " + region.getRegionNameAsString() +
1820 " to " + plan.getDestination();
1821 try {
1822 List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
1823 if (this.shouldAssignRegionsWithFavoredNodes) {
1824 favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
1825 }
1826 regionOpenState = serverManager.sendRegionOpen(
1827 plan.getDestination(), region, versionOfOfflineNode, favoredNodes);
1828
1829 if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
1830
1831 needNewPlan = true;
1832 LOG.warn(assignMsg + ", regionserver says 'FAILED_OPENING', " +
1833 " trying to assign elsewhere instead; " +
1834 "try=" + i + " of " + this.maximumAttempts);
1835 } else {
1836
1837 if (regionOpenState == RegionOpeningState.ALREADY_OPENED) {
1838 processAlreadyOpenedRegion(region, plan.getDestination());
1839 }
1840 return;
1841 }
1842
1843 } catch (Throwable t) {
1844 if (t instanceof RemoteException) {
1845 t = ((RemoteException) t).unwrapRemoteException();
1846 }
1847
1848
1849
1850
1851 boolean hold = (t instanceof RegionAlreadyInTransitionException ||
1852 t instanceof ServerNotRunningYetException);
1853
1854
1855
1856
1857
1858
1859 boolean retry = !hold && (t instanceof java.net.SocketTimeoutException
1860 && this.serverManager.isServerOnline(plan.getDestination()));
1861
1862
1863 if (hold) {
1864 LOG.warn(assignMsg + ", waiting a little before trying on the same region server " +
1865 "try=" + i + " of " + this.maximumAttempts, t);
1866
1867 if (maxRegionServerStartupWaitTime < 0) {
1868 maxRegionServerStartupWaitTime = EnvironmentEdgeManager.currentTimeMillis() +
1869 this.server.getConfiguration().
1870 getLong("hbase.regionserver.rpc.startup.waittime", 60000);
1871 }
1872 try {
1873 long now = EnvironmentEdgeManager.currentTimeMillis();
1874 if (now < maxRegionServerStartupWaitTime) {
1875 LOG.debug("Server is not yet up; waiting up to " +
1876 (maxRegionServerStartupWaitTime - now) + "ms", t);
1877 Thread.sleep(100);
1878 i--;
1879 needNewPlan = false;
1880 } else {
1881 LOG.debug("Server is not up for a while; try a new one", t);
1882 needNewPlan = true;
1883 }
1884 } catch (InterruptedException ie) {
1885 LOG.warn("Failed to assign "
1886 + region.getRegionNameAsString() + " since interrupted", ie);
1887 Thread.currentThread().interrupt();
1888 if (!tomActivated) {
1889 regionStates.updateRegionState(region, RegionState.State.FAILED_OPEN);
1890 }
1891 return;
1892 }
1893 } else if (retry) {
1894 needNewPlan = false;
1895 LOG.warn(assignMsg + ", trying to assign to the same region server " +
1896 "try=" + i + " of " + this.maximumAttempts, t);
1897 } else {
1898 needNewPlan = true;
1899 LOG.warn(assignMsg + ", trying to assign elsewhere instead;" +
1900 " try=" + i + " of " + this.maximumAttempts, t);
1901 }
1902 }
1903
1904 if (i == this.maximumAttempts) {
1905
1906
1907 continue;
1908 }
1909
1910
1911
1912
1913 if (needNewPlan) {
1914
1915
1916
1917
1918 RegionPlan newPlan = getRegionPlan(region, true);
1919
1920 if (newPlan == null) {
1921 if (tomActivated) {
1922 this.timeoutMonitor.setAllRegionServersOffline(true);
1923 } else {
1924 regionStates.updateRegionState(region, RegionState.State.FAILED_OPEN);
1925 }
1926 LOG.warn("Unable to find a viable location to assign region " +
1927 region.getRegionNameAsString());
1928 return;
1929 }
1930
1931 if (plan != newPlan && !plan.getDestination().equals(newPlan.getDestination())) {
1932
1933
1934
1935 currentState = regionStates.updateRegionState(region, RegionState.State.OFFLINE);
1936 versionOfOfflineNode = -1;
1937 plan = newPlan;
1938 }
1939 }
1940 }
1941
1942 if (!tomActivated) {
1943 regionStates.updateRegionState(region, RegionState.State.FAILED_OPEN);
1944 }
1945 }
1946
1947 private void processAlreadyOpenedRegion(HRegionInfo region, ServerName sn) {
1948
1949
1950
1951 LOG.debug("ALREADY_OPENED region " + region.getRegionNameAsString()
1952 + " to " + sn);
1953 String encodedRegionName = region.getEncodedName();
1954 try {
1955 ZKAssign.deleteOfflineNode(watcher, encodedRegionName);
1956 } catch (KeeperException.NoNodeException e) {
1957 if (LOG.isDebugEnabled()) {
1958 LOG.debug("The unassigned node " + encodedRegionName
1959 + " does not exist.");
1960 }
1961 } catch (KeeperException e) {
1962 server.abort(
1963 "Error deleting OFFLINED node in ZK for transition ZK node ("
1964 + encodedRegionName + ")", e);
1965 }
1966
1967 regionStates.regionOnline(region, sn);
1968 }
1969
1970 private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) {
1971 String tableName = region.getTableNameAsString();
1972 boolean disabled = this.zkTable.isDisabledTable(tableName);
1973 if (disabled || this.zkTable.isDisablingTable(tableName)) {
1974 LOG.info("Table " + tableName + (disabled ? " disabled;" : " disabling;") +
1975 " skipping assign of " + region.getRegionNameAsString());
1976 offlineDisabledRegion(region);
1977 return true;
1978 }
1979 return false;
1980 }
1981
1982
1983
1984
1985
1986
1987
1988
1989 private int setOfflineInZooKeeper(final RegionState state, final ServerName destination) {
1990 if (!state.isClosed() && !state.isOffline()) {
1991 String msg = "Unexpected state : " + state + " .. Cannot transit it to OFFLINE.";
1992 this.server.abort(msg, new IllegalStateException(msg));
1993 return -1;
1994 }
1995 regionStates.updateRegionState(state.getRegion(),
1996 RegionState.State.OFFLINE);
1997 int versionOfOfflineNode;
1998 try {
1999
2000 versionOfOfflineNode = ZKAssign.createOrForceNodeOffline(watcher,
2001 state.getRegion(), destination);
2002 if (versionOfOfflineNode == -1) {
2003 LOG.warn("Attempted to create/force node into OFFLINE state before "
2004 + "completing assignment but failed to do so for " + state);
2005 return -1;
2006 }
2007 } catch (KeeperException e) {
2008 server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
2009 return -1;
2010 }
2011 return versionOfOfflineNode;
2012 }
2013
2014
2015
2016
2017
2018
2019 private RegionPlan getRegionPlan(final HRegionInfo region,
2020 final boolean forceNewPlan) {
2021 return getRegionPlan(region, null, forceNewPlan);
2022 }
2023
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033 private RegionPlan getRegionPlan(final HRegionInfo region,
2034 final ServerName serverToExclude, final boolean forceNewPlan) {
2035
2036 final String encodedName = region.getEncodedName();
2037 final List<ServerName> destServers =
2038 serverManager.createDestinationServersList(serverToExclude);
2039
2040 if (destServers.isEmpty()){
2041 LOG.warn("Can't move the region " + encodedName +
2042 ", there is no destination server available.");
2043 return null;
2044 }
2045
2046 RegionPlan randomPlan = null;
2047 boolean newPlan = false;
2048 RegionPlan existingPlan;
2049
2050 synchronized (this.regionPlans) {
2051 existingPlan = this.regionPlans.get(encodedName);
2052
2053 if (existingPlan != null && existingPlan.getDestination() != null) {
2054 LOG.debug("Found an existing plan for " + region.getRegionNameAsString()
2055 + " destination server is " + existingPlan.getDestination() +
2056 " accepted as a dest server = " + destServers.contains(existingPlan.getDestination()));
2057 }
2058
2059 if (forceNewPlan
2060 || existingPlan == null
2061 || existingPlan.getDestination() == null
2062 || !destServers.contains(existingPlan.getDestination())) {
2063 newPlan = true;
2064 randomPlan = new RegionPlan(region, null,
2065 balancer.randomAssignment(region, destServers));
2066 if (!region.isMetaTable() && shouldAssignRegionsWithFavoredNodes) {
2067 List<HRegionInfo> regions = new ArrayList<HRegionInfo>(1);
2068 regions.add(region);
2069 try {
2070 processFavoredNodes(regions);
2071 } catch (IOException ie) {
2072 LOG.warn("Ignoring exception in processFavoredNodes " + ie);
2073 }
2074 }
2075 this.regionPlans.put(encodedName, randomPlan);
2076 }
2077 }
2078
2079 if (newPlan) {
2080 if (randomPlan.getDestination() == null) {
2081 LOG.warn("Can't find a destination for region" + encodedName);
2082 return null;
2083 }
2084 LOG.debug("No previous transition plan was found (or we are ignoring " +
2085 "an existing plan) for " + region.getRegionNameAsString() +
2086 " so generated a random one; " + randomPlan + "; " +
2087 serverManager.countOfRegionServers() +
2088 " (online=" + serverManager.getOnlineServers().size() +
2089 ", available=" + destServers.size() + ") available servers" +
2090 ", forceNewPlan=" + forceNewPlan);
2091 return randomPlan;
2092 }
2093 LOG.debug("Using pre-existing plan for region " +
2094 region.getRegionNameAsString() + "; plan=" + existingPlan);
2095 return existingPlan;
2096 }
2097
2098
2099
2100
2101
2102
2103
2104
2105
2106 public void unassign(List<HRegionInfo> regions) {
2107 int waitTime = this.server.getConfiguration().getInt(
2108 "hbase.bulk.waitbetween.reopen", 0);
2109 for (HRegionInfo region : regions) {
2110 if (regionStates.isRegionInTransition(region))
2111 continue;
2112 unassign(region, false);
2113 while (regionStates.isRegionInTransition(region)) {
2114 try {
2115 Thread.sleep(10);
2116 } catch (InterruptedException e) {
2117
2118 }
2119 }
2120 if (waitTime > 0)
2121 try {
2122 Thread.sleep(waitTime);
2123 } catch (InterruptedException e) {
2124
2125 }
2126 }
2127 }
2128
2129
2130
2131
2132
2133
2134
2135
2136
2137
2138
2139
2140
2141
2142 public void unassign(HRegionInfo region) {
2143 unassign(region, false);
2144 }
2145
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159
2160
2161 public void unassign(HRegionInfo region, boolean force, ServerName dest) {
2162
2163 LOG.debug("Starting unassignment of region " +
2164 region.getRegionNameAsString() + " (offlining)");
2165
2166 String encodedName = region.getEncodedName();
2167
2168 int versionOfClosingNode = -1;
2169
2170
2171 ReentrantLock lock = locker.acquireLock(encodedName);
2172 RegionState state = regionStates.getRegionTransitionState(encodedName);
2173 try {
2174 if (state == null) {
2175
2176 try {
2177 state = regionStates.getRegionState(region);
2178 if (state == null || state.getServerName() == null) {
2179
2180
2181 regionOffline(region);
2182 return;
2183 }
2184 versionOfClosingNode = ZKAssign.createNodeClosing(
2185 watcher, region, state.getServerName());
2186 if (versionOfClosingNode == -1) {
2187 LOG.debug("Attempting to unassign region " +
2188 region.getRegionNameAsString() + " but ZK closing node "
2189 + "can't be created.");
2190 return;
2191 }
2192 } catch (KeeperException e) {
2193 if (e instanceof NodeExistsException) {
2194
2195
2196
2197
2198 NodeExistsException nee = (NodeExistsException)e;
2199 String path = nee.getPath();
2200 try {
2201 if (isSplitOrSplittingOrMergeOrMerging(path)) {
2202 LOG.debug(path + " is SPLIT or SPLITTING or MERGE or MERGING; " +
2203 "skipping unassign because region no longer exists -- its split or merge");
2204 return;
2205 }
2206 } catch (KeeperException.NoNodeException ke) {
2207 LOG.warn("Failed getData on SPLITTING/SPLIT at " + path +
2208 "; presuming split and that the region to unassign, " +
2209 encodedName + ", no longer exists -- confirm", ke);
2210 return;
2211 } catch (KeeperException ke) {
2212 LOG.error("Unexpected zk state", ke);
2213 } catch (DeserializationException de) {
2214 LOG.error("Failed parse", de);
2215 }
2216 }
2217
2218 server.abort("Unexpected ZK exception creating node CLOSING", e);
2219 return;
2220 }
2221 state = regionStates.updateRegionState(region, RegionState.State.PENDING_CLOSE);
2222 } else if (state.isFailedOpen()) {
2223
2224 regionOffline(region);
2225 return;
2226 } else if (force && (state.isPendingClose()
2227 || state.isClosing() || state.isFailedClose())) {
2228 LOG.debug("Attempting to unassign region " + region.getRegionNameAsString() +
2229 " which is already " + state.getState() +
2230 " but forcing to send a CLOSE RPC again ");
2231 if (state.isFailedClose()) {
2232 state = regionStates.updateRegionState(region, RegionState.State.PENDING_CLOSE);
2233 }
2234 state.updateTimestampToNow();
2235 } else {
2236 LOG.debug("Attempting to unassign region " +
2237 region.getRegionNameAsString() + " but it is " +
2238 "already in transition (" + state.getState() + ", force=" + force + ")");
2239 return;
2240 }
2241
2242 unassign(region, state, versionOfClosingNode, dest, true, null);
2243 } finally {
2244 lock.unlock();
2245 }
2246 }
2247
2248 public void unassign(HRegionInfo region, boolean force){
2249 unassign(region, force, null);
2250 }
2251
2252
2253
2254
2255 public void deleteClosingOrClosedNode(HRegionInfo region) {
2256 String encodedName = region.getEncodedName();
2257 try {
2258 if (!ZKAssign.deleteNode(watcher, encodedName,
2259 EventType.M_ZK_REGION_CLOSING)) {
2260 boolean deleteNode = ZKAssign.deleteNode(watcher,
2261 encodedName, EventType.RS_ZK_REGION_CLOSED);
2262
2263
2264 if (!deleteNode) {
2265 LOG.error("The deletion of the CLOSED node for the region "
2266 + encodedName + " returned " + deleteNode);
2267 }
2268 }
2269 } catch (NoNodeException e) {
2270 LOG.debug("CLOSING/CLOSED node for the region " + encodedName
2271 + " already deleted");
2272 } catch (KeeperException ke) {
2273 server.abort(
2274 "Unexpected ZK exception deleting node CLOSING/CLOSED for the region "
2275 + encodedName, ke);
2276 }
2277 }
2278
2279
2280
2281
2282
2283
2284
2285 private boolean isSplitOrSplittingOrMergeOrMerging(final String path)
2286 throws KeeperException, DeserializationException {
2287 boolean result = false;
2288
2289
2290 byte [] data = ZKAssign.getData(watcher, path);
2291 if (data == null) return false;
2292 RegionTransition rt = RegionTransition.parseFrom(data);
2293 switch (rt.getEventType()) {
2294 case RS_ZK_REGION_SPLIT:
2295 case RS_ZK_REGION_SPLITTING:
2296 case RS_ZK_REGION_MERGE:
2297 case RS_ZK_REGION_MERGING:
2298 result = true;
2299 break;
2300 default:
2301 break;
2302 }
2303 return result;
2304 }
2305
2306
2307
2308
2309
2310
2311
2312
2313
2314 public boolean waitForAssignment(HRegionInfo regionInfo)
2315 throws InterruptedException {
2316 while (!regionStates.isRegionAssigned(regionInfo)) {
2317 if (regionStates.isRegionFailedToOpen(regionInfo)
2318 || this.server.isStopped()) {
2319 return false;
2320 }
2321
2322
2323
2324
2325 regionStates.waitForUpdate(100);
2326 }
2327 return true;
2328 }
2329
2330
2331
2332
2333
2334
2335
2336
2337
2338
2339
2340 public void assignMeta() throws KeeperException {
2341 MetaRegionTracker.deleteMetaLocation(this.watcher);
2342 assign(HRegionInfo.FIRST_META_REGIONINFO, true);
2343 }
2344
2345
2346
2347
2348
2349
2350
2351
2352
2353 public void assign(Map<HRegionInfo, ServerName> regions)
2354 throws IOException, InterruptedException {
2355 if (regions == null || regions.isEmpty()) {
2356 return;
2357 }
2358 List<ServerName> servers = serverManager.createDestinationServersList();
2359 if (servers == null || servers.isEmpty()) {
2360 throw new IOException("Found no destination server to assign region(s)");
2361 }
2362
2363
2364 Map<ServerName, List<HRegionInfo>> bulkPlan =
2365 balancer.retainAssignment(regions, servers);
2366
2367 assign(regions.size(), servers.size(),
2368 "retainAssignment=true", bulkPlan);
2369 }
2370
2371
2372
2373
2374
2375
2376
2377
2378
2379 public void assign(List<HRegionInfo> regions)
2380 throws IOException, InterruptedException {
2381 if (regions == null || regions.isEmpty()) {
2382 return;
2383 }
2384
2385 List<ServerName> servers = serverManager.createDestinationServersList();
2386 if (servers == null || servers.isEmpty()) {
2387 throw new IOException("Found no destination server to assign region(s)");
2388 }
2389
2390
2391 Map<ServerName, List<HRegionInfo>> bulkPlan
2392 = balancer.roundRobinAssignment(regions, servers);
2393 processFavoredNodes(regions);
2394
2395 assign(regions.size(), servers.size(),
2396 "round-robin=true", bulkPlan);
2397 }
2398
2399 private void assign(int regions, int totalServers,
2400 String message, Map<ServerName, List<HRegionInfo>> bulkPlan)
2401 throws InterruptedException, IOException {
2402
2403 int servers = bulkPlan.size();
2404 if (servers == 1 || (regions < bulkAssignThresholdRegions
2405 && servers < bulkAssignThresholdServers)) {
2406
2407
2408
2409 LOG.info("Not use bulk assigning since we are assigning only "
2410 + regions + " region(s) to " + servers + " server(s)");
2411
2412 for (Map.Entry<ServerName, List<HRegionInfo>> plan: bulkPlan.entrySet()) {
2413 assign(plan.getKey(), plan.getValue());
2414 }
2415 } else {
2416 LOG.info("Bulk assigning " + regions + " region(s) across "
2417 + totalServers + " server(s), " + message);
2418
2419
2420 BulkAssigner ba = new GeneralBulkAssigner(
2421 this.server, bulkPlan, this, bulkAssignWaitTillAllAssigned);
2422 ba.bulkAssign();
2423 LOG.info("Bulk assigning done");
2424 }
2425 }
2426
2427
2428
2429
2430
2431
2432
2433
2434
2435
2436
2437 private void assignAllUserRegions()
2438 throws IOException, InterruptedException, KeeperException {
2439
2440 ZKAssign.deleteAllNodes(watcher);
2441 ZKUtil.listChildrenAndWatchForNewChildren(this.watcher,
2442 this.watcher.assignmentZNode);
2443 failoverCleanupDone();
2444
2445
2446
2447
2448 Set<String> disabledOrDisablingOrEnabling = ZKTable.getDisabledOrDisablingTables(watcher);
2449 disabledOrDisablingOrEnabling.addAll(ZKTable.getEnablingTables(watcher));
2450
2451 Map<HRegionInfo, ServerName> allRegions = null;
2452 if (this.shouldAssignRegionsWithFavoredNodes) {
2453 allRegions = FavoredNodeAssignmentHelper.fullScan(
2454 catalogTracker, disabledOrDisablingOrEnabling, true, (FavoredNodeLoadBalancer)balancer);
2455 } else {
2456 allRegions = MetaReader.fullScan(
2457 catalogTracker, disabledOrDisablingOrEnabling, true);
2458 }
2459 if (allRegions == null || allRegions.isEmpty()) return;
2460
2461
2462 boolean retainAssignment = server.getConfiguration().
2463 getBoolean("hbase.master.startup.retainassign", true);
2464
2465 if (retainAssignment) {
2466 assign(allRegions);
2467 } else {
2468 List<HRegionInfo> regions = new ArrayList<HRegionInfo>(allRegions.keySet());
2469 assign(regions);
2470 }
2471
2472 for (HRegionInfo hri : allRegions.keySet()) {
2473 String tableName = hri.getTableNameAsString();
2474 if (!zkTable.isEnabledTable(tableName)) {
2475 setEnabledTable(tableName);
2476 }
2477 }
2478 }
2479
2480
2481
2482
2483
2484
2485
2486 boolean waitUntilNoRegionsInTransition(final long timeout)
2487 throws InterruptedException {
2488
2489
2490
2491
2492
2493
2494 final long endTime = System.currentTimeMillis() + timeout;
2495
2496 while (!this.server.isStopped() && regionStates.isRegionsInTransition()
2497 && endTime > System.currentTimeMillis()) {
2498 regionStates.waitForUpdate(100);
2499 }
2500
2501 return !regionStates.isRegionsInTransition();
2502 }
2503
2504
2505
2506
2507
2508
2509
2510
2511
2512
2513 Map<ServerName, List<HRegionInfo>> rebuildUserRegions() throws IOException, KeeperException {
2514 Set<String> enablingTables = ZKTable.getEnablingTables(watcher);
2515 Set<String> disabledOrEnablingTables = ZKTable.getDisabledTables(watcher);
2516 disabledOrEnablingTables.addAll(enablingTables);
2517 Set<String> disabledOrDisablingOrEnabling = ZKTable.getDisablingTables(watcher);
2518 disabledOrDisablingOrEnabling.addAll(disabledOrEnablingTables);
2519
2520
2521 List<Result> results = MetaReader.fullScan(this.catalogTracker);
2522
2523 Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
2524
2525 Map<ServerName, List<HRegionInfo>> offlineServers =
2526 new TreeMap<ServerName, List<HRegionInfo>>();
2527
2528 for (Result result : results) {
2529 Pair<HRegionInfo, ServerName> region = HRegionInfo.getHRegionInfoAndServerName(result);
2530 if (region == null) continue;
2531 HRegionInfo regionInfo = region.getFirst();
2532 ServerName regionLocation = region.getSecond();
2533 if (regionInfo == null) continue;
2534 regionStates.createRegionState(regionInfo);
2535 String tableName = regionInfo.getTableNameAsString();
2536 if (regionLocation == null) {
2537
2538
2539
2540
2541
2542
2543
2544
2545
2546
2547 if (!enablingTables.contains(tableName)) {
2548 LOG.warn("Region " + regionInfo.getEncodedName() +
2549 " has null regionLocation." + " But its table " + tableName +
2550 " isn't in ENABLING state.");
2551 }
2552 } else if (!onlineServers.contains(regionLocation)) {
2553
2554 List<HRegionInfo> offlineRegions = offlineServers.get(regionLocation);
2555 if (offlineRegions == null) {
2556 offlineRegions = new ArrayList<HRegionInfo>(1);
2557 offlineServers.put(regionLocation, offlineRegions);
2558 }
2559 offlineRegions.add(regionInfo);
2560
2561
2562 if (!disabledOrDisablingOrEnabling.contains(tableName)
2563 && !getZKTable().isEnabledTable(tableName)) {
2564 setEnabledTable(tableName);
2565 }
2566 } else {
2567
2568 if (regionInfo.isOffline() && regionInfo.isSplit()) {
2569 String node = ZKAssign.getNodeName(this.watcher, regionInfo
2570 .getEncodedName());
2571 Stat stat = new Stat();
2572 byte[] data = ZKUtil.getDataNoWatch(this.watcher, node, stat);
2573
2574 if (data == null) {
2575 LOG.debug("Region " + regionInfo.getRegionNameAsString()
2576 + " split is completed. Hence need not add to regions list");
2577 continue;
2578 }
2579 }
2580
2581
2582 if (!disabledOrEnablingTables.contains(tableName)) {
2583 regionStates.regionOnline(regionInfo, regionLocation);
2584 }
2585
2586
2587 if (!disabledOrDisablingOrEnabling.contains(tableName)
2588 && !getZKTable().isEnabledTable(tableName)) {
2589 setEnabledTable(tableName);
2590 }
2591 }
2592 }
2593 return offlineServers;
2594 }
2595
2596
2597
2598
2599
2600
2601
2602
2603
2604 private void recoverTableInDisablingState()
2605 throws KeeperException, TableNotFoundException, IOException {
2606 Set<String> disablingTables = ZKTable.getDisablingTables(watcher);
2607 if (disablingTables.size() != 0) {
2608 for (String tableName : disablingTables) {
2609
2610 LOG.info("The table " + tableName
2611 + " is in DISABLING state. Hence recovering by moving the table"
2612 + " to DISABLED state.");
2613 new DisableTableHandler(this.server, tableName.getBytes(), catalogTracker,
2614 this, tableLockManager, true).prepare().process();
2615 }
2616 }
2617 }
2618
2619
2620
2621
2622
2623
2624
2625
2626
2627 private void recoverTableInEnablingState()
2628 throws KeeperException, TableNotFoundException, IOException {
2629 Set<String> enablingTables = ZKTable.getEnablingTables(watcher);
2630 if (enablingTables.size() != 0) {
2631 for (String tableName : enablingTables) {
2632
2633 LOG.info("The table " + tableName
2634 + " is in ENABLING state. Hence recovering by moving the table"
2635 + " to ENABLED state.");
2636
2637
2638 new EnableTableHandler(this.server, tableName.getBytes(),
2639 catalogTracker, this, tableLockManager, true).prepare().process();
2640 }
2641 }
2642 }
2643
2644
2645
2646
2647
2648
2649
2650
2651
2652
2653
2654
2655
2656
2657
2658
2659 private void processDeadServersAndRecoverLostRegions(
2660 Map<ServerName, List<HRegionInfo>> deadServers)
2661 throws IOException, KeeperException {
2662 if (deadServers != null) {
2663 for (Map.Entry<ServerName, List<HRegionInfo>> server: deadServers.entrySet()) {
2664 ServerName serverName = server.getKey();
2665 if (!serverManager.isServerDead(serverName)) {
2666 serverManager.expireServer(serverName);
2667 }
2668 }
2669 }
2670 List<String> nodes = ZKUtil.listChildrenAndWatchForNewChildren(
2671 this.watcher, this.watcher.assignmentZNode);
2672 if (!nodes.isEmpty()) {
2673 for (String encodedRegionName : nodes) {
2674 processRegionInTransition(encodedRegionName, null);
2675 }
2676 }
2677
2678
2679
2680
2681
2682 failoverCleanupDone();
2683 }
2684
2685
2686
2687
2688
2689
2690
2691
2692 public void updateRegionsInTransitionMetrics() {
2693 long currentTime = System.currentTimeMillis();
2694 int totalRITs = 0;
2695 int totalRITsOverThreshold = 0;
2696 long oldestRITTime = 0;
2697 int ritThreshold = this.server.getConfiguration().
2698 getInt(HConstants.METRICS_RIT_STUCK_WARNING_THRESHOLD, 60000);
2699 for (RegionState state: regionStates.getRegionsInTransition().values()) {
2700 totalRITs++;
2701 long ritTime = currentTime - state.getStamp();
2702 if (ritTime > ritThreshold) {
2703 totalRITsOverThreshold++;
2704 }
2705 if (oldestRITTime < ritTime) {
2706 oldestRITTime = ritTime;
2707 }
2708 }
2709 if (this.metricsMaster != null) {
2710 this.metricsMaster.updateRITOldestAge(oldestRITTime);
2711 this.metricsMaster.updateRITCount(totalRITs);
2712 this.metricsMaster.updateRITCountOverThreshold(totalRITsOverThreshold);
2713 }
2714 }
2715
2716
2717
2718
2719 void clearRegionPlan(final HRegionInfo region) {
2720 synchronized (this.regionPlans) {
2721 this.regionPlans.remove(region.getEncodedName());
2722 }
2723 }
2724
2725
2726
2727
2728
2729
2730 public void waitOnRegionToClearRegionsInTransition(final HRegionInfo hri)
2731 throws IOException, InterruptedException {
2732 waitOnRegionToClearRegionsInTransition(hri, -1L);
2733 }
2734
2735
2736
2737
2738
2739
2740
2741
2742
2743 public boolean waitOnRegionToClearRegionsInTransition(final HRegionInfo hri, long timeOut)
2744 throws IOException, InterruptedException {
2745 if (!regionStates.isRegionInTransition(hri)) return true;
2746 RegionState rs = null;
2747 long end = (timeOut <= 0) ? Long.MAX_VALUE : EnvironmentEdgeManager.currentTimeMillis()
2748 + timeOut;
2749
2750
2751 LOG.info("Waiting on " + rs + " to clear regions-in-transition");
2752 while (!this.server.isStopped() && regionStates.isRegionInTransition(hri)) {
2753 regionStates.waitForUpdate(100);
2754 if (EnvironmentEdgeManager.currentTimeMillis() > end) {
2755 LOG.info("Timed out on waiting for region:" + hri.getEncodedName() + " to be assigned.");
2756 return false;
2757 }
2758 }
2759 if (this.server.isStopped()) {
2760 LOG.info("Giving up wait on regions in transition because stoppable.isStopped is set");
2761 return false;
2762 }
2763 return true;
2764 }
2765
2766
2767
2768
2769
2770 public class TimerUpdater extends Chore {
2771
2772 public TimerUpdater(final int period, final Stoppable stopper) {
2773 super("AssignmentTimerUpdater", period, stopper);
2774 }
2775
2776 @Override
2777 protected void chore() {
2778 Preconditions.checkState(tomActivated);
2779 ServerName serverToUpdateTimer = null;
2780 while (!serversInUpdatingTimer.isEmpty() && !stopper.isStopped()) {
2781 if (serverToUpdateTimer == null) {
2782 serverToUpdateTimer = serversInUpdatingTimer.first();
2783 } else {
2784 serverToUpdateTimer = serversInUpdatingTimer
2785 .higher(serverToUpdateTimer);
2786 }
2787 if (serverToUpdateTimer == null) {
2788 break;
2789 }
2790 updateTimers(serverToUpdateTimer);
2791 serversInUpdatingTimer.remove(serverToUpdateTimer);
2792 }
2793 }
2794 }
2795
2796
2797
2798
2799 public class TimeoutMonitor extends Chore {
2800 private boolean allRegionServersOffline = false;
2801 private ServerManager serverManager;
2802 private final int timeout;
2803
2804
2805
2806
2807
2808
2809
2810
2811
2812
2813 public TimeoutMonitor(final int period, final Stoppable stopper,
2814 ServerManager serverManager,
2815 final int timeout) {
2816 super("AssignmentTimeoutMonitor", period, stopper);
2817 this.timeout = timeout;
2818 this.serverManager = serverManager;
2819 }
2820
2821 private synchronized void setAllRegionServersOffline(
2822 boolean allRegionServersOffline) {
2823 this.allRegionServersOffline = allRegionServersOffline;
2824 }
2825
2826 @Override
2827 protected void chore() {
2828 Preconditions.checkState(tomActivated);
2829 boolean noRSAvailable = this.serverManager.createDestinationServersList().isEmpty();
2830
2831
2832 long now = System.currentTimeMillis();
2833
2834
2835 for (String regionName : regionStates.getRegionsInTransition().keySet()) {
2836 RegionState regionState = regionStates.getRegionTransitionState(regionName);
2837 if (regionState == null) continue;
2838
2839 if (regionState.getStamp() + timeout <= now) {
2840
2841 actOnTimeOut(regionState);
2842 } else if (this.allRegionServersOffline && !noRSAvailable) {
2843 RegionPlan existingPlan = regionPlans.get(regionName);
2844 if (existingPlan == null
2845 || !this.serverManager.isServerOnline(existingPlan
2846 .getDestination())) {
2847
2848
2849 actOnTimeOut(regionState);
2850 }
2851 }
2852 }
2853 setAllRegionServersOffline(noRSAvailable);
2854 }
2855
2856 private void actOnTimeOut(RegionState regionState) {
2857 HRegionInfo regionInfo = regionState.getRegion();
2858 LOG.info("Regions in transition timed out: " + regionState);
2859
2860 switch (regionState.getState()) {
2861 case CLOSED:
2862 LOG.info("Region " + regionInfo.getEncodedName()
2863 + " has been CLOSED for too long, waiting on queued "
2864 + "ClosedRegionHandler to run or server shutdown");
2865
2866 regionState.updateTimestampToNow();
2867 break;
2868 case OFFLINE:
2869 LOG.info("Region has been OFFLINE for too long, " + "reassigning "
2870 + regionInfo.getRegionNameAsString() + " to a random server");
2871 invokeAssign(regionInfo);
2872 break;
2873 case PENDING_OPEN:
2874 LOG.info("Region has been PENDING_OPEN for too "
2875 + "long, reassigning region=" + regionInfo.getRegionNameAsString());
2876 invokeAssign(regionInfo);
2877 break;
2878 case OPENING:
2879 processOpeningState(regionInfo);
2880 break;
2881 case OPEN:
2882 LOG.error("Region has been OPEN for too long, " +
2883 "we don't know where region was opened so can't do anything");
2884 regionState.updateTimestampToNow();
2885 break;
2886
2887 case PENDING_CLOSE:
2888 LOG.info("Region has been PENDING_CLOSE for too "
2889 + "long, running forced unassign again on region="
2890 + regionInfo.getRegionNameAsString());
2891 invokeUnassign(regionInfo);
2892 break;
2893 case CLOSING:
2894 LOG.info("Region has been CLOSING for too " +
2895 "long, this should eventually complete or the server will " +
2896 "expire, send RPC again");
2897 invokeUnassign(regionInfo);
2898 break;
2899
2900 case SPLIT:
2901 case SPLITTING:
2902 case FAILED_OPEN:
2903 case FAILED_CLOSE:
2904 break;
2905
2906 default:
2907 throw new IllegalStateException("Received event is not valid.");
2908 }
2909 }
2910 }
2911
2912 private void processOpeningState(HRegionInfo regionInfo) {
2913 LOG.info("Region has been OPENING for too long, reassigning region="
2914 + regionInfo.getRegionNameAsString());
2915
2916 try {
2917 String node = ZKAssign.getNodeName(watcher, regionInfo.getEncodedName());
2918 Stat stat = new Stat();
2919 byte [] data = ZKAssign.getDataNoWatch(watcher, node, stat);
2920 if (data == null) {
2921 LOG.warn("Data is null, node " + node + " no longer exists");
2922 return;
2923 }
2924 RegionTransition rt = RegionTransition.parseFrom(data);
2925 EventType et = rt.getEventType();
2926 if (et == EventType.RS_ZK_REGION_OPENED) {
2927 LOG.debug("Region has transitioned to OPENED, allowing "
2928 + "watched event handlers to process");
2929 return;
2930 } else if (et != EventType.RS_ZK_REGION_OPENING && et != EventType.RS_ZK_REGION_FAILED_OPEN ) {
2931 LOG.warn("While timing out a region, found ZK node in unexpected state: " + et);
2932 return;
2933 }
2934 invokeAssign(regionInfo);
2935 } catch (KeeperException ke) {
2936 LOG.error("Unexpected ZK exception timing out CLOSING region", ke);
2937 } catch (DeserializationException e) {
2938 LOG.error("Unexpected exception parsing CLOSING region", e);
2939 }
2940 }
2941
2942 void invokeAssign(HRegionInfo regionInfo) {
2943 threadPoolExecutorService.submit(new AssignCallable(this, regionInfo));
2944 }
2945
2946 private void invokeUnassign(HRegionInfo regionInfo) {
2947 threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
2948 }
2949
2950 public boolean isCarryingMeta(ServerName serverName) {
2951 return isCarryingRegion(serverName, HRegionInfo.FIRST_META_REGIONINFO);
2952 }
2953
2954
2955
2956
2957
2958
2959
2960
2961
2962
2963
2964 private boolean isCarryingRegion(ServerName serverName, HRegionInfo hri) {
2965 RegionTransition rt = null;
2966 try {
2967 byte [] data = ZKAssign.getData(watcher, hri.getEncodedName());
2968
2969 rt = data == null? null: RegionTransition.parseFrom(data);
2970 } catch (KeeperException e) {
2971 server.abort("Exception reading unassigned node for region=" + hri.getEncodedName(), e);
2972 } catch (DeserializationException e) {
2973 server.abort("Exception parsing unassigned node for region=" + hri.getEncodedName(), e);
2974 }
2975
2976 ServerName addressFromZK = rt != null? rt.getServerName(): null;
2977 if (addressFromZK != null) {
2978
2979 boolean matchZK = addressFromZK.equals(serverName);
2980 LOG.debug("based on ZK, current region=" + hri.getRegionNameAsString() +
2981 " is on server=" + addressFromZK +
2982 " server being checked=: " + serverName);
2983 return matchZK;
2984 }
2985
2986 ServerName addressFromAM = regionStates.getRegionServerOfRegion(hri);
2987 boolean matchAM = (addressFromAM != null &&
2988 addressFromAM.equals(serverName));
2989 LOG.debug("based on AM, current region=" + hri.getRegionNameAsString() +
2990 " is on server=" + (addressFromAM != null ? addressFromAM : "null") +
2991 " server being checked: " + serverName);
2992
2993 return matchAM;
2994 }
2995
2996
2997
2998
2999
3000
3001 public List<HRegionInfo> processServerShutdown(final ServerName sn) {
3002
3003 synchronized (this.regionPlans) {
3004 for (Iterator <Map.Entry<String, RegionPlan>> i =
3005 this.regionPlans.entrySet().iterator(); i.hasNext();) {
3006 Map.Entry<String, RegionPlan> e = i.next();
3007 ServerName otherSn = e.getValue().getDestination();
3008
3009 if (otherSn != null && otherSn.equals(sn)) {
3010
3011 i.remove();
3012 }
3013 }
3014 }
3015 List<HRegionInfo> regions = regionStates.serverOffline(sn);
3016 for (Iterator<HRegionInfo> it = regions.iterator(); it.hasNext(); ) {
3017 HRegionInfo hri = it.next();
3018 String encodedName = hri.getEncodedName();
3019
3020
3021 Lock lock = locker.acquireLock(encodedName);
3022 try {
3023 RegionState regionState =
3024 regionStates.getRegionTransitionState(encodedName);
3025 if (regionState == null
3026 || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
3027 LOG.info("Skip region " + hri
3028 + " since it is not opening on the dead server any more: " + sn);
3029 it.remove();
3030 } else {
3031 try{
3032
3033 ZKAssign.deleteNodeFailSilent(watcher, hri);
3034 } catch (KeeperException ke) {
3035 server.abort("Unexpected ZK exception deleting node " + hri, ke);
3036 }
3037 if (zkTable.isDisablingOrDisabledTable(hri.getTableNameAsString())) {
3038 it.remove();
3039 regionStates.regionOffline(hri);
3040 continue;
3041 }
3042
3043 regionStates.updateRegionState(hri, RegionState.State.CLOSED);
3044 }
3045 } finally {
3046 lock.unlock();
3047 }
3048 }
3049 return regions;
3050 }
3051
3052
3053
3054
3055
3056
3057
3058
3059 public void handleSplitReport(final ServerName sn, final HRegionInfo parent,
3060 final HRegionInfo a, final HRegionInfo b) {
3061 regionOffline(parent);
3062 regionOnline(a, sn);
3063 regionOnline(b, sn);
3064
3065
3066
3067
3068
3069 if (this.zkTable.isDisablingOrDisabledTable(
3070 parent.getTableNameAsString())) {
3071 unassign(a);
3072 unassign(b);
3073 }
3074 }
3075
3076
3077
3078
3079
3080
3081
3082
3083 public void handleRegionsMergeReport(final ServerName sn,
3084 final HRegionInfo merged, final HRegionInfo a, final HRegionInfo b) {
3085 regionOffline(a);
3086 regionOffline(b);
3087 regionOnline(merged, sn);
3088
3089
3090
3091
3092
3093 if (this.zkTable.isDisablingOrDisabledTable(merged.getTableNameAsString())) {
3094 unassign(merged);
3095 }
3096 }
3097
3098
3099
3100
3101 public void balance(final RegionPlan plan) {
3102 synchronized (this.regionPlans) {
3103 this.regionPlans.put(plan.getRegionName(), plan);
3104 }
3105 unassign(plan.getRegionInfo(), false, plan.getDestination());
3106 }
3107
3108 public void stop() {
3109 if (tomActivated){
3110 this.timeoutMonitor.interrupt();
3111 this.timerUpdater.interrupt();
3112 }
3113 }
3114
3115
3116
3117
3118 public void shutdown() {
3119
3120 synchronized (zkEventWorkerWaitingList){
3121 zkEventWorkerWaitingList.clear();
3122 }
3123 threadPoolExecutorService.shutdownNow();
3124 zkEventWorkers.shutdownNow();
3125 }
3126
3127 protected void setEnabledTable(String tableName) {
3128 try {
3129 this.zkTable.setEnabledTable(tableName);
3130 } catch (KeeperException e) {
3131
3132 String errorMsg = "Unable to ensure that the table " + tableName
3133 + " will be" + " enabled because of a ZooKeeper issue";
3134 LOG.error(errorMsg);
3135 this.server.abort(errorMsg, e);
3136 }
3137 }
3138
3139
3140
3141
3142
3143
3144
3145 private boolean asyncSetOfflineInZooKeeper(final RegionState state,
3146 final AsyncCallback.StringCallback cb, final ServerName destination) {
3147 if (!state.isClosed() && !state.isOffline()) {
3148 this.server.abort("Unexpected state trying to OFFLINE; " + state,
3149 new IllegalStateException());
3150 return false;
3151 }
3152 regionStates.updateRegionState(
3153 state.getRegion(), RegionState.State.OFFLINE);
3154 try {
3155 ZKAssign.asyncCreateNodeOffline(watcher, state.getRegion(),
3156 destination, cb, state);
3157 } catch (KeeperException e) {
3158 if (e instanceof NodeExistsException) {
3159 LOG.warn("Node for " + state.getRegion() + " already exists");
3160 } else {
3161 server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
3162 }
3163 return false;
3164 }
3165 return true;
3166 }
3167 }