1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.master;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.Collection;
26 import java.util.Collections;
27 import java.util.HashMap;
28 import java.util.HashSet;
29 import java.util.Iterator;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.NavigableMap;
33 import java.util.Random;
34 import java.util.Set;
35 import java.util.TreeMap;
36 import java.util.concurrent.Callable;
37 import java.util.concurrent.ConcurrentHashMap;
38 import java.util.concurrent.CopyOnWriteArrayList;
39 import java.util.concurrent.ThreadFactory;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicBoolean;
42 import java.util.concurrent.atomic.AtomicInteger;
43 import java.util.concurrent.locks.Lock;
44 import java.util.concurrent.locks.ReentrantLock;
45
46 import org.apache.commons.logging.Log;
47 import org.apache.commons.logging.LogFactory;
48 import org.apache.hadoop.conf.Configuration;
49 import org.apache.hadoop.fs.FileSystem;
50 import org.apache.hadoop.fs.Path;
51 import org.apache.hadoop.hbase.CoordinatedStateException;
52 import org.apache.hadoop.hbase.HBaseIOException;
53 import org.apache.hadoop.hbase.HConstants;
54 import org.apache.hadoop.hbase.HRegionInfo;
55 import org.apache.hadoop.hbase.HRegionLocation;
56 import org.apache.hadoop.hbase.HTableDescriptor;
57 import org.apache.hadoop.hbase.MetaTableAccessor;
58 import org.apache.hadoop.hbase.NotServingRegionException;
59 import org.apache.hadoop.hbase.RegionLocations;
60 import org.apache.hadoop.hbase.RegionTransition;
61 import org.apache.hadoop.hbase.Server;
62 import org.apache.hadoop.hbase.ServerName;
63 import org.apache.hadoop.hbase.TableName;
64 import org.apache.hadoop.hbase.TableNotFoundException;
65 import org.apache.hadoop.hbase.TableStateManager;
66 import org.apache.hadoop.hbase.classification.InterfaceAudience;
67 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
68 import org.apache.hadoop.hbase.client.Result;
69 import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
70 import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
71 import org.apache.hadoop.hbase.coordination.RegionMergeCoordination;
72 import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination.SplitTransactionDetails;
73 import org.apache.hadoop.hbase.coordination.ZkOpenRegionCoordination;
74 import org.apache.hadoop.hbase.coordination.ZkRegionMergeCoordination;
75 import org.apache.hadoop.hbase.exceptions.DeserializationException;
76 import org.apache.hadoop.hbase.executor.EventHandler;
77 import org.apache.hadoop.hbase.executor.EventType;
78 import org.apache.hadoop.hbase.executor.ExecutorService;
79 import org.apache.hadoop.hbase.ipc.FailedServerException;
80 import org.apache.hadoop.hbase.ipc.RpcClient;
81 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
82 import org.apache.hadoop.hbase.master.RegionState.State;
83 import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper;
84 import org.apache.hadoop.hbase.master.balancer.FavoredNodeLoadBalancer;
85 import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler;
86 import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
87 import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
88 import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler;
89 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
90 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
91 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
92 import org.apache.hadoop.hbase.quotas.RegionStateListener;
93 import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
94 import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
95 import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
96 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
97 import org.apache.hadoop.hbase.util.ConfigUtil;
98 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
99 import org.apache.hadoop.hbase.util.FSUtils;
100 import org.apache.hadoop.hbase.util.KeyLocker;
101 import org.apache.hadoop.hbase.util.Pair;
102 import org.apache.hadoop.hbase.util.PairOfSameType;
103 import org.apache.hadoop.hbase.util.Threads;
104 import org.apache.hadoop.hbase.util.Triple;
105 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
106 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
107 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
108 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
109 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
110 import org.apache.hadoop.ipc.RemoteException;
111 import org.apache.hadoop.util.StringUtils;
112 import org.apache.zookeeper.AsyncCallback;
113 import org.apache.zookeeper.KeeperException;
114 import org.apache.zookeeper.KeeperException.NoNodeException;
115 import org.apache.zookeeper.KeeperException.NodeExistsException;
116 import org.apache.zookeeper.data.Stat;
117
118 import com.google.common.annotations.VisibleForTesting;
119 import com.google.common.collect.LinkedHashMultimap;
120
121
122
123
124
125
126
127
128 @InterfaceAudience.Private
129 public class AssignmentManager extends ZooKeeperListener {
130 private static final Log LOG = LogFactory.getLog(AssignmentManager.class);
131
132 public static final ServerName HBCK_CODE_SERVERNAME = ServerName.valueOf(HConstants.HBCK_CODE_NAME,
133 -1, -1L);
134
135 static final String ALREADY_IN_TRANSITION_WAITTIME
136 = "hbase.assignment.already.intransition.waittime";
137 static final int DEFAULT_ALREADY_IN_TRANSITION_WAITTIME = 60000;
138
139 protected final Server server;
140
141 private ServerManager serverManager;
142
143 private boolean shouldAssignRegionsWithFavoredNodes;
144
145 private LoadBalancer balancer;
146
147 private final MetricsAssignmentManager metricsAssignmentManager;
148
149 private final TableLockManager tableLockManager;
150
151 private AtomicInteger numRegionsOpened = new AtomicInteger(0);
152
153 final private KeyLocker<String> locker = new KeyLocker<String>();
154
155 Set<HRegionInfo> replicasToClose = Collections.synchronizedSet(new HashSet<HRegionInfo>());
156
157
158
159
160
161 private final Map <String, HRegionInfo> regionsToReopen;
162
163
164
165
166
167 private final int maximumAttempts;
168
169
170
171
172 private final Map<String, PairOfSameType<HRegionInfo>> mergingRegions
173 = new HashMap<String, PairOfSameType<HRegionInfo>>();
174
175 private final Map<HRegionInfo, PairOfSameType<HRegionInfo>> splitRegions
176 = new HashMap<HRegionInfo, PairOfSameType<HRegionInfo>>();
177
178
179
180
181
182 private final long sleepTimeBeforeRetryingMetaAssignment;
183
184
185
186
187
188 final NavigableMap<String, RegionPlan> regionPlans =
189 new TreeMap<String, RegionPlan>();
190
191 private final TableStateManager tableStateManager;
192
193 private final ExecutorService executorService;
194
195
196 private Map<HRegionInfo, AtomicBoolean> closedRegionHandlerCalled = null;
197
198
199 private Map<HRegionInfo, AtomicBoolean> openedRegionHandlerCalled = null;
200
201
202 private java.util.concurrent.ExecutorService threadPoolExecutorService;
203
204
205 private final java.util.concurrent.ExecutorService zkEventWorkers;
206
207 private List<EventType> ignoreStatesRSOffline = Arrays.asList(
208 EventType.RS_ZK_REGION_FAILED_OPEN, EventType.RS_ZK_REGION_CLOSED);
209
210 private final RegionStates regionStates;
211
212
213
214
215
216 private final int bulkAssignThresholdRegions;
217 private final int bulkAssignThresholdServers;
218 private final int bulkPerRegionOpenTimeGuesstimate;
219
220
221
222
223 private final boolean bulkAssignWaitTillAllAssigned;
224
225
226
227
228
229
230
231
232
233 protected final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
234
235
236
237
238
239
240
241
242 private final ConcurrentHashMap<String, AtomicInteger>
243 failedOpenTracker = new ConcurrentHashMap<String, AtomicInteger>();
244
245
246 private final boolean useZKForAssignment;
247
248
249
250 private final RegionStateStore regionStateStore;
251
252
253
254
255 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
256 public static boolean TEST_SKIP_SPLIT_HANDLING = false;
257
258
259 private List<AssignmentListener> listeners = new CopyOnWriteArrayList<AssignmentListener>();
260
261 private RegionStateListener regionStateListener;
262
263 public enum ServerHostRegion {
264 NOT_HOSTING_REGION, HOSTING_REGION, UNKNOWN,
265 }
266
267
268
269
270
271
272
273
274
275
276
277
278
279 public AssignmentManager(Server server, ServerManager serverManager,
280 final LoadBalancer balancer,
281 final ExecutorService service, MetricsMaster metricsMaster,
282 final TableLockManager tableLockManager) throws KeeperException,
283 IOException, CoordinatedStateException {
284 super(server.getZooKeeper());
285 this.server = server;
286 this.serverManager = serverManager;
287 this.executorService = service;
288 this.regionStateStore = new RegionStateStore(server);
289 this.regionsToReopen = Collections.synchronizedMap
290 (new HashMap<String, HRegionInfo> ());
291 Configuration conf = server.getConfiguration();
292
293 this.shouldAssignRegionsWithFavoredNodes = conf.getClass(
294 HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class).equals(
295 FavoredNodeLoadBalancer.class);
296 try {
297 if (server.getCoordinatedStateManager() != null) {
298 this.tableStateManager = server.getCoordinatedStateManager().getTableStateManager();
299 } else {
300 this.tableStateManager = null;
301 }
302 } catch (InterruptedException e) {
303 throw new InterruptedIOException();
304 }
305
306 this.maximumAttempts = Math.max(1,
307 this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10));
308 this.sleepTimeBeforeRetryingMetaAssignment = this.server.getConfiguration().getLong(
309 "hbase.meta.assignment.retry.sleeptime", 1000l);
310 this.balancer = balancer;
311 int maxThreads = conf.getInt("hbase.assignment.threads.max", 30);
312 this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool(
313 maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("AM."));
314 this.regionStates = new RegionStates(
315 server, tableStateManager, serverManager, regionStateStore);
316
317 this.bulkAssignWaitTillAllAssigned =
318 conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false);
319 this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7);
320 this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3);
321 this.bulkPerRegionOpenTimeGuesstimate =
322 conf.getInt("hbase.bulk.assignment.perregion.open.time", 10000);
323
324 int workers = conf.getInt("hbase.assignment.zkevent.workers", 20);
325 ThreadFactory threadFactory = Threads.newDaemonThreadFactory("AM.ZK.Worker");
326 zkEventWorkers = Threads.getBoundedCachedThreadPool(workers, 60L,
327 TimeUnit.SECONDS, threadFactory);
328 this.tableLockManager = tableLockManager;
329
330 this.metricsAssignmentManager = new MetricsAssignmentManager();
331 useZKForAssignment = ConfigUtil.useZKForAssignment(conf);
332 }
333
334
335
336
337
338 public void registerListener(final AssignmentListener listener) {
339 this.listeners.add(listener);
340 }
341
342
343
344
345
346 public boolean unregisterListener(final AssignmentListener listener) {
347 return this.listeners.remove(listener);
348 }
349
350
351
352
353 public TableStateManager getTableStateManager() {
354
355
356 return this.tableStateManager;
357 }
358
359
360
361
362
363
364
365 public RegionStates getRegionStates() {
366 return regionStates;
367 }
368
369
370
371
372 @VisibleForTesting
373 RegionStateStore getRegionStateStore() {
374 return regionStateStore;
375 }
376
377 public RegionPlan getRegionReopenPlan(HRegionInfo hri) {
378 return new RegionPlan(hri, null, regionStates.getRegionServerOfRegion(hri));
379 }
380
381
382
383
384
385
386 public void addPlan(String encodedName, RegionPlan plan) {
387 synchronized (regionPlans) {
388 regionPlans.put(encodedName, plan);
389 }
390 }
391
392
393
394
395 public void addPlans(Map<String, RegionPlan> plans) {
396 synchronized (regionPlans) {
397 regionPlans.putAll(plans);
398 }
399 }
400
401
402
403
404
405
406
407
408 public void setRegionsToReopen(List <HRegionInfo> regions) {
409 for(HRegionInfo hri : regions) {
410 regionsToReopen.put(hri.getEncodedName(), hri);
411 }
412 }
413
414
415
416
417
418
419
420
421 public Pair<Integer, Integer> getReopenStatus(TableName tableName)
422 throws IOException {
423 List<HRegionInfo> hris;
424 if (TableName.META_TABLE_NAME.equals(tableName)) {
425 hris = new MetaTableLocator().getMetaRegions(server.getZooKeeper());
426 } else {
427 hris = MetaTableAccessor.getTableRegions(server.getZooKeeper(),
428 server.getConnection(), tableName, true);
429 }
430
431 Integer pending = 0;
432 for (HRegionInfo hri : hris) {
433 String name = hri.getEncodedName();
434
435 if (regionsToReopen.containsKey(name)
436 || regionStates.isRegionInTransition(name)) {
437 pending++;
438 }
439 }
440 return new Pair<Integer, Integer>(pending, hris.size());
441 }
442
443
444
445
446
447
448 public boolean isFailoverCleanupDone() {
449 return failoverCleanupDone.get();
450 }
451
452
453
454
455
456 public Lock acquireRegionLock(final String encodedName) {
457 return locker.acquireLock(encodedName);
458 }
459
460
461
462
463
464 void failoverCleanupDone() {
465 failoverCleanupDone.set(true);
466 serverManager.processQueuedDeadServers();
467 }
468
469
470
471
472
473
474
475
476
477 void joinCluster() throws IOException,
478 KeeperException, InterruptedException, CoordinatedStateException {
479 long startTime = System.currentTimeMillis();
480
481
482
483
484
485
486
487
488
489
490
491 Set<ServerName> deadServers = rebuildUserRegions();
492
493
494
495
496 boolean failover = processDeadServersAndRegionsInTransition(deadServers);
497
498 if (!useZKForAssignment) {
499
500 ZKUtil.deleteNodeRecursively(watcher, watcher.assignmentZNode);
501 }
502 recoverTableInDisablingState();
503 recoverTableInEnablingState();
504 LOG.info("Joined the cluster in " + (System.currentTimeMillis()
505 - startTime) + "ms, failover=" + failover);
506 }
507
508
509
510
511
512
513
514
515
516
517
518
519 boolean processDeadServersAndRegionsInTransition(
520 final Set<ServerName> deadServers) throws KeeperException,
521 IOException, InterruptedException, CoordinatedStateException {
522 List<String> nodes = ZKUtil.listChildrenNoWatch(watcher,
523 watcher.assignmentZNode);
524
525 if (useZKForAssignment && nodes == null) {
526 String errorMessage = "Failed to get the children from ZK";
527 server.abort(errorMessage, new IOException(errorMessage));
528 return true;
529 }
530
531 boolean failover = !serverManager.getDeadServers().isEmpty();
532 if (failover) {
533
534 if (LOG.isDebugEnabled()) {
535 LOG.debug("Found dead servers out on cluster " + serverManager.getDeadServers());
536 }
537 } else {
538
539 Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
540 for (Map.Entry<HRegionInfo, ServerName> en:
541 regionStates.getRegionAssignments().entrySet()) {
542 HRegionInfo hri = en.getKey();
543 if (!hri.isMetaTable()
544 && onlineServers.contains(en.getValue())) {
545 LOG.debug("Found " + hri + " out on cluster");
546 failover = true;
547 break;
548 }
549 }
550 if (!failover && nodes != null) {
551
552 for (String encodedName: nodes) {
553 RegionState regionState = regionStates.getRegionState(encodedName);
554 if (regionState != null && !regionState.getRegion().isMetaRegion()) {
555 LOG.debug("Found " + regionState + " in RITs");
556 failover = true;
557 break;
558 }
559 }
560 }
561 }
562 if (!failover && !useZKForAssignment) {
563
564 Map<String, RegionState> regionsInTransition = regionStates.getRegionsInTransition();
565 if (!regionsInTransition.isEmpty()) {
566 Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
567 for (RegionState regionState: regionsInTransition.values()) {
568 ServerName serverName = regionState.getServerName();
569 if (!regionState.getRegion().isMetaRegion()
570 && serverName != null && onlineServers.contains(serverName)) {
571 LOG.debug("Found " + regionState + " in RITs");
572 failover = true;
573 break;
574 }
575 }
576 }
577 }
578 if (!failover) {
579
580
581
582
583 Set<ServerName> queuedDeadServers = serverManager.getRequeuedDeadServers().keySet();
584 if (!queuedDeadServers.isEmpty()) {
585 Configuration conf = server.getConfiguration();
586 Path rootdir = FSUtils.getRootDir(conf);
587 FileSystem fs = rootdir.getFileSystem(conf);
588 for (ServerName serverName: queuedDeadServers) {
589
590
591 Path logDir = new Path(rootdir,
592 DefaultWALProvider.getWALDirectoryName(serverName.toString()));
593 Path splitDir = logDir.suffix(DefaultWALProvider.SPLITTING_EXT);
594 if (fs.exists(logDir) || fs.exists(splitDir)) {
595 LOG.debug("Found queued dead server " + serverName);
596 failover = true;
597 break;
598 }
599 }
600 if (!failover) {
601
602
603 LOG.info("AM figured that it's not a failover and cleaned up "
604 + queuedDeadServers.size() + " queued dead servers");
605 serverManager.removeRequeuedDeadServers();
606 }
607 }
608 }
609
610 Set<TableName> disabledOrDisablingOrEnabling = null;
611 Map<HRegionInfo, ServerName> allRegions = null;
612
613 if (!failover) {
614 disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
615 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING,
616 ZooKeeperProtos.Table.State.ENABLING);
617
618
619 allRegions = regionStates.closeAllUserRegions(
620 disabledOrDisablingOrEnabling);
621 }
622
623
624 regionStateStore.start();
625
626
627 if (failover) {
628 LOG.info("Found regions out on cluster or in RIT; presuming failover");
629
630
631 processDeadServersAndRecoverLostRegions(deadServers);
632 }
633
634 if (!failover && useZKForAssignment) {
635
636 ZKAssign.deleteAllNodes(watcher);
637 ZKUtil.listChildrenAndWatchForNewChildren(this.watcher,
638 this.watcher.assignmentZNode);
639 }
640
641
642
643
644
645 failoverCleanupDone();
646 if (!failover) {
647
648 LOG.info("Clean cluster startup. Assigning user regions");
649 assignAllUserRegions(allRegions);
650 }
651
652
653
654 for (HRegionInfo h : replicasToClose) {
655 unassign(h);
656 }
657 replicasToClose.clear();
658 return failover;
659 }
660
661
662
663
664
665
666
667
668
669
670
671
672 boolean processRegionInTransitionAndBlockUntilAssigned(final HRegionInfo hri)
673 throws InterruptedException, KeeperException, IOException {
674 String encodedRegionName = hri.getEncodedName();
675 if (!processRegionInTransition(encodedRegionName, hri)) {
676 return false;
677 }
678 LOG.debug("Waiting on " + HRegionInfo.prettyPrint(encodedRegionName));
679 while (!this.server.isStopped() &&
680 this.regionStates.isRegionInTransition(encodedRegionName)) {
681 RegionState state = this.regionStates.getRegionTransitionState(encodedRegionName);
682 if (state == null || !serverManager.isServerOnline(state.getServerName())) {
683
684
685
686 break;
687 }
688 this.regionStates.waitForUpdate(100);
689 }
690 return true;
691 }
692
693
694
695
696
697
698
699
700
701
702 boolean processRegionInTransition(final String encodedRegionName,
703 final HRegionInfo regionInfo) throws KeeperException, IOException {
704
705
706
707
708 Lock lock = locker.acquireLock(encodedRegionName);
709 try {
710 Stat stat = new Stat();
711 byte [] data = ZKAssign.getDataAndWatch(watcher, encodedRegionName, stat);
712 if (data == null) return false;
713 RegionTransition rt;
714 try {
715 rt = RegionTransition.parseFrom(data);
716 } catch (DeserializationException e) {
717 LOG.warn("Failed parse znode data", e);
718 return false;
719 }
720 HRegionInfo hri = regionInfo;
721 if (hri == null) {
722
723
724
725
726
727 hri = regionStates.getRegionInfo(rt.getRegionName());
728 EventType et = rt.getEventType();
729 if (hri == null && et != EventType.RS_ZK_REGION_MERGING
730 && et != EventType.RS_ZK_REQUEST_REGION_MERGE) {
731 LOG.warn("Couldn't find the region in recovering " + rt);
732 return false;
733 }
734 }
735
736
737
738 BaseCoordinatedStateManager cp =
739 (BaseCoordinatedStateManager) this.server.getCoordinatedStateManager();
740 OpenRegionCoordination openRegionCoordination = cp.getOpenRegionCoordination();
741
742 ZkOpenRegionCoordination.ZkOpenRegionDetails zkOrd =
743 new ZkOpenRegionCoordination.ZkOpenRegionDetails();
744 zkOrd.setVersion(stat.getVersion());
745 zkOrd.setServerName(cp.getServer().getServerName());
746
747 return processRegionsInTransition(
748 rt, hri, openRegionCoordination, zkOrd);
749 } finally {
750 lock.unlock();
751 }
752 }
753
754
755
756
757
758
759
760
761
762 boolean processRegionsInTransition(
763 final RegionTransition rt, final HRegionInfo regionInfo,
764 OpenRegionCoordination coordination,
765 final OpenRegionCoordination.OpenRegionDetails ord) throws KeeperException {
766 EventType et = rt.getEventType();
767
768 final ServerName sn = rt.getServerName();
769 final byte[] regionName = rt.getRegionName();
770 final String encodedName = HRegionInfo.encodeRegionName(regionName);
771 final String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
772 LOG.info("Processing " + prettyPrintedRegionName + " in state: " + et);
773
774 if (regionStates.isRegionInTransition(encodedName)
775 && (regionInfo.isMetaRegion() || !useZKForAssignment)) {
776 LOG.info("Processed region " + prettyPrintedRegionName + " in state: "
777 + et + ", does nothing since the region is already in transition "
778 + regionStates.getRegionTransitionState(encodedName));
779
780 return true;
781 }
782 if (!serverManager.isServerOnline(sn)) {
783
784
785
786 LOG.debug("RIT " + encodedName + " in state=" + rt.getEventType() +
787 " was on deadserver; forcing offline");
788 if (regionStates.isRegionOnline(regionInfo)) {
789
790
791
792 regionStates.regionOffline(regionInfo);
793 sendRegionClosedNotification(regionInfo);
794 }
795
796 regionStates.updateRegionState(regionInfo, State.OFFLINE, sn);
797
798 if (regionInfo.isMetaRegion()) {
799
800
801 MetaTableLocator.setMetaLocation(watcher, sn, State.OPEN);
802 } else {
803
804
805 regionStates.setLastRegionServerOfRegion(sn, encodedName);
806
807 if (!serverManager.isServerDead(sn)) {
808 serverManager.expireServer(sn);
809 }
810 }
811 return false;
812 }
813 switch (et) {
814 case M_ZK_REGION_CLOSING:
815
816
817 final RegionState rsClosing = regionStates.updateRegionState(rt, State.CLOSING);
818 this.executorService.submit(
819 new EventHandler(server, EventType.M_MASTER_RECOVERY) {
820 @Override
821 public void process() throws IOException {
822 ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
823 try {
824 final int expectedVersion = ((ZkOpenRegionCoordination.ZkOpenRegionDetails) ord)
825 .getVersion();
826 unassign(regionInfo, rsClosing, expectedVersion, null, useZKForAssignment, null);
827 if (regionStates.isRegionOffline(regionInfo)) {
828 assign(regionInfo, true);
829 }
830 } finally {
831 lock.unlock();
832 }
833 }
834 });
835 break;
836
837 case RS_ZK_REGION_CLOSED:
838 case RS_ZK_REGION_FAILED_OPEN:
839
840 regionStates.setLastRegionServerOfRegion(sn, encodedName);
841 regionStates.updateRegionState(regionInfo, State.CLOSED, sn);
842 if (!replicasToClose.contains(regionInfo)) {
843 invokeAssign(regionInfo);
844 } else {
845 offlineDisabledRegion(regionInfo);
846 }
847 break;
848
849 case M_ZK_REGION_OFFLINE:
850
851 regionStates.updateRegionState(rt, State.PENDING_OPEN);
852 final RegionState rsOffline = regionStates.getRegionState(regionInfo);
853 this.executorService.submit(
854 new EventHandler(server, EventType.M_MASTER_RECOVERY) {
855 @Override
856 public void process() throws IOException {
857 ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
858 try {
859 RegionPlan plan = new RegionPlan(regionInfo, null, sn);
860 addPlan(encodedName, plan);
861 assign(rsOffline, false, false);
862 } finally {
863 lock.unlock();
864 }
865 }
866 });
867 break;
868
869 case RS_ZK_REGION_OPENING:
870 regionStates.updateRegionState(rt, State.OPENING);
871 break;
872
873 case RS_ZK_REGION_OPENED:
874
875
876
877 regionStates.updateRegionState(rt, State.OPEN);
878 new OpenedRegionHandler(server, this, regionInfo, coordination, ord).process();
879 break;
880 case RS_ZK_REQUEST_REGION_SPLIT:
881 case RS_ZK_REGION_SPLITTING:
882 case RS_ZK_REGION_SPLIT:
883
884
885
886 regionStates.regionOnline(regionInfo, sn);
887 regionStates.updateRegionState(rt, State.SPLITTING);
888 if (!handleRegionSplitting(
889 rt, encodedName, prettyPrintedRegionName, sn)) {
890 deleteSplittingNode(encodedName, sn);
891 }
892 break;
893 case RS_ZK_REQUEST_REGION_MERGE:
894 case RS_ZK_REGION_MERGING:
895 case RS_ZK_REGION_MERGED:
896 if (!handleRegionMerging(
897 rt, encodedName, prettyPrintedRegionName, sn)) {
898 deleteMergingNode(encodedName, sn);
899 }
900 break;
901 default:
902 throw new IllegalStateException("Received region in state:" + et + " is not valid.");
903 }
904 LOG.info("Processed region " + prettyPrintedRegionName + " in state "
905 + et + ", on " + (serverManager.isServerOnline(sn) ? "" : "dead ")
906 + "server: " + sn);
907 return true;
908 }
909
910
911
912
913
914 public void removeClosedRegion(HRegionInfo hri) {
915 if (regionsToReopen.remove(hri.getEncodedName()) != null) {
916 LOG.debug("Removed region from reopening regions because it was closed");
917 }
918 }
919
920
921
922
923
924
925
926
927
928
929
930
931 void handleRegion(final RegionTransition rt, OpenRegionCoordination coordination,
932 OpenRegionCoordination.OpenRegionDetails ord) {
933 if (rt == null) {
934 LOG.warn("Unexpected NULL input for RegionTransition rt");
935 return;
936 }
937 final ServerName sn = rt.getServerName();
938
939 if (sn.equals(HBCK_CODE_SERVERNAME)) {
940 handleHBCK(rt);
941 return;
942 }
943 final long createTime = rt.getCreateTime();
944 final byte[] regionName = rt.getRegionName();
945 String encodedName = HRegionInfo.encodeRegionName(regionName);
946 String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
947
948 if (!serverManager.isServerOnline(sn)
949 && !ignoreStatesRSOffline.contains(rt.getEventType())) {
950 LOG.warn("Attempted to handle region transition for server but " +
951 "it is not online: " + prettyPrintedRegionName + ", " + rt);
952 return;
953 }
954
955 RegionState regionState =
956 regionStates.getRegionState(encodedName);
957 long startTime = System.currentTimeMillis();
958 if (LOG.isDebugEnabled()) {
959 boolean lateEvent = createTime < (startTime - 15000);
960 LOG.debug("Handling " + rt.getEventType() +
961 ", server=" + sn + ", region=" +
962 (prettyPrintedRegionName == null ? "null" : prettyPrintedRegionName) +
963 (lateEvent ? ", which is more than 15 seconds late" : "") +
964 ", current_state=" + regionState);
965 }
966
967
968 if (rt.getEventType() == EventType.M_ZK_REGION_OFFLINE) {
969 return;
970 }
971
972
973 Lock lock = locker.acquireLock(encodedName);
974 try {
975 RegionState latestState =
976 regionStates.getRegionState(encodedName);
977 if ((regionState == null && latestState != null)
978 || (regionState != null && latestState == null)
979 || (regionState != null && latestState != null
980 && latestState.getState() != regionState.getState())) {
981 LOG.warn("Region state changed from " + regionState + " to "
982 + latestState + ", while acquiring lock");
983 }
984 long waitedTime = System.currentTimeMillis() - startTime;
985 if (waitedTime > 5000) {
986 LOG.warn("Took " + waitedTime + "ms to acquire the lock");
987 }
988 regionState = latestState;
989 switch (rt.getEventType()) {
990 case RS_ZK_REQUEST_REGION_SPLIT:
991 case RS_ZK_REGION_SPLITTING:
992 case RS_ZK_REGION_SPLIT:
993 if (!handleRegionSplitting(
994 rt, encodedName, prettyPrintedRegionName, sn)) {
995 deleteSplittingNode(encodedName, sn);
996 }
997 break;
998
999 case RS_ZK_REQUEST_REGION_MERGE:
1000 case RS_ZK_REGION_MERGING:
1001 case RS_ZK_REGION_MERGED:
1002
1003
1004 if (!handleRegionMerging(
1005 rt, encodedName, prettyPrintedRegionName, sn)) {
1006 deleteMergingNode(encodedName, sn);
1007 }
1008 break;
1009
1010 case M_ZK_REGION_CLOSING:
1011
1012
1013 if (regionState == null
1014 || !regionState.isPendingCloseOrClosingOnServer(sn)) {
1015 LOG.warn("Received CLOSING for " + prettyPrintedRegionName
1016 + " from " + sn + " but the region isn't PENDING_CLOSE/CLOSING here: "
1017 + regionStates.getRegionState(encodedName));
1018 return;
1019 }
1020
1021 regionStates.updateRegionState(rt, State.CLOSING);
1022 break;
1023
1024 case RS_ZK_REGION_CLOSED:
1025
1026 if (regionState == null
1027 || !regionState.isPendingCloseOrClosingOnServer(sn)) {
1028 LOG.warn("Received CLOSED for " + prettyPrintedRegionName
1029 + " from " + sn + " but the region isn't PENDING_CLOSE/CLOSING here: "
1030 + regionStates.getRegionState(encodedName));
1031 return;
1032 }
1033
1034
1035
1036 new ClosedRegionHandler(server, this, regionState.getRegion()).process();
1037 updateClosedRegionHandlerTracker(regionState.getRegion());
1038 break;
1039
1040 case RS_ZK_REGION_FAILED_OPEN:
1041 if (regionState == null
1042 || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
1043 LOG.warn("Received FAILED_OPEN for " + prettyPrintedRegionName
1044 + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
1045 + regionStates.getRegionState(encodedName));
1046 return;
1047 }
1048 AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
1049 if (failedOpenCount == null) {
1050 failedOpenCount = new AtomicInteger();
1051
1052
1053
1054 failedOpenTracker.put(encodedName, failedOpenCount);
1055 }
1056 if (failedOpenCount.incrementAndGet() >= maximumAttempts) {
1057 regionStates.updateRegionState(rt, State.FAILED_OPEN);
1058
1059
1060 failedOpenTracker.remove(encodedName);
1061 } else {
1062
1063 regionState = regionStates.updateRegionState(rt, State.CLOSED);
1064 if (regionState != null) {
1065
1066
1067 try {
1068 getRegionPlan(regionState.getRegion(), sn, true);
1069 new ClosedRegionHandler(server, this, regionState.getRegion()).process();
1070 } catch (HBaseIOException e) {
1071 LOG.warn("Failed to get region plan", e);
1072 }
1073 }
1074 }
1075 break;
1076
1077 case RS_ZK_REGION_OPENING:
1078
1079
1080 if (regionState == null
1081 || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
1082 LOG.warn("Received OPENING for " + prettyPrintedRegionName
1083 + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
1084 + regionStates.getRegionState(encodedName));
1085 return;
1086 }
1087
1088 regionStates.updateRegionState(rt, State.OPENING);
1089 break;
1090
1091 case RS_ZK_REGION_OPENED:
1092
1093 if (regionState == null
1094 || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
1095 LOG.warn("Received OPENED for " + prettyPrintedRegionName
1096 + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
1097 + regionStates.getRegionState(encodedName));
1098
1099 if (regionState != null) {
1100
1101
1102
1103 unassign(regionState.getRegion(), null, -1, null, false, sn);
1104 }
1105 return;
1106 }
1107
1108 regionState =
1109 regionStates.transitionOpenFromPendingOpenOrOpeningOnServer(rt,regionState, sn);
1110 if (regionState != null) {
1111 failedOpenTracker.remove(encodedName);
1112 new OpenedRegionHandler(
1113 server, this, regionState.getRegion(), coordination, ord).process();
1114 updateOpenedRegionHandlerTracker(regionState.getRegion());
1115 }
1116 break;
1117
1118 default:
1119 throw new IllegalStateException("Received event is not valid.");
1120 }
1121 } finally {
1122 lock.unlock();
1123 }
1124 }
1125
1126
1127 boolean wasClosedHandlerCalled(HRegionInfo hri) {
1128 AtomicBoolean b = closedRegionHandlerCalled.get(hri);
1129
1130
1131
1132 return b == null ? false : b.compareAndSet(true, false);
1133 }
1134
1135
1136 boolean wasOpenedHandlerCalled(HRegionInfo hri) {
1137 AtomicBoolean b = openedRegionHandlerCalled.get(hri);
1138
1139
1140
1141 return b == null ? false : b.compareAndSet(true, false);
1142 }
1143
1144
1145 void initializeHandlerTrackers() {
1146 closedRegionHandlerCalled = new HashMap<HRegionInfo, AtomicBoolean>();
1147 openedRegionHandlerCalled = new HashMap<HRegionInfo, AtomicBoolean>();
1148 }
1149
1150 void updateClosedRegionHandlerTracker(HRegionInfo hri) {
1151 if (closedRegionHandlerCalled != null) {
1152 closedRegionHandlerCalled.put(hri, new AtomicBoolean(true));
1153 }
1154 }
1155
1156 void updateOpenedRegionHandlerTracker(HRegionInfo hri) {
1157 if (openedRegionHandlerCalled != null) {
1158 openedRegionHandlerCalled.put(hri, new AtomicBoolean(true));
1159 }
1160 }
1161
1162
1163
1164
1165
1166
1167 void processFavoredNodes(List<HRegionInfo> regions) throws IOException {
1168 if (!shouldAssignRegionsWithFavoredNodes) return;
1169
1170
1171 Map<HRegionInfo, List<ServerName>> regionToFavoredNodes =
1172 new HashMap<HRegionInfo, List<ServerName>>();
1173 for (HRegionInfo region : regions) {
1174 regionToFavoredNodes.put(region,
1175 ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region));
1176 }
1177 FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(regionToFavoredNodes,
1178 this.server.getConnection());
1179 }
1180
1181
1182
1183
1184
1185
1186
1187 @SuppressWarnings("deprecation")
1188 private void handleHBCK(RegionTransition rt) {
1189 String encodedName = HRegionInfo.encodeRegionName(rt.getRegionName());
1190 LOG.info("Handling HBCK triggered transition=" + rt.getEventType() +
1191 ", server=" + rt.getServerName() + ", region=" +
1192 HRegionInfo.prettyPrint(encodedName));
1193 RegionState regionState = regionStates.getRegionTransitionState(encodedName);
1194 switch (rt.getEventType()) {
1195 case M_ZK_REGION_OFFLINE:
1196 HRegionInfo regionInfo;
1197 if (regionState != null) {
1198 regionInfo = regionState.getRegion();
1199 } else {
1200 try {
1201 byte [] name = rt.getRegionName();
1202 Pair<HRegionInfo, ServerName> p = MetaTableAccessor.getRegion(
1203 this.server.getConnection(), name);
1204 regionInfo = p.getFirst();
1205 } catch (IOException e) {
1206 LOG.info("Exception reading hbase:meta doing HBCK repair operation", e);
1207 return;
1208 }
1209 }
1210 LOG.info("HBCK repair is triggering assignment of region=" +
1211 regionInfo.getRegionNameAsString());
1212
1213 assign(regionInfo, false);
1214 break;
1215
1216 default:
1217 LOG.warn("Received unexpected region state from HBCK: " + rt.toString());
1218 break;
1219 }
1220
1221 }
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237 @Override
1238 public void nodeCreated(String path) {
1239 handleAssignmentEvent(path);
1240 }
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254 @Override
1255 public void nodeDataChanged(String path) {
1256 handleAssignmentEvent(path);
1257 }
1258
1259
1260
1261
1262
1263 private final Set<String> regionsInProgress = new HashSet<String>();
1264
1265
1266 private final LinkedHashMultimap <String, RegionRunnable>
1267 zkEventWorkerWaitingList = LinkedHashMultimap.create();
1268
1269
1270
1271
1272 private interface RegionRunnable extends Runnable{
1273
1274
1275
1276 String getRegionName();
1277 }
1278
1279
1280
1281
1282
1283 protected void zkEventWorkersSubmit(final RegionRunnable regRunnable) {
1284
1285 synchronized (regionsInProgress) {
1286
1287
1288 if (regionsInProgress.contains(regRunnable.getRegionName())) {
1289 synchronized (zkEventWorkerWaitingList){
1290 zkEventWorkerWaitingList.put(regRunnable.getRegionName(), regRunnable);
1291 }
1292 return;
1293 }
1294
1295
1296 regionsInProgress.add(regRunnable.getRegionName());
1297 zkEventWorkers.submit(new Runnable() {
1298 @Override
1299 public void run() {
1300 try {
1301 regRunnable.run();
1302 } finally {
1303
1304
1305 synchronized (regionsInProgress) {
1306 regionsInProgress.remove(regRunnable.getRegionName());
1307 synchronized (zkEventWorkerWaitingList) {
1308 java.util.Set<RegionRunnable> waiting = zkEventWorkerWaitingList.get(
1309 regRunnable.getRegionName());
1310 if (!waiting.isEmpty()) {
1311
1312 RegionRunnable toSubmit = waiting.iterator().next();
1313 zkEventWorkerWaitingList.remove(toSubmit.getRegionName(), toSubmit);
1314 zkEventWorkersSubmit(toSubmit);
1315 }
1316 }
1317 }
1318 }
1319 }
1320 });
1321 }
1322 }
1323
1324 @Override
1325 public void nodeDeleted(final String path) {
1326 if (path.startsWith(watcher.assignmentZNode)) {
1327 final String regionName = ZKAssign.getRegionName(watcher, path);
1328 zkEventWorkersSubmit(new RegionRunnable() {
1329 @Override
1330 public String getRegionName() {
1331 return regionName;
1332 }
1333
1334 @Override
1335 public void run() {
1336 Lock lock = locker.acquireLock(regionName);
1337 try {
1338 RegionState rs = regionStates.getRegionTransitionState(regionName);
1339 if (rs == null) {
1340 rs = regionStates.getRegionState(regionName);
1341 if (rs == null || !rs.isMergingNew()) {
1342
1343 return;
1344 }
1345 }
1346
1347 HRegionInfo regionInfo = rs.getRegion();
1348 String regionNameStr = regionInfo.getRegionNameAsString();
1349 LOG.debug("Znode " + regionNameStr + " deleted, state: " + rs);
1350
1351 boolean disabled = getTableStateManager().isTableState(regionInfo.getTable(),
1352 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING);
1353
1354 ServerName serverName = rs.getServerName();
1355 if (serverManager.isServerOnline(serverName)) {
1356 if (rs.isOnServer(serverName) && (rs.isOpened() || rs.isSplitting())) {
1357 synchronized (regionStates) {
1358 regionOnline(regionInfo, serverName);
1359 if (rs.isSplitting() && splitRegions.containsKey(regionInfo)) {
1360
1361
1362 HRegionInfo hri_a = splitRegions.get(regionInfo).getFirst();
1363 HRegionInfo hri_b = splitRegions.get(regionInfo).getSecond();
1364 if (!regionStates.isRegionInTransition(hri_a.getEncodedName())) {
1365 LOG.warn("Split daughter region not in transition " + hri_a);
1366 }
1367 if (!regionStates.isRegionInTransition(hri_b.getEncodedName())) {
1368 LOG.warn("Split daughter region not in transition" + hri_b);
1369 }
1370 regionOffline(hri_a);
1371 regionOffline(hri_b);
1372 splitRegions.remove(regionInfo);
1373 }
1374 if (disabled) {
1375
1376 LOG.info("Opened " + regionNameStr
1377 + "but this table is disabled, triggering close of region");
1378 unassign(regionInfo);
1379 }
1380 }
1381 } else if (rs.isMergingNew()) {
1382 synchronized (regionStates) {
1383 String p = regionInfo.getEncodedName();
1384 PairOfSameType<HRegionInfo> regions = mergingRegions.get(p);
1385 if (regions != null) {
1386 onlineMergingRegion(disabled, regions.getFirst(), serverName);
1387 onlineMergingRegion(disabled, regions.getSecond(), serverName);
1388 }
1389 }
1390 }
1391 }
1392 } finally {
1393 lock.unlock();
1394 }
1395 }
1396
1397 private void onlineMergingRegion(boolean disabled,
1398 final HRegionInfo hri, final ServerName serverName) {
1399 RegionState regionState = regionStates.getRegionState(hri);
1400 if (regionState != null && regionState.isMerging()
1401 && regionState.isOnServer(serverName)) {
1402 regionOnline(regionState.getRegion(), serverName);
1403 if (disabled) {
1404 unassign(hri);
1405 }
1406 }
1407 }
1408 });
1409 }
1410 }
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424 @Override
1425 public void nodeChildrenChanged(String path) {
1426 if (path.equals(watcher.assignmentZNode)) {
1427 zkEventWorkers.submit(new Runnable() {
1428 @Override
1429 public void run() {
1430 try {
1431
1432 List<String> children =
1433 ZKUtil.listChildrenAndWatchForNewChildren(
1434 watcher, watcher.assignmentZNode);
1435 if (children != null) {
1436 Stat stat = new Stat();
1437 for (String child : children) {
1438
1439
1440
1441 if (!regionStates.isRegionInTransition(child)) {
1442 ZKAssign.getDataAndWatch(watcher, child, stat);
1443 }
1444 }
1445 }
1446 } catch (KeeperException e) {
1447 server.abort("Unexpected ZK exception reading unassigned children", e);
1448 }
1449 }
1450 });
1451 }
1452 }
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463 void regionOnline(HRegionInfo regionInfo, ServerName sn) {
1464 regionOnline(regionInfo, sn, HConstants.NO_SEQNUM);
1465 }
1466
1467 void regionOnline(HRegionInfo regionInfo, ServerName sn, long openSeqNum) {
1468 numRegionsOpened.incrementAndGet();
1469 regionStates.regionOnline(regionInfo, sn, openSeqNum);
1470
1471
1472 clearRegionPlan(regionInfo);
1473 balancer.regionOnline(regionInfo, sn);
1474
1475
1476 sendRegionOpenedNotification(regionInfo, sn);
1477 }
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487 private void handleAssignmentEvent(final String path) {
1488 if (path.startsWith(watcher.assignmentZNode)) {
1489 final String regionName = ZKAssign.getRegionName(watcher, path);
1490
1491 zkEventWorkersSubmit(new RegionRunnable() {
1492 @Override
1493 public String getRegionName() {
1494 return regionName;
1495 }
1496
1497 @Override
1498 public void run() {
1499 try {
1500 Stat stat = new Stat();
1501 byte [] data = ZKAssign.getDataAndWatch(watcher, path, stat);
1502 if (data == null) return;
1503
1504 RegionTransition rt = RegionTransition.parseFrom(data);
1505
1506
1507
1508 BaseCoordinatedStateManager csm =
1509 (BaseCoordinatedStateManager) server.getCoordinatedStateManager();
1510 OpenRegionCoordination openRegionCoordination = csm.getOpenRegionCoordination();
1511
1512 ZkOpenRegionCoordination.ZkOpenRegionDetails zkOrd =
1513 new ZkOpenRegionCoordination.ZkOpenRegionDetails();
1514 zkOrd.setVersion(stat.getVersion());
1515 zkOrd.setServerName(csm.getServer().getServerName());
1516
1517 handleRegion(rt, openRegionCoordination, zkOrd);
1518 } catch (KeeperException e) {
1519 server.abort("Unexpected ZK exception reading unassigned node data", e);
1520 } catch (DeserializationException e) {
1521 server.abort("Unexpected exception deserializing node data", e);
1522 }
1523 }
1524 });
1525 }
1526 }
1527
1528
1529
1530
1531
1532
1533
1534
1535 public void regionOffline(final HRegionInfo regionInfo) {
1536 regionOffline(regionInfo, null);
1537 }
1538
1539 public void offlineDisabledRegion(HRegionInfo regionInfo) {
1540 if (useZKForAssignment) {
1541
1542 LOG.debug("Table being disabled so deleting ZK node and removing from " +
1543 "regions in transition, skipping assignment of region " +
1544 regionInfo.getRegionNameAsString());
1545 String encodedName = regionInfo.getEncodedName();
1546 deleteNodeInStates(encodedName, "closed", null,
1547 EventType.RS_ZK_REGION_CLOSED, EventType.M_ZK_REGION_OFFLINE);
1548 }
1549 replicasToClose.remove(regionInfo);
1550 regionOffline(regionInfo);
1551 }
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573 public void assign(HRegionInfo region, boolean setOfflineInZK) {
1574 assign(region, setOfflineInZK, false);
1575 }
1576
1577
1578
1579
1580 public void assign(HRegionInfo region,
1581 boolean setOfflineInZK, boolean forceNewPlan) {
1582 if (isDisabledorDisablingRegionInRIT(region)) {
1583 return;
1584 }
1585 String encodedName = region.getEncodedName();
1586 Lock lock = locker.acquireLock(encodedName);
1587 try {
1588 RegionState state = forceRegionStateToOffline(region, forceNewPlan);
1589 if (state != null) {
1590 if (regionStates.wasRegionOnDeadServer(encodedName)) {
1591 LOG.info("Skip assigning " + region.getRegionNameAsString()
1592 + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
1593 + " is dead but not processed yet");
1594 return;
1595 }
1596 assign(state, setOfflineInZK && useZKForAssignment, forceNewPlan);
1597 }
1598 } finally {
1599 lock.unlock();
1600 }
1601 }
1602
1603
1604
1605
1606
1607
1608
1609 boolean assign(final ServerName destination, final List<HRegionInfo> regions)
1610 throws InterruptedException {
1611 long startTime = EnvironmentEdgeManager.currentTime();
1612 try {
1613 int regionCount = regions.size();
1614 if (regionCount == 0) {
1615 return true;
1616 }
1617 LOG.info("Assigning " + regionCount + " region(s) to " + destination.toString());
1618 Set<String> encodedNames = new HashSet<String>(regionCount);
1619 for (HRegionInfo region : regions) {
1620 encodedNames.add(region.getEncodedName());
1621 }
1622
1623 List<HRegionInfo> failedToOpenRegions = new ArrayList<HRegionInfo>();
1624 Map<String, Lock> locks = locker.acquireLocks(encodedNames);
1625 try {
1626 AtomicInteger counter = new AtomicInteger(0);
1627 Map<String, Integer> offlineNodesVersions = new ConcurrentHashMap<String, Integer>();
1628 OfflineCallback cb = new OfflineCallback(
1629 watcher, destination, counter, offlineNodesVersions);
1630 Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>(regions.size());
1631 List<RegionState> states = new ArrayList<RegionState>(regions.size());
1632 for (HRegionInfo region : regions) {
1633 String encodedName = region.getEncodedName();
1634 if (!isDisabledorDisablingRegionInRIT(region)) {
1635 RegionState state = forceRegionStateToOffline(region, false);
1636 boolean onDeadServer = false;
1637 if (state != null) {
1638 if (regionStates.wasRegionOnDeadServer(encodedName)) {
1639 LOG.info("Skip assigning " + region.getRegionNameAsString()
1640 + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
1641 + " is dead but not processed yet");
1642 onDeadServer = true;
1643 } else if (!useZKForAssignment
1644 || asyncSetOfflineInZooKeeper(state, cb, destination)) {
1645 RegionPlan plan = new RegionPlan(region, state.getServerName(), destination);
1646 plans.put(encodedName, plan);
1647 states.add(state);
1648 continue;
1649 }
1650 }
1651
1652 if (!onDeadServer) {
1653 LOG.info("failed to force region state to offline or "
1654 + "failed to set it offline in ZK, will reassign later: " + region);
1655 failedToOpenRegions.add(region);
1656 }
1657 }
1658
1659
1660 Lock lock = locks.remove(encodedName);
1661 lock.unlock();
1662 }
1663
1664 if (useZKForAssignment) {
1665
1666 int total = states.size();
1667 for (int oldCounter = 0; !server.isStopped();) {
1668 int count = counter.get();
1669 if (oldCounter != count) {
1670 LOG.debug(destination.toString() + " unassigned znodes=" + count +
1671 " of total=" + total + "; oldCounter=" + oldCounter);
1672 oldCounter = count;
1673 }
1674 if (count >= total) break;
1675 Thread.sleep(5);
1676 }
1677 }
1678
1679 if (server.isStopped()) {
1680 return false;
1681 }
1682
1683
1684
1685 this.addPlans(plans);
1686
1687 List<Triple<HRegionInfo, Integer, List<ServerName>>> regionOpenInfos =
1688 new ArrayList<Triple<HRegionInfo, Integer, List<ServerName>>>(states.size());
1689 for (RegionState state: states) {
1690 HRegionInfo region = state.getRegion();
1691 String encodedRegionName = region.getEncodedName();
1692 Integer nodeVersion = offlineNodesVersions.get(encodedRegionName);
1693 if (useZKForAssignment && (nodeVersion == null || nodeVersion == -1)) {
1694 LOG.warn("failed to offline in zookeeper: " + region);
1695 failedToOpenRegions.add(region);
1696 Lock lock = locks.remove(encodedRegionName);
1697 lock.unlock();
1698 } else {
1699 regionStates.updateRegionState(
1700 region, State.PENDING_OPEN, destination);
1701 List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
1702 if (this.shouldAssignRegionsWithFavoredNodes) {
1703 favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
1704 }
1705 regionOpenInfos.add(new Triple<HRegionInfo, Integer, List<ServerName>>(
1706 region, nodeVersion, favoredNodes));
1707 }
1708 }
1709
1710
1711 try {
1712
1713
1714 long maxWaitTime = System.currentTimeMillis() +
1715 this.server.getConfiguration().
1716 getLong("hbase.regionserver.rpc.startup.waittime", 60000);
1717 for (int i = 1; i <= maximumAttempts && !server.isStopped(); i++) {
1718 try {
1719
1720 if (regionOpenInfos.isEmpty()) {
1721 break;
1722 }
1723 List<RegionOpeningState> regionOpeningStateList = serverManager
1724 .sendRegionOpen(destination, regionOpenInfos);
1725 if (regionOpeningStateList == null) {
1726
1727 return false;
1728 }
1729 for (int k = 0, n = regionOpeningStateList.size(); k < n; k++) {
1730 RegionOpeningState openingState = regionOpeningStateList.get(k);
1731 if (openingState != RegionOpeningState.OPENED) {
1732 HRegionInfo region = regionOpenInfos.get(k).getFirst();
1733 if (openingState == RegionOpeningState.ALREADY_OPENED) {
1734 processAlreadyOpenedRegion(region, destination);
1735 } else if (openingState == RegionOpeningState.FAILED_OPENING) {
1736
1737 failedToOpenRegions.add(region);
1738 } else {
1739 LOG.warn("THIS SHOULD NOT HAPPEN: unknown opening state "
1740 + openingState + " in assigning region " + region);
1741 }
1742 }
1743 }
1744 break;
1745 } catch (IOException e) {
1746 if (e instanceof RemoteException) {
1747 e = ((RemoteException)e).unwrapRemoteException();
1748 }
1749 if (e instanceof RegionServerStoppedException) {
1750 LOG.warn("The region server was shut down, ", e);
1751
1752 return false;
1753 } else if (e instanceof ServerNotRunningYetException) {
1754 long now = System.currentTimeMillis();
1755 if (now < maxWaitTime) {
1756 LOG.debug("Server is not yet up; waiting up to " +
1757 (maxWaitTime - now) + "ms", e);
1758 Thread.sleep(100);
1759 i--;
1760 continue;
1761 }
1762 } else if (e instanceof java.net.SocketTimeoutException
1763 && this.serverManager.isServerOnline(destination)) {
1764
1765
1766
1767
1768 if (LOG.isDebugEnabled()) {
1769 LOG.debug("Bulk assigner openRegion() to " + destination
1770 + " has timed out, but the regions might"
1771 + " already be opened on it.", e);
1772 }
1773
1774 Thread.sleep(100);
1775 i--;
1776 continue;
1777 }
1778 throw e;
1779 }
1780 }
1781 } catch (IOException e) {
1782
1783 LOG.info("Unable to communicate with " + destination
1784 + " in order to assign regions, ", e);
1785 return false;
1786 }
1787 } finally {
1788 for (Lock lock : locks.values()) {
1789 lock.unlock();
1790 }
1791 }
1792
1793 if (!failedToOpenRegions.isEmpty()) {
1794 for (HRegionInfo region : failedToOpenRegions) {
1795 if (!regionStates.isRegionOnline(region)) {
1796 invokeAssign(region);
1797 }
1798 }
1799 }
1800
1801
1802 ArrayList<HRegionInfo> userRegionSet = new ArrayList<HRegionInfo>(regions.size());
1803 for (HRegionInfo region: regions) {
1804 if (!region.getTable().isSystemTable()) {
1805 userRegionSet.add(region);
1806 }
1807 }
1808 if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
1809 System.currentTimeMillis())) {
1810 LOG.debug("some user regions are still in transition: " + userRegionSet);
1811 }
1812 LOG.debug("Bulk assigning done for " + destination);
1813 return true;
1814 } finally {
1815 metricsAssignmentManager.updateBulkAssignTime(EnvironmentEdgeManager.currentTime() - startTime);
1816 }
1817 }
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829 private void unassign(final HRegionInfo region,
1830 final RegionState state, final int versionOfClosingNode,
1831 final ServerName dest, final boolean transitionInZK,
1832 final ServerName src) {
1833 ServerName server = src;
1834 if (state != null) {
1835 server = state.getServerName();
1836 }
1837 long maxWaitTime = -1;
1838 for (int i = 1; i <= this.maximumAttempts; i++) {
1839 if (this.server.isStopped() || this.server.isAborted()) {
1840 LOG.debug("Server stopped/aborted; skipping unassign of " + region);
1841 return;
1842 }
1843
1844 if (!serverManager.isServerOnline(server)) {
1845 LOG.debug("Offline " + region.getRegionNameAsString()
1846 + ", no need to unassign since it's on a dead server: " + server);
1847 if (transitionInZK) {
1848
1849 deleteClosingOrClosedNode(region, server);
1850 }
1851 if (state != null) {
1852 regionOffline(region);
1853 }
1854 return;
1855 }
1856 try {
1857
1858 if (serverManager.sendRegionClose(server, region,
1859 versionOfClosingNode, dest, transitionInZK)) {
1860 LOG.debug("Sent CLOSE to " + server + " for region " +
1861 region.getRegionNameAsString());
1862 if (useZKForAssignment && !transitionInZK && state != null) {
1863
1864
1865 unassign(region, state, versionOfClosingNode,
1866 dest, transitionInZK, src);
1867 }
1868 return;
1869 }
1870
1871
1872 LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
1873 region.getRegionNameAsString());
1874 } catch (Throwable t) {
1875 long sleepTime = 0;
1876 Configuration conf = this.server.getConfiguration();
1877 if (t instanceof RemoteException) {
1878 t = ((RemoteException)t).unwrapRemoteException();
1879 }
1880 boolean logRetries = true;
1881 if (t instanceof RegionServerAbortedException
1882 || t instanceof RegionServerStoppedException
1883 || t instanceof ServerNotRunningYetException) {
1884
1885
1886 sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1887 RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
1888
1889 } else if (t instanceof NotServingRegionException) {
1890 LOG.debug("Offline " + region.getRegionNameAsString()
1891 + ", it's not any more on " + server, t);
1892 if (transitionInZK) {
1893 deleteClosingOrClosedNode(region, server);
1894 }
1895 if (state != null) {
1896 regionOffline(region);
1897 }
1898 return;
1899 } else if ((t instanceof FailedServerException) || (state != null &&
1900 t instanceof RegionAlreadyInTransitionException)) {
1901 if(t instanceof FailedServerException) {
1902 sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1903 RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
1904 } else {
1905
1906 LOG.debug("update " + state + " the timestamp.");
1907 state.updateTimestampToNow();
1908 if (maxWaitTime < 0) {
1909 maxWaitTime =
1910 EnvironmentEdgeManager.currentTime()
1911 + conf.getLong(ALREADY_IN_TRANSITION_WAITTIME,
1912 DEFAULT_ALREADY_IN_TRANSITION_WAITTIME);
1913 }
1914 long now = EnvironmentEdgeManager.currentTime();
1915 if (now < maxWaitTime) {
1916 LOG.debug("Region is already in transition; "
1917 + "waiting up to " + (maxWaitTime - now) + "ms", t);
1918 sleepTime = 100;
1919 i--;
1920 logRetries = false;
1921 }
1922 }
1923 }
1924
1925 try {
1926 if (sleepTime > 0) {
1927 Thread.sleep(sleepTime);
1928 }
1929 } catch (InterruptedException ie) {
1930 LOG.warn("Failed to unassign "
1931 + region.getRegionNameAsString() + " since interrupted", ie);
1932 Thread.currentThread().interrupt();
1933 if (state != null) {
1934 regionStates.updateRegionState(region, State.FAILED_CLOSE);
1935 }
1936 return;
1937 }
1938
1939 if (logRetries) {
1940 LOG.info("Server " + server + " returned " + t + " for "
1941 + region.getRegionNameAsString() + ", try=" + i
1942 + " of " + this.maximumAttempts, t);
1943
1944 }
1945 }
1946 }
1947
1948 if (state != null) {
1949 regionStates.updateRegionState(region, State.FAILED_CLOSE);
1950 }
1951 }
1952
1953
1954
1955
1956 private RegionState forceRegionStateToOffline(
1957 final HRegionInfo region, final boolean forceNewPlan) {
1958 RegionState state = regionStates.getRegionState(region);
1959 if (state == null) {
1960 LOG.warn("Assigning but not in region states: " + region);
1961 state = regionStates.createRegionState(region);
1962 }
1963
1964 ServerName sn = state.getServerName();
1965 if (forceNewPlan && LOG.isDebugEnabled()) {
1966 LOG.debug("Force region state offline " + state);
1967 }
1968
1969 switch (state.getState()) {
1970 case OPEN:
1971 case OPENING:
1972 case PENDING_OPEN:
1973 case CLOSING:
1974 case PENDING_CLOSE:
1975 if (!forceNewPlan) {
1976 LOG.debug("Skip assigning " +
1977 region + ", it is already " + state);
1978 return null;
1979 }
1980 case FAILED_CLOSE:
1981 case FAILED_OPEN:
1982 unassign(region, state, -1, null, false, null);
1983 state = regionStates.getRegionState(region);
1984 if (state.isFailedClose()) {
1985
1986
1987 LOG.info("Skip assigning " +
1988 region + ", we couldn't close it: " + state);
1989 return null;
1990 }
1991 case OFFLINE:
1992
1993
1994
1995
1996
1997
1998
1999
2000 if (useZKForAssignment
2001 && regionStates.isServerDeadAndNotProcessed(sn)
2002 && wasRegionOnDeadServerByMeta(region, sn)) {
2003 if (!regionStates.isRegionInTransition(region)) {
2004 LOG.info("Updating the state to " + State.OFFLINE + " to allow to be reassigned by SSH");
2005 regionStates.updateRegionState(region, State.OFFLINE);
2006 }
2007 LOG.info("Skip assigning " + region.getRegionNameAsString()
2008 + ", it is on a dead but not processed yet server: " + sn);
2009 return null;
2010 }
2011 case CLOSED:
2012 break;
2013 default:
2014 LOG.error("Trying to assign region " + region
2015 + ", which is " + state);
2016 return null;
2017 }
2018 return state;
2019 }
2020
2021 @SuppressWarnings("deprecation")
2022 protected boolean wasRegionOnDeadServerByMeta(
2023 final HRegionInfo region, final ServerName sn) {
2024 try {
2025 if (region.isMetaRegion()) {
2026 ServerName server = this.server.getMetaTableLocator().
2027 getMetaRegionLocation(this.server.getZooKeeper());
2028 return regionStates.isServerDeadAndNotProcessed(server);
2029 }
2030 while (!server.isStopped()) {
2031 try {
2032 this.server.getMetaTableLocator().waitMetaRegionLocation(server.getZooKeeper());
2033 Result r = MetaTableAccessor.getRegionResult(server.getConnection(),
2034 region.getRegionName());
2035 if (r == null || r.isEmpty()) return false;
2036 ServerName server = HRegionInfo.getServerName(r);
2037 return regionStates.isServerDeadAndNotProcessed(server);
2038 } catch (IOException ioe) {
2039 LOG.info("Received exception accessing hbase:meta during force assign "
2040 + region.getRegionNameAsString() + ", retrying", ioe);
2041 }
2042 }
2043 } catch (InterruptedException e) {
2044 Thread.currentThread().interrupt();
2045 LOG.info("Interrupted accessing hbase:meta", e);
2046 }
2047
2048 return regionStates.isServerDeadAndNotProcessed(sn);
2049 }
2050
2051
2052
2053
2054
2055
2056
2057 private void assign(RegionState state,
2058 boolean setOfflineInZK, final boolean forceNewPlan) {
2059 long startTime = EnvironmentEdgeManager.currentTime();
2060 try {
2061 Configuration conf = server.getConfiguration();
2062 RegionState currentState = state;
2063 int versionOfOfflineNode = -1;
2064 RegionPlan plan = null;
2065 long maxWaitTime = -1;
2066 HRegionInfo region = state.getRegion();
2067 RegionOpeningState regionOpenState;
2068 Throwable previousException = null;
2069 for (int i = 1; i <= maximumAttempts; i++) {
2070 if (server.isStopped() || server.isAborted()) {
2071 LOG.info("Skip assigning " + region.getRegionNameAsString()
2072 + ", the server is stopped/aborted");
2073 return;
2074 }
2075
2076 if (plan == null) {
2077 try {
2078 plan = getRegionPlan(region, forceNewPlan);
2079 } catch (HBaseIOException e) {
2080 LOG.warn("Failed to get region plan", e);
2081 }
2082 }
2083
2084 if (plan == null) {
2085 LOG.warn("Unable to determine a plan to assign " + region);
2086
2087
2088 if (region.isMetaRegion()) {
2089 if (i == maximumAttempts) {
2090 i = 0;
2091
2092 LOG.warn("Unable to determine a plan to assign a hbase:meta region " + region +
2093 " after maximumAttempts (" + this.maximumAttempts +
2094 "). Reset attempts count and continue retrying.");
2095 }
2096 waitForRetryingMetaAssignment();
2097 continue;
2098 }
2099
2100 regionStates.updateRegionState(region, State.FAILED_OPEN);
2101 return;
2102 }
2103 if (setOfflineInZK && versionOfOfflineNode == -1) {
2104 LOG.info("Setting node as OFFLINED in ZooKeeper for region " + region);
2105
2106
2107 versionOfOfflineNode = setOfflineInZooKeeper(currentState, plan.getDestination());
2108 if (versionOfOfflineNode != -1) {
2109 if (isDisabledorDisablingRegionInRIT(region)) {
2110 return;
2111 }
2112
2113
2114
2115
2116
2117
2118 TableName tableName = region.getTable();
2119 if (!tableStateManager.isTableState(tableName,
2120 ZooKeeperProtos.Table.State.ENABLED, ZooKeeperProtos.Table.State.ENABLING)) {
2121 LOG.debug("Setting table " + tableName + " to ENABLED state.");
2122 setEnabledTable(tableName);
2123 }
2124 }
2125 }
2126 if (setOfflineInZK && versionOfOfflineNode == -1) {
2127 LOG.info("Unable to set offline in ZooKeeper to assign " + region);
2128
2129
2130
2131
2132 if (!server.isAborted()) {
2133 continue;
2134 }
2135 }
2136 LOG.info("Assigning " + region.getRegionNameAsString() +
2137 " to " + plan.getDestination().toString());
2138
2139 currentState = regionStates.updateRegionState(region,
2140 State.PENDING_OPEN, plan.getDestination());
2141
2142 boolean needNewPlan;
2143 final String assignMsg = "Failed assignment of " + region.getRegionNameAsString() +
2144 " to " + plan.getDestination();
2145 try {
2146 List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
2147 if (this.shouldAssignRegionsWithFavoredNodes) {
2148 favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
2149 }
2150 regionOpenState = serverManager.sendRegionOpen(
2151 plan.getDestination(), region, versionOfOfflineNode, favoredNodes);
2152
2153 if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
2154
2155 needNewPlan = true;
2156 LOG.warn(assignMsg + ", regionserver says 'FAILED_OPENING', " +
2157 " trying to assign elsewhere instead; " +
2158 "try=" + i + " of " + this.maximumAttempts);
2159 } else {
2160
2161 if (regionOpenState == RegionOpeningState.ALREADY_OPENED) {
2162 processAlreadyOpenedRegion(region, plan.getDestination());
2163 }
2164 return;
2165 }
2166
2167 } catch (Throwable t) {
2168 if (t instanceof RemoteException) {
2169 t = ((RemoteException) t).unwrapRemoteException();
2170 }
2171 previousException = t;
2172
2173
2174
2175
2176 boolean hold = (t instanceof RegionAlreadyInTransitionException ||
2177 t instanceof ServerNotRunningYetException);
2178
2179
2180
2181
2182
2183
2184 boolean retry = !hold && (t instanceof java.net.SocketTimeoutException
2185 && this.serverManager.isServerOnline(plan.getDestination()));
2186
2187
2188 if (hold) {
2189 LOG.warn(assignMsg + ", waiting a little before trying on the same region server " +
2190 "try=" + i + " of " + this.maximumAttempts, t);
2191
2192 if (maxWaitTime < 0) {
2193 if (t instanceof RegionAlreadyInTransitionException) {
2194 maxWaitTime = EnvironmentEdgeManager.currentTime()
2195 + this.server.getConfiguration().getLong(ALREADY_IN_TRANSITION_WAITTIME,
2196 DEFAULT_ALREADY_IN_TRANSITION_WAITTIME);
2197 } else {
2198 maxWaitTime = EnvironmentEdgeManager.currentTime()
2199 + this.server.getConfiguration().getLong(
2200 "hbase.regionserver.rpc.startup.waittime", 60000);
2201 }
2202 }
2203 try {
2204 needNewPlan = false;
2205 long now = EnvironmentEdgeManager.currentTime();
2206 if (now < maxWaitTime) {
2207 LOG.debug("Server is not yet up or region is already in transition; "
2208 + "waiting up to " + (maxWaitTime - now) + "ms", t);
2209 Thread.sleep(100);
2210 i--;
2211 } else if (!(t instanceof RegionAlreadyInTransitionException)) {
2212 LOG.debug("Server is not up for a while; try a new one", t);
2213 needNewPlan = true;
2214 }
2215 } catch (InterruptedException ie) {
2216 LOG.warn("Failed to assign "
2217 + region.getRegionNameAsString() + " since interrupted", ie);
2218 regionStates.updateRegionState(region, State.FAILED_OPEN);
2219 Thread.currentThread().interrupt();
2220 return;
2221 }
2222 } else if (retry) {
2223 needNewPlan = false;
2224 i--;
2225 LOG.warn(assignMsg + ", trying to assign to the same region server due ", t);
2226 } else {
2227 needNewPlan = true;
2228 LOG.warn(assignMsg + ", trying to assign elsewhere instead;" +
2229 " try=" + i + " of " + this.maximumAttempts, t);
2230 }
2231 }
2232
2233 if (i == this.maximumAttempts) {
2234
2235 if (region.isMetaRegion()) {
2236 i = 0;
2237 LOG.warn(assignMsg +
2238 ", trying to assign a hbase:meta region reached to maximumAttempts (" +
2239 this.maximumAttempts + "). Reset attempt counts and continue retrying.");
2240 waitForRetryingMetaAssignment();
2241 }
2242 else {
2243
2244
2245 continue;
2246 }
2247 }
2248
2249
2250
2251
2252 if (needNewPlan) {
2253
2254
2255
2256
2257 RegionPlan newPlan = null;
2258 try {
2259 newPlan = getRegionPlan(region, true);
2260 } catch (HBaseIOException e) {
2261 LOG.warn("Failed to get region plan", e);
2262 }
2263 if (newPlan == null) {
2264 regionStates.updateRegionState(region, State.FAILED_OPEN);
2265 LOG.warn("Unable to find a viable location to assign region " +
2266 region.getRegionNameAsString());
2267 return;
2268 }
2269
2270 if (plan != newPlan && !plan.getDestination().equals(newPlan.getDestination())) {
2271
2272
2273
2274 LOG.info("Region assignment plan changed from " + plan.getDestination() + " to "
2275 + newPlan.getDestination() + " server.");
2276 currentState = regionStates.updateRegionState(region, State.OFFLINE);
2277 versionOfOfflineNode = -1;
2278 if (useZKForAssignment) {
2279 setOfflineInZK = true;
2280 }
2281 plan = newPlan;
2282 } else if(plan.getDestination().equals(newPlan.getDestination()) &&
2283 previousException instanceof FailedServerException) {
2284 try {
2285 LOG.info("Trying to re-assign " + region.getRegionNameAsString() +
2286 " to the same failed server.");
2287 Thread.sleep(1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
2288 RpcClient.FAILED_SERVER_EXPIRY_DEFAULT));
2289 } catch (InterruptedException ie) {
2290 LOG.warn("Failed to assign "
2291 + region.getRegionNameAsString() + " since interrupted", ie);
2292 regionStates.updateRegionState(region, State.FAILED_OPEN);
2293 Thread.currentThread().interrupt();
2294 return;
2295 }
2296 }
2297 }
2298 }
2299
2300 regionStates.updateRegionState(region, State.FAILED_OPEN);
2301 } finally {
2302 metricsAssignmentManager.updateAssignmentTime(EnvironmentEdgeManager.currentTime() - startTime);
2303 }
2304 }
2305
2306 private void processAlreadyOpenedRegion(HRegionInfo region, ServerName sn) {
2307
2308
2309
2310 LOG.debug("ALREADY_OPENED " + region.getRegionNameAsString()
2311 + " to " + sn);
2312 String encodedName = region.getEncodedName();
2313
2314
2315
2316 if(useZKForAssignment){
2317 String node = ZKAssign.getNodeName(watcher, encodedName);
2318 Stat stat = new Stat();
2319 try {
2320 byte[] existingBytes = ZKUtil.getDataNoWatch(watcher, node, stat);
2321 if(existingBytes!=null){
2322 RegionTransition rt= RegionTransition.parseFrom(existingBytes);
2323 EventType et = rt.getEventType();
2324 if (et.equals(EventType.RS_ZK_REGION_OPENED)) {
2325 LOG.debug("ALREADY_OPENED " + region.getRegionNameAsString()
2326 + " and node in "+et+" state");
2327 return;
2328 }
2329 }
2330 } catch (KeeperException ke) {
2331 LOG.warn("Unexpected ZK exception getData " + node
2332 + " node for the region " + encodedName, ke);
2333 } catch (DeserializationException e) {
2334 LOG.warn("Get RegionTransition from zk deserialization failed! ", e);
2335 }
2336
2337 deleteNodeInStates(encodedName, "offline", sn, EventType.M_ZK_REGION_OFFLINE);
2338 }
2339
2340 regionStates.regionOnline(region, sn);
2341 }
2342
2343 private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) {
2344 if (this.tableStateManager.isTableState(region.getTable(),
2345 ZooKeeperProtos.Table.State.DISABLED,
2346 ZooKeeperProtos.Table.State.DISABLING) || replicasToClose.contains(region)) {
2347 LOG.info("Table " + region.getTable() + " is disabled or disabling;"
2348 + " skipping assign of " + region.getRegionNameAsString());
2349 offlineDisabledRegion(region);
2350 return true;
2351 }
2352 return false;
2353 }
2354
2355
2356
2357
2358
2359
2360
2361
2362 private int setOfflineInZooKeeper(final RegionState state, final ServerName destination) {
2363 if (!state.isClosed() && !state.isOffline()) {
2364 String msg = "Unexpected state : " + state + " .. Cannot transit it to OFFLINE.";
2365 this.server.abort(msg, new IllegalStateException(msg));
2366 return -1;
2367 }
2368 regionStates.updateRegionState(state.getRegion(), State.OFFLINE);
2369 int versionOfOfflineNode;
2370 try {
2371
2372 versionOfOfflineNode = ZKAssign.createOrForceNodeOffline(watcher,
2373 state.getRegion(), destination);
2374 if (versionOfOfflineNode == -1) {
2375 LOG.warn("Attempted to create/force node into OFFLINE state before "
2376 + "completing assignment but failed to do so for " + state);
2377 return -1;
2378 }
2379 } catch (KeeperException e) {
2380 server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
2381 return -1;
2382 }
2383 return versionOfOfflineNode;
2384 }
2385
2386
2387
2388
2389
2390
2391 private RegionPlan getRegionPlan(final HRegionInfo region,
2392 final boolean forceNewPlan) throws HBaseIOException {
2393 return getRegionPlan(region, null, forceNewPlan);
2394 }
2395
2396
2397
2398
2399
2400
2401
2402
2403
2404
2405 private RegionPlan getRegionPlan(final HRegionInfo region,
2406 final ServerName serverToExclude, final boolean forceNewPlan) throws HBaseIOException {
2407
2408 final String encodedName = region.getEncodedName();
2409 final List<ServerName> destServers =
2410 serverManager.createDestinationServersList(serverToExclude);
2411
2412 if (destServers.isEmpty()){
2413 LOG.warn("Can't move " + encodedName +
2414 ", there is no destination server available.");
2415 return null;
2416 }
2417
2418 RegionPlan randomPlan = null;
2419 boolean newPlan = false;
2420 RegionPlan existingPlan;
2421
2422 synchronized (this.regionPlans) {
2423 existingPlan = this.regionPlans.get(encodedName);
2424
2425 if (existingPlan != null && existingPlan.getDestination() != null) {
2426 LOG.debug("Found an existing plan for " + region.getRegionNameAsString()
2427 + " destination server is " + existingPlan.getDestination() +
2428 " accepted as a dest server = " + destServers.contains(existingPlan.getDestination()));
2429 }
2430
2431 if (forceNewPlan
2432 || existingPlan == null
2433 || existingPlan.getDestination() == null
2434 || !destServers.contains(existingPlan.getDestination())) {
2435 newPlan = true;
2436 }
2437 }
2438
2439 if (newPlan) {
2440 ServerName destination = balancer.randomAssignment(region, destServers);
2441 if (destination == null) {
2442 LOG.warn("Can't find a destination for " + encodedName);
2443 return null;
2444 }
2445 synchronized (this.regionPlans) {
2446 randomPlan = new RegionPlan(region, null, destination);
2447 if (!region.isMetaTable() && shouldAssignRegionsWithFavoredNodes) {
2448 List<HRegionInfo> regions = new ArrayList<HRegionInfo>(1);
2449 regions.add(region);
2450 try {
2451 processFavoredNodes(regions);
2452 } catch (IOException ie) {
2453 LOG.warn("Ignoring exception in processFavoredNodes " + ie);
2454 }
2455 }
2456 this.regionPlans.put(encodedName, randomPlan);
2457 }
2458 LOG.debug("No previous transition plan found (or ignoring " + "an existing plan) for "
2459 + region.getRegionNameAsString() + "; generated random plan=" + randomPlan + "; "
2460 + destServers.size() + " (online=" + serverManager.getOnlineServers().size()
2461 + ") available servers, forceNewPlan=" + forceNewPlan);
2462 return randomPlan;
2463 }
2464 LOG.debug("Using pre-existing plan for " +
2465 region.getRegionNameAsString() + "; plan=" + existingPlan);
2466 return existingPlan;
2467 }
2468
2469
2470
2471
2472 private void waitForRetryingMetaAssignment() {
2473 try {
2474 Thread.sleep(this.sleepTimeBeforeRetryingMetaAssignment);
2475 } catch (InterruptedException e) {
2476 LOG.error("Got exception while waiting for hbase:meta assignment");
2477 Thread.currentThread().interrupt();
2478 }
2479 }
2480
2481
2482
2483
2484
2485
2486
2487
2488
2489
2490
2491
2492
2493
2494 public void unassign(HRegionInfo region) {
2495 unassign(region, false);
2496 }
2497
2498
2499
2500
2501
2502
2503
2504
2505
2506
2507
2508
2509
2510
2511
2512
2513 public void unassign(HRegionInfo region, boolean force, ServerName dest) {
2514
2515 LOG.debug("Starting unassign of " + region.getRegionNameAsString()
2516 + " (offlining), current state: " + regionStates.getRegionState(region));
2517
2518 String encodedName = region.getEncodedName();
2519
2520 int versionOfClosingNode = -1;
2521
2522
2523 ReentrantLock lock = locker.acquireLock(encodedName);
2524 RegionState state = regionStates.getRegionTransitionState(encodedName);
2525 boolean reassign = true;
2526 try {
2527 if (state == null) {
2528
2529
2530 state = regionStates.getRegionState(encodedName);
2531 if (state != null && state.isUnassignable()) {
2532 LOG.info("Attempting to unassign " + state + ", ignored");
2533
2534 return;
2535 }
2536
2537 try {
2538 if (state == null || state.getServerName() == null) {
2539
2540
2541 LOG.warn("Attempting to unassign a region not in RegionStates "
2542 + region.getRegionNameAsString() + ", offlined");
2543 regionOffline(region);
2544 return;
2545 }
2546 if (useZKForAssignment) {
2547 versionOfClosingNode = ZKAssign.createNodeClosing(
2548 watcher, region, state.getServerName());
2549 if (versionOfClosingNode == -1) {
2550 LOG.info("Attempting to unassign " +
2551 region.getRegionNameAsString() + " but ZK closing node "
2552 + "can't be created.");
2553 reassign = false;
2554 return;
2555 }
2556 }
2557 } catch (KeeperException e) {
2558 if (e instanceof NodeExistsException) {
2559
2560
2561
2562
2563 NodeExistsException nee = (NodeExistsException)e;
2564 String path = nee.getPath();
2565 try {
2566 if (isSplitOrSplittingOrMergedOrMerging(path)) {
2567 LOG.debug(path + " is SPLIT or SPLITTING or MERGED or MERGING; " +
2568 "skipping unassign because region no longer exists -- its split or merge");
2569 reassign = false;
2570 return;
2571 }
2572 } catch (KeeperException.NoNodeException ke) {
2573 LOG.warn("Failed getData on SPLITTING/SPLIT at " + path +
2574 "; presuming split and that the region to unassign, " +
2575 encodedName + ", no longer exists -- confirm", ke);
2576 return;
2577 } catch (KeeperException ke) {
2578 LOG.error("Unexpected zk state", ke);
2579 } catch (DeserializationException de) {
2580 LOG.error("Failed parse", de);
2581 }
2582 }
2583
2584 server.abort("Unexpected ZK exception creating node CLOSING", e);
2585 reassign = false;
2586 return;
2587 }
2588 state = regionStates.updateRegionState(region, State.PENDING_CLOSE);
2589 } else if (state.isFailedOpen()) {
2590
2591 regionOffline(region);
2592 return;
2593 } else if (force && state.isPendingCloseOrClosing()) {
2594 LOG.debug("Attempting to unassign " + region.getRegionNameAsString() +
2595 " which is already " + state.getState() +
2596 " but forcing to send a CLOSE RPC again ");
2597 if (state.isFailedClose()) {
2598 state = regionStates.updateRegionState(region, State.PENDING_CLOSE);
2599 }
2600 state.updateTimestampToNow();
2601 } else {
2602 LOG.debug("Attempting to unassign " +
2603 region.getRegionNameAsString() + " but it is " +
2604 "already in transition (" + state.getState() + ", force=" + force + ")");
2605 return;
2606 }
2607
2608 unassign(region, state, versionOfClosingNode, dest, useZKForAssignment, null);
2609 } finally {
2610 lock.unlock();
2611
2612
2613 if (!replicasToClose.contains(region) && reassign && regionStates.isRegionOffline(region)) {
2614 assign(region, true);
2615 }
2616 }
2617 }
2618
2619 public void unassign(HRegionInfo region, boolean force){
2620 unassign(region, force, null);
2621 }
2622
2623
2624
2625
2626 public void deleteClosingOrClosedNode(HRegionInfo region, ServerName sn) {
2627 String encodedName = region.getEncodedName();
2628 deleteNodeInStates(encodedName, "closing", sn, EventType.M_ZK_REGION_CLOSING,
2629 EventType.RS_ZK_REGION_CLOSED);
2630 }
2631
2632
2633
2634
2635
2636
2637
2638 private boolean isSplitOrSplittingOrMergedOrMerging(final String path)
2639 throws KeeperException, DeserializationException {
2640 boolean result = false;
2641
2642
2643 byte [] data = ZKAssign.getData(watcher, path);
2644 if (data == null) {
2645 LOG.info("Node " + path + " is gone");
2646 return false;
2647 }
2648 RegionTransition rt = RegionTransition.parseFrom(data);
2649 switch (rt.getEventType()) {
2650 case RS_ZK_REQUEST_REGION_SPLIT:
2651 case RS_ZK_REGION_SPLIT:
2652 case RS_ZK_REGION_SPLITTING:
2653 case RS_ZK_REQUEST_REGION_MERGE:
2654 case RS_ZK_REGION_MERGED:
2655 case RS_ZK_REGION_MERGING:
2656 result = true;
2657 break;
2658 default:
2659 LOG.info("Node " + path + " is in " + rt.getEventType());
2660 break;
2661 }
2662 return result;
2663 }
2664
2665
2666
2667
2668
2669
2670 public int getNumRegionsOpened() {
2671 return numRegionsOpened.get();
2672 }
2673
2674
2675
2676
2677
2678
2679
2680
2681
2682
2683 public boolean waitForAssignment(HRegionInfo regionInfo)
2684 throws InterruptedException {
2685 ArrayList<HRegionInfo> regionSet = new ArrayList<HRegionInfo>(1);
2686 regionSet.add(regionInfo);
2687 return waitForAssignment(regionSet, true, Long.MAX_VALUE);
2688 }
2689
2690
2691
2692
2693 protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
2694 final boolean waitTillAllAssigned, final int reassigningRegions,
2695 final long minEndTime) throws InterruptedException {
2696 long deadline = minEndTime + bulkPerRegionOpenTimeGuesstimate * (reassigningRegions + 1);
2697 if (deadline < 0) {
2698 deadline = Long.MAX_VALUE;
2699 }
2700 return waitForAssignment(regionSet, waitTillAllAssigned, deadline);
2701 }
2702
2703
2704
2705
2706
2707
2708
2709
2710
2711 protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
2712 final boolean waitTillAllAssigned, final long deadline) throws InterruptedException {
2713
2714 while (!regionSet.isEmpty() && !server.isStopped() && deadline > System.currentTimeMillis()) {
2715 int failedOpenCount = 0;
2716 Iterator<HRegionInfo> regionInfoIterator = regionSet.iterator();
2717 while (regionInfoIterator.hasNext()) {
2718 HRegionInfo hri = regionInfoIterator.next();
2719 if (regionStates.isRegionOnline(hri) || regionStates.isRegionInState(hri,
2720 State.SPLITTING, State.SPLIT, State.MERGING, State.MERGED)) {
2721 regionInfoIterator.remove();
2722 } else if (regionStates.isRegionInState(hri, State.FAILED_OPEN)) {
2723 failedOpenCount++;
2724 }
2725 }
2726 if (!waitTillAllAssigned) {
2727
2728 break;
2729 }
2730 if (!regionSet.isEmpty()) {
2731 if (failedOpenCount == regionSet.size()) {
2732
2733 break;
2734 }
2735 regionStates.waitForUpdate(100);
2736 }
2737 }
2738 return regionSet.isEmpty();
2739 }
2740
2741
2742
2743
2744
2745
2746
2747
2748
2749
2750
2751
2752 public void assignMeta(HRegionInfo hri) throws KeeperException {
2753 this.server.getMetaTableLocator().deleteMetaLocation(this.watcher, hri.getReplicaId());
2754 assign(hri, true);
2755 }
2756
2757
2758
2759
2760
2761
2762
2763
2764
2765 public void assign(Map<HRegionInfo, ServerName> regions)
2766 throws IOException, InterruptedException {
2767 if (regions == null || regions.isEmpty()) {
2768 return;
2769 }
2770 List<ServerName> servers = serverManager.createDestinationServersList();
2771 if (servers == null || servers.isEmpty()) {
2772 throw new IOException("Found no destination server to assign region(s)");
2773 }
2774
2775
2776 Map<ServerName, List<HRegionInfo>> bulkPlan =
2777 balancer.retainAssignment(regions, servers);
2778 if (bulkPlan == null) {
2779 throw new IOException("Unable to determine a plan to assign region(s)");
2780 }
2781
2782 assign(regions.size(), servers.size(),
2783 "retainAssignment=true", bulkPlan);
2784 }
2785
2786
2787
2788
2789
2790
2791
2792
2793
2794 public void assign(List<HRegionInfo> regions)
2795 throws IOException, InterruptedException {
2796 if (regions == null || regions.isEmpty()) {
2797 return;
2798 }
2799
2800 List<ServerName> servers = serverManager.createDestinationServersList();
2801 if (servers == null || servers.isEmpty()) {
2802 throw new IOException("Found no destination server to assign region(s)");
2803 }
2804
2805
2806 Map<ServerName, List<HRegionInfo>> bulkPlan
2807 = balancer.roundRobinAssignment(regions, servers);
2808 if (bulkPlan == null) {
2809 throw new IOException("Unable to determine a plan to assign region(s)");
2810 }
2811
2812 processFavoredNodes(regions);
2813 assign(regions.size(), servers.size(),
2814 "round-robin=true", bulkPlan);
2815 }
2816
2817 private void assign(int regions, int totalServers,
2818 String message, Map<ServerName, List<HRegionInfo>> bulkPlan)
2819 throws InterruptedException, IOException {
2820
2821 int servers = bulkPlan.size();
2822 if (servers == 1 || (regions < bulkAssignThresholdRegions
2823 && servers < bulkAssignThresholdServers)) {
2824
2825
2826
2827 if (LOG.isTraceEnabled()) {
2828 LOG.trace("Not using bulk assignment since we are assigning only " + regions +
2829 " region(s) to " + servers + " server(s)");
2830 }
2831
2832
2833 ArrayList<HRegionInfo> userRegionSet = new ArrayList<HRegionInfo>(regions);
2834 for (Map.Entry<ServerName, List<HRegionInfo>> plan: bulkPlan.entrySet()) {
2835 if (!assign(plan.getKey(), plan.getValue())) {
2836 for (HRegionInfo region: plan.getValue()) {
2837 if (!regionStates.isRegionOnline(region)) {
2838 invokeAssign(region);
2839 if (!region.getTable().isSystemTable()) {
2840 userRegionSet.add(region);
2841 }
2842 }
2843 }
2844 }
2845 }
2846
2847
2848 if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
2849 System.currentTimeMillis())) {
2850 LOG.debug("some user regions are still in transition: " + userRegionSet);
2851 }
2852 } else {
2853 LOG.info("Bulk assigning " + regions + " region(s) across "
2854 + totalServers + " server(s), " + message);
2855
2856
2857 BulkAssigner ba = new GeneralBulkAssigner(
2858 this.server, bulkPlan, this, bulkAssignWaitTillAllAssigned);
2859 ba.bulkAssign();
2860 LOG.info("Bulk assigning done");
2861 }
2862 }
2863
2864
2865
2866
2867
2868
2869
2870
2871
2872
2873 private void assignAllUserRegions(Map<HRegionInfo, ServerName> allRegions)
2874 throws IOException, InterruptedException {
2875 if (allRegions == null || allRegions.isEmpty()) return;
2876
2877
2878 boolean retainAssignment = server.getConfiguration().
2879 getBoolean("hbase.master.startup.retainassign", true);
2880
2881 Set<HRegionInfo> regionsFromMetaScan = allRegions.keySet();
2882 if (retainAssignment) {
2883 assign(allRegions);
2884 } else {
2885 List<HRegionInfo> regions = new ArrayList<HRegionInfo>(regionsFromMetaScan);
2886 assign(regions);
2887 }
2888
2889 for (HRegionInfo hri : regionsFromMetaScan) {
2890 TableName tableName = hri.getTable();
2891 if (!tableStateManager.isTableState(tableName,
2892 ZooKeeperProtos.Table.State.ENABLED)) {
2893 setEnabledTable(tableName);
2894 }
2895 }
2896
2897 assign(replicaRegionsNotRecordedInMeta(regionsFromMetaScan, (MasterServices)server));
2898 }
2899
2900
2901
2902
2903
2904
2905
2906
2907
2908
2909
2910
2911 public static List<HRegionInfo> replicaRegionsNotRecordedInMeta(
2912 Set<HRegionInfo> regionsRecordedInMeta, MasterServices master)throws IOException {
2913 List<HRegionInfo> regionsNotRecordedInMeta = new ArrayList<HRegionInfo>();
2914 for (HRegionInfo hri : regionsRecordedInMeta) {
2915 TableName table = hri.getTable();
2916 HTableDescriptor htd = master.getTableDescriptors().get(table);
2917
2918 int desiredRegionReplication = htd.getRegionReplication();
2919 for (int i = 0; i < desiredRegionReplication; i++) {
2920 HRegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hri, i);
2921 if (regionsRecordedInMeta.contains(replica)) continue;
2922 regionsNotRecordedInMeta.add(replica);
2923 }
2924 }
2925 return regionsNotRecordedInMeta;
2926 }
2927
2928
2929
2930
2931
2932
2933
2934 boolean waitUntilNoRegionsInTransition(final long timeout)
2935 throws InterruptedException {
2936
2937
2938
2939
2940
2941
2942 final long endTime = System.currentTimeMillis() + timeout;
2943
2944 while (!this.server.isStopped() && regionStates.isRegionsInTransition()
2945 && endTime > System.currentTimeMillis()) {
2946 regionStates.waitForUpdate(100);
2947 }
2948
2949 return !regionStates.isRegionsInTransition();
2950 }
2951
2952
2953
2954
2955
2956
2957
2958
2959
2960 Set<ServerName> rebuildUserRegions() throws
2961 IOException, KeeperException, CoordinatedStateException {
2962 Set<TableName> disabledOrEnablingTables = tableStateManager.getTablesInStates(
2963 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.ENABLING);
2964
2965 Set<TableName> disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
2966 ZooKeeperProtos.Table.State.DISABLED,
2967 ZooKeeperProtos.Table.State.DISABLING,
2968 ZooKeeperProtos.Table.State.ENABLING);
2969
2970
2971 List<Result> results = MetaTableAccessor.fullScanOfMeta(server.getConnection());
2972
2973 Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
2974
2975 Set<ServerName> offlineServers = new HashSet<ServerName>();
2976
2977 for (Result result : results) {
2978 if (result == null && LOG.isDebugEnabled()){
2979 LOG.debug("null result from meta - ignoring but this is strange.");
2980 continue;
2981 }
2982
2983
2984
2985 PairOfSameType<HRegionInfo> p = MetaTableAccessor.getMergeRegions(result);
2986 if (p.getFirst() != null && p.getSecond() != null) {
2987 int numReplicas = ((MasterServices)server).getTableDescriptors().get(p.getFirst().
2988 getTable()).getRegionReplication();
2989 for (HRegionInfo merge : p) {
2990 for (int i = 1; i < numReplicas; i++) {
2991 replicasToClose.add(RegionReplicaUtil.getRegionInfoForReplica(merge, i));
2992 }
2993 }
2994 }
2995 RegionLocations rl = MetaTableAccessor.getRegionLocations(result);
2996 if (rl == null) continue;
2997 HRegionLocation[] locations = rl.getRegionLocations();
2998 if (locations == null) continue;
2999 for (HRegionLocation hrl : locations) {
3000 if (hrl == null) continue;
3001 HRegionInfo regionInfo = hrl.getRegionInfo();
3002 if (regionInfo == null) continue;
3003 int replicaId = regionInfo.getReplicaId();
3004 State state = RegionStateStore.getRegionState(result, replicaId);
3005
3006
3007
3008 if (replicaId == 0 && state.equals(State.SPLIT)) {
3009 for (HRegionLocation h : locations) {
3010 replicasToClose.add(h.getRegionInfo());
3011 }
3012 }
3013 ServerName lastHost = hrl.getServerName();
3014 ServerName regionLocation = RegionStateStore.getRegionServer(result, replicaId);
3015 if (tableStateManager.isTableState(regionInfo.getTable(),
3016 ZooKeeperProtos.Table.State.DISABLED)) {
3017
3018
3019 lastHost = null;
3020 regionLocation = null;
3021 }
3022 regionStates.createRegionState(regionInfo, state, regionLocation, lastHost);
3023 if (!regionStates.isRegionInState(regionInfo, State.OPEN)) {
3024
3025 continue;
3026 }
3027 TableName tableName = regionInfo.getTable();
3028 if (!onlineServers.contains(regionLocation)) {
3029
3030 offlineServers.add(regionLocation);
3031 if (useZKForAssignment) {
3032 regionStates.regionOffline(regionInfo);
3033 }
3034 } else if (!disabledOrEnablingTables.contains(tableName)) {
3035
3036
3037 regionStates.regionOnline(regionInfo, regionLocation);
3038 balancer.regionOnline(regionInfo, regionLocation);
3039 } else if (useZKForAssignment) {
3040 regionStates.regionOffline(regionInfo);
3041 }
3042
3043
3044 if (!disabledOrDisablingOrEnabling.contains(tableName)
3045 && !getTableStateManager().isTableState(tableName,
3046 ZooKeeperProtos.Table.State.ENABLED)) {
3047 setEnabledTable(tableName);
3048 }
3049 }
3050 }
3051 return offlineServers;
3052 }
3053
3054
3055
3056
3057
3058
3059
3060
3061
3062 private void recoverTableInDisablingState()
3063 throws KeeperException, IOException, CoordinatedStateException {
3064 Set<TableName> disablingTables =
3065 tableStateManager.getTablesInStates(ZooKeeperProtos.Table.State.DISABLING);
3066 if (disablingTables.size() != 0) {
3067 for (TableName tableName : disablingTables) {
3068
3069 LOG.info("The table " + tableName
3070 + " is in DISABLING state. Hence recovering by moving the table"
3071 + " to DISABLED state.");
3072 new DisableTableHandler(this.server, tableName,
3073 this, tableLockManager, true).prepare().process();
3074 }
3075 }
3076 }
3077
3078
3079
3080
3081
3082
3083
3084
3085
3086 private void recoverTableInEnablingState()
3087 throws KeeperException, IOException, CoordinatedStateException {
3088 Set<TableName> enablingTables = tableStateManager.
3089 getTablesInStates(ZooKeeperProtos.Table.State.ENABLING);
3090 if (enablingTables.size() != 0) {
3091 for (TableName tableName : enablingTables) {
3092
3093 LOG.info("The table " + tableName
3094 + " is in ENABLING state. Hence recovering by moving the table"
3095 + " to ENABLED state.");
3096
3097
3098 EnableTableHandler eth = new EnableTableHandler(this.server, tableName,
3099 this, tableLockManager, true);
3100 try {
3101 eth.prepare();
3102 } catch (TableNotFoundException e) {
3103 LOG.warn("Table " + tableName + " not found in hbase:meta to recover.");
3104 continue;
3105 }
3106 eth.process();
3107 }
3108 }
3109 }
3110
3111
3112
3113
3114
3115
3116
3117
3118
3119
3120
3121
3122
3123
3124
3125
3126 private void processDeadServersAndRecoverLostRegions(
3127 Set<ServerName> deadServers) throws IOException, KeeperException {
3128 if (deadServers != null && !deadServers.isEmpty()) {
3129 for (ServerName serverName: deadServers) {
3130 if (!serverManager.isServerDead(serverName)) {
3131 serverManager.expireServer(serverName);
3132 }
3133 }
3134 }
3135
3136 List<String> nodes = useZKForAssignment ?
3137 ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.assignmentZNode)
3138 : ZKUtil.listChildrenNoWatch(watcher, watcher.assignmentZNode);
3139 if (nodes != null && !nodes.isEmpty()) {
3140 for (String encodedRegionName : nodes) {
3141 processRegionInTransition(encodedRegionName, null);
3142 }
3143 } else if (!useZKForAssignment) {
3144 processRegionInTransitionZkLess();
3145 }
3146 }
3147
3148 void processRegionInTransitionZkLess() {
3149
3150
3151
3152
3153
3154 Map<String, RegionState> rits = regionStates.getRegionsInTransition();
3155 for (RegionState regionState : rits.values()) {
3156 LOG.info("Processing " + regionState);
3157 ServerName serverName = regionState.getServerName();
3158
3159
3160 if (serverName != null
3161 && !serverManager.getOnlineServers().containsKey(serverName)) {
3162 LOG.info("Server " + serverName + " isn't online. SSH will handle this");
3163 continue;
3164 }
3165 HRegionInfo regionInfo = regionState.getRegion();
3166 State state = regionState.getState();
3167
3168 switch (state) {
3169 case CLOSED:
3170 invokeAssign(regionInfo);
3171 break;
3172 case PENDING_OPEN:
3173 retrySendRegionOpen(regionState);
3174 break;
3175 case PENDING_CLOSE:
3176 retrySendRegionClose(regionState);
3177 break;
3178 case FAILED_CLOSE:
3179 case FAILED_OPEN:
3180 invokeUnAssign(regionInfo);
3181 break;
3182 default:
3183
3184 }
3185 }
3186 }
3187
3188
3189
3190
3191
3192 private void retrySendRegionOpen(final RegionState regionState) {
3193 this.executorService.submit(
3194 new EventHandler(server, EventType.M_MASTER_RECOVERY) {
3195 @Override
3196 public void process() throws IOException {
3197 HRegionInfo hri = regionState.getRegion();
3198 ServerName serverName = regionState.getServerName();
3199 ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
3200 try {
3201 for (int i = 1; i <= maximumAttempts; i++) {
3202 if (!serverManager.isServerOnline(serverName)
3203 || server.isStopped() || server.isAborted()) {
3204 return;
3205 }
3206 try {
3207 if (!regionState.equals(regionStates.getRegionState(hri))) {
3208 return;
3209 }
3210 List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
3211 if (shouldAssignRegionsWithFavoredNodes) {
3212 favoredNodes = ((FavoredNodeLoadBalancer)balancer).getFavoredNodes(hri);
3213 }
3214 RegionOpeningState regionOpenState = serverManager.sendRegionOpen(
3215 serverName, hri, -1, favoredNodes);
3216
3217 if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
3218
3219
3220 LOG.debug("Got failed_opening in retry sendRegionOpen for "
3221 + regionState + ", re-assign it");
3222 invokeAssign(hri, true);
3223 }
3224 return;
3225 } catch (Throwable t) {
3226 if (t instanceof RemoteException) {
3227 t = ((RemoteException) t).unwrapRemoteException();
3228 }
3229
3230 if (t instanceof java.net.SocketTimeoutException
3231 || t instanceof FailedServerException) {
3232 Threads.sleep(100);
3233 continue;
3234 }
3235
3236 LOG.debug("Got exception in retry sendRegionOpen for "
3237 + regionState + ", re-assign it", t);
3238 invokeAssign(hri);
3239 return;
3240 }
3241 }
3242 } finally {
3243 lock.unlock();
3244 }
3245 }
3246 });
3247 }
3248
3249
3250
3251
3252
3253 private void retrySendRegionClose(final RegionState regionState) {
3254 this.executorService.submit(
3255 new EventHandler(server, EventType.M_MASTER_RECOVERY) {
3256 @Override
3257 public void process() throws IOException {
3258 HRegionInfo hri = regionState.getRegion();
3259 ServerName serverName = regionState.getServerName();
3260 ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
3261 try {
3262 for (int i = 1; i <= maximumAttempts; i++) {
3263 if (!serverManager.isServerOnline(serverName)
3264 || server.isStopped() || server.isAborted()) {
3265 return;
3266 }
3267 try {
3268 if (!regionState.equals(regionStates.getRegionState(hri))) {
3269 return;
3270 }
3271 if (!serverManager.sendRegionClose(serverName, hri, -1, null, false)) {
3272
3273 LOG.debug("Got false in retry sendRegionClose for "
3274 + regionState + ", re-close it");
3275 invokeUnAssign(hri);
3276 }
3277 return;
3278 } catch (Throwable t) {
3279 if (t instanceof RemoteException) {
3280 t = ((RemoteException) t).unwrapRemoteException();
3281 }
3282
3283 if (t instanceof java.net.SocketTimeoutException
3284 || t instanceof FailedServerException) {
3285 Threads.sleep(100);
3286 continue;
3287 }
3288 if (!(t instanceof NotServingRegionException
3289 || t instanceof RegionAlreadyInTransitionException)) {
3290
3291
3292
3293 LOG.debug("Got exception in retry sendRegionClose for "
3294 + regionState + ", re-close it", t);
3295 invokeUnAssign(hri);
3296 }
3297 return;
3298 }
3299 }
3300 } finally {
3301 lock.unlock();
3302 }
3303 }
3304 });
3305 }
3306
3307
3308
3309
3310
3311
3312
3313
3314 public void updateRegionsInTransitionMetrics() {
3315 long currentTime = System.currentTimeMillis();
3316 int totalRITs = 0;
3317 int totalRITsOverThreshold = 0;
3318 long oldestRITTime = 0;
3319 int ritThreshold = this.server.getConfiguration().
3320 getInt(HConstants.METRICS_RIT_STUCK_WARNING_THRESHOLD, 60000);
3321 for (RegionState state: regionStates.getRegionsInTransition().values()) {
3322 totalRITs++;
3323 long ritTime = currentTime - state.getStamp();
3324 if (ritTime > ritThreshold) {
3325 totalRITsOverThreshold++;
3326 }
3327 if (oldestRITTime < ritTime) {
3328 oldestRITTime = ritTime;
3329 }
3330 }
3331 if (this.metricsAssignmentManager != null) {
3332 this.metricsAssignmentManager.updateRITOldestAge(oldestRITTime);
3333 this.metricsAssignmentManager.updateRITCount(totalRITs);
3334 this.metricsAssignmentManager.updateRITCountOverThreshold(totalRITsOverThreshold);
3335 }
3336 }
3337
3338
3339
3340
3341 void clearRegionPlan(final HRegionInfo region) {
3342 synchronized (this.regionPlans) {
3343 this.regionPlans.remove(region.getEncodedName());
3344 }
3345 }
3346
3347
3348
3349
3350
3351
3352 public void waitOnRegionToClearRegionsInTransition(final HRegionInfo hri)
3353 throws IOException, InterruptedException {
3354 waitOnRegionToClearRegionsInTransition(hri, -1L);
3355 }
3356
3357
3358
3359
3360
3361
3362
3363
3364 public boolean waitOnRegionToClearRegionsInTransition(final HRegionInfo hri, long timeOut)
3365 throws InterruptedException {
3366 if (!regionStates.isRegionInTransition(hri)) return true;
3367 long end = (timeOut <= 0) ? Long.MAX_VALUE : EnvironmentEdgeManager.currentTime()
3368 + timeOut;
3369
3370
3371 LOG.info("Waiting for " + hri.getEncodedName() +
3372 " to leave regions-in-transition, timeOut=" + timeOut + " ms.");
3373 while (!this.server.isStopped() && regionStates.isRegionInTransition(hri)) {
3374 regionStates.waitForUpdate(100);
3375 if (EnvironmentEdgeManager.currentTime() > end) {
3376 LOG.info("Timed out on waiting for " + hri.getEncodedName() + " to be assigned.");
3377 return false;
3378 }
3379 }
3380 if (this.server.isStopped()) {
3381 LOG.info("Giving up wait on regions in transition because stoppable.isStopped is set");
3382 return false;
3383 }
3384 return true;
3385 }
3386
3387 void invokeAssign(HRegionInfo regionInfo) {
3388 invokeAssign(regionInfo, true);
3389 }
3390
3391 void invokeAssign(HRegionInfo regionInfo, boolean newPlan) {
3392 threadPoolExecutorService.submit(new AssignCallable(this, regionInfo, newPlan));
3393 }
3394
3395 void invokeUnAssign(HRegionInfo regionInfo) {
3396 threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
3397 }
3398
3399 public ServerHostRegion isCarryingMeta(ServerName serverName) {
3400 return isCarryingRegion(serverName, HRegionInfo.FIRST_META_REGIONINFO);
3401 }
3402
3403 public ServerHostRegion isCarryingMetaReplica(ServerName serverName, int replicaId) {
3404 return isCarryingRegion(serverName,
3405 RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO, replicaId));
3406 }
3407
3408 public ServerHostRegion isCarryingMetaReplica(ServerName serverName, HRegionInfo metaHri) {
3409 return isCarryingRegion(serverName, metaHri);
3410 }
3411
3412
3413
3414
3415
3416
3417
3418
3419
3420
3421
3422 private ServerHostRegion isCarryingRegion(ServerName serverName, HRegionInfo hri) {
3423 RegionTransition rt = null;
3424 try {
3425 byte [] data = ZKAssign.getData(watcher, hri.getEncodedName());
3426
3427 rt = data == null? null: RegionTransition.parseFrom(data);
3428 } catch (KeeperException e) {
3429 server.abort("Exception reading unassigned node for region=" + hri.getEncodedName(), e);
3430 } catch (DeserializationException e) {
3431 server.abort("Exception parsing unassigned node for region=" + hri.getEncodedName(), e);
3432 }
3433
3434 ServerName addressFromZK = rt != null? rt.getServerName(): null;
3435 if (addressFromZK != null) {
3436
3437 boolean matchZK = addressFromZK.equals(serverName);
3438 LOG.debug("Checking region=" + hri.getRegionNameAsString() + ", zk server=" + addressFromZK +
3439 " current=" + serverName + ", matches=" + matchZK);
3440 return matchZK ? ServerHostRegion.HOSTING_REGION : ServerHostRegion.NOT_HOSTING_REGION;
3441 }
3442
3443 ServerName addressFromAM = regionStates.getRegionServerOfRegion(hri);
3444 if (addressFromAM != null) {
3445 boolean matchAM = addressFromAM.equals(serverName);
3446 LOG.debug("based on AM, current region=" + hri.getRegionNameAsString() +
3447 " is on server=" + (addressFromAM != null ? addressFromAM : "null") +
3448 " server being checked: " + serverName);
3449 return matchAM ? ServerHostRegion.HOSTING_REGION : ServerHostRegion.NOT_HOSTING_REGION;
3450 }
3451
3452 if (hri.isMetaRegion() && RegionReplicaUtil.isDefaultReplica(hri)) {
3453
3454 final ServerName serverNameInZK =
3455 server.getMetaTableLocator().getMetaRegionLocation(this.server.getZooKeeper());
3456 LOG.debug("Based on MetaTableLocator, the META region is on server="
3457 + (serverNameInZK == null ? "null" : serverNameInZK)
3458 + " server being checked: " + serverName);
3459 if (serverNameInZK != null) {
3460 return serverNameInZK.equals(serverName) ?
3461 ServerHostRegion.HOSTING_REGION : ServerHostRegion.NOT_HOSTING_REGION;
3462 }
3463 }
3464
3465
3466 return ServerHostRegion.UNKNOWN;
3467 }
3468
3469
3470
3471
3472
3473
3474 public List<HRegionInfo> processServerShutdown(final ServerName sn) {
3475
3476 synchronized (this.regionPlans) {
3477 for (Iterator <Map.Entry<String, RegionPlan>> i =
3478 this.regionPlans.entrySet().iterator(); i.hasNext();) {
3479 Map.Entry<String, RegionPlan> e = i.next();
3480 ServerName otherSn = e.getValue().getDestination();
3481
3482 if (otherSn != null && otherSn.equals(sn)) {
3483
3484 i.remove();
3485 }
3486 }
3487 }
3488 List<HRegionInfo> regions = regionStates.serverOffline(watcher, sn);
3489 for (Iterator<HRegionInfo> it = regions.iterator(); it.hasNext(); ) {
3490 HRegionInfo hri = it.next();
3491 String encodedName = hri.getEncodedName();
3492
3493
3494 Lock lock = locker.acquireLock(encodedName);
3495 try {
3496 RegionState regionState =
3497 regionStates.getRegionTransitionState(encodedName);
3498 if (regionState == null
3499 || (regionState.getServerName() != null && !regionState.isOnServer(sn))
3500 || !(regionState.isFailedClose() || regionState.isOffline()
3501 || regionState.isPendingOpenOrOpening())) {
3502 LOG.info("Skip " + regionState + " since it is not opening/failed_close"
3503 + " on the dead server any more: " + sn);
3504 it.remove();
3505 } else {
3506 try {
3507
3508 ZKAssign.deleteNodeFailSilent(watcher, hri);
3509 } catch (KeeperException ke) {
3510 server.abort("Unexpected ZK exception deleting node " + hri, ke);
3511 }
3512 if (tableStateManager.isTableState(hri.getTable(),
3513 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3514 regionStates.regionOffline(hri);
3515 it.remove();
3516 continue;
3517 }
3518
3519 regionStates.updateRegionState(hri, State.OFFLINE);
3520 }
3521 } finally {
3522 lock.unlock();
3523 }
3524 }
3525 return regions;
3526 }
3527
3528
3529
3530
3531 public void balance(final RegionPlan plan) {
3532
3533 HRegionInfo hri = plan.getRegionInfo();
3534 TableName tableName = hri.getTable();
3535 if (tableStateManager.isTableState(tableName,
3536 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3537 LOG.info("Ignored moving region of disabling/disabled table "
3538 + tableName);
3539 return;
3540 }
3541
3542
3543 String encodedName = hri.getEncodedName();
3544 ReentrantLock lock = locker.acquireLock(encodedName);
3545 try {
3546 if (!regionStates.isRegionOnline(hri)) {
3547 RegionState state = regionStates.getRegionState(encodedName);
3548 LOG.info("Ignored moving region not assigned: " + hri + ", "
3549 + (state == null ? "not in region states" : state));
3550 return;
3551 }
3552 synchronized (this.regionPlans) {
3553 this.regionPlans.put(plan.getRegionName(), plan);
3554 }
3555 unassign(hri, false, plan.getDestination());
3556 } finally {
3557 lock.unlock();
3558 }
3559 }
3560
3561 public void stop() {
3562 shutdown();
3563 }
3564
3565
3566
3567
3568 public void shutdown() {
3569
3570 synchronized (zkEventWorkerWaitingList){
3571 zkEventWorkerWaitingList.clear();
3572 }
3573
3574
3575 threadPoolExecutorService.shutdownNow();
3576 zkEventWorkers.shutdownNow();
3577 regionStateStore.stop();
3578 }
3579
3580 protected void setEnabledTable(TableName tableName) {
3581 try {
3582 this.tableStateManager.setTableState(tableName,
3583 ZooKeeperProtos.Table.State.ENABLED);
3584 } catch (CoordinatedStateException e) {
3585
3586 String errorMsg = "Unable to ensure that the table " + tableName
3587 + " will be" + " enabled because of a ZooKeeper issue";
3588 LOG.error(errorMsg);
3589 this.server.abort(errorMsg, e);
3590 }
3591 }
3592
3593
3594
3595
3596
3597
3598
3599 private boolean asyncSetOfflineInZooKeeper(final RegionState state,
3600 final AsyncCallback.StringCallback cb, final ServerName destination) {
3601 if (!state.isClosed() && !state.isOffline()) {
3602 this.server.abort("Unexpected state trying to OFFLINE; " + state,
3603 new IllegalStateException());
3604 return false;
3605 }
3606 regionStates.updateRegionState(state.getRegion(), State.OFFLINE);
3607 try {
3608 ZKAssign.asyncCreateNodeOffline(watcher, state.getRegion(),
3609 destination, cb, state);
3610 } catch (KeeperException e) {
3611 if (e instanceof NodeExistsException) {
3612 LOG.warn("Node for " + state.getRegion() + " already exists");
3613 } else {
3614 server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
3615 }
3616 return false;
3617 }
3618 return true;
3619 }
3620
3621 private boolean deleteNodeInStates(String encodedName,
3622 String desc, ServerName sn, EventType... types) {
3623 try {
3624 for (EventType et: types) {
3625 if (ZKAssign.deleteNode(watcher, encodedName, et, sn)) {
3626 return true;
3627 }
3628 }
3629 LOG.info("Failed to delete the " + desc + " node for "
3630 + encodedName + ". The node type may not match");
3631 } catch (NoNodeException e) {
3632 if (LOG.isDebugEnabled()) {
3633 LOG.debug("The " + desc + " node for " + encodedName + " already deleted");
3634 }
3635 } catch (KeeperException ke) {
3636 server.abort("Unexpected ZK exception deleting " + desc
3637 + " node for the region " + encodedName, ke);
3638 }
3639 return false;
3640 }
3641
3642 private void deleteMergingNode(String encodedName, ServerName sn) {
3643 deleteNodeInStates(encodedName, "merging", sn, EventType.RS_ZK_REGION_MERGING,
3644 EventType.RS_ZK_REQUEST_REGION_MERGE, EventType.RS_ZK_REGION_MERGED);
3645 }
3646
3647 private void deleteSplittingNode(String encodedName, ServerName sn) {
3648 deleteNodeInStates(encodedName, "splitting", sn, EventType.RS_ZK_REGION_SPLITTING,
3649 EventType.RS_ZK_REQUEST_REGION_SPLIT, EventType.RS_ZK_REGION_SPLIT);
3650 }
3651
3652 private void onRegionFailedOpen(
3653 final HRegionInfo hri, final ServerName sn) {
3654 String encodedName = hri.getEncodedName();
3655 AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
3656 if (failedOpenCount == null) {
3657 failedOpenCount = new AtomicInteger();
3658
3659
3660
3661 failedOpenTracker.put(encodedName, failedOpenCount);
3662 }
3663 if (failedOpenCount.incrementAndGet() >= maximumAttempts && !hri.isMetaRegion()) {
3664 regionStates.updateRegionState(hri, State.FAILED_OPEN);
3665
3666
3667 failedOpenTracker.remove(encodedName);
3668 } else {
3669 if (hri.isMetaRegion() && failedOpenCount.get() >= maximumAttempts) {
3670
3671
3672 LOG.warn("Failed to open the hbase:meta region " +
3673 hri.getRegionNameAsString() + " after" +
3674 failedOpenCount.get() + " retries. Continue retrying.");
3675 }
3676
3677
3678 RegionState regionState = regionStates.updateRegionState(hri, State.CLOSED);
3679 if (regionState != null) {
3680
3681
3682 if (getTableStateManager().isTableState(hri.getTable(),
3683 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING) ||
3684 replicasToClose.contains(hri)) {
3685 offlineDisabledRegion(hri);
3686 return;
3687 }
3688
3689 regionStates.updateRegionState(hri, RegionState.State.CLOSED);
3690
3691 removeClosedRegion(hri);
3692 try {
3693 getRegionPlan(hri, sn, true);
3694 } catch (HBaseIOException e) {
3695 LOG.warn("Failed to get region plan", e);
3696 }
3697 invokeAssign(hri, false);
3698 }
3699 }
3700 }
3701
3702 private void onRegionOpen(
3703 final HRegionInfo hri, final ServerName sn, long openSeqNum) {
3704 regionOnline(hri, sn, openSeqNum);
3705 if (useZKForAssignment) {
3706 try {
3707
3708 ZKAssign.deleteNodeFailSilent(watcher, hri);
3709 } catch (KeeperException ke) {
3710 server.abort("Unexpected ZK exception deleting node " + hri, ke);
3711 }
3712 }
3713
3714
3715 failedOpenTracker.remove(hri.getEncodedName());
3716 if (getTableStateManager().isTableState(hri.getTable(),
3717 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3718 invokeUnAssign(hri);
3719 }
3720 }
3721
3722 private void onRegionClosed(final HRegionInfo hri) {
3723 if (getTableStateManager().isTableState(hri.getTable(),
3724 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING) ||
3725 replicasToClose.contains(hri)) {
3726 offlineDisabledRegion(hri);
3727 return;
3728 }
3729 regionStates.updateRegionState(hri, RegionState.State.CLOSED);
3730 sendRegionClosedNotification(hri);
3731
3732 removeClosedRegion(hri);
3733 invokeAssign(hri, false);
3734 }
3735
3736 private String onRegionSplit(ServerName sn, TransitionCode code,
3737 final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) {
3738 final RegionState rs_p = regionStates.getRegionState(p);
3739 RegionState rs_a = regionStates.getRegionState(a);
3740 RegionState rs_b = regionStates.getRegionState(b);
3741 if (!(rs_p.isOpenOrSplittingOnServer(sn)
3742 && (rs_a == null || rs_a.isOpenOrSplittingNewOnServer(sn))
3743 && (rs_b == null || rs_b.isOpenOrSplittingNewOnServer(sn)))) {
3744 return "Not in state good for split";
3745 }
3746
3747 regionStates.updateRegionState(a, State.SPLITTING_NEW, sn);
3748 regionStates.updateRegionState(b, State.SPLITTING_NEW, sn);
3749 regionStates.updateRegionState(p, State.SPLITTING);
3750
3751 if (code == TransitionCode.SPLIT) {
3752 if (TEST_SKIP_SPLIT_HANDLING) {
3753 return "Skipping split message, TEST_SKIP_SPLIT_HANDLING is set";
3754 }
3755 regionOffline(p, State.SPLIT);
3756 regionOnline(a, sn, 1);
3757 regionOnline(b, sn, 1);
3758
3759
3760 if (getTableStateManager().isTableState(p.getTable(),
3761 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3762 invokeUnAssign(a);
3763 invokeUnAssign(b);
3764 } else {
3765 Callable<Object> splitReplicasCallable = new Callable<Object>() {
3766 @Override
3767 public Object call() {
3768 doSplittingOfReplicas(p, a, b);
3769 return null;
3770 }
3771 };
3772 threadPoolExecutorService.submit(splitReplicasCallable);
3773 }
3774 } else if (code == TransitionCode.SPLIT_PONR) {
3775 try {
3776 regionStates.splitRegion(p, a, b, sn);
3777 } catch (IOException ioe) {
3778 LOG.info("Failed to record split region " + p.getShortNameToLog());
3779 return "Failed to record the splitting in meta";
3780 }
3781 } else if (code == TransitionCode.SPLIT_REVERTED) {
3782
3783
3784 regionOnline(p, sn);
3785
3786
3787 RegionState regionStateA = regionStates.getRegionState(a);
3788 RegionState regionStateB = regionStates.getRegionState(b);
3789 if (regionStateA != null) {
3790 regionOffline(a);
3791 }
3792 if (regionStateB != null) {
3793 regionOffline(b);
3794 }
3795
3796 if (getTableStateManager().isTableState(p.getTable(),
3797 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3798 invokeUnAssign(p);
3799 }
3800 }
3801 return null;
3802 }
3803
3804 private String onRegionMerge(ServerName sn, TransitionCode code,
3805 final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) {
3806 RegionState rs_p = regionStates.getRegionState(p);
3807 RegionState rs_a = regionStates.getRegionState(a);
3808 RegionState rs_b = regionStates.getRegionState(b);
3809 if (!(rs_a.isOpenOrMergingOnServer(sn) && rs_b.isOpenOrMergingOnServer(sn)
3810 && (rs_p == null || rs_p.isOpenOrMergingNewOnServer(sn)))) {
3811 return "Not in state good for merge";
3812 }
3813
3814 regionStates.updateRegionState(a, State.MERGING);
3815 regionStates.updateRegionState(b, State.MERGING);
3816 regionStates.updateRegionState(p, State.MERGING_NEW, sn);
3817
3818 String encodedName = p.getEncodedName();
3819 if (code == TransitionCode.READY_TO_MERGE) {
3820 mergingRegions.put(encodedName,
3821 new PairOfSameType<HRegionInfo>(a, b));
3822 } else if (code == TransitionCode.MERGED) {
3823 mergingRegions.remove(encodedName);
3824 regionOffline(a, State.MERGED);
3825 regionOffline(b, State.MERGED);
3826 regionOnline(p, sn, 1);
3827
3828
3829 if (getTableStateManager().isTableState(p.getTable(),
3830 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3831 invokeUnAssign(p);
3832 } else {
3833 Callable<Object> mergeReplicasCallable = new Callable<Object>() {
3834 @Override
3835 public Object call() {
3836 doMergingOfReplicas(p, a, b);
3837 return null;
3838 }
3839 };
3840 threadPoolExecutorService.submit(mergeReplicasCallable);
3841 }
3842 } else if (code == TransitionCode.MERGE_PONR) {
3843 try {
3844 regionStates.mergeRegions(p, a, b, sn);
3845 } catch (IOException ioe) {
3846 LOG.info("Failed to record merged region " + p.getShortNameToLog());
3847 return "Failed to record the merging in meta";
3848 }
3849 } else {
3850 mergingRegions.remove(encodedName);
3851 regionOnline(a, sn);
3852 regionOnline(b, sn);
3853 regionOffline(p);
3854
3855 if (getTableStateManager().isTableState(p.getTable(),
3856 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3857 invokeUnAssign(a);
3858 invokeUnAssign(b);
3859 }
3860 }
3861 return null;
3862 }
3863
3864
3865
3866
3867
3868 private boolean handleRegionMerging(final RegionTransition rt, final String encodedName,
3869 final String prettyPrintedRegionName, final ServerName sn) {
3870 if (!serverManager.isServerOnline(sn)) {
3871 LOG.warn("Dropped merging! ServerName=" + sn + " unknown.");
3872 return false;
3873 }
3874 byte [] payloadOfMerging = rt.getPayload();
3875 List<HRegionInfo> mergingRegions;
3876 try {
3877 mergingRegions = HRegionInfo.parseDelimitedFrom(
3878 payloadOfMerging, 0, payloadOfMerging.length);
3879 } catch (IOException e) {
3880 LOG.error("Dropped merging! Failed reading " + rt.getEventType()
3881 + " payload for " + prettyPrintedRegionName);
3882 return false;
3883 }
3884 assert mergingRegions.size() == 3;
3885 HRegionInfo p = mergingRegions.get(0);
3886 HRegionInfo hri_a = mergingRegions.get(1);
3887 HRegionInfo hri_b = mergingRegions.get(2);
3888
3889 RegionState rs_p = regionStates.getRegionState(p);
3890 RegionState rs_a = regionStates.getRegionState(hri_a);
3891 RegionState rs_b = regionStates.getRegionState(hri_b);
3892
3893 if (!((rs_a == null || rs_a.isOpenOrMergingOnServer(sn))
3894 && (rs_b == null || rs_b.isOpenOrMergingOnServer(sn))
3895 && (rs_p == null || rs_p.isOpenOrMergingNewOnServer(sn)))) {
3896 LOG.warn("Dropped merging! Not in state good for MERGING; rs_p="
3897 + rs_p + ", rs_a=" + rs_a + ", rs_b=" + rs_b);
3898 return false;
3899 }
3900
3901 EventType et = rt.getEventType();
3902 if (et == EventType.RS_ZK_REQUEST_REGION_MERGE) {
3903 try {
3904 RegionMergeCoordination.RegionMergeDetails std =
3905 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
3906 .getRegionMergeCoordination().getDefaultDetails();
3907 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
3908 .getRegionMergeCoordination().processRegionMergeRequest(p, hri_a, hri_b, sn, std);
3909 if (((ZkRegionMergeCoordination.ZkRegionMergeDetails) std).getZnodeVersion() == -1) {
3910 byte[] data = ZKAssign.getData(watcher, encodedName);
3911 EventType currentType = null;
3912 if (data != null) {
3913 RegionTransition newRt = RegionTransition.parseFrom(data);
3914 currentType = newRt.getEventType();
3915 }
3916 if (currentType == null || (currentType != EventType.RS_ZK_REGION_MERGED
3917 && currentType != EventType.RS_ZK_REGION_MERGING)) {
3918 LOG.warn("Failed to transition pending_merge node "
3919 + encodedName + " to merging, it's now " + currentType);
3920 return false;
3921 }
3922 }
3923 } catch (Exception e) {
3924 LOG.warn("Failed to transition pending_merge node "
3925 + encodedName + " to merging", e);
3926 return false;
3927 }
3928 }
3929
3930 synchronized (regionStates) {
3931 regionStates.updateRegionState(hri_a, State.MERGING);
3932 regionStates.updateRegionState(hri_b, State.MERGING);
3933 regionStates.updateRegionState(p, State.MERGING_NEW, sn);
3934
3935 if (et != EventType.RS_ZK_REGION_MERGED) {
3936 this.mergingRegions.put(encodedName,
3937 new PairOfSameType<HRegionInfo>(hri_a, hri_b));
3938 } else {
3939 this.mergingRegions.remove(encodedName);
3940 regionOffline(hri_a, State.MERGED);
3941 regionOffline(hri_b, State.MERGED);
3942 regionOnline(p, sn);
3943 }
3944 }
3945
3946 if (et == EventType.RS_ZK_REGION_MERGED) {
3947 doMergingOfReplicas(p, hri_a, hri_b);
3948 LOG.debug("Handling MERGED event for " + encodedName + "; deleting node");
3949
3950 try {
3951 boolean successful = false;
3952 while (!successful) {
3953
3954
3955 successful = ZKAssign.deleteNode(watcher, encodedName,
3956 EventType.RS_ZK_REGION_MERGED, sn);
3957 }
3958 } catch (KeeperException e) {
3959 if (e instanceof NoNodeException) {
3960 String znodePath = ZKUtil.joinZNode(watcher.splitLogZNode, encodedName);
3961 LOG.debug("The znode " + znodePath + " does not exist. May be deleted already.");
3962 } else {
3963 server.abort("Error deleting MERGED node " + encodedName, e);
3964 }
3965 }
3966 LOG.info("Handled MERGED event; merged=" + p.getRegionNameAsString()
3967 + ", region_a=" + hri_a.getRegionNameAsString() + ", region_b="
3968 + hri_b.getRegionNameAsString() + ", on " + sn);
3969
3970
3971 if (tableStateManager.isTableState(p.getTable(),
3972 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3973 unassign(p);
3974 }
3975 }
3976 return true;
3977 }
3978
3979
3980
3981
3982 private boolean handleRegionSplitting(final RegionTransition rt, final String encodedName,
3983 final String prettyPrintedRegionName, final ServerName sn) {
3984 if (!serverManager.isServerOnline(sn)) {
3985 LOG.warn("Dropped splitting! ServerName=" + sn + " unknown.");
3986 return false;
3987 }
3988 byte [] payloadOfSplitting = rt.getPayload();
3989 List<HRegionInfo> splittingRegions;
3990 try {
3991 splittingRegions = HRegionInfo.parseDelimitedFrom(
3992 payloadOfSplitting, 0, payloadOfSplitting.length);
3993 } catch (IOException e) {
3994 LOG.error("Dropped splitting! Failed reading " + rt.getEventType()
3995 + " payload for " + prettyPrintedRegionName);
3996 return false;
3997 }
3998 assert splittingRegions.size() == 2;
3999 HRegionInfo hri_a = splittingRegions.get(0);
4000 HRegionInfo hri_b = splittingRegions.get(1);
4001
4002 RegionState rs_p = regionStates.getRegionState(encodedName);
4003 RegionState rs_a = regionStates.getRegionState(hri_a);
4004 RegionState rs_b = regionStates.getRegionState(hri_b);
4005
4006 if (!((rs_p == null || rs_p.isOpenOrSplittingOnServer(sn))
4007 && (rs_a == null || rs_a.isOpenOrSplittingNewOnServer(sn))
4008 && (rs_b == null || rs_b.isOpenOrSplittingNewOnServer(sn)))) {
4009 LOG.warn("Dropped splitting! Not in state good for SPLITTING; rs_p="
4010 + rs_p + ", rs_a=" + rs_a + ", rs_b=" + rs_b);
4011 return false;
4012 }
4013
4014 if (rs_p == null) {
4015
4016 rs_p = regionStates.updateRegionState(rt, State.OPEN);
4017 if (rs_p == null) {
4018 LOG.warn("Received splitting for region " + prettyPrintedRegionName
4019 + " from server " + sn + " but it doesn't exist anymore,"
4020 + " probably already processed its split");
4021 return false;
4022 }
4023 regionStates.regionOnline(rs_p.getRegion(), sn);
4024 }
4025
4026 HRegionInfo p = rs_p.getRegion();
4027 EventType et = rt.getEventType();
4028 if (et == EventType.RS_ZK_REQUEST_REGION_SPLIT) {
4029 try {
4030 SplitTransactionDetails std =
4031 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
4032 .getSplitTransactionCoordination().getDefaultDetails();
4033 if (((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
4034 .getSplitTransactionCoordination().processTransition(p, hri_a, hri_b, sn, std) == -1) {
4035 byte[] data = ZKAssign.getData(watcher, encodedName);
4036 EventType currentType = null;
4037 if (data != null) {
4038 RegionTransition newRt = RegionTransition.parseFrom(data);
4039 currentType = newRt.getEventType();
4040 }
4041 if (currentType == null
4042 || (currentType != EventType.RS_ZK_REGION_SPLIT && currentType != EventType.RS_ZK_REGION_SPLITTING)) {
4043 LOG.warn("Failed to transition pending_split node " + encodedName
4044 + " to splitting, it's now " + currentType);
4045 return false;
4046 }
4047 }
4048 } catch (Exception e) {
4049 LOG.warn("Failed to transition pending_split node " + encodedName + " to splitting", e);
4050 return false;
4051 }
4052 }
4053
4054 synchronized (regionStates) {
4055 splitRegions.put(p, new PairOfSameType<HRegionInfo>(hri_a, hri_b));
4056 regionStates.updateRegionState(hri_a, State.SPLITTING_NEW, sn);
4057 regionStates.updateRegionState(hri_b, State.SPLITTING_NEW, sn);
4058 regionStates.updateRegionState(rt, State.SPLITTING);
4059
4060
4061
4062 if (TEST_SKIP_SPLIT_HANDLING) {
4063 LOG.warn("Skipping split message, TEST_SKIP_SPLIT_HANDLING is set");
4064 return true;
4065 }
4066
4067 if (et == EventType.RS_ZK_REGION_SPLIT) {
4068 regionOffline(p, State.SPLIT);
4069 regionOnline(hri_a, sn);
4070 regionOnline(hri_b, sn);
4071 splitRegions.remove(p);
4072 }
4073 }
4074
4075 if (et == EventType.RS_ZK_REGION_SPLIT) {
4076
4077 doSplittingOfReplicas(rs_p.getRegion(), hri_a, hri_b);
4078 LOG.debug("Handling SPLIT event for " + encodedName + "; deleting node");
4079
4080 try {
4081 boolean successful = false;
4082 while (!successful) {
4083
4084
4085 successful = ZKAssign.deleteNode(watcher, encodedName,
4086 EventType.RS_ZK_REGION_SPLIT, sn);
4087 }
4088 } catch (KeeperException e) {
4089 if (e instanceof NoNodeException) {
4090 String znodePath = ZKUtil.joinZNode(watcher.splitLogZNode, encodedName);
4091 LOG.debug("The znode " + znodePath + " does not exist. May be deleted already.");
4092 } else {
4093 server.abort("Error deleting SPLIT node " + encodedName, e);
4094 }
4095 }
4096 LOG.info("Handled SPLIT event; parent=" + p.getRegionNameAsString()
4097 + ", daughter a=" + hri_a.getRegionNameAsString() + ", daughter b="
4098 + hri_b.getRegionNameAsString() + ", on " + sn);
4099
4100
4101 if (tableStateManager.isTableState(p.getTable(),
4102 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
4103 unassign(hri_a);
4104 unassign(hri_b);
4105 }
4106 }
4107 return true;
4108 }
4109
4110 private void doMergingOfReplicas(HRegionInfo mergedHri, final HRegionInfo hri_a,
4111 final HRegionInfo hri_b) {
4112
4113
4114 List<HRegionInfo> unmergedRegions = new ArrayList<HRegionInfo>();
4115 unmergedRegions.add(hri_a);
4116 unmergedRegions.add(hri_b);
4117 Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(unmergedRegions);
4118 Collection<List<HRegionInfo>> c = map.values();
4119 for (List<HRegionInfo> l : c) {
4120 for (HRegionInfo h : l) {
4121 if (!RegionReplicaUtil.isDefaultReplica(h)) {
4122 LOG.debug("Unassigning un-merged replica " + h);
4123 unassign(h);
4124 }
4125 }
4126 }
4127 int numReplicas = 1;
4128 try {
4129 numReplicas = ((MasterServices)server).getTableDescriptors().get(mergedHri.getTable()).
4130 getRegionReplication();
4131 } catch (IOException e) {
4132 LOG.warn("Couldn't get the replication attribute of the table " + mergedHri.getTable() +
4133 " due to " + e.getMessage() + ". The assignment of replicas for the merged region " +
4134 "will not be done");
4135 }
4136 List<HRegionInfo> regions = new ArrayList<HRegionInfo>();
4137 for (int i = 1; i < numReplicas; i++) {
4138 regions.add(RegionReplicaUtil.getRegionInfoForReplica(mergedHri, i));
4139 }
4140 try {
4141 assign(regions);
4142 } catch (IOException ioe) {
4143 LOG.warn("Couldn't assign all replica(s) of region " + mergedHri + " because of " +
4144 ioe.getMessage());
4145 } catch (InterruptedException ie) {
4146 LOG.warn("Couldn't assign all replica(s) of region " + mergedHri+ " because of " +
4147 ie.getMessage());
4148 }
4149 }
4150
4151 private void doSplittingOfReplicas(final HRegionInfo parentHri, final HRegionInfo hri_a,
4152 final HRegionInfo hri_b) {
4153
4154
4155
4156 int numReplicas = 1;
4157 try {
4158 numReplicas = ((MasterServices)server).getTableDescriptors().get(parentHri.getTable()).
4159 getRegionReplication();
4160 } catch (IOException e) {
4161 LOG.warn("Couldn't get the replication attribute of the table " + parentHri.getTable() +
4162 " due to " + e.getMessage() + ". The assignment of daughter replicas " +
4163 "replicas will not be done");
4164 }
4165
4166 List<HRegionInfo> parentRegion = new ArrayList<HRegionInfo>();
4167 parentRegion.add(parentHri);
4168 Map<ServerName, List<HRegionInfo>> currentAssign =
4169 regionStates.getRegionAssignments(parentRegion);
4170 Collection<List<HRegionInfo>> c = currentAssign.values();
4171 for (List<HRegionInfo> l : c) {
4172 for (HRegionInfo h : l) {
4173 if (!RegionReplicaUtil.isDefaultReplica(h)) {
4174 LOG.debug("Unassigning parent's replica " + h);
4175 unassign(h);
4176 }
4177 }
4178 }
4179
4180 Map<HRegionInfo, ServerName> map = new HashMap<HRegionInfo, ServerName>();
4181 for (int i = 1; i < numReplicas; i++) {
4182 prepareDaughterReplicaForAssignment(hri_a, parentHri, i, map);
4183 prepareDaughterReplicaForAssignment(hri_b, parentHri, i, map);
4184 }
4185 try {
4186 assign(map);
4187 } catch (IOException e) {
4188 LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)");
4189 } catch (InterruptedException e) {
4190 LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)");
4191 }
4192 }
4193
4194 private void prepareDaughterReplicaForAssignment(HRegionInfo daughterHri, HRegionInfo parentHri,
4195 int replicaId, Map<HRegionInfo, ServerName> map) {
4196 HRegionInfo parentReplica = RegionReplicaUtil.getRegionInfoForReplica(parentHri, replicaId);
4197 HRegionInfo daughterReplica = RegionReplicaUtil.getRegionInfoForReplica(daughterHri,
4198 replicaId);
4199 LOG.debug("Created replica region for daughter " + daughterReplica);
4200 ServerName sn;
4201 if ((sn = regionStates.getRegionServerOfRegion(parentReplica)) != null) {
4202 map.put(daughterReplica, sn);
4203 } else {
4204 List<ServerName> servers = serverManager.getOnlineServersList();
4205 sn = servers.get((new Random(System.currentTimeMillis())).nextInt(servers.size()));
4206 map.put(daughterReplica, sn);
4207 }
4208 }
4209
4210 public Set<HRegionInfo> getReplicasToClose() {
4211 return replicasToClose;
4212 }
4213
4214
4215
4216
4217
4218
4219 private void regionOffline(final HRegionInfo regionInfo, final State state) {
4220 regionStates.regionOffline(regionInfo, state);
4221 removeClosedRegion(regionInfo);
4222
4223 clearRegionPlan(regionInfo);
4224 balancer.regionOffline(regionInfo);
4225
4226
4227 sendRegionClosedNotification(regionInfo);
4228
4229 if (state != null && state.equals(State.SPLIT)) {
4230 Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1);
4231 c.add(regionInfo);
4232 Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(c);
4233 Collection<List<HRegionInfo>> allReplicas = map.values();
4234 for (List<HRegionInfo> list : allReplicas) {
4235 replicasToClose.addAll(list);
4236 }
4237 }
4238 else if (state != null && state.equals(State.MERGED)) {
4239 Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1);
4240 c.add(regionInfo);
4241 Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(c);
4242 Collection<List<HRegionInfo>> allReplicas = map.values();
4243 for (List<HRegionInfo> list : allReplicas) {
4244 replicasToClose.addAll(list);
4245 }
4246 }
4247 }
4248
4249 private void sendRegionOpenedNotification(final HRegionInfo regionInfo,
4250 final ServerName serverName) {
4251 if (!this.listeners.isEmpty()) {
4252 for (AssignmentListener listener : this.listeners) {
4253 listener.regionOpened(regionInfo, serverName);
4254 }
4255 }
4256 }
4257
4258 private void sendRegionClosedNotification(final HRegionInfo regionInfo) {
4259 if (!this.listeners.isEmpty()) {
4260 for (AssignmentListener listener : this.listeners) {
4261 listener.regionClosed(regionInfo);
4262 }
4263 }
4264 }
4265
4266
4267
4268
4269
4270
4271
4272
4273
4274
4275
4276
4277
4278
4279
4280
4281
4282
4283
4284
4285
4286
4287
4288
4289
4290
4291
4292
4293
4294
4295
4296
4297
4298
4299
4300
4301
4302
4303
4304
4305
4306
4307
4308
4309
4310 protected String onRegionTransition(final ServerName serverName,
4311 final RegionStateTransition transition) {
4312 TransitionCode code = transition.getTransitionCode();
4313 HRegionInfo hri = HRegionInfo.convert(transition.getRegionInfo(0));
4314 RegionState current = regionStates.getRegionState(hri);
4315 if (LOG.isDebugEnabled()) {
4316 LOG.debug("Got transition " + code + " for "
4317 + (current != null ? current.toString() : hri.getShortNameToLog())
4318 + " from " + serverName);
4319 }
4320 String errorMsg = null;
4321 switch (code) {
4322 case OPENED:
4323 if (current != null && current.isOpened() && current.isOnServer(serverName)) {
4324 LOG.info("Region " + hri.getShortNameToLog() + " is already " + current.getState() + " on "
4325 + serverName);
4326 break;
4327 }
4328 case FAILED_OPEN:
4329 if (current == null
4330 || !current.isPendingOpenOrOpeningOnServer(serverName)) {
4331 errorMsg = hri.getShortNameToLog()
4332 + " is not pending open on " + serverName;
4333 } else if (code == TransitionCode.FAILED_OPEN) {
4334 onRegionFailedOpen(hri, serverName);
4335 } else {
4336 long openSeqNum = HConstants.NO_SEQNUM;
4337 if (transition.hasOpenSeqNum()) {
4338 openSeqNum = transition.getOpenSeqNum();
4339 }
4340 if (openSeqNum < 0) {
4341 errorMsg = "Newly opened region has invalid open seq num " + openSeqNum;
4342 } else {
4343 onRegionOpen(hri, serverName, openSeqNum);
4344 }
4345 }
4346 break;
4347
4348 case CLOSED:
4349 if (current == null
4350 || !current.isPendingCloseOrClosingOnServer(serverName)) {
4351 errorMsg = hri.getShortNameToLog()
4352 + " is not pending close on " + serverName;
4353 } else {
4354 onRegionClosed(hri);
4355 }
4356 break;
4357
4358 case READY_TO_SPLIT:
4359 try {
4360 regionStateListener.onRegionSplit(hri);
4361 } catch (IOException exp) {
4362 errorMsg = StringUtils.stringifyException(exp);
4363 }
4364 case SPLIT_PONR:
4365 case SPLIT:
4366 case SPLIT_REVERTED:
4367 errorMsg =
4368 onRegionSplit(serverName, code, hri, HRegionInfo.convert(transition.getRegionInfo(1)),
4369 HRegionInfo.convert(transition.getRegionInfo(2)));
4370 if (org.apache.commons.lang.StringUtils.isEmpty(errorMsg)) {
4371 try {
4372 regionStateListener.onRegionSplitReverted(hri);
4373 } catch (IOException exp) {
4374 LOG.warn(StringUtils.stringifyException(exp));
4375 }
4376 }
4377 break;
4378 case READY_TO_MERGE:
4379 case MERGE_PONR:
4380 case MERGED:
4381 case MERGE_REVERTED:
4382 errorMsg = onRegionMerge(serverName, code, hri,
4383 HRegionInfo.convert(transition.getRegionInfo(1)),
4384 HRegionInfo.convert(transition.getRegionInfo(2)));
4385 if (code == TransitionCode.MERGED && org.apache.commons.lang.StringUtils.isEmpty(errorMsg)) {
4386 try {
4387 regionStateListener.onRegionMerged(hri);
4388 } catch (IOException exp) {
4389 errorMsg = StringUtils.stringifyException(exp);
4390 }
4391 }
4392 break;
4393
4394 default:
4395 errorMsg = "Unexpected transition code " + code;
4396 }
4397 if (errorMsg != null) {
4398 LOG.error("Failed to transtion region from " + current + " to "
4399 + code + " by " + serverName + ": " + errorMsg);
4400 }
4401 return errorMsg;
4402 }
4403
4404
4405
4406
4407 public LoadBalancer getBalancer() {
4408 return this.balancer;
4409 }
4410
4411 public Map<ServerName, List<HRegionInfo>>
4412 getSnapShotOfAssignment(Collection<HRegionInfo> infos) {
4413 return getRegionStates().getRegionAssignments(infos);
4414 }
4415
4416 void setRegionStateListener(RegionStateListener listener) {
4417 this.regionStateListener = listener;
4418 }
4419 }