1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.master;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.Collection;
26 import java.util.Collections;
27 import java.util.HashMap;
28 import java.util.HashSet;
29 import java.util.Iterator;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.NavigableMap;
33 import java.util.Random;
34 import java.util.Set;
35 import java.util.TreeMap;
36 import java.util.concurrent.Callable;
37 import java.util.concurrent.ConcurrentHashMap;
38 import java.util.concurrent.CopyOnWriteArrayList;
39 import java.util.concurrent.ThreadFactory;
40 import java.util.concurrent.TimeUnit;
41 import java.util.concurrent.atomic.AtomicBoolean;
42 import java.util.concurrent.atomic.AtomicInteger;
43 import java.util.concurrent.locks.Lock;
44 import java.util.concurrent.locks.ReentrantLock;
45
46 import org.apache.commons.logging.Log;
47 import org.apache.commons.logging.LogFactory;
48 import org.apache.hadoop.conf.Configuration;
49 import org.apache.hadoop.fs.FileSystem;
50 import org.apache.hadoop.fs.Path;
51 import org.apache.hadoop.hbase.CoordinatedStateException;
52 import org.apache.hadoop.hbase.HBaseIOException;
53 import org.apache.hadoop.hbase.HConstants;
54 import org.apache.hadoop.hbase.HRegionInfo;
55 import org.apache.hadoop.hbase.HRegionLocation;
56 import org.apache.hadoop.hbase.HTableDescriptor;
57 import org.apache.hadoop.hbase.MetaTableAccessor;
58 import org.apache.hadoop.hbase.NotServingRegionException;
59 import org.apache.hadoop.hbase.RegionLocations;
60 import org.apache.hadoop.hbase.RegionStateListener;
61 import org.apache.hadoop.hbase.RegionTransition;
62 import org.apache.hadoop.hbase.ServerName;
63 import org.apache.hadoop.hbase.TableName;
64 import org.apache.hadoop.hbase.TableNotFoundException;
65 import org.apache.hadoop.hbase.TableStateManager;
66 import org.apache.hadoop.hbase.classification.InterfaceAudience;
67 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
68 import org.apache.hadoop.hbase.client.Result;
69 import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
70 import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
71 import org.apache.hadoop.hbase.coordination.RegionMergeCoordination;
72 import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination.SplitTransactionDetails;
73 import org.apache.hadoop.hbase.coordination.ZkOpenRegionCoordination;
74 import org.apache.hadoop.hbase.coordination.ZkRegionMergeCoordination;
75 import org.apache.hadoop.hbase.exceptions.DeserializationException;
76 import org.apache.hadoop.hbase.executor.EventHandler;
77 import org.apache.hadoop.hbase.executor.EventType;
78 import org.apache.hadoop.hbase.executor.ExecutorService;
79 import org.apache.hadoop.hbase.ipc.FailedServerException;
80 import org.apache.hadoop.hbase.ipc.RpcClient;
81 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
82 import org.apache.hadoop.hbase.master.RegionState.State;
83 import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper;
84 import org.apache.hadoop.hbase.master.balancer.FavoredNodeLoadBalancer;
85 import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler;
86 import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
87 import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
88 import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler;
89 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
90 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
91 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
92 import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
93 import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
94 import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
95 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
96 import org.apache.hadoop.hbase.util.ConfigUtil;
97 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
98 import org.apache.hadoop.hbase.util.FSUtils;
99 import org.apache.hadoop.hbase.util.KeyLocker;
100 import org.apache.hadoop.hbase.util.Pair;
101 import org.apache.hadoop.hbase.util.PairOfSameType;
102 import org.apache.hadoop.hbase.util.Threads;
103 import org.apache.hadoop.hbase.util.Triple;
104 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
105 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
106 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
107 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
108 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
109 import org.apache.hadoop.ipc.RemoteException;
110 import org.apache.hadoop.util.StringUtils;
111 import org.apache.zookeeper.AsyncCallback;
112 import org.apache.zookeeper.KeeperException;
113 import org.apache.zookeeper.KeeperException.NoNodeException;
114 import org.apache.zookeeper.KeeperException.NodeExistsException;
115 import org.apache.zookeeper.data.Stat;
116
117 import com.google.common.annotations.VisibleForTesting;
118 import com.google.common.collect.LinkedHashMultimap;
119
120
121
122
123
124
125
126
127 @InterfaceAudience.Private
128 public class AssignmentManager extends ZooKeeperListener {
129 private static final Log LOG = LogFactory.getLog(AssignmentManager.class);
130
131 public static final ServerName HBCK_CODE_SERVERNAME = ServerName.valueOf(HConstants.HBCK_CODE_NAME,
132 -1, -1L);
133
134 static final String ALREADY_IN_TRANSITION_WAITTIME
135 = "hbase.assignment.already.intransition.waittime";
136 static final int DEFAULT_ALREADY_IN_TRANSITION_WAITTIME = 60000;
137
138 protected final MasterServices server;
139
140 private ServerManager serverManager;
141
142 private boolean shouldAssignRegionsWithFavoredNodes;
143
144 private LoadBalancer balancer;
145
146 private final MetricsAssignmentManager metricsAssignmentManager;
147
148 private final TableLockManager tableLockManager;
149
150 private AtomicInteger numRegionsOpened = new AtomicInteger(0);
151
152 final private KeyLocker<String> locker = new KeyLocker<String>();
153
154 Set<HRegionInfo> replicasToClose = Collections.synchronizedSet(new HashSet<HRegionInfo>());
155
156
157
158
159
160 private final Map <String, HRegionInfo> regionsToReopen;
161
162
163
164
165
166 private final int maximumAttempts;
167
168
169
170
171 private final Map<String, PairOfSameType<HRegionInfo>> mergingRegions
172 = new HashMap<String, PairOfSameType<HRegionInfo>>();
173
174 private final Map<HRegionInfo, PairOfSameType<HRegionInfo>> splitRegions
175 = new HashMap<HRegionInfo, PairOfSameType<HRegionInfo>>();
176
177
178
179
180
181 private final long sleepTimeBeforeRetryingMetaAssignment;
182
183
184
185
186
187 final NavigableMap<String, RegionPlan> regionPlans =
188 new TreeMap<String, RegionPlan>();
189
190 private final TableStateManager tableStateManager;
191
192 private final ExecutorService executorService;
193
194
195 private Map<HRegionInfo, AtomicBoolean> closedRegionHandlerCalled = null;
196
197
198 private Map<HRegionInfo, AtomicBoolean> openedRegionHandlerCalled = null;
199
200
201 private java.util.concurrent.ExecutorService threadPoolExecutorService;
202
203
204 private final java.util.concurrent.ExecutorService zkEventWorkers;
205
206 private List<EventType> ignoreStatesRSOffline = Arrays.asList(
207 EventType.RS_ZK_REGION_FAILED_OPEN, EventType.RS_ZK_REGION_CLOSED);
208
209 private final RegionStates regionStates;
210
211
212
213
214
215 private final int bulkAssignThresholdRegions;
216 private final int bulkAssignThresholdServers;
217 private final int bulkPerRegionOpenTimeGuesstimate;
218
219
220
221
222 private final boolean bulkAssignWaitTillAllAssigned;
223
224
225
226
227
228
229
230
231
232 protected final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
233
234
235
236
237
238
239
240
241 private final ConcurrentHashMap<String, AtomicInteger>
242 failedOpenTracker = new ConcurrentHashMap<String, AtomicInteger>();
243
244
245 private final boolean useZKForAssignment;
246
247
248
249 private final RegionStateStore regionStateStore;
250
251
252
253
254 private static boolean TEST_SKIP_SPLIT_HANDLING = false;
255 private static boolean TEST_SKIP_MERGE_HANDLING = false;
256
257
258 private List<AssignmentListener> listeners = new CopyOnWriteArrayList<AssignmentListener>();
259
260 private RegionStateListener regionStateListener;
261
262 public enum ServerHostRegion {
263 NOT_HOSTING_REGION, HOSTING_REGION, UNKNOWN,
264 }
265
266
267
268
269
270
271
272
273
274
275
276
277
278 public AssignmentManager(MasterServices server, ServerManager serverManager,
279 final LoadBalancer balancer,
280 final ExecutorService service, MetricsMaster metricsMaster,
281 final TableLockManager tableLockManager) throws KeeperException,
282 IOException, CoordinatedStateException {
283 super(server.getZooKeeper());
284 this.server = server;
285 this.serverManager = serverManager;
286 this.executorService = service;
287 this.regionStateStore = new RegionStateStore(server);
288 this.regionsToReopen = Collections.synchronizedMap
289 (new HashMap<String, HRegionInfo> ());
290 Configuration conf = server.getConfiguration();
291
292 this.shouldAssignRegionsWithFavoredNodes = conf.getClass(
293 HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class).equals(
294 FavoredNodeLoadBalancer.class);
295 try {
296 if (server.getCoordinatedStateManager() != null) {
297 this.tableStateManager = server.getCoordinatedStateManager().getTableStateManager();
298 } else {
299 this.tableStateManager = null;
300 }
301 } catch (InterruptedException e) {
302 throw new InterruptedIOException();
303 }
304
305 this.maximumAttempts = Math.max(1,
306 this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10));
307 this.sleepTimeBeforeRetryingMetaAssignment = this.server.getConfiguration().getLong(
308 "hbase.meta.assignment.retry.sleeptime", 1000l);
309 this.balancer = balancer;
310 int maxThreads = conf.getInt("hbase.assignment.threads.max", 30);
311 this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool(
312 maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("AM."));
313 this.regionStates = new RegionStates(
314 server, tableStateManager, serverManager, regionStateStore);
315
316 this.bulkAssignWaitTillAllAssigned =
317 conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false);
318 this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7);
319 this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3);
320 this.bulkPerRegionOpenTimeGuesstimate =
321 conf.getInt("hbase.bulk.assignment.perregion.open.time", 10000);
322
323 int workers = conf.getInt("hbase.assignment.zkevent.workers", 20);
324 ThreadFactory threadFactory = Threads.newDaemonThreadFactory("AM.ZK.Worker");
325 zkEventWorkers = Threads.getBoundedCachedThreadPool(workers, 60L,
326 TimeUnit.SECONDS, threadFactory);
327 this.tableLockManager = tableLockManager;
328
329 this.metricsAssignmentManager = new MetricsAssignmentManager();
330 useZKForAssignment = ConfigUtil.useZKForAssignment(conf);
331 }
332
333 MetricsAssignmentManager getAssignmentManagerMetrics() {
334 return this.metricsAssignmentManager;
335 }
336
337
338
339
340
341 public void registerListener(final AssignmentListener listener) {
342 this.listeners.add(listener);
343 }
344
345
346
347
348
349 public boolean unregisterListener(final AssignmentListener listener) {
350 return this.listeners.remove(listener);
351 }
352
353
354
355
356 public TableStateManager getTableStateManager() {
357
358
359 return this.tableStateManager;
360 }
361
362
363
364
365
366
367
368 public RegionStates getRegionStates() {
369 return regionStates;
370 }
371
372
373
374
375 @VisibleForTesting
376 RegionStateStore getRegionStateStore() {
377 return regionStateStore;
378 }
379
380 public RegionPlan getRegionReopenPlan(HRegionInfo hri) {
381 return new RegionPlan(hri, null, regionStates.getRegionServerOfRegion(hri));
382 }
383
384
385
386
387
388
389 public void addPlan(String encodedName, RegionPlan plan) {
390 synchronized (regionPlans) {
391 regionPlans.put(encodedName, plan);
392 }
393 }
394
395
396
397
398 public void addPlans(Map<String, RegionPlan> plans) {
399 synchronized (regionPlans) {
400 regionPlans.putAll(plans);
401 }
402 }
403
404
405
406
407
408
409
410
411 public void setRegionsToReopen(List <HRegionInfo> regions) {
412 for(HRegionInfo hri : regions) {
413 regionsToReopen.put(hri.getEncodedName(), hri);
414 }
415 }
416
417
418
419
420
421
422
423
424 public Pair<Integer, Integer> getReopenStatus(TableName tableName)
425 throws IOException {
426 List<HRegionInfo> hris;
427 if (TableName.META_TABLE_NAME.equals(tableName)) {
428 hris = new MetaTableLocator().getMetaRegions(server.getZooKeeper());
429 } else {
430 hris = MetaTableAccessor.getTableRegions(server.getZooKeeper(),
431 server.getConnection(), tableName, true);
432 }
433
434 Integer pending = 0;
435 for (HRegionInfo hri : hris) {
436 String name = hri.getEncodedName();
437
438 if (regionsToReopen.containsKey(name)
439 || regionStates.isRegionInTransition(name)) {
440 pending++;
441 }
442 }
443 return new Pair<Integer, Integer>(pending, hris.size());
444 }
445
446
447
448
449
450
451 public boolean isFailoverCleanupDone() {
452 return failoverCleanupDone.get();
453 }
454
455
456
457
458
459 public Lock acquireRegionLock(final String encodedName) {
460 return locker.acquireLock(encodedName);
461 }
462
463
464
465
466
467 void failoverCleanupDone() {
468 failoverCleanupDone.set(true);
469 serverManager.processQueuedDeadServers();
470 }
471
472
473
474
475
476
477
478
479
480 void joinCluster() throws IOException,
481 KeeperException, InterruptedException, CoordinatedStateException {
482 long startTime = System.currentTimeMillis();
483
484
485
486
487
488
489
490
491
492
493
494 Set<ServerName> deadServers = rebuildUserRegions();
495
496
497
498 boolean failover = processDeadServersAndRegionsInTransition(deadServers);
499
500 if (!useZKForAssignment) {
501
502 ZKUtil.deleteNodeRecursively(watcher, watcher.assignmentZNode);
503 }
504 recoverTableInDisablingState();
505 recoverTableInEnablingState();
506 LOG.info("Joined the cluster in " + (System.currentTimeMillis()
507 - startTime) + "ms, failover=" + failover);
508 }
509
510
511
512
513
514
515
516
517
518
519
520
521 boolean processDeadServersAndRegionsInTransition(final Set<ServerName> deadServers)
522 throws KeeperException, IOException, InterruptedException, CoordinatedStateException {
523 List<String> nodes = ZKUtil.listChildrenNoWatch(watcher, watcher.assignmentZNode);
524
525 if (useZKForAssignment && nodes == null) {
526 String errorMessage = "Failed to get the children from ZK";
527 server.abort(errorMessage, new IOException(errorMessage));
528 return true;
529 }
530
531 boolean failover = !serverManager.getDeadServers().isEmpty();
532 if (failover) {
533
534 if (LOG.isDebugEnabled()) {
535 LOG.debug("Found dead servers out on cluster " + serverManager.getDeadServers());
536 }
537 } else {
538
539 Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
540 for (Map.Entry<HRegionInfo, ServerName> en:
541 regionStates.getRegionAssignments().entrySet()) {
542 HRegionInfo hri = en.getKey();
543 if (!hri.isMetaTable()
544 && onlineServers.contains(en.getValue())) {
545 LOG.debug("Found " + hri + " out on cluster");
546 failover = true;
547 break;
548 }
549 }
550 if (!failover && nodes != null) {
551
552 for (String encodedName: nodes) {
553 RegionState regionState = regionStates.getRegionState(encodedName);
554 if (regionState != null && !regionState.getRegion().isMetaRegion()) {
555 LOG.debug("Found " + regionState + " in RITs");
556 failover = true;
557 break;
558 }
559 }
560 }
561 }
562 if (!failover && !useZKForAssignment) {
563
564 Map<String, RegionState> regionsInTransition = regionStates.getRegionsInTransition();
565 if (!regionsInTransition.isEmpty()) {
566 Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
567 for (RegionState regionState: regionsInTransition.values()) {
568 ServerName serverName = regionState.getServerName();
569 if (!regionState.getRegion().isMetaRegion()
570 && serverName != null && onlineServers.contains(serverName)) {
571 LOG.debug("Found " + regionState + " in RITs");
572 failover = true;
573 break;
574 }
575 }
576 }
577 }
578 if (!failover) {
579
580
581
582
583 Set<ServerName> queuedDeadServers = serverManager.getRequeuedDeadServers().keySet();
584 if (!queuedDeadServers.isEmpty()) {
585 Configuration conf = server.getConfiguration();
586 Path rootdir = FSUtils.getRootDir(conf);
587 FileSystem fs = rootdir.getFileSystem(conf);
588 for (ServerName serverName: queuedDeadServers) {
589
590
591 Path logDir = new Path(rootdir,
592 DefaultWALProvider.getWALDirectoryName(serverName.toString()));
593 Path splitDir = logDir.suffix(DefaultWALProvider.SPLITTING_EXT);
594 if (fs.exists(logDir) || fs.exists(splitDir)) {
595 LOG.debug("Found queued dead server " + serverName);
596 failover = true;
597 break;
598 }
599 }
600 if (!failover) {
601
602
603 LOG.info("AM figured that it's not a failover and cleaned up "
604 + queuedDeadServers.size() + " queued dead servers");
605 serverManager.removeRequeuedDeadServers();
606 }
607 }
608 }
609
610 Set<TableName> disabledOrDisablingOrEnabling = null;
611 Map<HRegionInfo, ServerName> allRegions = null;
612
613 if (!failover) {
614 disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
615 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING,
616 ZooKeeperProtos.Table.State.ENABLING);
617
618
619 allRegions = regionStates.closeAllUserRegions(
620 disabledOrDisablingOrEnabling);
621 }
622
623
624 regionStateStore.start();
625
626
627 if (failover) {
628 LOG.info("Found regions out on cluster or in RIT; presuming failover");
629
630
631 processDeadServersAndRecoverLostRegions(deadServers);
632 }
633
634 if (!failover && useZKForAssignment) {
635
636 ZKAssign.deleteAllNodes(watcher);
637 ZKUtil.listChildrenAndWatchForNewChildren(this.watcher,
638 this.watcher.assignmentZNode);
639 }
640
641
642
643
644
645 failoverCleanupDone();
646 if (!failover) {
647
648 LOG.info("Clean cluster startup. Assigning user regions");
649 assignAllUserRegions(allRegions);
650 }
651
652
653
654 for (HRegionInfo h : replicasToClose) {
655 unassign(h);
656 }
657 replicasToClose.clear();
658 return failover;
659 }
660
661
662
663
664
665
666
667
668
669
670
671
672 boolean processRegionInTransitionAndBlockUntilAssigned(final HRegionInfo hri)
673 throws InterruptedException, KeeperException, IOException {
674 String encodedRegionName = hri.getEncodedName();
675 if (!processRegionInTransition(encodedRegionName, hri)) {
676 return false;
677 }
678 LOG.debug("Waiting on " + HRegionInfo.prettyPrint(encodedRegionName));
679 while (!this.server.isStopped() &&
680 this.regionStates.isRegionInTransition(encodedRegionName)) {
681 RegionState state = this.regionStates.getRegionTransitionState(encodedRegionName);
682 if (state == null || !serverManager.isServerOnline(state.getServerName())) {
683
684
685
686 break;
687 }
688 this.regionStates.waitForUpdate(100);
689 }
690 return true;
691 }
692
693
694
695
696
697
698
699
700
701
702 boolean processRegionInTransition(final String encodedRegionName,
703 final HRegionInfo regionInfo) throws KeeperException, IOException {
704
705
706
707
708 Lock lock = locker.acquireLock(encodedRegionName);
709 try {
710 Stat stat = new Stat();
711 byte [] data = ZKAssign.getDataAndWatch(watcher, encodedRegionName, stat);
712 if (data == null) return false;
713 RegionTransition rt;
714 try {
715 rt = RegionTransition.parseFrom(data);
716 } catch (DeserializationException e) {
717 LOG.warn("Failed parse znode data", e);
718 return false;
719 }
720 HRegionInfo hri = regionInfo;
721 if (hri == null) {
722
723
724
725
726
727 hri = regionStates.getRegionInfo(rt.getRegionName());
728 EventType et = rt.getEventType();
729 if (hri == null && et != EventType.RS_ZK_REGION_MERGING
730 && et != EventType.RS_ZK_REQUEST_REGION_MERGE) {
731 LOG.warn("Couldn't find the region in recovering " + rt);
732 return false;
733 }
734 }
735
736
737
738 BaseCoordinatedStateManager cp =
739 (BaseCoordinatedStateManager) this.server.getCoordinatedStateManager();
740 OpenRegionCoordination openRegionCoordination = cp.getOpenRegionCoordination();
741
742 ZkOpenRegionCoordination.ZkOpenRegionDetails zkOrd =
743 new ZkOpenRegionCoordination.ZkOpenRegionDetails();
744 zkOrd.setVersion(stat.getVersion());
745 zkOrd.setServerName(cp.getServer().getServerName());
746
747 return processRegionsInTransition(
748 rt, hri, openRegionCoordination, zkOrd);
749 } finally {
750 lock.unlock();
751 }
752 }
753
754
755
756
757
758
759
760
761
762 boolean processRegionsInTransition(
763 final RegionTransition rt, final HRegionInfo regionInfo,
764 OpenRegionCoordination coordination,
765 final OpenRegionCoordination.OpenRegionDetails ord) throws KeeperException {
766 EventType et = rt.getEventType();
767
768 final ServerName sn = rt.getServerName();
769 final byte[] regionName = rt.getRegionName();
770 final String encodedName = HRegionInfo.encodeRegionName(regionName);
771 final String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
772 LOG.info("Processing " + prettyPrintedRegionName + " in state: " + et);
773
774 if (regionStates.isRegionInTransition(encodedName)
775 && (regionInfo.isMetaRegion() || !useZKForAssignment)) {
776 LOG.info("Processed region " + prettyPrintedRegionName + " in state: "
777 + et + ", does nothing since the region is already in transition "
778 + regionStates.getRegionTransitionState(encodedName));
779
780 return true;
781 }
782 if (!serverManager.isServerOnline(sn)) {
783
784
785
786 LOG.debug("RIT " + encodedName + " in state=" + rt.getEventType() +
787 " was on deadserver; forcing offline");
788 if (regionStates.isRegionOnline(regionInfo)) {
789
790
791
792 regionStates.regionOffline(regionInfo);
793 sendRegionClosedNotification(regionInfo);
794 }
795
796 regionStates.updateRegionState(regionInfo, State.OFFLINE, sn);
797
798 if (regionInfo.isMetaRegion()) {
799
800
801 MetaTableLocator.setMetaLocation(watcher, sn, State.OPEN);
802 } else {
803
804
805 regionStates.setLastRegionServerOfRegion(sn, encodedName);
806
807 if (!serverManager.isServerDead(sn)) {
808 serverManager.expireServer(sn);
809 }
810 }
811 return false;
812 }
813 switch (et) {
814 case M_ZK_REGION_CLOSING:
815
816
817 final RegionState rsClosing = regionStates.updateRegionState(rt, State.CLOSING);
818 this.executorService.submit(
819 new EventHandler(server, EventType.M_MASTER_RECOVERY) {
820 @Override
821 public void process() throws IOException {
822 ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
823 try {
824 final int expectedVersion = ((ZkOpenRegionCoordination.ZkOpenRegionDetails) ord)
825 .getVersion();
826 unassign(regionInfo, rsClosing, expectedVersion, null, useZKForAssignment, null);
827 if (regionStates.isRegionOffline(regionInfo)) {
828 assign(regionInfo, true);
829 }
830 } finally {
831 lock.unlock();
832 }
833 }
834 });
835 break;
836
837 case RS_ZK_REGION_CLOSED:
838 case RS_ZK_REGION_FAILED_OPEN:
839
840 regionStates.setRegionStateTOCLOSED(regionInfo, sn);
841 if (!replicasToClose.contains(regionInfo)) {
842 invokeAssign(regionInfo);
843 } else {
844 offlineDisabledRegion(regionInfo);
845 }
846 break;
847
848 case M_ZK_REGION_OFFLINE:
849
850 regionStates.updateRegionState(rt, State.OFFLINE);
851 final RegionState rsOffline = regionStates.getRegionState(regionInfo);
852 this.executorService.submit(
853 new EventHandler(server, EventType.M_MASTER_RECOVERY) {
854 @Override
855 public void process() throws IOException {
856 ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
857 try {
858 RegionPlan plan = new RegionPlan(regionInfo, null, sn);
859 addPlan(encodedName, plan);
860 assign(rsOffline, true, false);
861 } finally {
862 lock.unlock();
863 }
864 }
865 });
866 break;
867
868 case RS_ZK_REGION_OPENING:
869 regionStates.updateRegionState(rt, State.OPENING);
870 break;
871
872 case RS_ZK_REGION_OPENED:
873
874
875
876 regionStates.updateRegionState(rt, State.OPEN);
877 new OpenedRegionHandler(server, this, regionInfo, coordination, ord).process();
878 break;
879 case RS_ZK_REQUEST_REGION_SPLIT:
880 case RS_ZK_REGION_SPLITTING:
881 case RS_ZK_REGION_SPLIT:
882
883
884
885 regionStates.regionOnline(regionInfo, sn);
886 regionStates.updateRegionState(rt, State.SPLITTING);
887 if (!handleRegionSplitting(
888 rt, encodedName, prettyPrintedRegionName, sn)) {
889 deleteSplittingNode(encodedName, sn);
890 }
891 break;
892 case RS_ZK_REQUEST_REGION_MERGE:
893 case RS_ZK_REGION_MERGING:
894 case RS_ZK_REGION_MERGED:
895 if (!handleRegionMerging(
896 rt, encodedName, prettyPrintedRegionName, sn)) {
897 deleteMergingNode(encodedName, sn);
898 }
899 break;
900 default:
901 throw new IllegalStateException("Received region in state:" + et + " is not valid.");
902 }
903 LOG.info("Processed region " + prettyPrintedRegionName + " in state "
904 + et + ", on " + (serverManager.isServerOnline(sn) ? "" : "dead ")
905 + "server: " + sn);
906 return true;
907 }
908
909
910
911
912
913 public void removeClosedRegion(HRegionInfo hri) {
914 if (regionsToReopen.remove(hri.getEncodedName()) != null) {
915 LOG.debug("Removed region from reopening regions because it was closed");
916 }
917 }
918
919
920
921
922
923
924
925
926
927
928
929
930 @edu.umd.cs.findbugs.annotations.SuppressWarnings(
931 value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
932 justification="Needs work; says access to ConcurrentHashMaps not ATOMIC!!!")
933 void handleRegion(final RegionTransition rt, OpenRegionCoordination coordination,
934 OpenRegionCoordination.OpenRegionDetails ord) {
935 if (rt == null) {
936 LOG.warn("Unexpected NULL input for RegionTransition rt");
937 return;
938 }
939 final ServerName sn = rt.getServerName();
940
941 if (sn.equals(HBCK_CODE_SERVERNAME)) {
942 handleHBCK(rt);
943 return;
944 }
945 final long createTime = rt.getCreateTime();
946 final byte[] regionName = rt.getRegionName();
947 String encodedName = HRegionInfo.encodeRegionName(regionName);
948 String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
949
950 if (!serverManager.isServerOnline(sn)
951 && !ignoreStatesRSOffline.contains(rt.getEventType())) {
952 LOG.warn("Attempted to handle region transition for server but " +
953 "it is not online: " + prettyPrintedRegionName + ", " + rt);
954 return;
955 }
956
957 RegionState regionState =
958 regionStates.getRegionState(encodedName);
959 long startTime = System.currentTimeMillis();
960 if (LOG.isDebugEnabled()) {
961 boolean lateEvent = createTime < (startTime - 15000);
962 LOG.debug("Handling " + rt.getEventType() +
963 ", server=" + sn + ", region=" +
964 (prettyPrintedRegionName == null ? "null" : prettyPrintedRegionName) +
965 (lateEvent ? ", which is more than 15 seconds late" : "") +
966 ", current_state=" + regionState);
967 }
968
969
970 if (rt.getEventType() == EventType.M_ZK_REGION_OFFLINE) {
971 return;
972 }
973
974
975 Lock lock = locker.acquireLock(encodedName);
976 try {
977 RegionState latestState =
978 regionStates.getRegionState(encodedName);
979 if ((regionState == null && latestState != null)
980 || (regionState != null && latestState == null)
981 || (regionState != null && latestState != null
982 && latestState.getState() != regionState.getState())) {
983 LOG.warn("Region state changed from " + regionState + " to "
984 + latestState + ", while acquiring lock");
985 }
986 long waitedTime = System.currentTimeMillis() - startTime;
987 if (waitedTime > 5000) {
988 LOG.warn("Took " + waitedTime + "ms to acquire the lock");
989 }
990 regionState = latestState;
991 switch (rt.getEventType()) {
992 case RS_ZK_REQUEST_REGION_SPLIT:
993 case RS_ZK_REGION_SPLITTING:
994 case RS_ZK_REGION_SPLIT:
995 if (!handleRegionSplitting(
996 rt, encodedName, prettyPrintedRegionName, sn)) {
997 deleteSplittingNode(encodedName, sn);
998 }
999 break;
1000
1001 case RS_ZK_REQUEST_REGION_MERGE:
1002 case RS_ZK_REGION_MERGING:
1003 case RS_ZK_REGION_MERGED:
1004
1005
1006 if (!handleRegionMerging(
1007 rt, encodedName, prettyPrintedRegionName, sn)) {
1008 deleteMergingNode(encodedName, sn);
1009 }
1010 break;
1011
1012 case M_ZK_REGION_CLOSING:
1013
1014
1015 if (regionState == null
1016 || !regionState.isPendingCloseOrClosingOnServer(sn)) {
1017 LOG.warn("Received CLOSING for " + prettyPrintedRegionName
1018 + " from " + sn + " but the region isn't PENDING_CLOSE/CLOSING here: "
1019 + regionStates.getRegionState(encodedName));
1020 return;
1021 }
1022
1023 regionStates.updateRegionState(rt, State.CLOSING);
1024 break;
1025
1026 case RS_ZK_REGION_CLOSED:
1027
1028 if (regionState == null
1029 || !regionState.isPendingCloseOrClosingOnServer(sn)) {
1030 LOG.warn("Received CLOSED for " + prettyPrintedRegionName
1031 + " from " + sn + " but the region isn't PENDING_CLOSE/CLOSING here: "
1032 + regionStates.getRegionState(encodedName));
1033 return;
1034 }
1035
1036
1037
1038 new ClosedRegionHandler(server, this, regionState.getRegion()).process();
1039 updateClosedRegionHandlerTracker(regionState.getRegion());
1040 break;
1041
1042 case RS_ZK_REGION_FAILED_OPEN:
1043 if (regionState == null
1044 || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
1045 LOG.warn("Received FAILED_OPEN for " + prettyPrintedRegionName
1046 + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
1047 + regionStates.getRegionState(encodedName));
1048 return;
1049 }
1050 AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
1051 if (failedOpenCount == null) {
1052 failedOpenCount = new AtomicInteger();
1053
1054
1055
1056
1057 failedOpenTracker.put(encodedName, failedOpenCount);
1058 }
1059 if (failedOpenCount.incrementAndGet() >= maximumAttempts) {
1060
1061 regionStates.updateRegionState(rt, State.FAILED_OPEN);
1062
1063
1064 failedOpenTracker.remove(encodedName);
1065 } else {
1066
1067 regionState = regionStates.setRegionStateTOCLOSED(rt.getRegionName(), sn);
1068 if (regionState != null) {
1069
1070
1071 try {
1072 getRegionPlan(regionState.getRegion(), sn, true);
1073 new ClosedRegionHandler(server, this, regionState.getRegion()).process();
1074 } catch (HBaseIOException e) {
1075 LOG.warn("Failed to get region plan", e);
1076 }
1077 }
1078 }
1079 break;
1080
1081 case RS_ZK_REGION_OPENING:
1082
1083
1084 if (regionState == null
1085 || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
1086 LOG.warn("Received OPENING for " + prettyPrintedRegionName
1087 + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
1088 + regionStates.getRegionState(encodedName));
1089 return;
1090 }
1091
1092 regionStates.updateRegionState(rt, State.OPENING);
1093 break;
1094
1095 case RS_ZK_REGION_OPENED:
1096
1097 if (regionState == null
1098 || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
1099 LOG.warn("Received OPENED for " + prettyPrintedRegionName
1100 + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
1101 + regionStates.getRegionState(encodedName));
1102
1103 if (regionState != null) {
1104 if(regionState.isOpened() && regionState.getServerName().equals(sn)) {
1105
1106
1107 failedOpenTracker.remove(encodedName);
1108 new OpenedRegionHandler(
1109 server, this, regionState.getRegion(), coordination, ord).process();
1110 updateOpenedRegionHandlerTracker(regionState.getRegion());
1111 } else {
1112
1113
1114
1115 unassign(regionState.getRegion(), null, -1, null, false, sn);
1116 }
1117 }
1118 return;
1119 }
1120
1121 regionState =
1122 regionStates.transitionOpenFromPendingOpenOrOpeningOnServer(rt,regionState, sn);
1123 if (regionState != null) {
1124 failedOpenTracker.remove(encodedName);
1125 new OpenedRegionHandler(
1126 server, this, regionState.getRegion(), coordination, ord).process();
1127 updateOpenedRegionHandlerTracker(regionState.getRegion());
1128 }
1129 break;
1130
1131 default:
1132 throw new IllegalStateException("Received event is not valid.");
1133 }
1134 } finally {
1135 lock.unlock();
1136 }
1137 }
1138
1139
1140 boolean wasClosedHandlerCalled(HRegionInfo hri) {
1141 AtomicBoolean b = closedRegionHandlerCalled.get(hri);
1142
1143
1144
1145 return b == null ? false : b.compareAndSet(true, false);
1146 }
1147
1148
1149 boolean wasOpenedHandlerCalled(HRegionInfo hri) {
1150 AtomicBoolean b = openedRegionHandlerCalled.get(hri);
1151
1152
1153
1154 return b == null ? false : b.compareAndSet(true, false);
1155 }
1156
1157
1158 void initializeHandlerTrackers() {
1159 closedRegionHandlerCalled = new HashMap<HRegionInfo, AtomicBoolean>();
1160 openedRegionHandlerCalled = new HashMap<HRegionInfo, AtomicBoolean>();
1161 }
1162
1163 void updateClosedRegionHandlerTracker(HRegionInfo hri) {
1164 if (closedRegionHandlerCalled != null) {
1165 closedRegionHandlerCalled.put(hri, new AtomicBoolean(true));
1166 }
1167 }
1168
1169 void updateOpenedRegionHandlerTracker(HRegionInfo hri) {
1170 if (openedRegionHandlerCalled != null) {
1171 openedRegionHandlerCalled.put(hri, new AtomicBoolean(true));
1172 }
1173 }
1174
1175
1176
1177
1178
1179
1180 void processFavoredNodes(List<HRegionInfo> regions) throws IOException {
1181 if (!shouldAssignRegionsWithFavoredNodes) return;
1182
1183
1184 Map<HRegionInfo, List<ServerName>> regionToFavoredNodes =
1185 new HashMap<HRegionInfo, List<ServerName>>();
1186 for (HRegionInfo region : regions) {
1187 regionToFavoredNodes.put(region,
1188 ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region));
1189 }
1190 FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(regionToFavoredNodes,
1191 this.server.getConnection());
1192 }
1193
1194
1195
1196
1197
1198
1199
1200 @SuppressWarnings("deprecation")
1201 private void handleHBCK(RegionTransition rt) {
1202 String encodedName = HRegionInfo.encodeRegionName(rt.getRegionName());
1203 LOG.info("Handling HBCK triggered transition=" + rt.getEventType() +
1204 ", server=" + rt.getServerName() + ", region=" +
1205 HRegionInfo.prettyPrint(encodedName));
1206 RegionState regionState = regionStates.getRegionTransitionState(encodedName);
1207 switch (rt.getEventType()) {
1208 case M_ZK_REGION_OFFLINE:
1209 HRegionInfo regionInfo;
1210 if (regionState != null) {
1211 regionInfo = regionState.getRegion();
1212 } else {
1213 try {
1214 byte [] name = rt.getRegionName();
1215 Pair<HRegionInfo, ServerName> p = MetaTableAccessor.getRegion(
1216 this.server.getConnection(), name);
1217 regionInfo = p.getFirst();
1218 } catch (IOException e) {
1219 LOG.info("Exception reading hbase:meta doing HBCK repair operation", e);
1220 return;
1221 }
1222 }
1223 LOG.info("HBCK repair is triggering assignment of region=" +
1224 regionInfo.getRegionNameAsString());
1225
1226 assign(regionInfo, false);
1227 break;
1228
1229 default:
1230 LOG.warn("Received unexpected region state from HBCK: " + rt.toString());
1231 break;
1232 }
1233
1234 }
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250 @Override
1251 public void nodeCreated(String path) {
1252 handleAssignmentEvent(path);
1253 }
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267 @Override
1268 public void nodeDataChanged(String path) {
1269 handleAssignmentEvent(path);
1270 }
1271
1272
1273
1274
1275
1276 private final Set<String> regionsInProgress = new HashSet<String>();
1277
1278
1279 private final LinkedHashMultimap <String, RegionRunnable>
1280 zkEventWorkerWaitingList = LinkedHashMultimap.create();
1281
1282
1283
1284
1285 private interface RegionRunnable extends Runnable{
1286
1287
1288
1289 String getRegionName();
1290 }
1291
1292
1293
1294
1295
1296 protected void zkEventWorkersSubmit(final RegionRunnable regRunnable) {
1297
1298 synchronized (regionsInProgress) {
1299
1300
1301 if (regionsInProgress.contains(regRunnable.getRegionName())) {
1302 synchronized (zkEventWorkerWaitingList){
1303 zkEventWorkerWaitingList.put(regRunnable.getRegionName(), regRunnable);
1304 }
1305 return;
1306 }
1307
1308
1309 regionsInProgress.add(regRunnable.getRegionName());
1310 zkEventWorkers.submit(new Runnable() {
1311 @Override
1312 public void run() {
1313 try {
1314 regRunnable.run();
1315 } finally {
1316
1317
1318 synchronized (regionsInProgress) {
1319 regionsInProgress.remove(regRunnable.getRegionName());
1320 synchronized (zkEventWorkerWaitingList) {
1321 java.util.Set<RegionRunnable> waiting = zkEventWorkerWaitingList.get(
1322 regRunnable.getRegionName());
1323 if (!waiting.isEmpty()) {
1324
1325 RegionRunnable toSubmit = waiting.iterator().next();
1326 zkEventWorkerWaitingList.remove(toSubmit.getRegionName(), toSubmit);
1327 zkEventWorkersSubmit(toSubmit);
1328 }
1329 }
1330 }
1331 }
1332 }
1333 });
1334 }
1335 }
1336
1337 @Override
1338 public void nodeDeleted(final String path) {
1339 if (path.startsWith(watcher.assignmentZNode)) {
1340 final String regionName = ZKAssign.getRegionName(watcher, path);
1341 zkEventWorkersSubmit(new RegionRunnable() {
1342 @Override
1343 public String getRegionName() {
1344 return regionName;
1345 }
1346
1347 @Override
1348 public void run() {
1349 Lock lock = locker.acquireLock(regionName);
1350 try {
1351 RegionState rs = regionStates.getRegionTransitionState(regionName);
1352 if (rs == null) {
1353 rs = regionStates.getRegionState(regionName);
1354 if (rs == null || !rs.isMergingNew()) {
1355
1356 return;
1357 }
1358 }
1359
1360 HRegionInfo regionInfo = rs.getRegion();
1361 String regionNameStr = regionInfo.getRegionNameAsString();
1362 LOG.debug("Znode " + regionNameStr + " deleted, state: " + rs);
1363
1364 boolean disabled = getTableStateManager().isTableState(regionInfo.getTable(),
1365 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING);
1366
1367 ServerName serverName = rs.getServerName();
1368 if (serverManager.isServerOnline(serverName)) {
1369 if (rs.isOnServer(serverName) && (rs.isOpened() || rs.isSplitting())) {
1370 synchronized (regionStates) {
1371 regionOnline(regionInfo, serverName);
1372 if (rs.isSplitting() && splitRegions.containsKey(regionInfo)) {
1373
1374
1375 HRegionInfo hri_a = splitRegions.get(regionInfo).getFirst();
1376 HRegionInfo hri_b = splitRegions.get(regionInfo).getSecond();
1377 if (!regionStates.isRegionInTransition(hri_a.getEncodedName())) {
1378 LOG.warn("Split daughter region not in transition " + hri_a);
1379 }
1380 if (!regionStates.isRegionInTransition(hri_b.getEncodedName())) {
1381 LOG.warn("Split daughter region not in transition" + hri_b);
1382 }
1383 regionOffline(hri_a);
1384 regionOffline(hri_b);
1385 splitRegions.remove(regionInfo);
1386 }
1387 if (disabled) {
1388
1389 LOG.info("Opened " + regionNameStr
1390 + "but this table is disabled, triggering close of region");
1391 unassign(regionInfo);
1392 }
1393 }
1394 } else if (rs.isMergingNew()) {
1395 synchronized (regionStates) {
1396 String p = regionInfo.getEncodedName();
1397 PairOfSameType<HRegionInfo> regions = mergingRegions.get(p);
1398 if (regions != null) {
1399 onlineMergingRegion(disabled, regions.getFirst(), serverName);
1400 onlineMergingRegion(disabled, regions.getSecond(), serverName);
1401 }
1402 }
1403 }
1404 }
1405 } finally {
1406 lock.unlock();
1407 }
1408 }
1409
1410 private void onlineMergingRegion(boolean disabled,
1411 final HRegionInfo hri, final ServerName serverName) {
1412 RegionState regionState = regionStates.getRegionState(hri);
1413 if (regionState != null && regionState.isMerging()
1414 && regionState.isOnServer(serverName)) {
1415 regionOnline(regionState.getRegion(), serverName);
1416 if (disabled) {
1417 unassign(hri);
1418 }
1419 }
1420 }
1421 });
1422 }
1423 }
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437 @Override
1438 public void nodeChildrenChanged(String path) {
1439 if (path.equals(watcher.assignmentZNode)) {
1440 zkEventWorkers.submit(new Runnable() {
1441 @Override
1442 public void run() {
1443 try {
1444
1445 List<String> children =
1446 ZKUtil.listChildrenAndWatchForNewChildren(
1447 watcher, watcher.assignmentZNode);
1448 if (children != null) {
1449 Stat stat = new Stat();
1450 for (String child : children) {
1451
1452
1453
1454 if (!regionStates.isRegionInTransition(child)) {
1455 ZKAssign.getDataAndWatch(watcher, child, stat);
1456 }
1457 }
1458 }
1459 } catch (KeeperException e) {
1460 server.abort("Unexpected ZK exception reading unassigned children", e);
1461 }
1462 }
1463 });
1464 }
1465 }
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476 void regionOnline(HRegionInfo regionInfo, ServerName sn) {
1477 regionOnline(regionInfo, sn, HConstants.NO_SEQNUM);
1478 }
1479
1480 void regionOnline(HRegionInfo regionInfo, ServerName sn, long openSeqNum) {
1481 numRegionsOpened.incrementAndGet();
1482 regionStates.regionOnline(regionInfo, sn, openSeqNum);
1483
1484
1485 clearRegionPlan(regionInfo);
1486 balancer.regionOnline(regionInfo, sn);
1487
1488
1489 sendRegionOpenedNotification(regionInfo, sn);
1490 }
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500 private void handleAssignmentEvent(final String path) {
1501 if (path.startsWith(watcher.assignmentZNode)) {
1502 final String regionName = ZKAssign.getRegionName(watcher, path);
1503
1504 zkEventWorkersSubmit(new RegionRunnable() {
1505 @Override
1506 public String getRegionName() {
1507 return regionName;
1508 }
1509
1510 @Override
1511 public void run() {
1512 try {
1513 Stat stat = new Stat();
1514 byte [] data = ZKAssign.getDataAndWatch(watcher, path, stat);
1515 if (data == null) return;
1516
1517 RegionTransition rt = RegionTransition.parseFrom(data);
1518
1519
1520
1521 BaseCoordinatedStateManager csm =
1522 (BaseCoordinatedStateManager) server.getCoordinatedStateManager();
1523 OpenRegionCoordination openRegionCoordination = csm.getOpenRegionCoordination();
1524
1525 ZkOpenRegionCoordination.ZkOpenRegionDetails zkOrd =
1526 new ZkOpenRegionCoordination.ZkOpenRegionDetails();
1527 zkOrd.setVersion(stat.getVersion());
1528 zkOrd.setServerName(csm.getServer().getServerName());
1529
1530 handleRegion(rt, openRegionCoordination, zkOrd);
1531 } catch (KeeperException e) {
1532 server.abort("Unexpected ZK exception reading unassigned node data", e);
1533 } catch (DeserializationException e) {
1534 server.abort("Unexpected exception deserializing node data", e);
1535 }
1536 }
1537 });
1538 }
1539 }
1540
1541
1542
1543
1544
1545
1546
1547
1548 public void regionOffline(final HRegionInfo regionInfo) {
1549 if (regionStates.isRegionInState(regionInfo, State.MERGED, State.SPLIT)) {
1550 LOG.info("Try to offline region " + regionInfo.getEncodedName() +
1551 ", which is at state " + regionStates.getRegionState(regionInfo).getState() + ", skip");
1552 return;
1553 }
1554 regionOffline(regionInfo, null);
1555 }
1556
1557 public void offlineDisabledRegion(HRegionInfo regionInfo) {
1558 if (useZKForAssignment) {
1559
1560 LOG.debug("Table being disabled so deleting ZK node and removing from " +
1561 "regions in transition, skipping assignment of region " +
1562 regionInfo.getRegionNameAsString());
1563 String encodedName = regionInfo.getEncodedName();
1564 deleteNodeInStates(encodedName, "closed", null,
1565 EventType.RS_ZK_REGION_CLOSED, EventType.M_ZK_REGION_OFFLINE);
1566 }
1567 replicasToClose.remove(regionInfo);
1568 regionOffline(regionInfo);
1569 }
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591 public void assign(HRegionInfo region, boolean setOfflineInZK) {
1592 assign(region, setOfflineInZK, false);
1593 }
1594
1595
1596
1597
1598 @VisibleForTesting
1599 public void assign(HRegionInfo region,
1600 boolean setOfflineInZK, boolean forceNewPlan) {
1601 if (isDisabledorDisablingRegionInRIT(region)) {
1602 return;
1603 }
1604 String encodedName = region.getEncodedName();
1605 Lock lock = locker.acquireLock(encodedName);
1606 try {
1607 RegionState state = forceRegionStateToOffline(region, forceNewPlan);
1608 if (state != null) {
1609 if (regionStates.wasRegionOnDeadServer(encodedName)) {
1610 LOG.info("Skip assigning " + region.getRegionNameAsString()
1611 + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
1612 + " is dead but not processed yet");
1613 return;
1614 }
1615 assign(state, setOfflineInZK && useZKForAssignment, forceNewPlan);
1616 }
1617 } finally {
1618 lock.unlock();
1619 }
1620 }
1621
1622
1623
1624
1625
1626
1627
1628 boolean assign(final ServerName destination, final List<HRegionInfo> regions)
1629 throws InterruptedException {
1630 long startTime = EnvironmentEdgeManager.currentTime();
1631 try {
1632 int regionCount = regions.size();
1633 if (regionCount == 0) {
1634 return true;
1635 }
1636 LOG.info("Assigning " + regionCount + " region(s) to " + destination.toString());
1637 Set<String> encodedNames = new HashSet<String>(regionCount);
1638 for (HRegionInfo region : regions) {
1639 encodedNames.add(region.getEncodedName());
1640 }
1641
1642 List<HRegionInfo> failedToOpenRegions = new ArrayList<HRegionInfo>();
1643 Map<String, Lock> locks = locker.acquireLocks(encodedNames);
1644 try {
1645 AtomicInteger counter = new AtomicInteger(0);
1646 Map<String, Integer> offlineNodesVersions = new ConcurrentHashMap<String, Integer>();
1647 OfflineCallback cb = new OfflineCallback(
1648 watcher, destination, counter, offlineNodesVersions);
1649 Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>(regions.size());
1650 List<RegionState> states = new ArrayList<RegionState>(regions.size());
1651 for (HRegionInfo region : regions) {
1652 String encodedName = region.getEncodedName();
1653 if (!isDisabledorDisablingRegionInRIT(region)) {
1654 RegionState state = forceRegionStateToOffline(region, false);
1655 boolean onDeadServer = false;
1656 if (state != null) {
1657 if (regionStates.wasRegionOnDeadServer(encodedName)) {
1658 LOG.info("Skip assigning " + region.getRegionNameAsString()
1659 + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
1660 + " is dead but not processed yet");
1661 onDeadServer = true;
1662 } else if (!useZKForAssignment
1663 || asyncSetOfflineInZooKeeper(state, cb, destination)) {
1664 RegionPlan plan = new RegionPlan(region, state.getServerName(), destination);
1665 plans.put(encodedName, plan);
1666 states.add(state);
1667 continue;
1668 }
1669 }
1670
1671 if (!onDeadServer) {
1672 LOG.info("failed to force region state to offline or "
1673 + "failed to set it offline in ZK, will reassign later: " + region);
1674 failedToOpenRegions.add(region);
1675 }
1676 }
1677
1678
1679 Lock lock = locks.remove(encodedName);
1680 lock.unlock();
1681 }
1682
1683 if (useZKForAssignment) {
1684
1685 int total = states.size();
1686 for (int oldCounter = 0; !server.isStopped();) {
1687 int count = counter.get();
1688 if (oldCounter != count) {
1689 LOG.debug(destination.toString() + " unassigned znodes=" + count +
1690 " of total=" + total + "; oldCounter=" + oldCounter);
1691 oldCounter = count;
1692 }
1693 if (count >= total) break;
1694 Thread.sleep(5);
1695 }
1696 }
1697
1698 if (server.isStopped()) {
1699 return false;
1700 }
1701
1702
1703
1704 this.addPlans(plans);
1705
1706 List<Triple<HRegionInfo, Integer, List<ServerName>>> regionOpenInfos =
1707 new ArrayList<Triple<HRegionInfo, Integer, List<ServerName>>>(states.size());
1708 for (RegionState state: states) {
1709 HRegionInfo region = state.getRegion();
1710 String encodedRegionName = region.getEncodedName();
1711 Integer nodeVersion = offlineNodesVersions.get(encodedRegionName);
1712 if (useZKForAssignment && (nodeVersion == null || nodeVersion == -1)) {
1713 LOG.warn("failed to offline in zookeeper: " + region);
1714 failedToOpenRegions.add(region);
1715 Lock lock = locks.remove(encodedRegionName);
1716 lock.unlock();
1717 } else {
1718 regionStates.updateRegionState(
1719 region, State.PENDING_OPEN, destination);
1720 List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
1721 if (this.shouldAssignRegionsWithFavoredNodes) {
1722 favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
1723 }
1724 regionOpenInfos.add(new Triple<HRegionInfo, Integer, List<ServerName>>(
1725 region, nodeVersion, favoredNodes));
1726 }
1727 }
1728
1729
1730 try {
1731
1732
1733 long maxWaitTime = System.currentTimeMillis() +
1734 this.server.getConfiguration().
1735 getLong("hbase.regionserver.rpc.startup.waittime", 60000);
1736 for (int i = 1; i <= maximumAttempts && !server.isStopped(); i++) {
1737 try {
1738
1739 if (regionOpenInfos.isEmpty()) {
1740 break;
1741 }
1742 List<RegionOpeningState> regionOpeningStateList = serverManager
1743 .sendRegionOpen(destination, regionOpenInfos);
1744 if (regionOpeningStateList == null) {
1745
1746 return false;
1747 }
1748 for (int k = 0, n = regionOpeningStateList.size(); k < n; k++) {
1749 RegionOpeningState openingState = regionOpeningStateList.get(k);
1750 if (openingState != RegionOpeningState.OPENED) {
1751 HRegionInfo region = regionOpenInfos.get(k).getFirst();
1752 if (openingState == RegionOpeningState.ALREADY_OPENED) {
1753 processAlreadyOpenedRegion(region, destination);
1754 } else if (openingState == RegionOpeningState.FAILED_OPENING) {
1755
1756 failedToOpenRegions.add(region);
1757 } else {
1758 LOG.warn("THIS SHOULD NOT HAPPEN: unknown opening state "
1759 + openingState + " in assigning region " + region);
1760 }
1761 }
1762 }
1763 break;
1764 } catch (IOException e) {
1765 if (e instanceof RemoteException) {
1766 e = ((RemoteException)e).unwrapRemoteException();
1767 }
1768 if (e instanceof RegionServerStoppedException) {
1769 LOG.warn("The region server was shut down, ", e);
1770
1771 return false;
1772 } else if (e instanceof ServerNotRunningYetException) {
1773 long now = System.currentTimeMillis();
1774 if (now < maxWaitTime) {
1775 LOG.debug("Server is not yet up; waiting up to " +
1776 (maxWaitTime - now) + "ms", e);
1777 Thread.sleep(100);
1778 i--;
1779 continue;
1780 }
1781 } else if (e instanceof java.net.SocketTimeoutException
1782 && this.serverManager.isServerOnline(destination)) {
1783
1784
1785
1786
1787 if (LOG.isDebugEnabled()) {
1788 LOG.debug("Bulk assigner openRegion() to " + destination
1789 + " has timed out, but the regions might"
1790 + " already be opened on it.", e);
1791 }
1792
1793 Thread.sleep(100);
1794 i--;
1795 continue;
1796 }
1797 throw e;
1798 }
1799 }
1800 } catch (IOException e) {
1801
1802 LOG.info("Unable to communicate with " + destination
1803 + " in order to assign regions, ", e);
1804 return false;
1805 }
1806 } finally {
1807 for (Lock lock : locks.values()) {
1808 lock.unlock();
1809 }
1810 }
1811
1812 if (!failedToOpenRegions.isEmpty()) {
1813 for (HRegionInfo region : failedToOpenRegions) {
1814 if (!regionStates.isRegionOnline(region)) {
1815 invokeAssign(region);
1816 }
1817 }
1818 }
1819
1820
1821 ArrayList<HRegionInfo> userRegionSet = new ArrayList<HRegionInfo>(regions.size());
1822 for (HRegionInfo region: regions) {
1823 if (!region.getTable().isSystemTable()) {
1824 userRegionSet.add(region);
1825 }
1826 }
1827 if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
1828 System.currentTimeMillis())) {
1829 LOG.debug("some user regions are still in transition: " + userRegionSet);
1830 }
1831 LOG.debug("Bulk assigning done for " + destination);
1832 return true;
1833 } finally {
1834 metricsAssignmentManager.updateBulkAssignTime(EnvironmentEdgeManager.currentTime() - startTime);
1835 }
1836 }
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848 private void unassign(final HRegionInfo region,
1849 final RegionState state, final int versionOfClosingNode,
1850 final ServerName dest, final boolean transitionInZK,
1851 final ServerName src) {
1852 ServerName server = src;
1853 if (state != null) {
1854 server = state.getServerName();
1855 }
1856 long maxWaitTime = -1;
1857 for (int i = 1; i <= this.maximumAttempts; i++) {
1858 if (this.server.isStopped() || this.server.isAborted()) {
1859 LOG.debug("Server stopped/aborted; skipping unassign of " + region);
1860 return;
1861 }
1862
1863 if (!serverManager.isServerOnline(server)) {
1864 LOG.debug("Offline " + region.getRegionNameAsString()
1865 + ", no need to unassign since it's on a dead server: " + server);
1866 if (transitionInZK) {
1867
1868 deleteClosingOrClosedNode(region, server);
1869 }
1870 if (state != null) {
1871 regionOffline(region);
1872 }
1873 return;
1874 }
1875 try {
1876
1877 if (serverManager.sendRegionClose(server, region,
1878 versionOfClosingNode, dest, transitionInZK)) {
1879 LOG.debug("Sent CLOSE to " + server + " for region " +
1880 region.getRegionNameAsString());
1881 if (useZKForAssignment && !transitionInZK && state != null) {
1882
1883
1884 unassign(region, state, versionOfClosingNode,
1885 dest, transitionInZK, src);
1886 }
1887 return;
1888 }
1889
1890
1891 LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
1892 region.getRegionNameAsString());
1893 } catch (Throwable t) {
1894 long sleepTime = 0;
1895 Configuration conf = this.server.getConfiguration();
1896 if (t instanceof RemoteException) {
1897 t = ((RemoteException)t).unwrapRemoteException();
1898 }
1899 boolean logRetries = true;
1900 if (t instanceof RegionServerAbortedException
1901 || t instanceof RegionServerStoppedException
1902 || t instanceof ServerNotRunningYetException) {
1903
1904
1905 sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1906 RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
1907
1908 } else if (t instanceof NotServingRegionException) {
1909 LOG.debug("Offline " + region.getRegionNameAsString()
1910 + ", it's not any more on " + server, t);
1911 if (transitionInZK) {
1912 deleteClosingOrClosedNode(region, server);
1913 }
1914 if (state != null) {
1915 regionOffline(region);
1916 }
1917 return;
1918 } else if ((t instanceof FailedServerException) || (state != null &&
1919 t instanceof RegionAlreadyInTransitionException)) {
1920 if(t instanceof FailedServerException) {
1921 sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1922 RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
1923 } else {
1924
1925 LOG.debug("update " + state + " the timestamp.");
1926 state.updateTimestampToNow();
1927 if (maxWaitTime < 0) {
1928 maxWaitTime =
1929 EnvironmentEdgeManager.currentTime()
1930 + conf.getLong(ALREADY_IN_TRANSITION_WAITTIME,
1931 DEFAULT_ALREADY_IN_TRANSITION_WAITTIME);
1932 }
1933 long now = EnvironmentEdgeManager.currentTime();
1934 if (now < maxWaitTime) {
1935 LOG.debug("Region is already in transition; "
1936 + "waiting up to " + (maxWaitTime - now) + "ms", t);
1937 sleepTime = 100;
1938 i--;
1939 logRetries = false;
1940 }
1941 }
1942 }
1943
1944 try {
1945 if (sleepTime > 0) {
1946 Thread.sleep(sleepTime);
1947 }
1948 } catch (InterruptedException ie) {
1949 LOG.warn("Failed to unassign "
1950 + region.getRegionNameAsString() + " since interrupted", ie);
1951 Thread.currentThread().interrupt();
1952 if (state != null) {
1953 regionStates.updateRegionState(region, State.FAILED_CLOSE);
1954 }
1955 return;
1956 }
1957
1958 if (logRetries) {
1959 LOG.info("Server " + server + " returned " + t + " for "
1960 + region.getRegionNameAsString() + ", try=" + i
1961 + " of " + this.maximumAttempts, t);
1962
1963 }
1964 }
1965 }
1966
1967 if (state != null) {
1968 regionStates.updateRegionState(region, State.FAILED_CLOSE);
1969 }
1970 }
1971
1972
1973
1974
1975 private RegionState forceRegionStateToOffline(
1976 final HRegionInfo region, final boolean forceNewPlan) {
1977 RegionState state = regionStates.getRegionState(region);
1978 if (state == null) {
1979 LOG.warn("Assigning but not in region states: " + region);
1980 state = regionStates.createRegionState(region);
1981 }
1982
1983 ServerName sn = state.getServerName();
1984 if (forceNewPlan && LOG.isDebugEnabled()) {
1985 LOG.debug("Force region state offline " + state);
1986 }
1987
1988 switch (state.getState()) {
1989 case OPEN:
1990 case OPENING:
1991 case PENDING_OPEN:
1992 case CLOSING:
1993 case PENDING_CLOSE:
1994 if (!forceNewPlan) {
1995 LOG.debug("Skip assigning " +
1996 region + ", it is already " + state);
1997 return null;
1998 }
1999 case FAILED_CLOSE:
2000 case FAILED_OPEN:
2001 unassign(region, state, -1, null, false, null);
2002 state = regionStates.getRegionState(region);
2003 if (state.isFailedClose()) {
2004
2005
2006 LOG.info("Skip assigning " +
2007 region + ", we couldn't close it: " + state);
2008 return null;
2009 }
2010 case OFFLINE:
2011
2012
2013
2014
2015
2016
2017
2018
2019 if (useZKForAssignment
2020 && regionStates.isServerDeadAndNotProcessed(sn)
2021 && wasRegionOnDeadServerByMeta(region, sn)) {
2022 if (!regionStates.isRegionInTransition(region)) {
2023 LOG.info("Updating the state to " + State.OFFLINE + " to allow to be reassigned by SSH");
2024 regionStates.updateRegionState(region, State.OFFLINE);
2025 }
2026 LOG.info("Skip assigning " + region.getRegionNameAsString()
2027 + ", it is on a dead but not processed yet server: " + sn);
2028 return null;
2029 }
2030 case CLOSED:
2031 break;
2032 default:
2033 LOG.error("Trying to assign region " + region
2034 + ", which is " + state);
2035 return null;
2036 }
2037 return state;
2038 }
2039
2040 @SuppressWarnings("deprecation")
2041 protected boolean wasRegionOnDeadServerByMeta(
2042 final HRegionInfo region, final ServerName sn) {
2043 try {
2044 if (region.isMetaRegion()) {
2045 ServerName server = this.server.getMetaTableLocator().
2046 getMetaRegionLocation(this.server.getZooKeeper());
2047 return regionStates.isServerDeadAndNotProcessed(server);
2048 }
2049 while (!server.isStopped()) {
2050 try {
2051 this.server.getMetaTableLocator().waitMetaRegionLocation(server.getZooKeeper());
2052 Result r = MetaTableAccessor.getRegionResult(server.getConnection(),
2053 region.getRegionName());
2054 if (r == null || r.isEmpty()) return false;
2055 ServerName server = HRegionInfo.getServerName(r);
2056 return regionStates.isServerDeadAndNotProcessed(server);
2057 } catch (IOException ioe) {
2058 LOG.info("Received exception accessing hbase:meta during force assign "
2059 + region.getRegionNameAsString() + ", retrying", ioe);
2060 }
2061 }
2062 } catch (InterruptedException e) {
2063 Thread.currentThread().interrupt();
2064 LOG.info("Interrupted accessing hbase:meta", e);
2065 }
2066
2067 return regionStates.isServerDeadAndNotProcessed(sn);
2068 }
2069
2070
2071
2072
2073
2074
2075
2076 public void assign(RegionState state,
2077 boolean setOfflineInZK, final boolean forceNewPlan) {
2078 long startTime = EnvironmentEdgeManager.currentTime();
2079 try {
2080 Configuration conf = server.getConfiguration();
2081 RegionState currentState = state;
2082 int versionOfOfflineNode = -1;
2083 RegionPlan plan = null;
2084 long maxWaitTime = -1;
2085 HRegionInfo region = state.getRegion();
2086 RegionOpeningState regionOpenState;
2087 Throwable previousException = null;
2088 for (int i = 1; i <= maximumAttempts; i++) {
2089 if (server.isStopped() || server.isAborted()) {
2090 LOG.info("Skip assigning " + region.getRegionNameAsString()
2091 + ", the server is stopped/aborted");
2092 return;
2093 }
2094
2095 if (plan == null) {
2096 try {
2097 plan = getRegionPlan(region, forceNewPlan);
2098 } catch (HBaseIOException e) {
2099 LOG.warn("Failed to get region plan", e);
2100 }
2101 }
2102
2103 if (plan == null) {
2104 LOG.warn("Unable to determine a plan to assign " + region);
2105
2106
2107 if (region.isMetaRegion()) {
2108 if (i == maximumAttempts) {
2109 i = 0;
2110
2111 LOG.warn("Unable to determine a plan to assign a hbase:meta region " + region +
2112 " after maximumAttempts (" + this.maximumAttempts +
2113 "). Reset attempts count and continue retrying.");
2114 }
2115 waitForRetryingMetaAssignment();
2116 continue;
2117 }
2118
2119 regionStates.updateRegionState(region, State.FAILED_OPEN);
2120 return;
2121 }
2122 if (setOfflineInZK && versionOfOfflineNode == -1) {
2123 LOG.info("Setting node as OFFLINED in ZooKeeper for region " + region);
2124
2125
2126 versionOfOfflineNode = setOfflineInZooKeeper(currentState, plan.getDestination());
2127 if (versionOfOfflineNode != -1) {
2128 if (isDisabledorDisablingRegionInRIT(region)) {
2129 return;
2130 }
2131
2132
2133
2134
2135
2136
2137 TableName tableName = region.getTable();
2138 if (!tableStateManager.isTableState(tableName,
2139 ZooKeeperProtos.Table.State.ENABLED, ZooKeeperProtos.Table.State.ENABLING)) {
2140 LOG.debug("Setting table " + tableName + " to ENABLED state.");
2141 setEnabledTable(tableName);
2142 }
2143 }
2144 }
2145 if (setOfflineInZK && versionOfOfflineNode == -1) {
2146 LOG.info("Unable to set offline in ZooKeeper to assign " + region);
2147
2148
2149
2150
2151 if (!server.isAborted()) {
2152 continue;
2153 }
2154 }
2155 LOG.info("Assigning " + region.getRegionNameAsString() +
2156 " to " + plan.getDestination().toString());
2157
2158 currentState = regionStates.updateRegionState(region,
2159 State.PENDING_OPEN, plan.getDestination());
2160
2161 boolean needNewPlan;
2162 final String assignMsg = "Failed assignment of " + region.getRegionNameAsString() +
2163 " to " + plan.getDestination();
2164 try {
2165 List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
2166 if (this.shouldAssignRegionsWithFavoredNodes) {
2167 favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
2168 }
2169 regionOpenState = serverManager.sendRegionOpen(
2170 plan.getDestination(), region, versionOfOfflineNode, favoredNodes);
2171
2172 if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
2173
2174 needNewPlan = true;
2175 LOG.warn(assignMsg + ", regionserver says 'FAILED_OPENING', " +
2176 " trying to assign elsewhere instead; " +
2177 "try=" + i + " of " + this.maximumAttempts);
2178 } else {
2179
2180 if (regionOpenState == RegionOpeningState.ALREADY_OPENED) {
2181 processAlreadyOpenedRegion(region, plan.getDestination());
2182 }
2183 return;
2184 }
2185
2186 } catch (Throwable t) {
2187 if (t instanceof RemoteException) {
2188 t = ((RemoteException) t).unwrapRemoteException();
2189 }
2190 previousException = t;
2191
2192
2193
2194
2195 boolean hold = (t instanceof RegionAlreadyInTransitionException ||
2196 t instanceof ServerNotRunningYetException);
2197
2198
2199
2200
2201
2202
2203 boolean retry = !hold && (t instanceof java.net.SocketTimeoutException
2204 && this.serverManager.isServerOnline(plan.getDestination()));
2205
2206
2207 if (hold) {
2208 LOG.warn(assignMsg + ", waiting a little before trying on the same region server " +
2209 "try=" + i + " of " + this.maximumAttempts, t);
2210
2211 if (maxWaitTime < 0) {
2212 if (t instanceof RegionAlreadyInTransitionException) {
2213 maxWaitTime = EnvironmentEdgeManager.currentTime()
2214 + this.server.getConfiguration().getLong(ALREADY_IN_TRANSITION_WAITTIME,
2215 DEFAULT_ALREADY_IN_TRANSITION_WAITTIME);
2216 } else {
2217 maxWaitTime = EnvironmentEdgeManager.currentTime()
2218 + this.server.getConfiguration().getLong(
2219 "hbase.regionserver.rpc.startup.waittime", 60000);
2220 }
2221 }
2222 try {
2223 needNewPlan = false;
2224 long now = EnvironmentEdgeManager.currentTime();
2225 if (now < maxWaitTime) {
2226 LOG.debug("Server is not yet up or region is already in transition; "
2227 + "waiting up to " + (maxWaitTime - now) + "ms", t);
2228 Thread.sleep(100);
2229 i--;
2230 } else if (!(t instanceof RegionAlreadyInTransitionException)) {
2231 LOG.debug("Server is not up for a while; try a new one", t);
2232 needNewPlan = true;
2233 }
2234 } catch (InterruptedException ie) {
2235 LOG.warn("Failed to assign "
2236 + region.getRegionNameAsString() + " since interrupted", ie);
2237 regionStates.updateRegionState(region, State.FAILED_OPEN);
2238 Thread.currentThread().interrupt();
2239 return;
2240 }
2241 } else if (retry) {
2242 needNewPlan = false;
2243 i--;
2244 LOG.warn(assignMsg + ", trying to assign to the same region server due ", t);
2245 } else {
2246 needNewPlan = true;
2247 LOG.warn(assignMsg + ", trying to assign elsewhere instead;" +
2248 " try=" + i + " of " + this.maximumAttempts, t);
2249 }
2250 }
2251
2252 if (i == this.maximumAttempts) {
2253
2254 if (region.isMetaRegion()) {
2255 i = 0;
2256 LOG.warn(assignMsg +
2257 ", trying to assign a hbase:meta region reached to maximumAttempts (" +
2258 this.maximumAttempts + "). Reset attempt counts and continue retrying.");
2259 waitForRetryingMetaAssignment();
2260 }
2261 else {
2262
2263
2264 continue;
2265 }
2266 }
2267
2268
2269
2270
2271 if (needNewPlan) {
2272
2273
2274
2275
2276 RegionPlan newPlan = null;
2277 try {
2278 newPlan = getRegionPlan(region, true);
2279 } catch (HBaseIOException e) {
2280 LOG.warn("Failed to get region plan", e);
2281 }
2282 if (newPlan == null) {
2283 regionStates.updateRegionState(region, State.FAILED_OPEN);
2284 LOG.warn("Unable to find a viable location to assign region " +
2285 region.getRegionNameAsString());
2286 return;
2287 }
2288
2289 if (plan != newPlan && !plan.getDestination().equals(newPlan.getDestination())) {
2290
2291
2292
2293 LOG.info("Region assignment plan changed from " + plan.getDestination() + " to "
2294 + newPlan.getDestination() + " server.");
2295 currentState = regionStates.updateRegionState(region, State.OFFLINE);
2296 versionOfOfflineNode = -1;
2297 if (useZKForAssignment) {
2298 setOfflineInZK = true;
2299 }
2300 plan = newPlan;
2301 } else if(plan.getDestination().equals(newPlan.getDestination()) &&
2302 previousException instanceof FailedServerException) {
2303 try {
2304 LOG.info("Trying to re-assign " + region.getRegionNameAsString() +
2305 " to the same failed server.");
2306 Thread.sleep(1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
2307 RpcClient.FAILED_SERVER_EXPIRY_DEFAULT));
2308 } catch (InterruptedException ie) {
2309 LOG.warn("Failed to assign "
2310 + region.getRegionNameAsString() + " since interrupted", ie);
2311 regionStates.updateRegionState(region, State.FAILED_OPEN);
2312 Thread.currentThread().interrupt();
2313 return;
2314 }
2315 }
2316 }
2317 }
2318
2319 regionStates.updateRegionState(region, State.FAILED_OPEN);
2320 } finally {
2321 metricsAssignmentManager.updateAssignmentTime(EnvironmentEdgeManager.currentTime() - startTime);
2322 }
2323 }
2324
2325 private void processAlreadyOpenedRegion(HRegionInfo region, ServerName sn) {
2326
2327
2328
2329 LOG.debug("ALREADY_OPENED " + region.getRegionNameAsString()
2330 + " to " + sn);
2331 String encodedName = region.getEncodedName();
2332
2333
2334
2335 if(useZKForAssignment){
2336 String node = ZKAssign.getNodeName(watcher, encodedName);
2337 Stat stat = new Stat();
2338 try {
2339 byte[] existingBytes = ZKUtil.getDataNoWatch(watcher, node, stat);
2340 if(existingBytes!=null){
2341 RegionTransition rt= RegionTransition.parseFrom(existingBytes);
2342 EventType et = rt.getEventType();
2343 if (et.equals(EventType.RS_ZK_REGION_OPENED)) {
2344 LOG.debug("ALREADY_OPENED " + region.getRegionNameAsString()
2345 + " and node in "+et+" state");
2346 return;
2347 }
2348 }
2349 } catch (KeeperException ke) {
2350 LOG.warn("Unexpected ZK exception getData " + node
2351 + " node for the region " + encodedName, ke);
2352 } catch (DeserializationException e) {
2353 LOG.warn("Get RegionTransition from zk deserialization failed! ", e);
2354 }
2355
2356 deleteNodeInStates(encodedName, "offline", sn, EventType.M_ZK_REGION_OFFLINE);
2357 }
2358
2359 regionStates.regionOnline(region, sn);
2360 }
2361
2362 private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) {
2363 if (this.tableStateManager.isTableState(region.getTable(),
2364 ZooKeeperProtos.Table.State.DISABLED,
2365 ZooKeeperProtos.Table.State.DISABLING) || replicasToClose.contains(region)) {
2366 LOG.info("Table " + region.getTable() + " is disabled or disabling;"
2367 + " skipping assign of " + region.getRegionNameAsString());
2368 offlineDisabledRegion(region);
2369 return true;
2370 }
2371 return false;
2372 }
2373
2374
2375
2376
2377
2378
2379
2380
2381 private int setOfflineInZooKeeper(final RegionState state, final ServerName destination) {
2382 if (!state.isClosed() && !state.isOffline()) {
2383 String msg = "Unexpected state : " + state + " .. Cannot transit it to OFFLINE.";
2384 this.server.abort(msg, new IllegalStateException(msg));
2385 return -1;
2386 }
2387 regionStates.updateRegionState(state.getRegion(), State.OFFLINE);
2388 int versionOfOfflineNode;
2389 try {
2390
2391 versionOfOfflineNode = ZKAssign.createOrForceNodeOffline(watcher,
2392 state.getRegion(), destination);
2393 if (versionOfOfflineNode == -1) {
2394 LOG.warn("Attempted to create/force node into OFFLINE state before "
2395 + "completing assignment but failed to do so for " + state);
2396 return -1;
2397 }
2398 } catch (KeeperException e) {
2399 server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
2400 return -1;
2401 }
2402 return versionOfOfflineNode;
2403 }
2404
2405
2406
2407
2408
2409
2410 private RegionPlan getRegionPlan(final HRegionInfo region,
2411 final boolean forceNewPlan) throws HBaseIOException {
2412 return getRegionPlan(region, null, forceNewPlan);
2413 }
2414
2415
2416
2417
2418
2419
2420
2421
2422
2423
2424 private RegionPlan getRegionPlan(final HRegionInfo region,
2425 final ServerName serverToExclude, final boolean forceNewPlan) throws HBaseIOException {
2426
2427 final String encodedName = region.getEncodedName();
2428 final List<ServerName> destServers =
2429 serverManager.createDestinationServersList(serverToExclude);
2430
2431 if (destServers.isEmpty()){
2432 LOG.warn("Can't move " + encodedName +
2433 ", there is no destination server available.");
2434 return null;
2435 }
2436
2437 RegionPlan randomPlan = null;
2438 boolean newPlan = false;
2439 RegionPlan existingPlan;
2440
2441 synchronized (this.regionPlans) {
2442 existingPlan = this.regionPlans.get(encodedName);
2443
2444 if (existingPlan != null && existingPlan.getDestination() != null) {
2445 LOG.debug("Found an existing plan for " + region.getRegionNameAsString()
2446 + " destination server is " + existingPlan.getDestination() +
2447 " accepted as a dest server = " + destServers.contains(existingPlan.getDestination()));
2448 }
2449
2450 if (forceNewPlan
2451 || existingPlan == null
2452 || existingPlan.getDestination() == null
2453 || !destServers.contains(existingPlan.getDestination())) {
2454 newPlan = true;
2455 }
2456 }
2457
2458 if (newPlan) {
2459 ServerName destination = balancer.randomAssignment(region, destServers);
2460 if (destination == null) {
2461 LOG.warn("Can't find a destination for " + encodedName);
2462 return null;
2463 }
2464 synchronized (this.regionPlans) {
2465 randomPlan = new RegionPlan(region, null, destination);
2466 if (!region.isMetaTable() && shouldAssignRegionsWithFavoredNodes) {
2467 List<HRegionInfo> regions = new ArrayList<HRegionInfo>(1);
2468 regions.add(region);
2469 try {
2470 processFavoredNodes(regions);
2471 } catch (IOException ie) {
2472 LOG.warn("Ignoring exception in processFavoredNodes " + ie);
2473 }
2474 }
2475 this.regionPlans.put(encodedName, randomPlan);
2476 }
2477 LOG.debug("No previous transition plan found (or ignoring " + "an existing plan) for "
2478 + region.getRegionNameAsString() + "; generated random plan=" + randomPlan + "; "
2479 + destServers.size() + " (online=" + serverManager.getOnlineServers().size()
2480 + ") available servers, forceNewPlan=" + forceNewPlan);
2481 return randomPlan;
2482 }
2483 LOG.debug("Using pre-existing plan for " +
2484 region.getRegionNameAsString() + "; plan=" + existingPlan);
2485 return existingPlan;
2486 }
2487
2488
2489
2490
2491 private void waitForRetryingMetaAssignment() {
2492 try {
2493 Thread.sleep(this.sleepTimeBeforeRetryingMetaAssignment);
2494 } catch (InterruptedException e) {
2495 LOG.error("Got exception while waiting for hbase:meta assignment");
2496 Thread.currentThread().interrupt();
2497 }
2498 }
2499
2500
2501
2502
2503
2504
2505
2506
2507
2508
2509
2510
2511
2512
2513 public void unassign(HRegionInfo region) {
2514 unassign(region, false);
2515 }
2516
2517
2518
2519
2520
2521
2522
2523
2524
2525
2526
2527
2528
2529
2530
2531
2532 public void unassign(HRegionInfo region, boolean force, ServerName dest) {
2533
2534 LOG.debug("Starting unassign of " + region.getRegionNameAsString()
2535 + " (offlining), current state: " + regionStates.getRegionState(region));
2536
2537 String encodedName = region.getEncodedName();
2538
2539 int versionOfClosingNode = -1;
2540
2541
2542 ReentrantLock lock = locker.acquireLock(encodedName);
2543 RegionState state = regionStates.getRegionTransitionState(encodedName);
2544 boolean reassign = true;
2545 try {
2546 if (state == null) {
2547
2548
2549 state = regionStates.getRegionState(encodedName);
2550 if (state != null && state.isUnassignable()) {
2551 LOG.info("Attempting to unassign " + state + ", ignored");
2552
2553 return;
2554 }
2555
2556 try {
2557 if (state == null || state.getServerName() == null) {
2558
2559
2560 LOG.warn("Attempting to unassign a region not in RegionStates "
2561 + region.getRegionNameAsString() + ", offlined");
2562 regionOffline(region);
2563 return;
2564 }
2565 if (useZKForAssignment) {
2566 versionOfClosingNode = ZKAssign.createNodeClosing(
2567 watcher, region, state.getServerName());
2568 if (versionOfClosingNode == -1) {
2569 LOG.info("Attempting to unassign " +
2570 region.getRegionNameAsString() + " but ZK closing node "
2571 + "can't be created.");
2572 reassign = false;
2573 return;
2574 }
2575 }
2576 } catch (KeeperException e) {
2577 if (e instanceof NodeExistsException) {
2578
2579
2580
2581
2582 NodeExistsException nee = (NodeExistsException)e;
2583 String path = nee.getPath();
2584 try {
2585 if (isSplitOrSplittingOrMergedOrMerging(path)) {
2586 LOG.debug(path + " is SPLIT or SPLITTING or MERGED or MERGING; " +
2587 "skipping unassign because region no longer exists -- its split or merge");
2588 reassign = false;
2589 return;
2590 }
2591 } catch (KeeperException.NoNodeException ke) {
2592 LOG.warn("Failed getData on SPLITTING/SPLIT at " + path +
2593 "; presuming split and that the region to unassign, " +
2594 encodedName + ", no longer exists -- confirm", ke);
2595 return;
2596 } catch (KeeperException ke) {
2597 LOG.error("Unexpected zk state", ke);
2598 } catch (DeserializationException de) {
2599 LOG.error("Failed parse", de);
2600 }
2601 }
2602
2603 server.abort("Unexpected ZK exception creating node CLOSING", e);
2604 reassign = false;
2605 return;
2606 }
2607 state = regionStates.updateRegionState(region, State.PENDING_CLOSE);
2608 } else if (state.isFailedOpen()) {
2609
2610 regionOffline(region);
2611 return;
2612 } else if (force && state.isPendingCloseOrClosing()) {
2613 LOG.debug("Attempting to unassign " + region.getRegionNameAsString() +
2614 " which is already " + state.getState() +
2615 " but forcing to send a CLOSE RPC again ");
2616 if (state.isFailedClose()) {
2617 state = regionStates.updateRegionState(region, State.PENDING_CLOSE);
2618 }
2619 state.updateTimestampToNow();
2620 } else {
2621 LOG.debug("Attempting to unassign " +
2622 region.getRegionNameAsString() + " but it is " +
2623 "already in transition (" + state.getState() + ", force=" + force + ")");
2624 return;
2625 }
2626
2627 unassign(region, state, versionOfClosingNode, dest, useZKForAssignment, null);
2628 } finally {
2629 lock.unlock();
2630
2631
2632 if (!replicasToClose.contains(region) && reassign && regionStates.isRegionOffline(region)) {
2633 assign(region, true);
2634 }
2635 }
2636 }
2637
2638 public void unassign(HRegionInfo region, boolean force){
2639 unassign(region, force, null);
2640 }
2641
2642
2643
2644
2645 public void deleteClosingOrClosedNode(HRegionInfo region, ServerName sn) {
2646 String encodedName = region.getEncodedName();
2647 deleteNodeInStates(encodedName, "closing", sn, EventType.M_ZK_REGION_CLOSING,
2648 EventType.RS_ZK_REGION_CLOSED);
2649 }
2650
2651
2652
2653
2654
2655
2656
2657 private boolean isSplitOrSplittingOrMergedOrMerging(final String path)
2658 throws KeeperException, DeserializationException {
2659 boolean result = false;
2660
2661
2662 byte [] data = ZKAssign.getData(watcher, path);
2663 if (data == null) {
2664 LOG.info("Node " + path + " is gone");
2665 return false;
2666 }
2667 RegionTransition rt = RegionTransition.parseFrom(data);
2668 switch (rt.getEventType()) {
2669 case RS_ZK_REQUEST_REGION_SPLIT:
2670 case RS_ZK_REGION_SPLIT:
2671 case RS_ZK_REGION_SPLITTING:
2672 case RS_ZK_REQUEST_REGION_MERGE:
2673 case RS_ZK_REGION_MERGED:
2674 case RS_ZK_REGION_MERGING:
2675 result = true;
2676 break;
2677 default:
2678 LOG.info("Node " + path + " is in " + rt.getEventType());
2679 break;
2680 }
2681 return result;
2682 }
2683
2684
2685
2686
2687
2688
2689 public int getNumRegionsOpened() {
2690 return numRegionsOpened.get();
2691 }
2692
2693
2694
2695
2696
2697
2698
2699
2700
2701
2702 public boolean waitForAssignment(HRegionInfo regionInfo)
2703 throws InterruptedException {
2704 ArrayList<HRegionInfo> regionSet = new ArrayList<HRegionInfo>(1);
2705 regionSet.add(regionInfo);
2706 return waitForAssignment(regionSet, true, Long.MAX_VALUE);
2707 }
2708
2709
2710
2711
2712 protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
2713 final boolean waitTillAllAssigned, final int reassigningRegions,
2714 final long minEndTime) throws InterruptedException {
2715 long deadline = minEndTime + bulkPerRegionOpenTimeGuesstimate * (reassigningRegions + 1);
2716 if (deadline < 0) {
2717 deadline = Long.MAX_VALUE;
2718 }
2719 return waitForAssignment(regionSet, waitTillAllAssigned, deadline);
2720 }
2721
2722
2723
2724
2725
2726
2727
2728
2729
2730 protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
2731 final boolean waitTillAllAssigned, final long deadline) throws InterruptedException {
2732
2733 while (!regionSet.isEmpty() && !server.isStopped() && deadline > System.currentTimeMillis()) {
2734 int failedOpenCount = 0;
2735 Iterator<HRegionInfo> regionInfoIterator = regionSet.iterator();
2736 while (regionInfoIterator.hasNext()) {
2737 HRegionInfo hri = regionInfoIterator.next();
2738 if (regionStates.isRegionOnline(hri) || regionStates.isRegionInState(hri,
2739 State.SPLITTING, State.SPLIT, State.MERGING, State.MERGED)) {
2740 regionInfoIterator.remove();
2741 } else if (regionStates.isRegionInState(hri, State.FAILED_OPEN)) {
2742 failedOpenCount++;
2743 }
2744 }
2745 if (!waitTillAllAssigned) {
2746
2747 break;
2748 }
2749 if (!regionSet.isEmpty()) {
2750 if (failedOpenCount == regionSet.size()) {
2751
2752 break;
2753 }
2754 regionStates.waitForUpdate(100);
2755 }
2756 }
2757 return regionSet.isEmpty();
2758 }
2759
2760
2761
2762
2763
2764
2765
2766
2767
2768
2769
2770
2771 public void assignMeta(HRegionInfo hri) throws KeeperException {
2772 this.server.getMetaTableLocator().deleteMetaLocation(this.watcher, hri.getReplicaId());
2773 assign(hri, true);
2774 }
2775
2776
2777
2778
2779
2780
2781
2782
2783
2784 public void assign(Map<HRegionInfo, ServerName> regions)
2785 throws IOException, InterruptedException {
2786 if (regions == null || regions.isEmpty()) {
2787 return;
2788 }
2789 List<ServerName> servers = serverManager.createDestinationServersList();
2790 if (servers == null || servers.isEmpty()) {
2791 throw new IOException("Found no destination server to assign region(s)");
2792 }
2793
2794
2795 Map<ServerName, List<HRegionInfo>> bulkPlan =
2796 balancer.retainAssignment(regions, servers);
2797 if (bulkPlan == null) {
2798 throw new IOException("Unable to determine a plan to assign region(s)");
2799 }
2800
2801 assign(regions.size(), servers.size(),
2802 "retainAssignment=true", bulkPlan);
2803 }
2804
2805
2806
2807
2808
2809
2810
2811
2812
2813 public void assign(List<HRegionInfo> regions)
2814 throws IOException, InterruptedException {
2815 if (regions == null || regions.isEmpty()) {
2816 return;
2817 }
2818
2819 List<ServerName> servers = serverManager.createDestinationServersList();
2820 if (servers == null || servers.isEmpty()) {
2821 throw new IOException("Found no destination server to assign region(s)");
2822 }
2823
2824
2825 Map<ServerName, List<HRegionInfo>> bulkPlan = balancer.roundRobinAssignment(regions, servers);
2826 if (bulkPlan == null) {
2827 throw new IOException("Unable to determine a plan to assign region(s)");
2828 }
2829
2830 processFavoredNodes(regions);
2831 assign(regions.size(), servers.size(), "round-robin=true", bulkPlan);
2832 }
2833
2834 private void assign(int regions, int totalServers,
2835 String message, Map<ServerName, List<HRegionInfo>> bulkPlan)
2836 throws InterruptedException, IOException {
2837
2838 int servers = bulkPlan.size();
2839 if (servers == 1 || (regions < bulkAssignThresholdRegions
2840 && servers < bulkAssignThresholdServers)) {
2841
2842
2843
2844 if (LOG.isTraceEnabled()) {
2845 LOG.trace("Not using bulk assignment since we are assigning only " + regions +
2846 " region(s) to " + servers + " server(s)");
2847 }
2848
2849
2850 ArrayList<HRegionInfo> userRegionSet = new ArrayList<HRegionInfo>(regions);
2851 for (Map.Entry<ServerName, List<HRegionInfo>> plan: bulkPlan.entrySet()) {
2852 if (!assign(plan.getKey(), plan.getValue())) {
2853 for (HRegionInfo region: plan.getValue()) {
2854 if (!regionStates.isRegionOnline(region)) {
2855 invokeAssign(region);
2856 if (!region.getTable().isSystemTable()) {
2857 userRegionSet.add(region);
2858 }
2859 }
2860 }
2861 }
2862 }
2863
2864
2865 if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
2866 System.currentTimeMillis())) {
2867 LOG.debug("some user regions are still in transition: " + userRegionSet);
2868 }
2869 } else {
2870 LOG.info("Bulk assigning " + regions + " region(s) across "
2871 + totalServers + " server(s), " + message);
2872
2873
2874 BulkAssigner ba = new GeneralBulkAssigner(
2875 this.server, bulkPlan, this, bulkAssignWaitTillAllAssigned);
2876 ba.bulkAssign();
2877 LOG.info("Bulk assigning done");
2878 }
2879 }
2880
2881
2882
2883
2884
2885
2886
2887
2888
2889
2890 private void assignAllUserRegions(Map<HRegionInfo, ServerName> allRegions)
2891 throws IOException, InterruptedException {
2892 if (allRegions == null || allRegions.isEmpty()) return;
2893
2894
2895 boolean retainAssignment = server.getConfiguration().
2896 getBoolean("hbase.master.startup.retainassign", true);
2897
2898 Set<HRegionInfo> regionsFromMetaScan = allRegions.keySet();
2899 if (retainAssignment) {
2900 assign(allRegions);
2901 } else {
2902 List<HRegionInfo> regions = new ArrayList<HRegionInfo>(regionsFromMetaScan);
2903 assign(regions);
2904 }
2905
2906 for (HRegionInfo hri : regionsFromMetaScan) {
2907 TableName tableName = hri.getTable();
2908 if (!tableStateManager.isTableState(tableName,
2909 ZooKeeperProtos.Table.State.ENABLED)) {
2910 setEnabledTable(tableName);
2911 }
2912 }
2913
2914 assign(replicaRegionsNotRecordedInMeta(regionsFromMetaScan, server));
2915 }
2916
2917
2918
2919
2920
2921
2922
2923
2924
2925
2926
2927
2928 public static List<HRegionInfo> replicaRegionsNotRecordedInMeta(
2929 Set<HRegionInfo> regionsRecordedInMeta, MasterServices master)throws IOException {
2930 List<HRegionInfo> regionsNotRecordedInMeta = new ArrayList<HRegionInfo>();
2931 for (HRegionInfo hri : regionsRecordedInMeta) {
2932 TableName table = hri.getTable();
2933 HTableDescriptor htd = master.getTableDescriptors().get(table);
2934
2935 int desiredRegionReplication = htd.getRegionReplication();
2936 for (int i = 0; i < desiredRegionReplication; i++) {
2937 HRegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hri, i);
2938 if (regionsRecordedInMeta.contains(replica)) continue;
2939 regionsNotRecordedInMeta.add(replica);
2940 }
2941 }
2942 return regionsNotRecordedInMeta;
2943 }
2944
2945
2946
2947
2948
2949
2950
2951 boolean waitUntilNoRegionsInTransition(final long timeout)
2952 throws InterruptedException {
2953
2954
2955
2956
2957
2958
2959 final long endTime = System.currentTimeMillis() + timeout;
2960
2961 while (!this.server.isStopped() && regionStates.isRegionsInTransition()
2962 && endTime > System.currentTimeMillis()) {
2963 regionStates.waitForUpdate(100);
2964 }
2965
2966 return !regionStates.isRegionsInTransition();
2967 }
2968
2969
2970
2971
2972
2973
2974
2975 Set<ServerName> rebuildUserRegions() throws
2976 IOException, KeeperException, CoordinatedStateException {
2977 Set<TableName> disabledOrEnablingTables = tableStateManager.getTablesInStates(
2978 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.ENABLING);
2979
2980 Set<TableName> disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
2981 ZooKeeperProtos.Table.State.DISABLED,
2982 ZooKeeperProtos.Table.State.DISABLING,
2983 ZooKeeperProtos.Table.State.ENABLING);
2984
2985
2986 List<Result> results = MetaTableAccessor.fullScanOfMeta(server.getConnection());
2987
2988 Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
2989
2990 Set<ServerName> offlineServers = new HashSet<ServerName>();
2991
2992 for (Result result : results) {
2993 if (result == null && LOG.isDebugEnabled()){
2994 LOG.debug("null result from meta - ignoring but this is strange.");
2995 continue;
2996 }
2997
2998
2999
3000 PairOfSameType<HRegionInfo> p = MetaTableAccessor.getMergeRegions(result);
3001 if (p.getFirst() != null && p.getSecond() != null) {
3002 int numReplicas = server.getTableDescriptors().get(p.getFirst().
3003 getTable()).getRegionReplication();
3004 for (HRegionInfo merge : p) {
3005 for (int i = 1; i < numReplicas; i++) {
3006 replicasToClose.add(RegionReplicaUtil.getRegionInfoForReplica(merge, i));
3007 }
3008 }
3009 }
3010 RegionLocations rl = MetaTableAccessor.getRegionLocations(result);
3011 if (rl == null) continue;
3012 HRegionLocation[] locations = rl.getRegionLocations();
3013 if (locations == null) continue;
3014 for (HRegionLocation hrl : locations) {
3015 if (hrl == null) continue;
3016 HRegionInfo regionInfo = hrl.getRegionInfo();
3017 if (regionInfo == null) continue;
3018 int replicaId = regionInfo.getReplicaId();
3019 State state = RegionStateStore.getRegionState(result, replicaId);
3020
3021
3022
3023 if (replicaId == 0 && state.equals(State.SPLIT)) {
3024 for (HRegionLocation h : locations) {
3025 replicasToClose.add(h.getRegionInfo());
3026 }
3027 }
3028 ServerName lastHost = hrl.getServerName();
3029 ServerName regionLocation = RegionStateStore.getRegionServer(result, replicaId);
3030 if (tableStateManager.isTableState(regionInfo.getTable(),
3031 ZooKeeperProtos.Table.State.DISABLED)) {
3032
3033
3034 lastHost = null;
3035 regionLocation = null;
3036 }
3037 regionStates.createRegionState(regionInfo, state, regionLocation, lastHost);
3038 if (!regionStates.isRegionInState(regionInfo, State.OPEN)) {
3039
3040 continue;
3041 }
3042 TableName tableName = regionInfo.getTable();
3043 if (!onlineServers.contains(regionLocation)) {
3044
3045 offlineServers.add(regionLocation);
3046 if (useZKForAssignment) {
3047 regionStates.regionOffline(regionInfo);
3048 }
3049 } else if (!disabledOrEnablingTables.contains(tableName)) {
3050
3051
3052 regionStates.regionOnline(regionInfo, regionLocation);
3053 balancer.regionOnline(regionInfo, regionLocation);
3054 } else if (useZKForAssignment) {
3055 regionStates.regionOffline(regionInfo);
3056 }
3057
3058
3059 if (!disabledOrDisablingOrEnabling.contains(tableName)
3060 && !getTableStateManager().isTableState(tableName,
3061 ZooKeeperProtos.Table.State.ENABLED)) {
3062 setEnabledTable(tableName);
3063 }
3064 }
3065 }
3066 return offlineServers;
3067 }
3068
3069
3070
3071
3072
3073
3074
3075
3076
3077 private void recoverTableInDisablingState()
3078 throws KeeperException, IOException, CoordinatedStateException {
3079 Set<TableName> disablingTables =
3080 tableStateManager.getTablesInStates(ZooKeeperProtos.Table.State.DISABLING);
3081 if (disablingTables.size() != 0) {
3082 for (TableName tableName : disablingTables) {
3083
3084 LOG.info("The table " + tableName
3085 + " is in DISABLING state. Hence recovering by moving the table"
3086 + " to DISABLED state.");
3087 new DisableTableHandler(this.server, tableName,
3088 this, tableLockManager, true).prepare().process();
3089 }
3090 }
3091 }
3092
3093
3094
3095
3096
3097
3098
3099
3100
3101 private void recoverTableInEnablingState()
3102 throws KeeperException, IOException, CoordinatedStateException {
3103 Set<TableName> enablingTables = tableStateManager.
3104 getTablesInStates(ZooKeeperProtos.Table.State.ENABLING);
3105 if (enablingTables.size() != 0) {
3106 for (TableName tableName : enablingTables) {
3107
3108 LOG.info("The table " + tableName
3109 + " is in ENABLING state. Hence recovering by moving the table"
3110 + " to ENABLED state.");
3111
3112
3113 EnableTableHandler eth = new EnableTableHandler(this.server, tableName,
3114 this, tableLockManager, true);
3115 try {
3116 eth.prepare();
3117 } catch (TableNotFoundException e) {
3118 LOG.warn("Table " + tableName + " not found in hbase:meta to recover.");
3119 continue;
3120 }
3121 eth.process();
3122 }
3123 }
3124 }
3125
3126
3127
3128
3129
3130
3131
3132
3133
3134
3135
3136
3137 private void processDeadServersAndRecoverLostRegions(Set<ServerName> deadServers)
3138 throws IOException, KeeperException {
3139 if (deadServers != null && !deadServers.isEmpty()) {
3140 for (ServerName serverName: deadServers) {
3141 if (!serverManager.isServerDead(serverName)) {
3142 serverManager.expireServer(serverName);
3143 }
3144 }
3145 }
3146
3147 List<String> nodes = useZKForAssignment ?
3148 ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.assignmentZNode)
3149 : ZKUtil.listChildrenNoWatch(watcher, watcher.assignmentZNode);
3150 if (nodes != null && !nodes.isEmpty()) {
3151 for (String encodedRegionName : nodes) {
3152 processRegionInTransition(encodedRegionName, null);
3153 }
3154 } else if (!useZKForAssignment) {
3155 processRegionInTransitionZkLess();
3156 }
3157 }
3158
3159 void processRegionInTransitionZkLess() {
3160
3161
3162
3163
3164
3165 Map<String, RegionState> rits = regionStates.getRegionsInTransition();
3166 for (RegionState regionState : rits.values()) {
3167 LOG.info("Processing " + regionState);
3168 ServerName serverName = regionState.getServerName();
3169
3170
3171 if (serverName != null
3172 && !serverManager.getOnlineServers().containsKey(serverName)) {
3173 LOG.info("Server " + serverName + " isn't online. SSH will handle this");
3174 continue;
3175 }
3176 HRegionInfo regionInfo = regionState.getRegion();
3177 State state = regionState.getState();
3178
3179 switch (state) {
3180 case CLOSED:
3181 invokeAssign(regionInfo);
3182 break;
3183 case PENDING_OPEN:
3184 retrySendRegionOpen(regionState);
3185 break;
3186 case PENDING_CLOSE:
3187 retrySendRegionClose(regionState);
3188 break;
3189 case FAILED_CLOSE:
3190 case FAILED_OPEN:
3191 invokeUnAssign(regionInfo);
3192 break;
3193 default:
3194
3195 }
3196 }
3197 }
3198
3199
3200
3201
3202
3203 private void retrySendRegionOpen(final RegionState regionState) {
3204 this.executorService.submit(
3205 new EventHandler(server, EventType.M_MASTER_RECOVERY) {
3206 @Override
3207 public void process() throws IOException {
3208 HRegionInfo hri = regionState.getRegion();
3209 ServerName serverName = regionState.getServerName();
3210 ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
3211 try {
3212 for (int i = 1; i <= maximumAttempts; i++) {
3213 if (!serverManager.isServerOnline(serverName)
3214 || server.isStopped() || server.isAborted()) {
3215 return;
3216 }
3217 try {
3218 if (!regionState.equals(regionStates.getRegionState(hri))) {
3219 return;
3220 }
3221 List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
3222 if (shouldAssignRegionsWithFavoredNodes) {
3223 favoredNodes = ((FavoredNodeLoadBalancer)balancer).getFavoredNodes(hri);
3224 }
3225 RegionOpeningState regionOpenState = serverManager.sendRegionOpen(
3226 serverName, hri, -1, favoredNodes);
3227
3228 if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
3229
3230
3231 LOG.debug("Got failed_opening in retry sendRegionOpen for "
3232 + regionState + ", re-assign it");
3233 invokeAssign(hri, true);
3234 }
3235 return;
3236 } catch (Throwable t) {
3237 if (t instanceof RemoteException) {
3238 t = ((RemoteException) t).unwrapRemoteException();
3239 }
3240
3241 if (t instanceof java.net.SocketTimeoutException
3242 || t instanceof FailedServerException) {
3243 Threads.sleep(100);
3244 continue;
3245 }
3246
3247 LOG.debug("Got exception in retry sendRegionOpen for "
3248 + regionState + ", re-assign it", t);
3249 invokeAssign(hri);
3250 return;
3251 }
3252 }
3253 } finally {
3254 lock.unlock();
3255 }
3256 }
3257 });
3258 }
3259
3260
3261
3262
3263
3264 private void retrySendRegionClose(final RegionState regionState) {
3265 this.executorService.submit(
3266 new EventHandler(server, EventType.M_MASTER_RECOVERY) {
3267 @Override
3268 public void process() throws IOException {
3269 HRegionInfo hri = regionState.getRegion();
3270 ServerName serverName = regionState.getServerName();
3271 ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
3272 try {
3273 for (int i = 1; i <= maximumAttempts; i++) {
3274 if (!serverManager.isServerOnline(serverName)
3275 || server.isStopped() || server.isAborted()) {
3276 return;
3277 }
3278 try {
3279 if (!regionState.equals(regionStates.getRegionState(hri))) {
3280 return;
3281 }
3282 if (!serverManager.sendRegionClose(serverName, hri, -1, null, false)) {
3283
3284 LOG.debug("Got false in retry sendRegionClose for "
3285 + regionState + ", re-close it");
3286 invokeUnAssign(hri);
3287 }
3288 return;
3289 } catch (Throwable t) {
3290 if (t instanceof RemoteException) {
3291 t = ((RemoteException) t).unwrapRemoteException();
3292 }
3293
3294 if (t instanceof java.net.SocketTimeoutException
3295 || t instanceof FailedServerException) {
3296 Threads.sleep(100);
3297 continue;
3298 }
3299 if (!(t instanceof NotServingRegionException
3300 || t instanceof RegionAlreadyInTransitionException)) {
3301
3302
3303
3304 LOG.debug("Got exception in retry sendRegionClose for "
3305 + regionState + ", re-close it", t);
3306 invokeUnAssign(hri);
3307 }
3308 return;
3309 }
3310 }
3311 } finally {
3312 lock.unlock();
3313 }
3314 }
3315 });
3316 }
3317
3318
3319
3320
3321
3322
3323
3324
3325 public void updateRegionsInTransitionMetrics() {
3326 long currentTime = System.currentTimeMillis();
3327 int totalRITs = 0;
3328 int totalRITsOverThreshold = 0;
3329 long oldestRITTime = 0;
3330 int ritThreshold = this.server.getConfiguration().
3331 getInt(HConstants.METRICS_RIT_STUCK_WARNING_THRESHOLD, 60000);
3332 for (RegionState state: regionStates.getRegionsInTransition().values()) {
3333 totalRITs++;
3334 long ritTime = currentTime - state.getStamp();
3335 if (ritTime > ritThreshold) {
3336 totalRITsOverThreshold++;
3337 }
3338 if (oldestRITTime < ritTime) {
3339 oldestRITTime = ritTime;
3340 }
3341 }
3342 if (this.metricsAssignmentManager != null) {
3343 this.metricsAssignmentManager.updateRITOldestAge(oldestRITTime);
3344 this.metricsAssignmentManager.updateRITCount(totalRITs);
3345 this.metricsAssignmentManager.updateRITCountOverThreshold(totalRITsOverThreshold);
3346 }
3347 }
3348
3349
3350
3351
3352 void clearRegionPlan(final HRegionInfo region) {
3353 synchronized (this.regionPlans) {
3354 this.regionPlans.remove(region.getEncodedName());
3355 }
3356 }
3357
3358
3359
3360
3361
3362
3363 public void waitOnRegionToClearRegionsInTransition(final HRegionInfo hri)
3364 throws IOException, InterruptedException {
3365 waitOnRegionToClearRegionsInTransition(hri, -1L);
3366 }
3367
3368
3369
3370
3371
3372
3373
3374
3375 public boolean waitOnRegionToClearRegionsInTransition(final HRegionInfo hri, long timeOut)
3376 throws InterruptedException {
3377 if (!regionStates.isRegionInTransition(hri)) return true;
3378 long end = (timeOut <= 0) ? Long.MAX_VALUE : EnvironmentEdgeManager.currentTime()
3379 + timeOut;
3380
3381
3382 LOG.info("Waiting for " + hri.getEncodedName() +
3383 " to leave regions-in-transition, timeOut=" + timeOut + " ms.");
3384 while (!this.server.isStopped() && regionStates.isRegionInTransition(hri)) {
3385 regionStates.waitForUpdate(100);
3386 if (EnvironmentEdgeManager.currentTime() > end) {
3387 LOG.info("Timed out on waiting for " + hri.getEncodedName() + " to be assigned.");
3388 return false;
3389 }
3390 }
3391 if (this.server.isStopped()) {
3392 LOG.info("Giving up wait on regions in transition because stoppable.isStopped is set");
3393 return false;
3394 }
3395 return true;
3396 }
3397
3398 void invokeAssign(HRegionInfo regionInfo) {
3399 invokeAssign(regionInfo, true);
3400 }
3401
3402 void invokeAssign(HRegionInfo regionInfo, boolean newPlan) {
3403 threadPoolExecutorService.submit(new AssignCallable(this, regionInfo, newPlan));
3404 }
3405
3406 void invokeUnAssign(HRegionInfo regionInfo) {
3407 threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
3408 }
3409
3410 public ServerHostRegion isCarryingMeta(ServerName serverName) {
3411 return isCarryingRegion(serverName, HRegionInfo.FIRST_META_REGIONINFO);
3412 }
3413
3414 public ServerHostRegion isCarryingMetaReplica(ServerName serverName, int replicaId) {
3415 return isCarryingRegion(serverName,
3416 RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO, replicaId));
3417 }
3418
3419 public ServerHostRegion isCarryingMetaReplica(ServerName serverName, HRegionInfo metaHri) {
3420 return isCarryingRegion(serverName, metaHri);
3421 }
3422
3423
3424
3425
3426
3427
3428
3429
3430
3431
3432
3433 private ServerHostRegion isCarryingRegion(ServerName serverName, HRegionInfo hri) {
3434 RegionTransition rt = null;
3435 try {
3436 byte [] data = ZKAssign.getData(watcher, hri.getEncodedName());
3437
3438 rt = data == null? null: RegionTransition.parseFrom(data);
3439 } catch (KeeperException e) {
3440 server.abort("Exception reading unassigned node for region=" + hri.getEncodedName(), e);
3441 } catch (DeserializationException e) {
3442 server.abort("Exception parsing unassigned node for region=" + hri.getEncodedName(), e);
3443 }
3444
3445 ServerName addressFromZK = rt != null? rt.getServerName(): null;
3446 if (addressFromZK != null) {
3447
3448 boolean matchZK = addressFromZK.equals(serverName);
3449 LOG.debug("Checking region=" + hri.getRegionNameAsString() + ", zk server=" + addressFromZK +
3450 " current=" + serverName + ", matches=" + matchZK);
3451 return matchZK ? ServerHostRegion.HOSTING_REGION : ServerHostRegion.NOT_HOSTING_REGION;
3452 }
3453
3454 ServerName addressFromAM = regionStates.getRegionServerOfRegion(hri);
3455 if (LOG.isDebugEnabled()) {
3456 LOG.debug("based on AM, current region=" + hri.getRegionNameAsString() +
3457 " is on server=" + (addressFromAM != null ? addressFromAM : "null") +
3458 " server being checked: " + serverName);
3459 }
3460 if (addressFromAM != null) {
3461 return addressFromAM.equals(serverName) ?
3462 ServerHostRegion.HOSTING_REGION : ServerHostRegion.NOT_HOSTING_REGION;
3463 }
3464
3465 if (hri.isMetaRegion() && RegionReplicaUtil.isDefaultReplica(hri)) {
3466
3467 final ServerName serverNameInZK =
3468 server.getMetaTableLocator().getMetaRegionLocation(this.server.getZooKeeper());
3469 if (LOG.isDebugEnabled()) {
3470 LOG.debug("Based on MetaTableLocator, the META region is on server=" +
3471 (serverNameInZK == null ? "null" : serverNameInZK) +
3472 " server being checked: " + serverName);
3473 }
3474 if (serverNameInZK != null) {
3475 return serverNameInZK.equals(serverName) ?
3476 ServerHostRegion.HOSTING_REGION : ServerHostRegion.NOT_HOSTING_REGION;
3477 }
3478 }
3479
3480
3481 return ServerHostRegion.UNKNOWN;
3482 }
3483
3484
3485
3486
3487
3488
3489 public List<HRegionInfo> cleanOutCrashedServerReferences(final ServerName sn) {
3490
3491 synchronized (this.regionPlans) {
3492 for (Iterator <Map.Entry<String, RegionPlan>> i = this.regionPlans.entrySet().iterator();
3493 i.hasNext();) {
3494 Map.Entry<String, RegionPlan> e = i.next();
3495 ServerName otherSn = e.getValue().getDestination();
3496
3497 if (otherSn != null && otherSn.equals(sn)) {
3498
3499 i.remove();
3500 }
3501 }
3502 }
3503 List<HRegionInfo> regions = regionStates.serverOffline(watcher, sn);
3504 for (Iterator<HRegionInfo> it = regions.iterator(); it.hasNext(); ) {
3505 HRegionInfo hri = it.next();
3506 String encodedName = hri.getEncodedName();
3507
3508
3509 Lock lock = locker.acquireLock(encodedName);
3510 try {
3511 RegionState regionState = regionStates.getRegionTransitionState(encodedName);
3512 if (regionState == null
3513 || (regionState.getServerName() != null && !regionState.isOnServer(sn))
3514 || !(regionState.isFailedClose() || regionState.isOffline()
3515 || regionState.isPendingOpenOrOpening())) {
3516 LOG.info("Skip " + regionState + " since it is not opening/failed_close"
3517 + " on the dead server any more: " + sn);
3518 it.remove();
3519 } else {
3520 try {
3521
3522 ZKAssign.deleteNodeFailSilent(watcher, hri);
3523 } catch (KeeperException ke) {
3524 server.abort("Unexpected ZK exception deleting node " + hri, ke);
3525 }
3526 if (tableStateManager.isTableState(hri.getTable(),
3527 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3528 regionStates.regionOffline(hri);
3529 it.remove();
3530 continue;
3531 }
3532
3533 regionStates.updateRegionState(hri, State.OFFLINE);
3534 }
3535 } finally {
3536 lock.unlock();
3537 }
3538 }
3539 return regions;
3540 }
3541
3542
3543
3544
3545 public void balance(final RegionPlan plan) {
3546
3547 HRegionInfo hri = plan.getRegionInfo();
3548 TableName tableName = hri.getTable();
3549 if (tableStateManager.isTableState(tableName,
3550 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3551 LOG.info("Ignored moving region of disabling/disabled table "
3552 + tableName);
3553 return;
3554 }
3555
3556
3557 String encodedName = hri.getEncodedName();
3558 ReentrantLock lock = locker.acquireLock(encodedName);
3559 try {
3560 if (!regionStates.isRegionOnline(hri)) {
3561 RegionState state = regionStates.getRegionState(encodedName);
3562 LOG.info("Ignored moving region not assigned: " + hri + ", "
3563 + (state == null ? "not in region states" : state));
3564 return;
3565 }
3566 synchronized (this.regionPlans) {
3567 this.regionPlans.put(plan.getRegionName(), plan);
3568 }
3569 unassign(hri, false, plan.getDestination());
3570 } finally {
3571 lock.unlock();
3572 }
3573 }
3574
3575 public void stop() {
3576 shutdown();
3577 }
3578
3579
3580
3581
3582 public void shutdown() {
3583
3584 synchronized (zkEventWorkerWaitingList){
3585 zkEventWorkerWaitingList.clear();
3586 }
3587
3588
3589 threadPoolExecutorService.shutdownNow();
3590 zkEventWorkers.shutdownNow();
3591 regionStateStore.stop();
3592 }
3593
3594 protected void setEnabledTable(TableName tableName) {
3595 try {
3596 this.tableStateManager.setTableState(tableName,
3597 ZooKeeperProtos.Table.State.ENABLED);
3598 } catch (CoordinatedStateException e) {
3599
3600 String errorMsg = "Unable to ensure that the table " + tableName
3601 + " will be" + " enabled because of a ZooKeeper issue";
3602 LOG.error(errorMsg);
3603 this.server.abort(errorMsg, e);
3604 }
3605 }
3606
3607
3608
3609
3610
3611
3612
3613 private boolean asyncSetOfflineInZooKeeper(final RegionState state,
3614 final AsyncCallback.StringCallback cb, final ServerName destination) {
3615 if (!state.isClosed() && !state.isOffline()) {
3616 this.server.abort("Unexpected state trying to OFFLINE; " + state,
3617 new IllegalStateException());
3618 return false;
3619 }
3620 regionStates.updateRegionState(state.getRegion(), State.OFFLINE);
3621 try {
3622 ZKAssign.asyncCreateNodeOffline(watcher, state.getRegion(),
3623 destination, cb, state);
3624 } catch (KeeperException e) {
3625 if (e instanceof NodeExistsException) {
3626 LOG.warn("Node for " + state.getRegion() + " already exists");
3627 } else {
3628 server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
3629 }
3630 return false;
3631 }
3632 return true;
3633 }
3634
3635 private boolean deleteNodeInStates(String encodedName,
3636 String desc, ServerName sn, EventType... types) {
3637 try {
3638 for (EventType et: types) {
3639 if (ZKAssign.deleteNode(watcher, encodedName, et, sn)) {
3640 return true;
3641 }
3642 }
3643 LOG.info("Failed to delete the " + desc + " node for "
3644 + encodedName + ". The node type may not match");
3645 } catch (NoNodeException e) {
3646 if (LOG.isDebugEnabled()) {
3647 LOG.debug("The " + desc + " node for " + encodedName + " already deleted");
3648 }
3649 } catch (KeeperException ke) {
3650 server.abort("Unexpected ZK exception deleting " + desc
3651 + " node for the region " + encodedName, ke);
3652 }
3653 return false;
3654 }
3655
3656 private void deleteMergingNode(String encodedName, ServerName sn) {
3657 deleteNodeInStates(encodedName, "merging", sn, EventType.RS_ZK_REGION_MERGING,
3658 EventType.RS_ZK_REQUEST_REGION_MERGE, EventType.RS_ZK_REGION_MERGED);
3659 }
3660
3661 private void deleteSplittingNode(String encodedName, ServerName sn) {
3662 deleteNodeInStates(encodedName, "splitting", sn, EventType.RS_ZK_REGION_SPLITTING,
3663 EventType.RS_ZK_REQUEST_REGION_SPLIT, EventType.RS_ZK_REGION_SPLIT);
3664 }
3665
3666 @edu.umd.cs.findbugs.annotations.SuppressWarnings(
3667 value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
3668 justification="Modification of Maps not ATOMIC!!!! FIX!!!")
3669 private void onRegionFailedOpen(
3670 final HRegionInfo hri, final ServerName sn) {
3671 String encodedName = hri.getEncodedName();
3672
3673 AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
3674 if (failedOpenCount == null) {
3675 failedOpenCount = new AtomicInteger();
3676
3677
3678
3679
3680 failedOpenTracker.put(encodedName, failedOpenCount);
3681 }
3682 if (failedOpenCount.incrementAndGet() >= maximumAttempts && !hri.isMetaRegion()) {
3683
3684 regionStates.updateRegionState(hri, State.FAILED_OPEN);
3685
3686
3687 failedOpenTracker.remove(encodedName);
3688 } else {
3689 if (hri.isMetaRegion() && failedOpenCount.get() >= maximumAttempts) {
3690
3691
3692 LOG.warn("Failed to open the hbase:meta region " +
3693 hri.getRegionNameAsString() + " after" +
3694 failedOpenCount.get() + " retries. Continue retrying.");
3695 }
3696
3697
3698 RegionState regionState = regionStates.updateRegionState(hri, State.CLOSED);
3699 if (regionState != null) {
3700
3701
3702 if (getTableStateManager().isTableState(hri.getTable(),
3703 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING) ||
3704 replicasToClose.contains(hri)) {
3705 offlineDisabledRegion(hri);
3706 return;
3707 }
3708
3709 regionStates.updateRegionState(hri, RegionState.State.CLOSED);
3710
3711 removeClosedRegion(hri);
3712 try {
3713 getRegionPlan(hri, sn, true);
3714 } catch (HBaseIOException e) {
3715 LOG.warn("Failed to get region plan", e);
3716 }
3717 invokeAssign(hri, false);
3718 }
3719 }
3720 }
3721
3722 private void onRegionOpen(final HRegionInfo hri, final ServerName sn, long openSeqNum) {
3723 regionOnline(hri, sn, openSeqNum);
3724 if (useZKForAssignment) {
3725 try {
3726
3727 ZKAssign.deleteNodeFailSilent(watcher, hri);
3728 } catch (KeeperException ke) {
3729 server.abort("Unexpected ZK exception deleting node " + hri, ke);
3730 }
3731 }
3732
3733
3734 failedOpenTracker.remove(hri.getEncodedName());
3735 if (getTableStateManager().isTableState(hri.getTable(),
3736 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3737 invokeUnAssign(hri);
3738 }
3739 }
3740
3741 private void onRegionClosed(final HRegionInfo hri) {
3742 if (getTableStateManager().isTableState(hri.getTable(),
3743 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING) ||
3744 replicasToClose.contains(hri)) {
3745 offlineDisabledRegion(hri);
3746 return;
3747 }
3748 regionStates.updateRegionState(hri, RegionState.State.CLOSED);
3749 sendRegionClosedNotification(hri);
3750
3751 removeClosedRegion(hri);
3752 invokeAssign(hri, false);
3753 }
3754
3755 private String checkInStateForSplit(ServerName sn,
3756 final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) {
3757 final RegionState rs_p = regionStates.getRegionState(p);
3758 RegionState rs_a = regionStates.getRegionState(a);
3759 RegionState rs_b = regionStates.getRegionState(b);
3760 if (!(rs_p.isOpenOrSplittingOnServer(sn)
3761 && (rs_a == null || rs_a.isOpenOrSplittingNewOnServer(sn))
3762 && (rs_b == null || rs_b.isOpenOrSplittingNewOnServer(sn)))) {
3763 return "Not in state good for split";
3764 }
3765 return "";
3766 }
3767
3768 private String onRegionSplitReverted(ServerName sn,
3769 final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) {
3770 String s = checkInStateForSplit(sn, p, a, b);
3771 if (!org.apache.commons.lang.StringUtils.isEmpty(s)) {
3772 return s;
3773 }
3774
3775
3776
3777 regionOnline(p, sn);
3778
3779
3780 RegionState regionStateA = regionStates.getRegionState(a);
3781 RegionState regionStateB = regionStates.getRegionState(b);
3782 if (regionStateA != null) {
3783 regionOffline(a);
3784 }
3785 if (regionStateB != null) {
3786 regionOffline(b);
3787 }
3788
3789 if (getTableStateManager().isTableState(p.getTable(),
3790 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3791 invokeUnAssign(p);
3792 }
3793 return null;
3794 }
3795
3796 private String onRegionSplit(ServerName sn, TransitionCode code,
3797 final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) {
3798 String s = checkInStateForSplit(sn, p, a, b);
3799 if (!org.apache.commons.lang.StringUtils.isEmpty(s)) {
3800 return s;
3801 }
3802
3803 regionStates.updateRegionState(a, State.SPLITTING_NEW, sn);
3804 regionStates.updateRegionState(b, State.SPLITTING_NEW, sn);
3805 regionStates.updateRegionState(p, State.SPLITTING);
3806
3807 if (code == TransitionCode.SPLIT) {
3808 if (TEST_SKIP_SPLIT_HANDLING) {
3809 return "Skipping split message, TEST_SKIP_SPLIT_HANDLING is set";
3810 }
3811 regionOffline(p, State.SPLIT);
3812 regionOnline(a, sn, 1);
3813 regionOnline(b, sn, 1);
3814
3815
3816 if (getTableStateManager().isTableState(p.getTable(),
3817 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3818 invokeUnAssign(a);
3819 invokeUnAssign(b);
3820 } else {
3821 Callable<Object> splitReplicasCallable = new Callable<Object>() {
3822 @Override
3823 public Object call() {
3824 doSplittingOfReplicas(p, a, b);
3825 return null;
3826 }
3827 };
3828 threadPoolExecutorService.submit(splitReplicasCallable);
3829 }
3830 } else if (code == TransitionCode.SPLIT_PONR) {
3831 try {
3832 regionStates.splitRegion(p, a, b, sn);
3833 } catch (IOException ioe) {
3834 LOG.info("Failed to record split region " + p.getShortNameToLog());
3835 return "Failed to record the splitting in meta";
3836 }
3837 }
3838 return null;
3839 }
3840
3841 private String onRegionMerge(ServerName sn, TransitionCode code,
3842 final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) {
3843 RegionState rs_p = regionStates.getRegionState(p);
3844 RegionState rs_a = regionStates.getRegionState(a);
3845 RegionState rs_b = regionStates.getRegionState(b);
3846 if (!(rs_a.isOpenOrMergingOnServer(sn) && rs_b.isOpenOrMergingOnServer(sn)
3847 && (rs_p == null || rs_p.isOpenOrMergingNewOnServer(sn)))) {
3848 return "Not in state good for merge";
3849 }
3850
3851 regionStates.updateRegionState(a, State.MERGING);
3852 regionStates.updateRegionState(b, State.MERGING);
3853 regionStates.updateRegionState(p, State.MERGING_NEW, sn);
3854
3855 String encodedName = p.getEncodedName();
3856 if (code == TransitionCode.READY_TO_MERGE) {
3857 mergingRegions.put(encodedName,
3858 new PairOfSameType<HRegionInfo>(a, b));
3859 } else if (code == TransitionCode.MERGED) {
3860
3861 if (TEST_SKIP_MERGE_HANDLING) {
3862 return "Skipping merge message, TEST_SKIP_MERGE_HANDLING is set for merge parent: " + p;
3863 }
3864
3865 mergingRegions.remove(encodedName);
3866 regionOffline(a, State.MERGED);
3867 regionOffline(b, State.MERGED);
3868 regionOnline(p, sn, 1);
3869
3870
3871 if (getTableStateManager().isTableState(p.getTable(),
3872 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3873 invokeUnAssign(p);
3874 } else {
3875 Callable<Object> mergeReplicasCallable = new Callable<Object>() {
3876 @Override
3877 public Object call() {
3878 doMergingOfReplicas(p, a, b);
3879 return null;
3880 }
3881 };
3882 threadPoolExecutorService.submit(mergeReplicasCallable);
3883 }
3884 } else if (code == TransitionCode.MERGE_PONR) {
3885 try {
3886 regionStates.mergeRegions(p, a, b, sn);
3887 } catch (IOException ioe) {
3888 LOG.info("Failed to record merged region " + p.getShortNameToLog());
3889 return "Failed to record the merging in meta";
3890 }
3891 }
3892 return null;
3893 }
3894
3895 private String onRegionMergeReverted(ServerName sn, TransitionCode code,
3896 final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) {
3897 RegionState rs_p = regionStates.getRegionState(p);
3898 String encodedName = p.getEncodedName();
3899 mergingRegions.remove(encodedName);
3900
3901
3902
3903 regionOnline(a, sn);
3904 regionOnline(b, sn);
3905
3906
3907 if (rs_p != null) {
3908 regionOffline(p);
3909 }
3910
3911 if (getTableStateManager().isTableState(p.getTable(),
3912 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3913 invokeUnAssign(a);
3914 invokeUnAssign(b);
3915 }
3916
3917 return null;
3918 }
3919
3920
3921
3922
3923
3924 private boolean handleRegionMerging(final RegionTransition rt, final String encodedName,
3925 final String prettyPrintedRegionName, final ServerName sn) {
3926 if (!serverManager.isServerOnline(sn)) {
3927 LOG.warn("Dropped merging! ServerName=" + sn + " unknown.");
3928 return false;
3929 }
3930 byte [] payloadOfMerging = rt.getPayload();
3931 List<HRegionInfo> mergingRegions;
3932 try {
3933 mergingRegions = HRegionInfo.parseDelimitedFrom(
3934 payloadOfMerging, 0, payloadOfMerging.length);
3935 } catch (IOException e) {
3936 LOG.error("Dropped merging! Failed reading " + rt.getEventType()
3937 + " payload for " + prettyPrintedRegionName);
3938 return false;
3939 }
3940 assert mergingRegions.size() == 3;
3941 HRegionInfo p = mergingRegions.get(0);
3942 HRegionInfo hri_a = mergingRegions.get(1);
3943 HRegionInfo hri_b = mergingRegions.get(2);
3944
3945 RegionState rs_p = regionStates.getRegionState(p);
3946 RegionState rs_a = regionStates.getRegionState(hri_a);
3947 RegionState rs_b = regionStates.getRegionState(hri_b);
3948
3949 if (!((rs_a == null || rs_a.isOpenOrMergingOnServer(sn))
3950 && (rs_b == null || rs_b.isOpenOrMergingOnServer(sn))
3951 && (rs_p == null || rs_p.isOpenOrMergingNewOnServer(sn)))) {
3952 LOG.warn("Dropped merging! Not in state good for MERGING; rs_p="
3953 + rs_p + ", rs_a=" + rs_a + ", rs_b=" + rs_b);
3954 return false;
3955 }
3956
3957 EventType et = rt.getEventType();
3958 if (et == EventType.RS_ZK_REQUEST_REGION_MERGE) {
3959 try {
3960 RegionMergeCoordination.RegionMergeDetails std =
3961 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
3962 .getRegionMergeCoordination().getDefaultDetails();
3963 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
3964 .getRegionMergeCoordination().processRegionMergeRequest(p, hri_a, hri_b, sn, std);
3965 if (((ZkRegionMergeCoordination.ZkRegionMergeDetails) std).getZnodeVersion() == -1) {
3966 byte[] data = ZKAssign.getData(watcher, encodedName);
3967 EventType currentType = null;
3968 if (data != null) {
3969 RegionTransition newRt = RegionTransition.parseFrom(data);
3970 currentType = newRt.getEventType();
3971 }
3972 if (currentType == null || (currentType != EventType.RS_ZK_REGION_MERGED
3973 && currentType != EventType.RS_ZK_REGION_MERGING)) {
3974 LOG.warn("Failed to transition pending_merge node "
3975 + encodedName + " to merging, it's now " + currentType);
3976 return false;
3977 }
3978 }
3979 } catch (Exception e) {
3980 LOG.warn("Failed to transition pending_merge node "
3981 + encodedName + " to merging", e);
3982 return false;
3983 }
3984 }
3985
3986 synchronized (regionStates) {
3987 regionStates.updateRegionState(hri_a, State.MERGING);
3988 regionStates.updateRegionState(hri_b, State.MERGING);
3989 regionStates.updateRegionState(p, State.MERGING_NEW, sn);
3990
3991 if (TEST_SKIP_MERGE_HANDLING) {
3992 LOG.warn("Skipping merge message, TEST_SKIP_MERGE_HANDLING is set for merge parent: " + p);
3993 return true;
3994 }
3995
3996 if (et != EventType.RS_ZK_REGION_MERGED) {
3997 this.mergingRegions.put(encodedName,
3998 new PairOfSameType<HRegionInfo>(hri_a, hri_b));
3999 } else {
4000 this.mergingRegions.remove(encodedName);
4001 regionOffline(hri_a, State.MERGED);
4002 regionOffline(hri_b, State.MERGED);
4003 regionOnline(p, sn);
4004 }
4005 }
4006
4007 if (et == EventType.RS_ZK_REGION_MERGED) {
4008 doMergingOfReplicas(p, hri_a, hri_b);
4009 LOG.debug("Handling MERGED event for " + encodedName + "; deleting node");
4010
4011 try {
4012 boolean successful = false;
4013 while (!successful) {
4014
4015
4016 successful = ZKAssign.deleteNode(watcher, encodedName,
4017 EventType.RS_ZK_REGION_MERGED, sn);
4018 }
4019 } catch (KeeperException e) {
4020 if (e instanceof NoNodeException) {
4021 String znodePath = ZKUtil.joinZNode(watcher.splitLogZNode, encodedName);
4022 LOG.debug("The znode " + znodePath + " does not exist. May be deleted already.");
4023 } else {
4024 server.abort("Error deleting MERGED node " + encodedName, e);
4025 }
4026 }
4027 LOG.info("Handled MERGED event; merged=" + p.getRegionNameAsString()
4028 + ", region_a=" + hri_a.getRegionNameAsString() + ", region_b="
4029 + hri_b.getRegionNameAsString() + ", on " + sn);
4030
4031
4032 if (tableStateManager.isTableState(p.getTable(),
4033 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
4034 unassign(p);
4035 }
4036 }
4037 return true;
4038 }
4039
4040
4041
4042
4043 private boolean handleRegionSplitting(final RegionTransition rt, final String encodedName,
4044 final String prettyPrintedRegionName, final ServerName sn) {
4045 if (!serverManager.isServerOnline(sn)) {
4046 LOG.warn("Dropped splitting! ServerName=" + sn + " unknown.");
4047 return false;
4048 }
4049 byte [] payloadOfSplitting = rt.getPayload();
4050 List<HRegionInfo> splittingRegions;
4051 try {
4052 splittingRegions = HRegionInfo.parseDelimitedFrom(
4053 payloadOfSplitting, 0, payloadOfSplitting.length);
4054 } catch (IOException e) {
4055 LOG.error("Dropped splitting! Failed reading " + rt.getEventType()
4056 + " payload for " + prettyPrintedRegionName);
4057 return false;
4058 }
4059 assert splittingRegions.size() == 2;
4060 HRegionInfo hri_a = splittingRegions.get(0);
4061 HRegionInfo hri_b = splittingRegions.get(1);
4062
4063 RegionState rs_p = regionStates.getRegionState(encodedName);
4064 RegionState rs_a = regionStates.getRegionState(hri_a);
4065 RegionState rs_b = regionStates.getRegionState(hri_b);
4066
4067 if (!((rs_p == null || rs_p.isOpenOrSplittingOnServer(sn))
4068 && (rs_a == null || rs_a.isOpenOrSplittingNewOnServer(sn))
4069 && (rs_b == null || rs_b.isOpenOrSplittingNewOnServer(sn)))) {
4070 LOG.warn("Dropped splitting! Not in state good for SPLITTING; rs_p="
4071 + rs_p + ", rs_a=" + rs_a + ", rs_b=" + rs_b);
4072 return false;
4073 }
4074
4075 if (rs_p == null) {
4076
4077 rs_p = regionStates.updateRegionState(rt, State.OPEN);
4078 if (rs_p == null) {
4079 LOG.warn("Received splitting for region " + prettyPrintedRegionName
4080 + " from server " + sn + " but it doesn't exist anymore,"
4081 + " probably already processed its split");
4082 return false;
4083 }
4084 regionStates.regionOnline(rs_p.getRegion(), sn);
4085 }
4086
4087 HRegionInfo p = rs_p.getRegion();
4088 EventType et = rt.getEventType();
4089 if (et == EventType.RS_ZK_REQUEST_REGION_SPLIT) {
4090 try {
4091 SplitTransactionDetails std =
4092 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
4093 .getSplitTransactionCoordination().getDefaultDetails();
4094 if (((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
4095 .getSplitTransactionCoordination().processTransition(p, hri_a, hri_b, sn, std) == -1) {
4096 byte[] data = ZKAssign.getData(watcher, encodedName);
4097 EventType currentType = null;
4098 if (data != null) {
4099 RegionTransition newRt = RegionTransition.parseFrom(data);
4100 currentType = newRt.getEventType();
4101 }
4102 if (currentType == null
4103 || (currentType != EventType.RS_ZK_REGION_SPLIT && currentType != EventType.RS_ZK_REGION_SPLITTING)) {
4104 LOG.warn("Failed to transition pending_split node " + encodedName
4105 + " to splitting, it's now " + currentType);
4106 return false;
4107 }
4108 }
4109 } catch (Exception e) {
4110 LOG.warn("Failed to transition pending_split node " + encodedName + " to splitting", e);
4111 return false;
4112 }
4113 }
4114
4115 synchronized (regionStates) {
4116 splitRegions.put(p, new PairOfSameType<HRegionInfo>(hri_a, hri_b));
4117 regionStates.updateRegionState(hri_a, State.SPLITTING_NEW, sn);
4118 regionStates.updateRegionState(hri_b, State.SPLITTING_NEW, sn);
4119 regionStates.updateRegionState(rt, State.SPLITTING);
4120
4121
4122
4123 if (TEST_SKIP_SPLIT_HANDLING) {
4124 LOG.warn("Skipping split message, TEST_SKIP_SPLIT_HANDLING is set");
4125 return true;
4126 }
4127
4128 if (et == EventType.RS_ZK_REGION_SPLIT) {
4129 regionOffline(p, State.SPLIT);
4130 regionOnline(hri_a, sn);
4131 regionOnline(hri_b, sn);
4132 splitRegions.remove(p);
4133 }
4134 }
4135
4136 if (et == EventType.RS_ZK_REGION_SPLIT) {
4137
4138 doSplittingOfReplicas(rs_p.getRegion(), hri_a, hri_b);
4139 LOG.debug("Handling SPLIT event for " + encodedName + "; deleting node");
4140
4141 try {
4142 boolean successful = false;
4143 while (!successful) {
4144
4145
4146 successful = ZKAssign.deleteNode(watcher, encodedName,
4147 EventType.RS_ZK_REGION_SPLIT, sn);
4148 }
4149 } catch (KeeperException e) {
4150 if (e instanceof NoNodeException) {
4151 String znodePath = ZKUtil.joinZNode(watcher.splitLogZNode, encodedName);
4152 LOG.debug("The znode " + znodePath + " does not exist. May be deleted already.");
4153 } else {
4154 server.abort("Error deleting SPLIT node " + encodedName, e);
4155 }
4156 }
4157 LOG.info("Handled SPLIT event; parent=" + p.getRegionNameAsString()
4158 + ", daughter a=" + hri_a.getRegionNameAsString() + ", daughter b="
4159 + hri_b.getRegionNameAsString() + ", on " + sn);
4160
4161
4162 if (tableStateManager.isTableState(p.getTable(),
4163 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
4164 unassign(hri_a);
4165 unassign(hri_b);
4166 }
4167 }
4168 return true;
4169 }
4170
4171 private void doMergingOfReplicas(HRegionInfo mergedHri, final HRegionInfo hri_a,
4172 final HRegionInfo hri_b) {
4173
4174
4175 List<HRegionInfo> unmergedRegions = new ArrayList<HRegionInfo>();
4176 unmergedRegions.add(hri_a);
4177 unmergedRegions.add(hri_b);
4178 Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(unmergedRegions);
4179 Collection<List<HRegionInfo>> c = map.values();
4180 for (List<HRegionInfo> l : c) {
4181 for (HRegionInfo h : l) {
4182 if (!RegionReplicaUtil.isDefaultReplica(h)) {
4183 LOG.debug("Unassigning un-merged replica " + h);
4184 unassign(h);
4185 }
4186 }
4187 }
4188 int numReplicas = 1;
4189 try {
4190 numReplicas = server.getTableDescriptors().get(mergedHri.getTable()).
4191 getRegionReplication();
4192 } catch (IOException e) {
4193 LOG.warn("Couldn't get the replication attribute of the table " + mergedHri.getTable() +
4194 " due to " + e.getMessage() + ". The assignment of replicas for the merged region " +
4195 "will not be done");
4196 }
4197 List<HRegionInfo> regions = new ArrayList<HRegionInfo>();
4198 for (int i = 1; i < numReplicas; i++) {
4199 regions.add(RegionReplicaUtil.getRegionInfoForReplica(mergedHri, i));
4200 }
4201 try {
4202 assign(regions);
4203 } catch (IOException ioe) {
4204 LOG.warn("Couldn't assign all replica(s) of region " + mergedHri + " because of " +
4205 ioe.getMessage());
4206 } catch (InterruptedException ie) {
4207 LOG.warn("Couldn't assign all replica(s) of region " + mergedHri+ " because of " +
4208 ie.getMessage());
4209 }
4210 }
4211
4212 private void doSplittingOfReplicas(final HRegionInfo parentHri, final HRegionInfo hri_a,
4213 final HRegionInfo hri_b) {
4214
4215
4216
4217 int numReplicas = 1;
4218 try {
4219 numReplicas = server.getTableDescriptors().get(parentHri.getTable()).
4220 getRegionReplication();
4221 } catch (IOException e) {
4222 LOG.warn("Couldn't get the replication attribute of the table " + parentHri.getTable() +
4223 " due to " + e.getMessage() + ". The assignment of daughter replicas " +
4224 "replicas will not be done");
4225 }
4226
4227 List<HRegionInfo> parentRegion = new ArrayList<HRegionInfo>();
4228 parentRegion.add(parentHri);
4229 Map<ServerName, List<HRegionInfo>> currentAssign =
4230 regionStates.getRegionAssignments(parentRegion);
4231 Collection<List<HRegionInfo>> c = currentAssign.values();
4232 for (List<HRegionInfo> l : c) {
4233 for (HRegionInfo h : l) {
4234 if (!RegionReplicaUtil.isDefaultReplica(h)) {
4235 LOG.debug("Unassigning parent's replica " + h);
4236 unassign(h);
4237 }
4238 }
4239 }
4240
4241 Map<HRegionInfo, ServerName> map = new HashMap<HRegionInfo, ServerName>();
4242 for (int i = 1; i < numReplicas; i++) {
4243 prepareDaughterReplicaForAssignment(hri_a, parentHri, i, map);
4244 prepareDaughterReplicaForAssignment(hri_b, parentHri, i, map);
4245 }
4246 try {
4247 assign(map);
4248 } catch (IOException e) {
4249 LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)");
4250 } catch (InterruptedException e) {
4251 LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)");
4252 }
4253 }
4254
4255 private void prepareDaughterReplicaForAssignment(HRegionInfo daughterHri, HRegionInfo parentHri,
4256 int replicaId, Map<HRegionInfo, ServerName> map) {
4257 HRegionInfo parentReplica = RegionReplicaUtil.getRegionInfoForReplica(parentHri, replicaId);
4258 HRegionInfo daughterReplica = RegionReplicaUtil.getRegionInfoForReplica(daughterHri,
4259 replicaId);
4260 LOG.debug("Created replica region for daughter " + daughterReplica);
4261 ServerName sn;
4262 if ((sn = regionStates.getRegionServerOfRegion(parentReplica)) != null) {
4263 map.put(daughterReplica, sn);
4264 } else {
4265 List<ServerName> servers = serverManager.getOnlineServersList();
4266 sn = servers.get((new Random(System.currentTimeMillis())).nextInt(servers.size()));
4267 map.put(daughterReplica, sn);
4268 }
4269 }
4270
4271 public Set<HRegionInfo> getReplicasToClose() {
4272 return replicasToClose;
4273 }
4274
4275
4276
4277
4278
4279
4280 private void regionOffline(final HRegionInfo regionInfo, final State state) {
4281 regionStates.regionOffline(regionInfo, state);
4282 removeClosedRegion(regionInfo);
4283
4284 clearRegionPlan(regionInfo);
4285 balancer.regionOffline(regionInfo);
4286
4287
4288 sendRegionClosedNotification(regionInfo);
4289
4290 if (state != null && state.equals(State.SPLIT)) {
4291 Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1);
4292 c.add(regionInfo);
4293 Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(c);
4294 Collection<List<HRegionInfo>> allReplicas = map.values();
4295 for (List<HRegionInfo> list : allReplicas) {
4296 replicasToClose.addAll(list);
4297 }
4298 }
4299 else if (state != null && state.equals(State.MERGED)) {
4300 Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1);
4301 c.add(regionInfo);
4302 Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(c);
4303 Collection<List<HRegionInfo>> allReplicas = map.values();
4304 for (List<HRegionInfo> list : allReplicas) {
4305 replicasToClose.addAll(list);
4306 }
4307 }
4308 }
4309
4310 private void sendRegionOpenedNotification(final HRegionInfo regionInfo,
4311 final ServerName serverName) {
4312 if (!this.listeners.isEmpty()) {
4313 for (AssignmentListener listener : this.listeners) {
4314 listener.regionOpened(regionInfo, serverName);
4315 }
4316 }
4317 }
4318
4319 private void sendRegionClosedNotification(final HRegionInfo regionInfo) {
4320 if (!this.listeners.isEmpty()) {
4321 for (AssignmentListener listener : this.listeners) {
4322 listener.regionClosed(regionInfo);
4323 }
4324 }
4325 }
4326
4327
4328
4329
4330
4331
4332
4333
4334
4335
4336
4337
4338
4339
4340
4341
4342
4343
4344
4345
4346
4347
4348
4349
4350
4351
4352
4353
4354
4355
4356
4357
4358
4359
4360
4361
4362
4363
4364
4365
4366
4367
4368
4369
4370
4371 protected String onRegionTransition(final ServerName serverName,
4372 final RegionStateTransition transition) {
4373 TransitionCode code = transition.getTransitionCode();
4374 HRegionInfo hri = HRegionInfo.convert(transition.getRegionInfo(0));
4375 RegionState current = regionStates.getRegionState(hri);
4376 if (LOG.isDebugEnabled()) {
4377 LOG.debug("Got transition " + code + " for "
4378 + (current != null ? current.toString() : hri.getShortNameToLog())
4379 + " from " + serverName);
4380 }
4381 String errorMsg = null;
4382 switch (code) {
4383 case OPENED:
4384 if (current != null && current.isOpened() && current.isOnServer(serverName)) {
4385 LOG.info("Region " + hri.getShortNameToLog() + " is already " + current.getState() + " on "
4386 + serverName);
4387 break;
4388 }
4389 case FAILED_OPEN:
4390 if (current == null
4391 || !current.isPendingOpenOrOpeningOnServer(serverName)) {
4392 errorMsg = hri.getShortNameToLog()
4393 + " is not pending open on " + serverName;
4394 } else if (code == TransitionCode.FAILED_OPEN) {
4395 onRegionFailedOpen(hri, serverName);
4396 } else {
4397 long openSeqNum = HConstants.NO_SEQNUM;
4398 if (transition.hasOpenSeqNum()) {
4399 openSeqNum = transition.getOpenSeqNum();
4400 }
4401 if (openSeqNum < 0) {
4402 errorMsg = "Newly opened region has invalid open seq num " + openSeqNum;
4403 } else {
4404 onRegionOpen(hri, serverName, openSeqNum);
4405 }
4406 }
4407 break;
4408
4409 case CLOSED:
4410 if (current == null
4411 || !current.isPendingCloseOrClosingOnServer(serverName)) {
4412 errorMsg = hri.getShortNameToLog()
4413 + " is not pending close on " + serverName;
4414 } else {
4415 onRegionClosed(hri);
4416 }
4417 break;
4418
4419 case READY_TO_SPLIT:
4420 try {
4421 regionStateListener.onRegionSplit(hri);
4422 } catch (IOException exp) {
4423 errorMsg = StringUtils.stringifyException(exp);
4424 }
4425 break;
4426 case SPLIT_PONR:
4427 case SPLIT:
4428 errorMsg =
4429 onRegionSplit(serverName, code, hri, HRegionInfo.convert(transition.getRegionInfo(1)),
4430 HRegionInfo.convert(transition.getRegionInfo(2)));
4431 break;
4432
4433 case SPLIT_REVERTED:
4434 errorMsg =
4435 onRegionSplitReverted(serverName, hri,
4436 HRegionInfo.convert(transition.getRegionInfo(1)),
4437 HRegionInfo.convert(transition.getRegionInfo(2)));
4438 if (org.apache.commons.lang.StringUtils.isEmpty(errorMsg)) {
4439 try {
4440 regionStateListener.onRegionSplitReverted(hri);
4441 } catch (IOException exp) {
4442 LOG.warn(StringUtils.stringifyException(exp));
4443 }
4444 }
4445 break;
4446 case READY_TO_MERGE:
4447 case MERGE_PONR:
4448 case MERGED:
4449 errorMsg = onRegionMerge(serverName, code, hri,
4450 HRegionInfo.convert(transition.getRegionInfo(1)),
4451 HRegionInfo.convert(transition.getRegionInfo(2)));
4452 if (code == TransitionCode.MERGED && org.apache.commons.lang.StringUtils.isEmpty(errorMsg)) {
4453 try {
4454 regionStateListener.onRegionMerged(hri);
4455 } catch (IOException exp) {
4456 errorMsg = StringUtils.stringifyException(exp);
4457 }
4458 }
4459 break;
4460 case MERGE_REVERTED:
4461 errorMsg = onRegionMergeReverted(serverName, code, hri,
4462 HRegionInfo.convert(transition.getRegionInfo(1)),
4463 HRegionInfo.convert(transition.getRegionInfo(2)));
4464 break;
4465
4466 default:
4467 errorMsg = "Unexpected transition code " + code;
4468 }
4469 if (errorMsg != null) {
4470 LOG.error("Failed to transtion region from " + current + " to "
4471 + code + " by " + serverName + ": " + errorMsg);
4472 }
4473 return errorMsg;
4474 }
4475
4476
4477
4478
4479 public LoadBalancer getBalancer() {
4480 return this.balancer;
4481 }
4482
4483 public Map<ServerName, List<HRegionInfo>>
4484 getSnapShotOfAssignment(Collection<HRegionInfo> infos) {
4485 return getRegionStates().getRegionAssignments(infos);
4486 }
4487
4488 void setRegionStateListener(RegionStateListener listener) {
4489 this.regionStateListener = listener;
4490 }
4491
4492
4493
4494
4495 @VisibleForTesting
4496 public static void setTestSkipSplitHandling(boolean skipSplitHandling) {
4497 TEST_SKIP_SPLIT_HANDLING = skipSplitHandling;
4498 }
4499
4500
4501
4502
4503 @VisibleForTesting
4504 public static void setTestSkipMergeHandling(boolean skipMergeHandling) {
4505 TEST_SKIP_MERGE_HANDLING = skipMergeHandling;
4506 }
4507 }