View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.util.ArrayList;
24  import java.util.Arrays;
25  import java.util.Collection;
26  import java.util.Collections;
27  import java.util.HashMap;
28  import java.util.HashSet;
29  import java.util.Iterator;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.NavigableMap;
33  import java.util.Random;
34  import java.util.Set;
35  import java.util.TreeMap;
36  import java.util.concurrent.Callable;
37  import java.util.concurrent.ConcurrentHashMap;
38  import java.util.concurrent.CopyOnWriteArrayList;
39  import java.util.concurrent.ThreadFactory;
40  import java.util.concurrent.TimeUnit;
41  import java.util.concurrent.atomic.AtomicBoolean;
42  import java.util.concurrent.atomic.AtomicInteger;
43  import java.util.concurrent.locks.Lock;
44  import java.util.concurrent.locks.ReentrantLock;
45  
46  import org.apache.commons.logging.Log;
47  import org.apache.commons.logging.LogFactory;
48  import org.apache.hadoop.classification.InterfaceAudience;
49  import org.apache.hadoop.conf.Configuration;
50  import org.apache.hadoop.fs.FileSystem;
51  import org.apache.hadoop.fs.Path;
52  import org.apache.hadoop.hbase.CoordinatedStateException;
53  import org.apache.hadoop.hbase.HBaseIOException;
54  import org.apache.hadoop.hbase.HConstants;
55  import org.apache.hadoop.hbase.HRegionInfo;
56  import org.apache.hadoop.hbase.HRegionLocation;
57  import org.apache.hadoop.hbase.HTableDescriptor;
58  import org.apache.hadoop.hbase.NotServingRegionException;
59  import org.apache.hadoop.hbase.RegionLocations;
60  import org.apache.hadoop.hbase.RegionTransition;
61  import org.apache.hadoop.hbase.Server;
62  import org.apache.hadoop.hbase.ServerName;
63  import org.apache.hadoop.hbase.TableName;
64  import org.apache.hadoop.hbase.TableNotFoundException;
65  import org.apache.hadoop.hbase.TableStateManager;
66  import org.apache.hadoop.hbase.client.RegionReplicaUtil;
67  import org.apache.hadoop.hbase.MetaTableAccessor;
68  import org.apache.hadoop.hbase.client.Result;
69  import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
70  import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
71  import org.apache.hadoop.hbase.coordination.RegionMergeCoordination;
72  import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination.SplitTransactionDetails;
73  import org.apache.hadoop.hbase.coordination.ZkOpenRegionCoordination;
74  import org.apache.hadoop.hbase.coordination.ZkRegionMergeCoordination;
75  import org.apache.hadoop.hbase.exceptions.DeserializationException;
76  import org.apache.hadoop.hbase.executor.EventHandler;
77  import org.apache.hadoop.hbase.executor.EventType;
78  import org.apache.hadoop.hbase.executor.ExecutorService;
79  import org.apache.hadoop.hbase.ipc.RpcClient;
80  import org.apache.hadoop.hbase.ipc.RpcClient.FailedServerException;
81  import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
82  import org.apache.hadoop.hbase.master.RegionState.State;
83  import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper;
84  import org.apache.hadoop.hbase.master.balancer.FavoredNodeLoadBalancer;
85  import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler;
86  import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
87  import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
88  import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler;
89  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
90  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
91  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
92  import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
93  import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
94  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
95  import org.apache.hadoop.hbase.regionserver.wal.HLog;
96  import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
97  import org.apache.hadoop.hbase.util.ConfigUtil;
98  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
99  import org.apache.hadoop.hbase.util.FSUtils;
100 import org.apache.hadoop.hbase.util.KeyLocker;
101 import org.apache.hadoop.hbase.util.Pair;
102 import org.apache.hadoop.hbase.util.PairOfSameType;
103 import org.apache.hadoop.hbase.util.Threads;
104 import org.apache.hadoop.hbase.util.Triple;
105 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
106 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
107 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
108 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
109 import org.apache.hadoop.ipc.RemoteException;
110 import org.apache.zookeeper.AsyncCallback;
111 import org.apache.zookeeper.KeeperException;
112 import org.apache.zookeeper.KeeperException.NoNodeException;
113 import org.apache.zookeeper.KeeperException.NodeExistsException;
114 import org.apache.zookeeper.data.Stat;
115 
116 import com.google.common.annotations.VisibleForTesting;
117 import com.google.common.collect.LinkedHashMultimap;
118 
119 /**
120  * Manages and performs region assignment.
121  * <p>
122  * Monitors ZooKeeper for events related to regions in transition.
123  * <p>
124  * Handles existing regions in transition during master failover.
125  */
126 @InterfaceAudience.Private
127 public class AssignmentManager extends ZooKeeperListener {
128   private static final Log LOG = LogFactory.getLog(AssignmentManager.class);
129 
130   public static final ServerName HBCK_CODE_SERVERNAME = ServerName.valueOf(HConstants.HBCK_CODE_NAME,
131       -1, -1L);
132 
133   static final String ALREADY_IN_TRANSITION_WAITTIME
134     = "hbase.assignment.already.intransition.waittime";
135   static final int DEFAULT_ALREADY_IN_TRANSITION_WAITTIME = 60000; // 1 minute
136 
137   protected final Server server;
138 
139   private ServerManager serverManager;
140 
141   private boolean shouldAssignRegionsWithFavoredNodes;
142 
143   private LoadBalancer balancer;
144 
145   private final MetricsAssignmentManager metricsAssignmentManager;
146 
147   private final TableLockManager tableLockManager;
148 
149   private AtomicInteger numRegionsOpened = new AtomicInteger(0);
150 
151   final private KeyLocker<String> locker = new KeyLocker<String>();
152 
153   Set<HRegionInfo> replicasToClose = Collections.synchronizedSet(new HashSet<HRegionInfo>());
154 
155   /**
156    * Map of regions to reopen after the schema of a table is changed. Key -
157    * encoded region name, value - HRegionInfo
158    */
159   private final Map <String, HRegionInfo> regionsToReopen;
160 
161   /*
162    * Maximum times we recurse an assignment/unassignment.
163    * See below in {@link #assign()} and {@link #unassign()}.
164    */
165   private final int maximumAttempts;
166 
167   /**
168    * Map of two merging regions from the region to be created.
169    */
170   private final Map<String, PairOfSameType<HRegionInfo>> mergingRegions
171     = new HashMap<String, PairOfSameType<HRegionInfo>>();
172 
173   /**
174    * The sleep time for which the assignment will wait before retrying in case of hbase:meta assignment
175    * failure due to lack of availability of region plan
176    */
177   private final long sleepTimeBeforeRetryingMetaAssignment;
178 
179   /** Plans for region movement. Key is the encoded version of a region name*/
180   // TODO: When do plans get cleaned out?  Ever? In server open and in server
181   // shutdown processing -- St.Ack
182   // All access to this Map must be synchronized.
183   final NavigableMap<String, RegionPlan> regionPlans =
184     new TreeMap<String, RegionPlan>();
185 
186   private final TableStateManager tableStateManager;
187 
188   private final ExecutorService executorService;
189 
190   // For unit tests, keep track of calls to ClosedRegionHandler
191   private Map<HRegionInfo, AtomicBoolean> closedRegionHandlerCalled = null;
192 
193   // For unit tests, keep track of calls to OpenedRegionHandler
194   private Map<HRegionInfo, AtomicBoolean> openedRegionHandlerCalled = null;
195 
196   //Thread pool executor service for timeout monitor
197   private java.util.concurrent.ExecutorService threadPoolExecutorService;
198 
199   // A bunch of ZK events workers. Each is a single thread executor service
200   private final java.util.concurrent.ExecutorService zkEventWorkers;
201 
202   private List<EventType> ignoreStatesRSOffline = Arrays.asList(
203       EventType.RS_ZK_REGION_FAILED_OPEN, EventType.RS_ZK_REGION_CLOSED);
204 
205   private final RegionStates regionStates;
206 
207   // The threshold to use bulk assigning. Using bulk assignment
208   // only if assigning at least this many regions to at least this
209   // many servers. If assigning fewer regions to fewer servers,
210   // bulk assigning may be not as efficient.
211   private final int bulkAssignThresholdRegions;
212   private final int bulkAssignThresholdServers;
213 
214   // Should bulk assignment wait till all regions are assigned,
215   // or it is timed out?  This is useful to measure bulk assignment
216   // performance, but not needed in most use cases.
217   private final boolean bulkAssignWaitTillAllAssigned;
218 
219   /**
220    * Indicator that AssignmentManager has recovered the region states so
221    * that ServerShutdownHandler can be fully enabled and re-assign regions
222    * of dead servers. So that when re-assignment happens, AssignmentManager
223    * has proper region states.
224    *
225    * Protected to ease testing.
226    */
227   protected final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
228 
229   /**
230    * A map to track the count a region fails to open in a row.
231    * So that we don't try to open a region forever if the failure is
232    * unrecoverable.  We don't put this information in region states
233    * because we don't expect this to happen frequently; we don't
234    * want to copy this information over during each state transition either.
235    */
236   private final ConcurrentHashMap<String, AtomicInteger>
237     failedOpenTracker = new ConcurrentHashMap<String, AtomicInteger>();
238 
239   // A flag to indicate if we are using ZK for region assignment
240   private final boolean useZKForAssignment;
241 
242   // In case not using ZK for region assignment, region states
243   // are persisted in meta with a state store
244   private final RegionStateStore regionStateStore;
245 
246   /**
247    * For testing only!  Set to true to skip handling of split.
248    */
249   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
250   public static boolean TEST_SKIP_SPLIT_HANDLING = false;
251 
252   /** Listeners that are called on assignment events. */
253   private List<AssignmentListener> listeners = new CopyOnWriteArrayList<AssignmentListener>();
254 
255   /**
256    * Constructs a new assignment manager.
257    *
258    * @param server instance of HMaster this AM running inside
259    * @param serverManager serverManager for associated HMaster
260    * @param balancer implementation of {@link LoadBalancer}
261    * @param service Executor service
262    * @param metricsMaster metrics manager
263    * @param tableLockManager TableLock manager
264    * @throws KeeperException
265    * @throws IOException
266    */
267   public AssignmentManager(Server server, ServerManager serverManager,
268       final LoadBalancer balancer,
269       final ExecutorService service, MetricsMaster metricsMaster,
270       final TableLockManager tableLockManager) throws KeeperException,
271         IOException, CoordinatedStateException {
272     super(server.getZooKeeper());
273     this.server = server;
274     this.serverManager = serverManager;
275     this.executorService = service;
276     this.regionStateStore = new RegionStateStore(server);
277     this.regionsToReopen = Collections.synchronizedMap
278                            (new HashMap<String, HRegionInfo> ());
279     Configuration conf = server.getConfiguration();
280     // Only read favored nodes if using the favored nodes load balancer.
281     this.shouldAssignRegionsWithFavoredNodes = conf.getClass(
282            HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class).equals(
283            FavoredNodeLoadBalancer.class);
284     try {
285       if (server.getCoordinatedStateManager() != null) {
286         this.tableStateManager = server.getCoordinatedStateManager().getTableStateManager();
287       } else {
288         this.tableStateManager = null;
289       }
290     } catch (InterruptedException e) {
291       throw new InterruptedIOException();
292     }
293     // This is the max attempts, not retries, so it should be at least 1.
294     this.maximumAttempts = Math.max(1,
295       this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10));
296     this.sleepTimeBeforeRetryingMetaAssignment = this.server.getConfiguration().getLong(
297         "hbase.meta.assignment.retry.sleeptime", 1000l);
298     this.balancer = balancer;
299     int maxThreads = conf.getInt("hbase.assignment.threads.max", 30);
300     this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool(
301       maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("AM."));
302     this.regionStates = new RegionStates(
303       server, tableStateManager, serverManager, regionStateStore);
304 
305     this.bulkAssignWaitTillAllAssigned =
306       conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false);
307     this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7);
308     this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3);
309 
310     int workers = conf.getInt("hbase.assignment.zkevent.workers", 20);
311     ThreadFactory threadFactory = Threads.newDaemonThreadFactory("AM.ZK.Worker");
312     zkEventWorkers = Threads.getBoundedCachedThreadPool(workers, 60L,
313             TimeUnit.SECONDS, threadFactory);
314     this.tableLockManager = tableLockManager;
315 
316     this.metricsAssignmentManager = new MetricsAssignmentManager();
317     useZKForAssignment = ConfigUtil.useZKForAssignment(conf);
318   }
319 
320   /**
321    * Add the listener to the notification list.
322    * @param listener The AssignmentListener to register
323    */
324   public void registerListener(final AssignmentListener listener) {
325     this.listeners.add(listener);
326   }
327 
328   /**
329    * Remove the listener from the notification list.
330    * @param listener The AssignmentListener to unregister
331    */
332   public boolean unregisterListener(final AssignmentListener listener) {
333     return this.listeners.remove(listener);
334   }
335 
336   /**
337    * @return Instance of ZKTableStateManager.
338    */
339   public TableStateManager getTableStateManager() {
340     // These are 'expensive' to make involving trip to zk ensemble so allow
341     // sharing.
342     return this.tableStateManager;
343   }
344 
345   /**
346    * This SHOULD not be public. It is public now
347    * because of some unit tests.
348    *
349    * TODO: make it package private and keep RegionStates in the master package
350    */
351   public RegionStates getRegionStates() {
352     return regionStates;
353   }
354 
355   /**
356    * Used in some tests to mock up region state in meta
357    */
358   @VisibleForTesting
359   RegionStateStore getRegionStateStore() {
360     return regionStateStore;
361   }
362 
363   public RegionPlan getRegionReopenPlan(HRegionInfo hri) {
364     return new RegionPlan(hri, null, regionStates.getRegionServerOfRegion(hri));
365   }
366 
367   /**
368    * Add a regionPlan for the specified region.
369    * @param encodedName
370    * @param plan
371    */
372   public void addPlan(String encodedName, RegionPlan plan) {
373     synchronized (regionPlans) {
374       regionPlans.put(encodedName, plan);
375     }
376   }
377 
378   /**
379    * Add a map of region plans.
380    */
381   public void addPlans(Map<String, RegionPlan> plans) {
382     synchronized (regionPlans) {
383       regionPlans.putAll(plans);
384     }
385   }
386 
387   /**
388    * Set the list of regions that will be reopened
389    * because of an update in table schema
390    *
391    * @param regions
392    *          list of regions that should be tracked for reopen
393    */
394   public void setRegionsToReopen(List <HRegionInfo> regions) {
395     for(HRegionInfo hri : regions) {
396       regionsToReopen.put(hri.getEncodedName(), hri);
397     }
398   }
399 
400   /**
401    * Used by the client to identify if all regions have the schema updates
402    *
403    * @param tableName
404    * @return Pair indicating the status of the alter command
405    * @throws IOException
406    */
407   public Pair<Integer, Integer> getReopenStatus(TableName tableName)
408       throws IOException {
409     List <HRegionInfo> hris =
410       MetaTableAccessor.getTableRegions(this.watcher, this.server.getShortCircuitConnection(),
411         tableName, true);
412     Integer pending = 0;
413     for (HRegionInfo hri : hris) {
414       String name = hri.getEncodedName();
415       // no lock concurrent access ok: sequential consistency respected.
416       if (regionsToReopen.containsKey(name)
417           || regionStates.isRegionInTransition(name)) {
418         pending++;
419       }
420     }
421     return new Pair<Integer, Integer>(pending, hris.size());
422   }
423 
424   /**
425    * Used by ServerShutdownHandler to make sure AssignmentManager has completed
426    * the failover cleanup before re-assigning regions of dead servers. So that
427    * when re-assignment happens, AssignmentManager has proper region states.
428    */
429   public boolean isFailoverCleanupDone() {
430     return failoverCleanupDone.get();
431   }
432 
433   /**
434    * To avoid racing with AM, external entities may need to lock a region,
435    * for example, when SSH checks what regions to skip re-assigning.
436    */
437   public Lock acquireRegionLock(final String encodedName) {
438     return locker.acquireLock(encodedName);
439   }
440 
441   /**
442    * Now, failover cleanup is completed. Notify server manager to
443    * process queued up dead servers processing, if any.
444    */
445   void failoverCleanupDone() {
446     failoverCleanupDone.set(true);
447     serverManager.processQueuedDeadServers();
448   }
449 
450   /**
451    * Called on startup.
452    * Figures whether a fresh cluster start of we are joining extant running cluster.
453    * @throws IOException
454    * @throws KeeperException
455    * @throws InterruptedException
456    * @throws CoordinatedStateException
457    */
458   void joinCluster() throws IOException,
459       KeeperException, InterruptedException, CoordinatedStateException {
460     long startTime = System.currentTimeMillis();
461     // Concurrency note: In the below the accesses on regionsInTransition are
462     // outside of a synchronization block where usually all accesses to RIT are
463     // synchronized.  The presumption is that in this case it is safe since this
464     // method is being played by a single thread on startup.
465 
466     // TODO: Regions that have a null location and are not in regionsInTransitions
467     // need to be handled.
468 
469     // Scan hbase:meta to build list of existing regions, servers, and assignment
470     // Returns servers who have not checked in (assumed dead) that some regions
471     // were assigned to (according to the meta)
472     Set<ServerName> deadServers = rebuildUserRegions();
473 
474     // This method will assign all user regions if a clean server startup or
475     // it will reconstruct master state and cleanup any leftovers from
476     // previous master process.
477     boolean failover = processDeadServersAndRegionsInTransition(deadServers);
478 
479     if (!useZKForAssignment) {
480       // Not use ZK for assignment any more, remove the ZNode
481       ZKUtil.deleteNodeRecursively(watcher, watcher.assignmentZNode);
482     }
483     recoverTableInDisablingState();
484     recoverTableInEnablingState();
485     LOG.info("Joined the cluster in " + (System.currentTimeMillis()
486       - startTime) + "ms, failover=" + failover);
487   }
488 
489   /**
490    * Process all regions that are in transition in zookeeper and also
491    * processes the list of dead servers by scanning the META.
492    * Used by master joining an cluster.  If we figure this is a clean cluster
493    * startup, will assign all user regions.
494    * @param deadServers
495    *          Map of dead servers and their regions. Can be null.
496    * @throws KeeperException
497    * @throws IOException
498    * @throws InterruptedException
499    */
500   boolean processDeadServersAndRegionsInTransition(
501       final Set<ServerName> deadServers) throws KeeperException,
502         IOException, InterruptedException, CoordinatedStateException {
503     List<String> nodes = ZKUtil.listChildrenNoWatch(watcher,
504       watcher.assignmentZNode);
505 
506     if (useZKForAssignment && nodes == null) {
507       String errorMessage = "Failed to get the children from ZK";
508       server.abort(errorMessage, new IOException(errorMessage));
509       return true; // Doesn't matter in this case
510     }
511 
512     boolean failover = !serverManager.getDeadServers().isEmpty();
513     if (failover) {
514       // This may not be a failover actually, especially if meta is on this master.
515       if (LOG.isDebugEnabled()) {
516         LOG.debug("Found dead servers out on cluster " + serverManager.getDeadServers());
517       }
518     } else {
519       // If any one region except meta is assigned, it's a failover.
520       for (HRegionInfo hri: regionStates.getRegionAssignments().keySet()) {
521         if (!hri.isMetaTable()) {
522           LOG.debug("Found " + hri + " out on cluster");
523           failover = true;
524           break;
525         }
526       }
527     }
528     if (!failover && nodes != null) {
529       // If any one region except meta is in transition, it's a failover.
530       for (String encodedName: nodes) {
531         RegionState regionState = regionStates.getRegionState(encodedName);
532         if (regionState != null && !regionState.getRegion().isMetaRegion()) {
533           LOG.debug("Found " + regionState + " in RITs");
534           failover = true;
535           break;
536         }
537       }
538     }
539     if (!failover && !useZKForAssignment) {
540       // If any region except meta is in transition on a live server, it's a failover.
541       Map<String, RegionState> regionsInTransition = regionStates.getRegionsInTransition();
542       if (!regionsInTransition.isEmpty()) {
543         Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
544         for (RegionState regionState: regionsInTransition.values()) {
545           if (!regionState.getRegion().isMetaRegion()
546               && onlineServers.contains(regionState.getServerName())) {
547             LOG.debug("Found " + regionState + " in RITs");
548             failover = true;
549             break;
550           }
551         }
552       }
553     }
554     if (!failover) {
555       // If we get here, we have a full cluster restart. It is a failover only
556       // if there are some HLogs are not split yet. For meta HLogs, they should have
557       // been split already, if any. We can walk through those queued dead servers,
558       // if they don't have any HLogs, this restart should be considered as a clean one
559       Set<ServerName> queuedDeadServers = serverManager.getRequeuedDeadServers().keySet();
560       if (!queuedDeadServers.isEmpty()) {
561         Configuration conf = server.getConfiguration();
562         Path rootdir = FSUtils.getRootDir(conf);
563         FileSystem fs = rootdir.getFileSystem(conf);
564         for (ServerName serverName: queuedDeadServers) {
565           Path logDir = new Path(rootdir, HLogUtil.getHLogDirectoryName(serverName.toString()));
566           Path splitDir = logDir.suffix(HLog.SPLITTING_EXT);
567           if (fs.exists(logDir) || fs.exists(splitDir)) {
568             LOG.debug("Found queued dead server " + serverName);
569             failover = true;
570             break;
571           }
572         }
573         if (!failover) {
574           // We figured that it's not a failover, so no need to
575           // work on these re-queued dead servers any more.
576           LOG.info("AM figured that it's not a failover and cleaned up "
577             + queuedDeadServers.size() + " queued dead servers");
578           serverManager.removeRequeuedDeadServers();
579         }
580       }
581     }
582 
583     Set<TableName> disabledOrDisablingOrEnabling = null;
584     Map<HRegionInfo, ServerName> allRegions = null;
585 
586     if (!failover) {
587       disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
588         ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING,
589         ZooKeeperProtos.Table.State.ENABLING);
590 
591       // Clean re/start, mark all user regions closed before reassignment
592       allRegions = regionStates.closeAllUserRegions(
593         disabledOrDisablingOrEnabling);
594     }
595 
596     // Now region states are restored
597     regionStateStore.start();
598 
599     // If we found user regions out on cluster, its a failover.
600     if (failover) {
601       LOG.info("Found regions out on cluster or in RIT; presuming failover");
602       // Process list of dead servers and regions in RIT.
603       // See HBASE-4580 for more information.
604       processDeadServersAndRecoverLostRegions(deadServers);
605     }
606 
607     if (!failover && useZKForAssignment) {
608       // Cleanup any existing ZK nodes and start watching
609       ZKAssign.deleteAllNodes(watcher);
610       ZKUtil.listChildrenAndWatchForNewChildren(this.watcher,
611         this.watcher.assignmentZNode);
612     }
613 
614     // Now we can safely claim failover cleanup completed and enable
615     // ServerShutdownHandler for further processing. The nodes (below)
616     // in transition, if any, are for regions not related to those
617     // dead servers at all, and can be done in parallel to SSH.
618     failoverCleanupDone();
619     if (!failover) {
620       // Fresh cluster startup.
621       LOG.info("Clean cluster startup. Assigning user regions");
622       assignAllUserRegions(allRegions);
623     }
624     // unassign replicas of the split parents and the merged regions
625     // the daughter replicas are opened in assignAllUserRegions if it was
626     // not already opened.
627     for (HRegionInfo h : replicasToClose) {
628       unassign(h);
629     }
630     replicasToClose.clear();
631     return failover;
632   }
633 
634   /**
635    * If region is up in zk in transition, then do fixup and block and wait until
636    * the region is assigned and out of transition.  Used on startup for
637    * catalog regions.
638    * @param hri Region to look for.
639    * @return True if we processed a region in transition else false if region
640    * was not up in zk in transition.
641    * @throws InterruptedException
642    * @throws KeeperException
643    * @throws IOException
644    */
645   boolean processRegionInTransitionAndBlockUntilAssigned(final HRegionInfo hri)
646       throws InterruptedException, KeeperException, IOException {
647     String encodedRegionName = hri.getEncodedName();
648     if (!processRegionInTransition(encodedRegionName, hri)) {
649       return false; // The region is not in transition
650     }
651     LOG.debug("Waiting on " + HRegionInfo.prettyPrint(encodedRegionName));
652     while (!this.server.isStopped() &&
653         this.regionStates.isRegionInTransition(encodedRegionName)) {
654       RegionState state = this.regionStates.getRegionTransitionState(encodedRegionName);
655       if (state == null || !serverManager.isServerOnline(state.getServerName())) {
656         // The region is not in transition, or not in transition on an online
657         // server. Doesn't help to block here any more. Caller need to
658         // verify the region is actually assigned.
659         break;
660       }
661       this.regionStates.waitForUpdate(100);
662     }
663     return true;
664   }
665 
666   /**
667    * Process failover of new master for region <code>encodedRegionName</code>
668    * up in zookeeper.
669    * @param encodedRegionName Region to process failover for.
670    * @param regionInfo If null we'll go get it from meta table.
671    * @return True if we processed <code>regionInfo</code> as a RIT.
672    * @throws KeeperException
673    * @throws IOException
674    */
675   boolean processRegionInTransition(final String encodedRegionName,
676       final HRegionInfo regionInfo) throws KeeperException, IOException {
677     // We need a lock here to ensure that we will not put the same region twice
678     // It has no reason to be a lock shared with the other operations.
679     // We can do the lock on the region only, instead of a global lock: what we want to ensure
680     // is that we don't have two threads working on the same region.
681     Lock lock = locker.acquireLock(encodedRegionName);
682     try {
683       Stat stat = new Stat();
684       byte [] data = ZKAssign.getDataAndWatch(watcher, encodedRegionName, stat);
685       if (data == null) return false;
686       RegionTransition rt;
687       try {
688         rt = RegionTransition.parseFrom(data);
689       } catch (DeserializationException e) {
690         LOG.warn("Failed parse znode data", e);
691         return false;
692       }
693       HRegionInfo hri = regionInfo;
694       if (hri == null) {
695         // The region info is not passed in. We will try to find the region
696         // from region states map/meta based on the encoded region name. But we
697         // may not be able to find it. This is valid for online merge that
698         // the region may have not been created if the merge is not completed.
699         // Therefore, it is not in meta at master recovery time.
700         hri = regionStates.getRegionInfo(rt.getRegionName());
701         EventType et = rt.getEventType();
702         if (hri == null && et != EventType.RS_ZK_REGION_MERGING
703             && et != EventType.RS_ZK_REQUEST_REGION_MERGE) {
704           LOG.warn("Couldn't find the region in recovering " + rt);
705           return false;
706         }
707       }
708 
709       // TODO: This code is tied to ZK anyway, so for now leaving it as is,
710       // will refactor when whole region assignment will be abstracted from ZK
711       BaseCoordinatedStateManager cp =
712         (BaseCoordinatedStateManager) this.server.getCoordinatedStateManager();
713       OpenRegionCoordination openRegionCoordination = cp.getOpenRegionCoordination();
714 
715       ZkOpenRegionCoordination.ZkOpenRegionDetails zkOrd =
716         new ZkOpenRegionCoordination.ZkOpenRegionDetails();
717       zkOrd.setVersion(stat.getVersion());
718       zkOrd.setServerName(cp.getServer().getServerName());
719 
720       return processRegionsInTransition(
721         rt, hri, openRegionCoordination, zkOrd);
722     } finally {
723       lock.unlock();
724     }
725   }
726 
727   /**
728    * This call is invoked only (1) master assign meta;
729    * (2) during failover mode startup, zk assignment node processing.
730    * The locker is set in the caller. It returns true if the region
731    * is in transition for sure, false otherwise.
732    *
733    * It should be private but it is used by some test too.
734    */
735   boolean processRegionsInTransition(
736       final RegionTransition rt, final HRegionInfo regionInfo,
737       OpenRegionCoordination coordination,
738       final OpenRegionCoordination.OpenRegionDetails ord) throws KeeperException {
739     EventType et = rt.getEventType();
740     // Get ServerName.  Could not be null.
741     final ServerName sn = rt.getServerName();
742     final byte[] regionName = rt.getRegionName();
743     final String encodedName = HRegionInfo.encodeRegionName(regionName);
744     final String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
745     LOG.info("Processing " + prettyPrintedRegionName + " in state: " + et);
746 
747     if (regionStates.isRegionInTransition(encodedName)
748         && (regionInfo.isMetaRegion() || !useZKForAssignment)) {
749       LOG.info("Processed region " + prettyPrintedRegionName + " in state: "
750         + et + ", does nothing since the region is already in transition "
751         + regionStates.getRegionTransitionState(encodedName));
752       // Just return
753       return true;
754     }
755     if (!serverManager.isServerOnline(sn)) {
756       // It was transitioning on a dead server, so it's closed now.
757       // Force to OFFLINE and put it in transition, but not assign it
758       // since log splitting for the dead server is not done yet.
759       LOG.debug("RIT " + encodedName + " in state=" + rt.getEventType() +
760         " was on deadserver; forcing offline");
761       if (regionStates.isRegionOnline(regionInfo)) {
762         // Meta could still show the region is assigned to the previous
763         // server. If that server is online, when we reload the meta, the
764         // region is put back to online, we need to offline it.
765         regionStates.regionOffline(regionInfo);
766         sendRegionClosedNotification(regionInfo);
767       }
768       // Put it back in transition so that SSH can re-assign it
769       regionStates.updateRegionState(regionInfo, State.OFFLINE, sn);
770 
771       if (regionInfo.isMetaRegion()) {
772         // If it's meta region, reset the meta location.
773         // So that master knows the right meta region server.
774         MetaTableLocator.setMetaLocation(watcher, sn);
775       } else {
776         // No matter the previous server is online or offline,
777         // we need to reset the last region server of the region.
778         regionStates.setLastRegionServerOfRegion(sn, encodedName);
779         // Make sure we know the server is dead.
780         if (!serverManager.isServerDead(sn)) {
781           serverManager.expireServer(sn);
782         }
783       }
784       return false;
785     }
786     switch (et) {
787       case M_ZK_REGION_CLOSING:
788         // Insert into RIT & resend the query to the region server: may be the previous master
789         // died before sending the query the first time.
790         final RegionState rsClosing = regionStates.updateRegionState(rt, State.CLOSING);
791         this.executorService.submit(
792           new EventHandler(server, EventType.M_MASTER_RECOVERY) {
793             @Override
794             public void process() throws IOException {
795               ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
796               try {
797                 final int expectedVersion = ((ZkOpenRegionCoordination.ZkOpenRegionDetails) ord)
798                   .getVersion();
799                 unassign(regionInfo, rsClosing, expectedVersion, null, useZKForAssignment, null);
800                 if (regionStates.isRegionOffline(regionInfo)) {
801                   assign(regionInfo, true);
802                 }
803               } finally {
804                 lock.unlock();
805               }
806             }
807           });
808         break;
809 
810       case RS_ZK_REGION_CLOSED:
811       case RS_ZK_REGION_FAILED_OPEN:
812         // Region is closed, insert into RIT and handle it
813         regionStates.updateRegionState(regionInfo, State.CLOSED, sn);
814         if (!replicasToClose.contains(regionInfo)) {
815           invokeAssign(regionInfo);
816         } else {
817           offlineDisabledRegion(regionInfo);
818         }
819         break;
820 
821       case M_ZK_REGION_OFFLINE:
822         // Insert in RIT and resend to the regionserver
823         regionStates.updateRegionState(rt, State.PENDING_OPEN);
824         final RegionState rsOffline = regionStates.getRegionState(regionInfo);
825         this.executorService.submit(
826           new EventHandler(server, EventType.M_MASTER_RECOVERY) {
827             @Override
828             public void process() throws IOException {
829               ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
830               try {
831                 RegionPlan plan = new RegionPlan(regionInfo, null, sn);
832                 addPlan(encodedName, plan);
833                 assign(rsOffline, false, false);
834               } finally {
835                 lock.unlock();
836               }
837             }
838           });
839         break;
840 
841       case RS_ZK_REGION_OPENING:
842         regionStates.updateRegionState(rt, State.OPENING);
843         break;
844 
845       case RS_ZK_REGION_OPENED:
846         // Region is opened, insert into RIT and handle it
847         // This could be done asynchronously, we would need then to acquire the lock in the
848         //  handler.
849         regionStates.updateRegionState(rt, State.OPEN);
850         new OpenedRegionHandler(server, this, regionInfo, coordination, ord).process();
851         break;
852       case RS_ZK_REQUEST_REGION_SPLIT:
853       case RS_ZK_REGION_SPLITTING:
854       case RS_ZK_REGION_SPLIT:
855         // Splitting region should be online. We could have skipped it during
856         // user region rebuilding since we may consider the split is completed.
857         // Put it in SPLITTING state to avoid complications.
858         regionStates.regionOnline(regionInfo, sn);
859         regionStates.updateRegionState(rt, State.SPLITTING);
860         if (!handleRegionSplitting(
861             rt, encodedName, prettyPrintedRegionName, sn)) {
862           deleteSplittingNode(encodedName, sn);
863         }
864         break;
865       case RS_ZK_REQUEST_REGION_MERGE:
866       case RS_ZK_REGION_MERGING:
867       case RS_ZK_REGION_MERGED:
868         if (!handleRegionMerging(
869             rt, encodedName, prettyPrintedRegionName, sn)) {
870           deleteMergingNode(encodedName, sn);
871         }
872         break;
873       default:
874         throw new IllegalStateException("Received region in state:" + et + " is not valid.");
875     }
876     LOG.info("Processed region " + prettyPrintedRegionName + " in state "
877       + et + ", on " + (serverManager.isServerOnline(sn) ? "" : "dead ")
878       + "server: " + sn);
879     return true;
880   }
881 
882   /**
883    * When a region is closed, it should be removed from the regionsToReopen
884    * @param hri HRegionInfo of the region which was closed
885    */
886   public void removeClosedRegion(HRegionInfo hri) {
887     if (regionsToReopen.remove(hri.getEncodedName()) != null) {
888       LOG.debug("Removed region from reopening regions because it was closed");
889     }
890   }
891 
892   /**
893    * Handles various states an unassigned node can be in.
894    * <p>
895    * Method is called when a state change is suspected for an unassigned node.
896    * <p>
897    * This deals with skipped transitions (we got a CLOSED but didn't see CLOSING
898    * yet).
899    * @param rt region transition
900    * @param coordination coordination for opening region
901    * @param ord details about opening region
902    */
903   void handleRegion(final RegionTransition rt, OpenRegionCoordination coordination,
904                     OpenRegionCoordination.OpenRegionDetails ord) {
905     if (rt == null) {
906       LOG.warn("Unexpected NULL input for RegionTransition rt");
907       return;
908     }
909     final ServerName sn = rt.getServerName();
910     // Check if this is a special HBCK transition
911     if (sn.equals(HBCK_CODE_SERVERNAME)) {
912       handleHBCK(rt);
913       return;
914     }
915     final long createTime = rt.getCreateTime();
916     final byte[] regionName = rt.getRegionName();
917     String encodedName = HRegionInfo.encodeRegionName(regionName);
918     String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
919     // Verify this is a known server
920     if (!serverManager.isServerOnline(sn)
921       && !ignoreStatesRSOffline.contains(rt.getEventType())) {
922       LOG.warn("Attempted to handle region transition for server but " +
923         "it is not online: " + prettyPrintedRegionName + ", " + rt);
924       return;
925     }
926 
927     RegionState regionState =
928       regionStates.getRegionState(encodedName);
929     long startTime = System.currentTimeMillis();
930     if (LOG.isDebugEnabled()) {
931       boolean lateEvent = createTime < (startTime - 15000);
932       LOG.debug("Handling " + rt.getEventType() +
933         ", server=" + sn + ", region=" +
934         (prettyPrintedRegionName == null ? "null" : prettyPrintedRegionName) +
935         (lateEvent ? ", which is more than 15 seconds late" : "") +
936         ", current_state=" + regionState);
937     }
938     // We don't do anything for this event,
939     // so separate it out, no need to lock/unlock anything
940     if (rt.getEventType() == EventType.M_ZK_REGION_OFFLINE) {
941       return;
942     }
943 
944     // We need a lock on the region as we could update it
945     Lock lock = locker.acquireLock(encodedName);
946     try {
947       RegionState latestState =
948         regionStates.getRegionState(encodedName);
949       if ((regionState == null && latestState != null)
950           || (regionState != null && latestState == null)
951           || (regionState != null && latestState != null
952             && latestState.getState() != regionState.getState())) {
953         LOG.warn("Region state changed from " + regionState + " to "
954           + latestState + ", while acquiring lock");
955       }
956       long waitedTime = System.currentTimeMillis() - startTime;
957       if (waitedTime > 5000) {
958         LOG.warn("Took " + waitedTime + "ms to acquire the lock");
959       }
960       regionState = latestState;
961       switch (rt.getEventType()) {
962       case RS_ZK_REQUEST_REGION_SPLIT:
963       case RS_ZK_REGION_SPLITTING:
964       case RS_ZK_REGION_SPLIT:
965         if (!handleRegionSplitting(
966             rt, encodedName, prettyPrintedRegionName, sn)) {
967           deleteSplittingNode(encodedName, sn);
968         }
969         break;
970 
971       case RS_ZK_REQUEST_REGION_MERGE:
972       case RS_ZK_REGION_MERGING:
973       case RS_ZK_REGION_MERGED:
974         // Merged region is a new region, we can't find it in the region states now.
975         // However, the two merging regions are not new. They should be in state for merging.
976         if (!handleRegionMerging(
977             rt, encodedName, prettyPrintedRegionName, sn)) {
978           deleteMergingNode(encodedName, sn);
979         }
980         break;
981 
982       case M_ZK_REGION_CLOSING:
983         // Should see CLOSING after we have asked it to CLOSE or additional
984         // times after already being in state of CLOSING
985         if (regionState == null
986             || !regionState.isPendingCloseOrClosingOnServer(sn)) {
987           LOG.warn("Received CLOSING for " + prettyPrintedRegionName
988             + " from " + sn + " but the region isn't PENDING_CLOSE/CLOSING here: "
989             + regionStates.getRegionState(encodedName));
990           return;
991         }
992         // Transition to CLOSING (or update stamp if already CLOSING)
993         regionStates.updateRegionState(rt, State.CLOSING);
994         break;
995 
996       case RS_ZK_REGION_CLOSED:
997         // Should see CLOSED after CLOSING but possible after PENDING_CLOSE
998         if (regionState == null
999             || !regionState.isPendingCloseOrClosingOnServer(sn)) {
1000           LOG.warn("Received CLOSED for " + prettyPrintedRegionName
1001             + " from " + sn + " but the region isn't PENDING_CLOSE/CLOSING here: "
1002             + regionStates.getRegionState(encodedName));
1003           return;
1004         }
1005         // Handle CLOSED by assigning elsewhere or stopping if a disable
1006         // If we got here all is good.  Need to update RegionState -- else
1007         // what follows will fail because not in expected state.
1008         new ClosedRegionHandler(server, this, regionState.getRegion()).process();
1009         updateClosedRegionHandlerTracker(regionState.getRegion());
1010         break;
1011 
1012         case RS_ZK_REGION_FAILED_OPEN:
1013           if (regionState == null
1014               || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
1015             LOG.warn("Received FAILED_OPEN for " + prettyPrintedRegionName
1016               + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
1017               + regionStates.getRegionState(encodedName));
1018             return;
1019           }
1020           AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
1021           if (failedOpenCount == null) {
1022             failedOpenCount = new AtomicInteger();
1023             // No need to use putIfAbsent, or extra synchronization since
1024             // this whole handleRegion block is locked on the encoded region
1025             // name, and failedOpenTracker is updated only in this block
1026             failedOpenTracker.put(encodedName, failedOpenCount);
1027           }
1028           if (failedOpenCount.incrementAndGet() >= maximumAttempts) {
1029             regionStates.updateRegionState(rt, State.FAILED_OPEN);
1030             // remove the tracking info to save memory, also reset
1031             // the count for next open initiative
1032             failedOpenTracker.remove(encodedName);
1033           } else {
1034             // Handle this the same as if it were opened and then closed.
1035             regionState = regionStates.updateRegionState(rt, State.CLOSED);
1036             if (regionState != null) {
1037               // When there are more than one region server a new RS is selected as the
1038               // destination and the same is updated in the regionplan. (HBASE-5546)
1039               try {
1040                 getRegionPlan(regionState.getRegion(), sn, true);
1041                 new ClosedRegionHandler(server, this, regionState.getRegion()).process();
1042               } catch (HBaseIOException e) {
1043                 LOG.warn("Failed to get region plan", e);
1044               }
1045             }
1046           }
1047           break;
1048 
1049         case RS_ZK_REGION_OPENING:
1050           // Should see OPENING after we have asked it to OPEN or additional
1051           // times after already being in state of OPENING
1052           if (regionState == null
1053               || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
1054             LOG.warn("Received OPENING for " + prettyPrintedRegionName
1055               + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
1056               + regionStates.getRegionState(encodedName));
1057             return;
1058           }
1059           // Transition to OPENING (or update stamp if already OPENING)
1060           regionStates.updateRegionState(rt, State.OPENING);
1061           break;
1062 
1063         case RS_ZK_REGION_OPENED:
1064           // Should see OPENED after OPENING but possible after PENDING_OPEN.
1065           if (regionState == null
1066               || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
1067             LOG.warn("Received OPENED for " + prettyPrintedRegionName
1068               + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
1069               + regionStates.getRegionState(encodedName));
1070 
1071             if (regionState != null) {
1072               // Close it without updating the internal region states,
1073               // so as not to create double assignments in unlucky scenarios
1074               // mentioned in OpenRegionHandler#process
1075               unassign(regionState.getRegion(), null, -1, null, false, sn);
1076             }
1077             return;
1078           }
1079           // Handle OPENED by removing from transition and deleted zk node
1080           regionState = regionStates.updateRegionState(rt, State.OPEN);
1081           if (regionState != null) {
1082             failedOpenTracker.remove(encodedName); // reset the count, if any
1083             new OpenedRegionHandler(
1084               server, this, regionState.getRegion(), coordination, ord).process();
1085             updateOpenedRegionHandlerTracker(regionState.getRegion());
1086           }
1087           break;
1088 
1089         default:
1090           throw new IllegalStateException("Received event is not valid.");
1091       }
1092     } finally {
1093       lock.unlock();
1094     }
1095   }
1096 
1097   //For unit tests only
1098   boolean wasClosedHandlerCalled(HRegionInfo hri) {
1099     AtomicBoolean b = closedRegionHandlerCalled.get(hri);
1100     //compareAndSet to be sure that unit tests don't see stale values. Means,
1101     //we will return true exactly once unless the handler code resets to true
1102     //this value.
1103     return b == null ? false : b.compareAndSet(true, false);
1104   }
1105 
1106   //For unit tests only
1107   boolean wasOpenedHandlerCalled(HRegionInfo hri) {
1108     AtomicBoolean b = openedRegionHandlerCalled.get(hri);
1109     //compareAndSet to be sure that unit tests don't see stale values. Means,
1110     //we will return true exactly once unless the handler code resets to true
1111     //this value.
1112     return b == null ? false : b.compareAndSet(true, false);
1113   }
1114 
1115   //For unit tests only
1116   void initializeHandlerTrackers() {
1117     closedRegionHandlerCalled = new HashMap<HRegionInfo, AtomicBoolean>();
1118     openedRegionHandlerCalled = new HashMap<HRegionInfo, AtomicBoolean>();
1119   }
1120 
1121   void updateClosedRegionHandlerTracker(HRegionInfo hri) {
1122     if (closedRegionHandlerCalled != null) { //only for unit tests this is true
1123       closedRegionHandlerCalled.put(hri, new AtomicBoolean(true));
1124     }
1125   }
1126 
1127   void updateOpenedRegionHandlerTracker(HRegionInfo hri) {
1128     if (openedRegionHandlerCalled != null) { //only for unit tests this is true
1129       openedRegionHandlerCalled.put(hri, new AtomicBoolean(true));
1130     }
1131   }
1132 
1133   // TODO: processFavoredNodes might throw an exception, for e.g., if the
1134   // meta could not be contacted/updated. We need to see how seriously to treat
1135   // this problem as. Should we fail the current assignment. We should be able
1136   // to recover from this problem eventually (if the meta couldn't be updated
1137   // things should work normally and eventually get fixed up).
1138   void processFavoredNodes(List<HRegionInfo> regions) throws IOException {
1139     if (!shouldAssignRegionsWithFavoredNodes) return;
1140     // The AM gets the favored nodes info for each region and updates the meta
1141     // table with that info
1142     Map<HRegionInfo, List<ServerName>> regionToFavoredNodes =
1143         new HashMap<HRegionInfo, List<ServerName>>();
1144     for (HRegionInfo region : regions) {
1145       regionToFavoredNodes.put(region,
1146           ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region));
1147     }
1148     FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(regionToFavoredNodes,
1149       this.server.getShortCircuitConnection());
1150   }
1151 
1152   /**
1153    * Handle a ZK unassigned node transition triggered by HBCK repair tool.
1154    * <p>
1155    * This is handled in a separate code path because it breaks the normal rules.
1156    * @param rt
1157    */
1158   @SuppressWarnings("deprecation")
1159   private void handleHBCK(RegionTransition rt) {
1160     String encodedName = HRegionInfo.encodeRegionName(rt.getRegionName());
1161     LOG.info("Handling HBCK triggered transition=" + rt.getEventType() +
1162       ", server=" + rt.getServerName() + ", region=" +
1163       HRegionInfo.prettyPrint(encodedName));
1164     RegionState regionState = regionStates.getRegionTransitionState(encodedName);
1165     switch (rt.getEventType()) {
1166       case M_ZK_REGION_OFFLINE:
1167         HRegionInfo regionInfo;
1168         if (regionState != null) {
1169           regionInfo = regionState.getRegion();
1170         } else {
1171           try {
1172             byte [] name = rt.getRegionName();
1173             Pair<HRegionInfo, ServerName> p = MetaTableAccessor.getRegion(
1174               this.server.getShortCircuitConnection(), name);
1175             regionInfo = p.getFirst();
1176           } catch (IOException e) {
1177             LOG.info("Exception reading hbase:meta doing HBCK repair operation", e);
1178             return;
1179           }
1180         }
1181         LOG.info("HBCK repair is triggering assignment of region=" +
1182             regionInfo.getRegionNameAsString());
1183         // trigger assign, node is already in OFFLINE so don't need to update ZK
1184         assign(regionInfo, false);
1185         break;
1186 
1187       default:
1188         LOG.warn("Received unexpected region state from HBCK: " + rt.toString());
1189         break;
1190     }
1191 
1192   }
1193 
1194   // ZooKeeper events
1195 
1196   /**
1197    * New unassigned node has been created.
1198    *
1199    * <p>This happens when an RS begins the OPENING or CLOSING of a region by
1200    * creating an unassigned node.
1201    *
1202    * <p>When this happens we must:
1203    * <ol>
1204    *   <li>Watch the node for further events</li>
1205    *   <li>Read and handle the state in the node</li>
1206    * </ol>
1207    */
1208   @Override
1209   public void nodeCreated(String path) {
1210     handleAssignmentEvent(path);
1211   }
1212 
1213   /**
1214    * Existing unassigned node has had data changed.
1215    *
1216    * <p>This happens when an RS transitions from OFFLINE to OPENING, or between
1217    * OPENING/OPENED and CLOSING/CLOSED.
1218    *
1219    * <p>When this happens we must:
1220    * <ol>
1221    *   <li>Watch the node for further events</li>
1222    *   <li>Read and handle the state in the node</li>
1223    * </ol>
1224    */
1225   @Override
1226   public void nodeDataChanged(String path) {
1227     handleAssignmentEvent(path);
1228   }
1229 
1230 
1231   // We  don't want to have two events on the same region managed simultaneously.
1232   // For this reason, we need to wait if an event on the same region is currently in progress.
1233   // So we track the region names of the events in progress, and we keep a waiting list.
1234   private final Set<String> regionsInProgress = new HashSet<String>();
1235   // In a LinkedHashMultimap, the put order is kept when we retrieve the collection back. We need
1236   //  this as we want the events to be managed in the same order as we received them.
1237   private final LinkedHashMultimap <String, RegionRunnable>
1238       zkEventWorkerWaitingList = LinkedHashMultimap.create();
1239 
1240   /**
1241    * A specific runnable that works only on a region.
1242    */
1243   private interface RegionRunnable extends Runnable{
1244     /**
1245      * @return - the name of the region it works on.
1246      */
1247     String getRegionName();
1248   }
1249 
1250   /**
1251    * Submit a task, ensuring that there is only one task at a time that working on a given region.
1252    * Order is respected.
1253    */
1254   protected void zkEventWorkersSubmit(final RegionRunnable regRunnable) {
1255 
1256     synchronized (regionsInProgress) {
1257       // If we're there is already a task with this region, we add it to the
1258       //  waiting list and return.
1259       if (regionsInProgress.contains(regRunnable.getRegionName())) {
1260         synchronized (zkEventWorkerWaitingList){
1261           zkEventWorkerWaitingList.put(regRunnable.getRegionName(), regRunnable);
1262         }
1263         return;
1264       }
1265 
1266       // No event in progress on this region => we can submit a new task immediately.
1267       regionsInProgress.add(regRunnable.getRegionName());
1268       zkEventWorkers.submit(new Runnable() {
1269         @Override
1270         public void run() {
1271           try {
1272             regRunnable.run();
1273           } finally {
1274             // now that we have finished, let's see if there is an event for the same region in the
1275             //  waiting list. If it's the case, we can now submit it to the pool.
1276             synchronized (regionsInProgress) {
1277               regionsInProgress.remove(regRunnable.getRegionName());
1278               synchronized (zkEventWorkerWaitingList) {
1279                 java.util.Set<RegionRunnable> waiting = zkEventWorkerWaitingList.get(
1280                     regRunnable.getRegionName());
1281                 if (!waiting.isEmpty()) {
1282                   // We want the first object only. The only way to get it is through an iterator.
1283                   RegionRunnable toSubmit = waiting.iterator().next();
1284                   zkEventWorkerWaitingList.remove(toSubmit.getRegionName(), toSubmit);
1285                   zkEventWorkersSubmit(toSubmit);
1286                 }
1287               }
1288             }
1289           }
1290         }
1291       });
1292     }
1293   }
1294 
1295   @Override
1296   public void nodeDeleted(final String path) {
1297     if (path.startsWith(watcher.assignmentZNode)) {
1298       final String regionName = ZKAssign.getRegionName(watcher, path);
1299       zkEventWorkersSubmit(new RegionRunnable() {
1300         @Override
1301         public String getRegionName() {
1302           return regionName;
1303         }
1304 
1305         @Override
1306         public void run() {
1307           Lock lock = locker.acquireLock(regionName);
1308           try {
1309             RegionState rs = regionStates.getRegionTransitionState(regionName);
1310             if (rs == null) {
1311               rs = regionStates.getRegionState(regionName);
1312               if (rs == null || !rs.isMergingNew()) {
1313                 // MergingNew is an offline state
1314                 return;
1315               }
1316             }
1317 
1318             HRegionInfo regionInfo = rs.getRegion();
1319             String regionNameStr = regionInfo.getRegionNameAsString();
1320             LOG.debug("Znode " + regionNameStr + " deleted, state: " + rs);
1321 
1322             boolean disabled = getTableStateManager().isTableState(regionInfo.getTable(),
1323                 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING);
1324 
1325             ServerName serverName = rs.getServerName();
1326             if (serverManager.isServerOnline(serverName)) {
1327               if (rs.isOnServer(serverName)
1328                   && (rs.isOpened() || rs.isSplitting())) {
1329                 regionOnline(regionInfo, serverName);
1330                 if (disabled) {
1331                   // if server is offline, no hurt to unassign again
1332                   LOG.info("Opened " + regionNameStr
1333                     + "but this table is disabled, triggering close of region");
1334                   unassign(regionInfo);
1335                 }
1336               } else if (rs.isMergingNew()) {
1337                 synchronized (regionStates) {
1338                   String p = regionInfo.getEncodedName();
1339                   PairOfSameType<HRegionInfo> regions = mergingRegions.get(p);
1340                   if (regions != null) {
1341                     onlineMergingRegion(disabled, regions.getFirst(), serverName);
1342                     onlineMergingRegion(disabled, regions.getSecond(), serverName);
1343                   }
1344                 }
1345               }
1346             }
1347           } finally {
1348             lock.unlock();
1349           }
1350         }
1351 
1352         private void onlineMergingRegion(boolean disabled,
1353             final HRegionInfo hri, final ServerName serverName) {
1354           RegionState regionState = regionStates.getRegionState(hri);
1355           if (regionState != null && regionState.isMerging()
1356               && regionState.isOnServer(serverName)) {
1357             regionOnline(regionState.getRegion(), serverName);
1358             if (disabled) {
1359               unassign(hri);
1360             }
1361           }
1362         }
1363       });
1364     }
1365   }
1366 
1367   /**
1368    * New unassigned node has been created.
1369    *
1370    * <p>This happens when an RS begins the OPENING, SPLITTING or CLOSING of a
1371    * region by creating a znode.
1372    *
1373    * <p>When this happens we must:
1374    * <ol>
1375    *   <li>Watch the node for further children changed events</li>
1376    *   <li>Watch all new children for changed events</li>
1377    * </ol>
1378    */
1379   @Override
1380   public void nodeChildrenChanged(String path) {
1381     if (path.equals(watcher.assignmentZNode)) {
1382       zkEventWorkers.submit(new Runnable() {
1383         @Override
1384         public void run() {
1385           try {
1386             // Just make sure we see the changes for the new znodes
1387             List<String> children =
1388               ZKUtil.listChildrenAndWatchForNewChildren(
1389                 watcher, watcher.assignmentZNode);
1390             if (children != null) {
1391               Stat stat = new Stat();
1392               for (String child : children) {
1393                 // if region is in transition, we already have a watch
1394                 // on it, so no need to watch it again. So, as I know for now,
1395                 // this is needed to watch splitting nodes only.
1396                 if (!regionStates.isRegionInTransition(child)) {
1397                   ZKAssign.getDataAndWatch(watcher, child, stat);
1398                 }
1399               }
1400             }
1401           } catch (KeeperException e) {
1402             server.abort("Unexpected ZK exception reading unassigned children", e);
1403           }
1404         }
1405       });
1406     }
1407   }
1408 
1409 
1410   /**
1411    * Marks the region as online.  Removes it from regions in transition and
1412    * updates the in-memory assignment information.
1413    * <p>
1414    * Used when a region has been successfully opened on a region server.
1415    * @param regionInfo
1416    * @param sn
1417    */
1418   void regionOnline(HRegionInfo regionInfo, ServerName sn) {
1419     regionOnline(regionInfo, sn, HConstants.NO_SEQNUM);
1420   }
1421 
1422   void regionOnline(HRegionInfo regionInfo, ServerName sn, long openSeqNum) {
1423     numRegionsOpened.incrementAndGet();
1424     regionStates.regionOnline(regionInfo, sn, openSeqNum);
1425 
1426     // Remove plan if one.
1427     clearRegionPlan(regionInfo);
1428     balancer.regionOnline(regionInfo, sn);
1429 
1430     // Tell our listeners that a region was opened
1431     sendRegionOpenedNotification(regionInfo, sn);
1432   }
1433 
1434   /**
1435    * Pass the assignment event to a worker for processing.
1436    * Each worker is a single thread executor service.  The reason
1437    * for just one thread is to make sure all events for a given
1438    * region are processed in order.
1439    *
1440    * @param path
1441    */
1442   private void handleAssignmentEvent(final String path) {
1443     if (path.startsWith(watcher.assignmentZNode)) {
1444       final String regionName = ZKAssign.getRegionName(watcher, path);
1445 
1446       zkEventWorkersSubmit(new RegionRunnable() {
1447         @Override
1448         public String getRegionName() {
1449           return regionName;
1450         }
1451 
1452         @Override
1453         public void run() {
1454           try {
1455             Stat stat = new Stat();
1456             byte [] data = ZKAssign.getDataAndWatch(watcher, path, stat);
1457             if (data == null) return;
1458 
1459             RegionTransition rt = RegionTransition.parseFrom(data);
1460 
1461             // TODO: This code is tied to ZK anyway, so for now leaving it as is,
1462             // will refactor when whole region assignment will be abstracted from ZK
1463             BaseCoordinatedStateManager csm =
1464               (BaseCoordinatedStateManager) server.getCoordinatedStateManager();
1465             OpenRegionCoordination openRegionCoordination = csm.getOpenRegionCoordination();
1466 
1467             ZkOpenRegionCoordination.ZkOpenRegionDetails zkOrd =
1468               new ZkOpenRegionCoordination.ZkOpenRegionDetails();
1469             zkOrd.setVersion(stat.getVersion());
1470             zkOrd.setServerName(csm.getServer().getServerName());
1471 
1472             handleRegion(rt, openRegionCoordination, zkOrd);
1473           } catch (KeeperException e) {
1474             server.abort("Unexpected ZK exception reading unassigned node data", e);
1475           } catch (DeserializationException e) {
1476             server.abort("Unexpected exception deserializing node data", e);
1477           }
1478         }
1479       });
1480     }
1481   }
1482 
1483   /**
1484    * Marks the region as offline.  Removes it from regions in transition and
1485    * removes in-memory assignment information.
1486    * <p>
1487    * Used when a region has been closed and should remain closed.
1488    * @param regionInfo
1489    */
1490   public void regionOffline(final HRegionInfo regionInfo) {
1491     regionOffline(regionInfo, null);
1492   }
1493 
1494   public void offlineDisabledRegion(HRegionInfo regionInfo) {
1495     if (useZKForAssignment) {
1496       // Disabling so should not be reassigned, just delete the CLOSED node
1497       LOG.debug("Table being disabled so deleting ZK node and removing from " +
1498         "regions in transition, skipping assignment of region " +
1499           regionInfo.getRegionNameAsString());
1500       String encodedName = regionInfo.getEncodedName();
1501       deleteNodeInStates(encodedName, "closed", null,
1502         EventType.RS_ZK_REGION_CLOSED, EventType.M_ZK_REGION_OFFLINE);
1503     }
1504     replicasToClose.remove(regionInfo);
1505     regionOffline(regionInfo);
1506   }
1507 
1508   // Assignment methods
1509 
1510   /**
1511    * Assigns the specified region.
1512    * <p>
1513    * If a RegionPlan is available with a valid destination then it will be used
1514    * to determine what server region is assigned to.  If no RegionPlan is
1515    * available, region will be assigned to a random available server.
1516    * <p>
1517    * Updates the RegionState and sends the OPEN RPC.
1518    * <p>
1519    * This will only succeed if the region is in transition and in a CLOSED or
1520    * OFFLINE state or not in transition (in-memory not zk), and of course, the
1521    * chosen server is up and running (It may have just crashed!).  If the
1522    * in-memory checks pass, the zk node is forced to OFFLINE before assigning.
1523    *
1524    * @param region server to be assigned
1525    * @param setOfflineInZK whether ZK node should be created/transitioned to an
1526    *                       OFFLINE state before assigning the region
1527    */
1528   public void assign(HRegionInfo region, boolean setOfflineInZK) {
1529     assign(region, setOfflineInZK, false);
1530   }
1531 
1532   /**
1533    * Use care with forceNewPlan. It could cause double assignment.
1534    */
1535   public void assign(HRegionInfo region,
1536       boolean setOfflineInZK, boolean forceNewPlan) {
1537     if (isDisabledorDisablingRegionInRIT(region)) {
1538       return;
1539     }
1540     if (this.serverManager.isClusterShutdown()) {
1541       LOG.info("Cluster shutdown is set; skipping assign of " +
1542         region.getRegionNameAsString());
1543       return;
1544     }
1545     String encodedName = region.getEncodedName();
1546     Lock lock = locker.acquireLock(encodedName);
1547     try {
1548       RegionState state = forceRegionStateToOffline(region, forceNewPlan);
1549       if (state != null) {
1550         if (regionStates.wasRegionOnDeadServer(encodedName)) {
1551           LOG.info("Skip assigning " + region.getRegionNameAsString()
1552             + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
1553             + " is dead but not processed yet");
1554           return;
1555         }
1556         assign(state, setOfflineInZK && useZKForAssignment, forceNewPlan);
1557       }
1558     } finally {
1559       lock.unlock();
1560     }
1561   }
1562 
1563   /**
1564    * Bulk assign regions to <code>destination</code>.
1565    * @param destination
1566    * @param regions Regions to assign.
1567    * @return true if successful
1568    */
1569   boolean assign(final ServerName destination, final List<HRegionInfo> regions)
1570     throws InterruptedException {
1571     long startTime = EnvironmentEdgeManager.currentTimeMillis();
1572     try {
1573       int regionCount = regions.size();
1574       if (regionCount == 0) {
1575         return true;
1576       }
1577       LOG.info("Assigning " + regionCount + " region(s) to " + destination.toString());
1578       Set<String> encodedNames = new HashSet<String>(regionCount);
1579       for (HRegionInfo region : regions) {
1580         encodedNames.add(region.getEncodedName());
1581       }
1582 
1583       List<HRegionInfo> failedToOpenRegions = new ArrayList<HRegionInfo>();
1584       Map<String, Lock> locks = locker.acquireLocks(encodedNames);
1585       try {
1586         AtomicInteger counter = new AtomicInteger(0);
1587         Map<String, Integer> offlineNodesVersions = new ConcurrentHashMap<String, Integer>();
1588         OfflineCallback cb = new OfflineCallback(
1589           watcher, destination, counter, offlineNodesVersions);
1590         Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>(regions.size());
1591         List<RegionState> states = new ArrayList<RegionState>(regions.size());
1592         for (HRegionInfo region : regions) {
1593           String encodedName = region.getEncodedName();
1594           if (!isDisabledorDisablingRegionInRIT(region)) {
1595             RegionState state = forceRegionStateToOffline(region, false);
1596             boolean onDeadServer = false;
1597             if (state != null) {
1598               if (regionStates.wasRegionOnDeadServer(encodedName)) {
1599                 LOG.info("Skip assigning " + region.getRegionNameAsString()
1600                   + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
1601                   + " is dead but not processed yet");
1602                 onDeadServer = true;
1603               } else if (!useZKForAssignment
1604                   || asyncSetOfflineInZooKeeper(state, cb, destination)) {
1605                 RegionPlan plan = new RegionPlan(region, state.getServerName(), destination);
1606                 plans.put(encodedName, plan);
1607                 states.add(state);
1608                 continue;
1609               }
1610             }
1611             // Reassign if the region wasn't on a dead server
1612             if (!onDeadServer) {
1613               LOG.info("failed to force region state to offline or "
1614                 + "failed to set it offline in ZK, will reassign later: " + region);
1615               failedToOpenRegions.add(region); // assign individually later
1616             }
1617           }
1618           // Release the lock, this region is excluded from bulk assign because
1619           // we can't update its state, or set its znode to offline.
1620           Lock lock = locks.remove(encodedName);
1621           lock.unlock();
1622         }
1623 
1624         if (useZKForAssignment) {
1625           // Wait until all unassigned nodes have been put up and watchers set.
1626           int total = states.size();
1627           for (int oldCounter = 0; !server.isStopped();) {
1628             int count = counter.get();
1629             if (oldCounter != count) {
1630               LOG.debug(destination.toString() + " unassigned znodes=" + count +
1631                 " of total=" + total + "; oldCounter=" + oldCounter);
1632               oldCounter = count;
1633             }
1634             if (count >= total) break;
1635             Thread.sleep(5);
1636           }
1637         }
1638 
1639         if (server.isStopped()) {
1640           return false;
1641         }
1642 
1643         // Add region plans, so we can updateTimers when one region is opened so
1644         // that unnecessary timeout on RIT is reduced.
1645         this.addPlans(plans);
1646 
1647         List<Triple<HRegionInfo, Integer, List<ServerName>>> regionOpenInfos =
1648           new ArrayList<Triple<HRegionInfo, Integer, List<ServerName>>>(states.size());
1649         for (RegionState state: states) {
1650           HRegionInfo region = state.getRegion();
1651           String encodedRegionName = region.getEncodedName();
1652           Integer nodeVersion = offlineNodesVersions.get(encodedRegionName);
1653           if (useZKForAssignment && (nodeVersion == null || nodeVersion == -1)) {
1654             LOG.warn("failed to offline in zookeeper: " + region);
1655             failedToOpenRegions.add(region); // assign individually later
1656             Lock lock = locks.remove(encodedRegionName);
1657             lock.unlock();
1658           } else {
1659             regionStates.updateRegionState(
1660               region, State.PENDING_OPEN, destination);
1661             List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
1662             if (this.shouldAssignRegionsWithFavoredNodes) {
1663               favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
1664             }
1665             regionOpenInfos.add(new Triple<HRegionInfo, Integer,  List<ServerName>>(
1666               region, nodeVersion, favoredNodes));
1667           }
1668         }
1669 
1670         // Move on to open regions.
1671         try {
1672           // Send OPEN RPC. If it fails on a IOE or RemoteException,
1673           // regions will be assigned individually.
1674           long maxWaitTime = System.currentTimeMillis() +
1675             this.server.getConfiguration().
1676               getLong("hbase.regionserver.rpc.startup.waittime", 60000);
1677           for (int i = 1; i <= maximumAttempts && !server.isStopped(); i++) {
1678             try {
1679               List<RegionOpeningState> regionOpeningStateList = serverManager
1680                 .sendRegionOpen(destination, regionOpenInfos);
1681               if (regionOpeningStateList == null) {
1682                 // Failed getting RPC connection to this server
1683                 return false;
1684               }
1685               for (int k = 0, n = regionOpeningStateList.size(); k < n; k++) {
1686                 RegionOpeningState openingState = regionOpeningStateList.get(k);
1687                 if (openingState != RegionOpeningState.OPENED) {
1688                   HRegionInfo region = regionOpenInfos.get(k).getFirst();
1689                   if (openingState == RegionOpeningState.ALREADY_OPENED) {
1690                     processAlreadyOpenedRegion(region, destination);
1691                   } else if (openingState == RegionOpeningState.FAILED_OPENING) {
1692                     // Failed opening this region, reassign it later
1693                     failedToOpenRegions.add(region);
1694                   } else {
1695                     LOG.warn("THIS SHOULD NOT HAPPEN: unknown opening state "
1696                       + openingState + " in assigning region " + region);
1697                   }
1698                 }
1699               }
1700               break;
1701             } catch (IOException e) {
1702               if (e instanceof RemoteException) {
1703                 e = ((RemoteException)e).unwrapRemoteException();
1704               }
1705               if (e instanceof RegionServerStoppedException) {
1706                 LOG.warn("The region server was shut down, ", e);
1707                 // No need to retry, the region server is a goner.
1708                 return false;
1709               } else if (e instanceof ServerNotRunningYetException) {
1710                 long now = System.currentTimeMillis();
1711                 if (now < maxWaitTime) {
1712                   LOG.debug("Server is not yet up; waiting up to " +
1713                     (maxWaitTime - now) + "ms", e);
1714                   Thread.sleep(100);
1715                   i--; // reset the try count
1716                   continue;
1717                 }
1718               } else if (e instanceof java.net.SocketTimeoutException
1719                   && this.serverManager.isServerOnline(destination)) {
1720                 // In case socket is timed out and the region server is still online,
1721                 // the openRegion RPC could have been accepted by the server and
1722                 // just the response didn't go through.  So we will retry to
1723                 // open the region on the same server.
1724                 if (LOG.isDebugEnabled()) {
1725                   LOG.debug("Bulk assigner openRegion() to " + destination
1726                     + " has timed out, but the regions might"
1727                     + " already be opened on it.", e);
1728                 }
1729                 // wait and reset the re-try count, server might be just busy.
1730                 Thread.sleep(100);
1731                 i--;
1732                 continue;
1733               }
1734               throw e;
1735             }
1736           }
1737         } catch (IOException e) {
1738           // Can be a socket timeout, EOF, NoRouteToHost, etc
1739           LOG.info("Unable to communicate with " + destination
1740             + " in order to assign regions, ", e);
1741           return false;
1742         }
1743       } finally {
1744         for (Lock lock : locks.values()) {
1745           lock.unlock();
1746         }
1747       }
1748 
1749       if (!failedToOpenRegions.isEmpty()) {
1750         for (HRegionInfo region : failedToOpenRegions) {
1751           if (!regionStates.isRegionOnline(region)) {
1752             invokeAssign(region);
1753           }
1754         }
1755       }
1756       LOG.debug("Bulk assigning done for " + destination);
1757       return true;
1758     } finally {
1759       metricsAssignmentManager.updateBulkAssignTime(EnvironmentEdgeManager.currentTimeMillis() - startTime);
1760     }
1761   }
1762 
1763   /**
1764    * Send CLOSE RPC if the server is online, otherwise, offline the region.
1765    *
1766    * The RPC will be sent only to the region sever found in the region state
1767    * if it is passed in, otherwise, to the src server specified. If region
1768    * state is not specified, we don't update region state at all, instead
1769    * we just send the RPC call. This is useful for some cleanup without
1770    * messing around the region states (see handleRegion, on region opened
1771    * on an unexpected server scenario, for an example)
1772    */
1773   private void unassign(final HRegionInfo region,
1774       final RegionState state, final int versionOfClosingNode,
1775       final ServerName dest, final boolean transitionInZK,
1776       final ServerName src) {
1777     ServerName server = src;
1778     if (state != null) {
1779       server = state.getServerName();
1780     }
1781     long maxWaitTime = -1;
1782     for (int i = 1; i <= this.maximumAttempts; i++) {
1783       if (this.server.isStopped() || this.server.isAborted()) {
1784         LOG.debug("Server stopped/aborted; skipping unassign of " + region);
1785         return;
1786       }
1787       // ClosedRegionhandler can remove the server from this.regions
1788       if (!serverManager.isServerOnline(server)) {
1789         LOG.debug("Offline " + region.getRegionNameAsString()
1790           + ", no need to unassign since it's on a dead server: " + server);
1791         if (transitionInZK) {
1792           // delete the node. if no node exists need not bother.
1793           deleteClosingOrClosedNode(region, server);
1794         }
1795         if (state != null) {
1796           regionOffline(region);
1797         }
1798         return;
1799       }
1800       try {
1801         // Send CLOSE RPC
1802         if (serverManager.sendRegionClose(server, region,
1803           versionOfClosingNode, dest, transitionInZK)) {
1804           LOG.debug("Sent CLOSE to " + server + " for region " +
1805             region.getRegionNameAsString());
1806           if (useZKForAssignment && !transitionInZK && state != null) {
1807             // Retry to make sure the region is
1808             // closed so as to avoid double assignment.
1809             unassign(region, state, versionOfClosingNode,
1810               dest, transitionInZK, src);
1811           }
1812           return;
1813         }
1814         // This never happens. Currently regionserver close always return true.
1815         // Todo; this can now happen (0.96) if there is an exception in a coprocessor
1816         LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
1817           region.getRegionNameAsString());
1818       } catch (Throwable t) {
1819         if (t instanceof RemoteException) {
1820           t = ((RemoteException)t).unwrapRemoteException();
1821         }
1822         boolean logRetries = true;
1823         if (t instanceof NotServingRegionException
1824             || t instanceof RegionServerStoppedException
1825             || t instanceof ServerNotRunningYetException) {
1826           LOG.debug("Offline " + region.getRegionNameAsString()
1827             + ", it's not any more on " + server, t);
1828           if (transitionInZK) {
1829             deleteClosingOrClosedNode(region, server);
1830           }
1831           if (state != null) {
1832             regionOffline(region);
1833           }
1834           return;
1835         } else if ((t instanceof FailedServerException) || (state != null &&
1836             t instanceof RegionAlreadyInTransitionException)) {
1837           long sleepTime = 0;
1838           Configuration conf = this.server.getConfiguration();
1839           if(t instanceof FailedServerException) {
1840             sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1841                   RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
1842           } else {
1843             // RS is already processing this region, only need to update the timestamp
1844             LOG.debug("update " + state + " the timestamp.");
1845             state.updateTimestampToNow();
1846             if (maxWaitTime < 0) {
1847               maxWaitTime =
1848                   EnvironmentEdgeManager.currentTimeMillis()
1849                       + conf.getLong(ALREADY_IN_TRANSITION_WAITTIME,
1850                         DEFAULT_ALREADY_IN_TRANSITION_WAITTIME);
1851             }
1852             long now = EnvironmentEdgeManager.currentTimeMillis();
1853             if (now < maxWaitTime) {
1854               LOG.debug("Region is already in transition; "
1855                 + "waiting up to " + (maxWaitTime - now) + "ms", t);
1856               sleepTime = 100;
1857               i--; // reset the try count
1858               logRetries = false;
1859             }
1860           }
1861           try {
1862             if (sleepTime > 0) {
1863               Thread.sleep(sleepTime);
1864             }
1865           } catch (InterruptedException ie) {
1866             LOG.warn("Failed to unassign "
1867               + region.getRegionNameAsString() + " since interrupted", ie);
1868             Thread.currentThread().interrupt();
1869             if (state != null) {
1870               regionStates.updateRegionState(region, State.FAILED_CLOSE);
1871             }
1872             return;
1873           }
1874         }
1875 
1876         if (logRetries) {
1877           LOG.info("Server " + server + " returned " + t + " for "
1878             + region.getRegionNameAsString() + ", try=" + i
1879             + " of " + this.maximumAttempts, t);
1880           // Presume retry or server will expire.
1881         }
1882       }
1883     }
1884     // Run out of attempts
1885     if (state != null) {
1886       regionStates.updateRegionState(region, State.FAILED_CLOSE);
1887     }
1888   }
1889 
1890   /**
1891    * Set region to OFFLINE unless it is opening and forceNewPlan is false.
1892    */
1893   private RegionState forceRegionStateToOffline(
1894       final HRegionInfo region, final boolean forceNewPlan) {
1895     RegionState state = regionStates.getRegionState(region);
1896     if (state == null) {
1897       LOG.warn("Assigning a region not in region states: " + region);
1898       state = regionStates.createRegionState(region);
1899     }
1900 
1901     ServerName sn = state.getServerName();
1902     if (forceNewPlan && LOG.isDebugEnabled()) {
1903       LOG.debug("Force region state offline " + state);
1904     }
1905 
1906     switch (state.getState()) {
1907     case OPEN:
1908     case OPENING:
1909     case PENDING_OPEN:
1910     case CLOSING:
1911     case PENDING_CLOSE:
1912       if (!forceNewPlan) {
1913         LOG.debug("Skip assigning " +
1914           region + ", it is already " + state);
1915         return null;
1916       }
1917     case FAILED_CLOSE:
1918     case FAILED_OPEN:
1919       unassign(region, state, -1, null, false, null);
1920       state = regionStates.getRegionState(region);
1921       if (state.isFailedClose()) {
1922         // If we can't close the region, we can't re-assign
1923         // it so as to avoid possible double assignment/data loss.
1924         LOG.info("Skip assigning " +
1925           region + ", we couldn't close it: " + state);
1926         return null;
1927       }
1928     case OFFLINE:
1929       // This region could have been open on this server
1930       // for a while. If the server is dead and not processed
1931       // yet, we can move on only if the meta shows the
1932       // region is not on this server actually, or on a server
1933       // not dead, or dead and processed already.
1934       // In case not using ZK, we don't need this check because
1935       // we have the latest info in memory, and the caller
1936       // will do another round checking any way.
1937       if (useZKForAssignment
1938           && regionStates.isServerDeadAndNotProcessed(sn)
1939           && wasRegionOnDeadServerByMeta(region, sn)) {
1940         LOG.info("Skip assigning " + region.getRegionNameAsString()
1941           + ", it is on a dead but not processed yet server: " + sn);
1942         return null;
1943       }
1944     case CLOSED:
1945       break;
1946     default:
1947       LOG.error("Trying to assign region " + region
1948         + ", which is " + state);
1949       return null;
1950     }
1951     return state;
1952   }
1953 
1954   @SuppressWarnings("deprecation")
1955   private boolean wasRegionOnDeadServerByMeta(
1956       final HRegionInfo region, final ServerName sn) {
1957     try {
1958       if (region.isMetaRegion()) {
1959         ServerName server = this.server.getMetaTableLocator().
1960           getMetaRegionLocation(this.server.getZooKeeper());
1961         return regionStates.isServerDeadAndNotProcessed(server);
1962       }
1963       while (!server.isStopped()) {
1964         try {
1965           this.server.getMetaTableLocator().waitMetaRegionLocation(server.getZooKeeper());
1966           Result r = MetaTableAccessor.getRegionResult(server.getShortCircuitConnection(),
1967             region.getRegionName());
1968           if (r == null || r.isEmpty()) return false;
1969           ServerName server = HRegionInfo.getServerName(r);
1970           return regionStates.isServerDeadAndNotProcessed(server);
1971         } catch (IOException ioe) {
1972           LOG.info("Received exception accessing hbase:meta during force assign "
1973             + region.getRegionNameAsString() + ", retrying", ioe);
1974         }
1975       }
1976     } catch (InterruptedException e) {
1977       Thread.currentThread().interrupt();
1978       LOG.info("Interrupted accessing hbase:meta", e);
1979     }
1980     // Call is interrupted or server is stopped.
1981     return regionStates.isServerDeadAndNotProcessed(sn);
1982   }
1983 
1984   /**
1985    * Caller must hold lock on the passed <code>state</code> object.
1986    * @param state
1987    * @param setOfflineInZK
1988    * @param forceNewPlan
1989    */
1990   private void assign(RegionState state,
1991       final boolean setOfflineInZK, final boolean forceNewPlan) {
1992     long startTime = EnvironmentEdgeManager.currentTimeMillis();
1993     try {
1994       Configuration conf = server.getConfiguration();
1995       RegionState currentState = state;
1996       int versionOfOfflineNode = -1;
1997       RegionPlan plan = null;
1998       long maxWaitTime = -1;
1999       HRegionInfo region = state.getRegion();
2000       RegionOpeningState regionOpenState;
2001       Throwable previousException = null;
2002       for (int i = 1; i <= maximumAttempts; i++) {
2003         if (server.isStopped() || server.isAborted()) {
2004           LOG.info("Skip assigning " + region.getRegionNameAsString()
2005             + ", the server is stopped/aborted");
2006           return;
2007         }
2008         if (plan == null) { // Get a server for the region at first
2009           try {
2010             plan = getRegionPlan(region, forceNewPlan);
2011           } catch (HBaseIOException e) {
2012             LOG.warn("Failed to get region plan", e);
2013           }
2014         }
2015         if (plan == null) {
2016           LOG.warn("Unable to determine a plan to assign " + region);
2017           if (region.isMetaRegion()) {
2018             try {
2019               Thread.sleep(this.sleepTimeBeforeRetryingMetaAssignment);
2020               if (i == maximumAttempts) i = 1;
2021               continue;
2022             } catch (InterruptedException e) {
2023               LOG.error("Got exception while waiting for hbase:meta assignment");
2024               Thread.currentThread().interrupt();
2025             }
2026           }
2027           regionStates.updateRegionState(region, State.FAILED_OPEN);
2028           return;
2029         }
2030         if (setOfflineInZK && versionOfOfflineNode == -1) {
2031           // get the version of the znode after setting it to OFFLINE.
2032           // versionOfOfflineNode will be -1 if the znode was not set to OFFLINE
2033           versionOfOfflineNode = setOfflineInZooKeeper(currentState, plan.getDestination());
2034           if (versionOfOfflineNode != -1) {
2035             if (isDisabledorDisablingRegionInRIT(region)) {
2036               return;
2037             }
2038             // In case of assignment from EnableTableHandler table state is ENABLING. Any how
2039             // EnableTableHandler will set ENABLED after assigning all the table regions. If we
2040             // try to set to ENABLED directly then client API may think table is enabled.
2041             // When we have a case such as all the regions are added directly into hbase:meta and we call
2042             // assignRegion then we need to make the table ENABLED. Hence in such case the table
2043             // will not be in ENABLING or ENABLED state.
2044             TableName tableName = region.getTable();
2045             if (!tableStateManager.isTableState(tableName,
2046               ZooKeeperProtos.Table.State.ENABLED, ZooKeeperProtos.Table.State.ENABLING)) {
2047               LOG.debug("Setting table " + tableName + " to ENABLED state.");
2048               setEnabledTable(tableName);
2049             }
2050           }
2051         }
2052         if (setOfflineInZK && versionOfOfflineNode == -1) {
2053           LOG.info("Unable to set offline in ZooKeeper to assign " + region);
2054           // Setting offline in ZK must have been failed due to ZK racing or some
2055           // exception which may make the server to abort. If it is ZK racing,
2056           // we should retry since we already reset the region state,
2057           // existing (re)assignment will fail anyway.
2058           if (!server.isAborted()) {
2059             continue;
2060           }
2061         }
2062         LOG.info("Assigning " + region.getRegionNameAsString() +
2063             " to " + plan.getDestination().toString());
2064         // Transition RegionState to PENDING_OPEN
2065         currentState = regionStates.updateRegionState(region,
2066           State.PENDING_OPEN, plan.getDestination());
2067 
2068         boolean needNewPlan;
2069         final String assignMsg = "Failed assignment of " + region.getRegionNameAsString() +
2070             " to " + plan.getDestination();
2071         try {
2072           List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
2073           if (this.shouldAssignRegionsWithFavoredNodes) {
2074             favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
2075           }
2076           regionOpenState = serverManager.sendRegionOpen(
2077               plan.getDestination(), region, versionOfOfflineNode, favoredNodes);
2078 
2079           if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
2080             // Failed opening this region, looping again on a new server.
2081             needNewPlan = true;
2082             LOG.warn(assignMsg + ", regionserver says 'FAILED_OPENING', " +
2083                 " trying to assign elsewhere instead; " +
2084                 "try=" + i + " of " + this.maximumAttempts);
2085           } else {
2086             // we're done
2087             if (regionOpenState == RegionOpeningState.ALREADY_OPENED) {
2088               processAlreadyOpenedRegion(region, plan.getDestination());
2089             }
2090             return;
2091           }
2092 
2093         } catch (Throwable t) {
2094           if (t instanceof RemoteException) {
2095             t = ((RemoteException) t).unwrapRemoteException();
2096           }
2097           previousException = t;
2098 
2099           // Should we wait a little before retrying? If the server is starting it's yes.
2100           // If the region is already in transition, it's yes as well: we want to be sure that
2101           //  the region will get opened but we don't want a double assignment.
2102           boolean hold = (t instanceof RegionAlreadyInTransitionException ||
2103               t instanceof ServerNotRunningYetException);
2104 
2105           // In case socket is timed out and the region server is still online,
2106           // the openRegion RPC could have been accepted by the server and
2107           // just the response didn't go through.  So we will retry to
2108           // open the region on the same server to avoid possible
2109           // double assignment.
2110           boolean retry = !hold && (t instanceof java.net.SocketTimeoutException
2111               && this.serverManager.isServerOnline(plan.getDestination()));
2112 
2113 
2114           if (hold) {
2115             LOG.warn(assignMsg + ", waiting a little before trying on the same region server " +
2116               "try=" + i + " of " + this.maximumAttempts, t);
2117 
2118             if (maxWaitTime < 0) {
2119               if (t instanceof RegionAlreadyInTransitionException) {
2120                 maxWaitTime = EnvironmentEdgeManager.currentTimeMillis()
2121                   + this.server.getConfiguration().getLong(ALREADY_IN_TRANSITION_WAITTIME,
2122                     DEFAULT_ALREADY_IN_TRANSITION_WAITTIME);
2123               } else {
2124                 maxWaitTime = this.server.getConfiguration().
2125                   getLong("hbase.regionserver.rpc.startup.waittime", 60000);
2126               }
2127             }
2128             try {
2129               needNewPlan = false;
2130               long now = EnvironmentEdgeManager.currentTimeMillis();
2131               if (now < maxWaitTime) {
2132                 LOG.debug("Server is not yet up or region is already in transition; "
2133                   + "waiting up to " + (maxWaitTime - now) + "ms", t);
2134                 Thread.sleep(100);
2135                 i--; // reset the try count
2136               } else if (!(t instanceof RegionAlreadyInTransitionException)) {
2137                 LOG.debug("Server is not up for a while; try a new one", t);
2138                 needNewPlan = true;
2139               }
2140             } catch (InterruptedException ie) {
2141               LOG.warn("Failed to assign "
2142                   + region.getRegionNameAsString() + " since interrupted", ie);
2143               regionStates.updateRegionState(region, State.FAILED_OPEN);
2144               Thread.currentThread().interrupt();
2145               return;
2146             }
2147           } else if (retry) {
2148             needNewPlan = false;
2149             i--; // we want to retry as many times as needed as long as the RS is not dead.
2150             LOG.warn(assignMsg + ", trying to assign to the same region server due ", t);
2151           } else {
2152             needNewPlan = true;
2153             LOG.warn(assignMsg + ", trying to assign elsewhere instead;" +
2154                 " try=" + i + " of " + this.maximumAttempts, t);
2155           }
2156         }
2157 
2158         if (i == this.maximumAttempts) {
2159           // Don't reset the region state or get a new plan any more.
2160           // This is the last try.
2161           continue;
2162         }
2163 
2164         // If region opened on destination of present plan, reassigning to new
2165         // RS may cause double assignments. In case of RegionAlreadyInTransitionException
2166         // reassigning to same RS.
2167         if (needNewPlan) {
2168           // Force a new plan and reassign. Will return null if no servers.
2169           // The new plan could be the same as the existing plan since we don't
2170           // exclude the server of the original plan, which should not be
2171           // excluded since it could be the only server up now.
2172           RegionPlan newPlan = null;
2173           try {
2174             newPlan = getRegionPlan(region, true);
2175           } catch (HBaseIOException e) {
2176             LOG.warn("Failed to get region plan", e);
2177           }
2178           if (newPlan == null) {
2179             regionStates.updateRegionState(region, State.FAILED_OPEN);
2180             LOG.warn("Unable to find a viable location to assign region " +
2181                 region.getRegionNameAsString());
2182             return;
2183           }
2184 
2185           if (plan != newPlan && !plan.getDestination().equals(newPlan.getDestination())) {
2186             // Clean out plan we failed execute and one that doesn't look like it'll
2187             // succeed anyways; we need a new plan!
2188             // Transition back to OFFLINE
2189             currentState = regionStates.updateRegionState(region, State.OFFLINE);
2190             versionOfOfflineNode = -1;
2191             plan = newPlan;
2192           } else if(plan.getDestination().equals(newPlan.getDestination()) &&
2193               previousException instanceof FailedServerException) {
2194             try {
2195               LOG.info("Trying to re-assign " + region.getRegionNameAsString() +
2196                 " to the same failed server.");
2197               Thread.sleep(1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
2198                 RpcClient.FAILED_SERVER_EXPIRY_DEFAULT));
2199             } catch (InterruptedException ie) {
2200               LOG.warn("Failed to assign "
2201                   + region.getRegionNameAsString() + " since interrupted", ie);
2202               regionStates.updateRegionState(region, State.FAILED_OPEN);
2203               Thread.currentThread().interrupt();
2204               return;
2205             }
2206           }
2207         }
2208       }
2209       // Run out of attempts
2210       regionStates.updateRegionState(region, State.FAILED_OPEN);
2211     } finally {
2212       metricsAssignmentManager.updateAssignmentTime(EnvironmentEdgeManager.currentTimeMillis() - startTime);
2213     }
2214   }
2215 
2216   private void processAlreadyOpenedRegion(HRegionInfo region, ServerName sn) {
2217     // Remove region from in-memory transition and unassigned node from ZK
2218     // While trying to enable the table the regions of the table were
2219     // already enabled.
2220     LOG.debug("ALREADY_OPENED " + region.getRegionNameAsString()
2221       + " to " + sn);
2222     String encodedName = region.getEncodedName();
2223     deleteNodeInStates(encodedName, "offline", sn, EventType.M_ZK_REGION_OFFLINE);
2224     regionStates.regionOnline(region, sn);
2225   }
2226 
2227   private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) {
2228     if (this.tableStateManager.isTableState(region.getTable(),
2229         ZooKeeperProtos.Table.State.DISABLED,
2230         ZooKeeperProtos.Table.State.DISABLING) || replicasToClose.contains(region)) {
2231       LOG.info("Table " + region.getTable() + " is disabled or disabling;"
2232         + " skipping assign of " + region.getRegionNameAsString());
2233       offlineDisabledRegion(region);
2234       return true;
2235     }
2236     return false;
2237   }
2238 
2239   /**
2240    * Set region as OFFLINED up in zookeeper
2241    *
2242    * @param state
2243    * @return the version of the offline node if setting of the OFFLINE node was
2244    *         successful, -1 otherwise.
2245    */
2246   private int setOfflineInZooKeeper(final RegionState state, final ServerName destination) {
2247     if (!state.isClosed() && !state.isOffline()) {
2248       String msg = "Unexpected state : " + state + " .. Cannot transit it to OFFLINE.";
2249       this.server.abort(msg, new IllegalStateException(msg));
2250       return -1;
2251     }
2252     regionStates.updateRegionState(state.getRegion(), State.OFFLINE);
2253     int versionOfOfflineNode;
2254     try {
2255       // get the version after setting the znode to OFFLINE
2256       versionOfOfflineNode = ZKAssign.createOrForceNodeOffline(watcher,
2257         state.getRegion(), destination);
2258       if (versionOfOfflineNode == -1) {
2259         LOG.warn("Attempted to create/force node into OFFLINE state before "
2260             + "completing assignment but failed to do so for " + state);
2261         return -1;
2262       }
2263     } catch (KeeperException e) {
2264       server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
2265       return -1;
2266     }
2267     return versionOfOfflineNode;
2268   }
2269 
2270   /**
2271    * @param region the region to assign
2272    * @return Plan for passed <code>region</code> (If none currently, it creates one or
2273    * if no servers to assign, it returns null).
2274    */
2275   private RegionPlan getRegionPlan(final HRegionInfo region,
2276       final boolean forceNewPlan)  throws HBaseIOException {
2277     return getRegionPlan(region, null, forceNewPlan);
2278   }
2279 
2280   /**
2281    * @param region the region to assign
2282    * @param serverToExclude Server to exclude (we know its bad). Pass null if
2283    * all servers are thought to be assignable.
2284    * @param forceNewPlan If true, then if an existing plan exists, a new plan
2285    * will be generated.
2286    * @return Plan for passed <code>region</code> (If none currently, it creates one or
2287    * if no servers to assign, it returns null).
2288    */
2289   private RegionPlan getRegionPlan(final HRegionInfo region,
2290       final ServerName serverToExclude, final boolean forceNewPlan) throws HBaseIOException {
2291     // Pickup existing plan or make a new one
2292     final String encodedName = region.getEncodedName();
2293     final List<ServerName> destServers =
2294       serverManager.createDestinationServersList(serverToExclude);
2295 
2296     if (destServers.isEmpty()){
2297       LOG.warn("Can't move " + encodedName +
2298         ", there is no destination server available.");
2299       return null;
2300     }
2301 
2302     RegionPlan randomPlan = null;
2303     boolean newPlan = false;
2304     RegionPlan existingPlan;
2305 
2306     synchronized (this.regionPlans) {
2307       existingPlan = this.regionPlans.get(encodedName);
2308 
2309       if (existingPlan != null && existingPlan.getDestination() != null) {
2310         LOG.debug("Found an existing plan for " + region.getRegionNameAsString()
2311           + " destination server is " + existingPlan.getDestination() +
2312             " accepted as a dest server = " + destServers.contains(existingPlan.getDestination()));
2313       }
2314 
2315       if (forceNewPlan
2316           || existingPlan == null
2317           || existingPlan.getDestination() == null
2318           || !destServers.contains(existingPlan.getDestination())) {
2319         newPlan = true;
2320         randomPlan = new RegionPlan(region, null,
2321             balancer.randomAssignment(region, destServers));
2322         if (!region.isMetaTable() && shouldAssignRegionsWithFavoredNodes) {
2323           List<HRegionInfo> regions = new ArrayList<HRegionInfo>(1);
2324           regions.add(region);
2325           try {
2326             processFavoredNodes(regions);
2327           } catch (IOException ie) {
2328             LOG.warn("Ignoring exception in processFavoredNodes " + ie);
2329           }
2330         }
2331         this.regionPlans.put(encodedName, randomPlan);
2332       }
2333     }
2334 
2335     if (newPlan) {
2336       if (randomPlan.getDestination() == null) {
2337         LOG.warn("Can't find a destination for " + encodedName);
2338         return null;
2339       }
2340       LOG.debug("No previous transition plan found (or ignoring " +
2341         "an existing plan) for " + region.getRegionNameAsString() +
2342         "; generated random plan=" + randomPlan + "; " + destServers.size() +
2343         " (online=" + serverManager.getOnlineServers().size() +
2344         ") available servers, forceNewPlan=" + forceNewPlan);
2345         return randomPlan;
2346       }
2347     LOG.debug("Using pre-existing plan for " +
2348       region.getRegionNameAsString() + "; plan=" + existingPlan);
2349     return existingPlan;
2350   }
2351 
2352   /**
2353    * Unassigns the specified region.
2354    * <p>
2355    * Updates the RegionState and sends the CLOSE RPC unless region is being
2356    * split by regionserver; then the unassign fails (silently) because we
2357    * presume the region being unassigned no longer exists (its been split out
2358    * of existence). TODO: What to do if split fails and is rolled back and
2359    * parent is revivified?
2360    * <p>
2361    * If a RegionPlan is already set, it will remain.
2362    *
2363    * @param region server to be unassigned
2364    */
2365   public void unassign(HRegionInfo region) {
2366     unassign(region, false);
2367   }
2368 
2369 
2370   /**
2371    * Unassigns the specified region.
2372    * <p>
2373    * Updates the RegionState and sends the CLOSE RPC unless region is being
2374    * split by regionserver; then the unassign fails (silently) because we
2375    * presume the region being unassigned no longer exists (its been split out
2376    * of existence). TODO: What to do if split fails and is rolled back and
2377    * parent is revivified?
2378    * <p>
2379    * If a RegionPlan is already set, it will remain.
2380    *
2381    * @param region server to be unassigned
2382    * @param force if region should be closed even if already closing
2383    */
2384   public void unassign(HRegionInfo region, boolean force, ServerName dest) {
2385     // TODO: Method needs refactoring.  Ugly buried returns throughout.  Beware!
2386     LOG.debug("Starting unassign of " + region.getRegionNameAsString()
2387       + " (offlining), current state: " + regionStates.getRegionState(region));
2388 
2389     String encodedName = region.getEncodedName();
2390     // Grab the state of this region and synchronize on it
2391     int versionOfClosingNode = -1;
2392     // We need a lock here as we're going to do a put later and we don't want multiple states
2393     //  creation
2394     ReentrantLock lock = locker.acquireLock(encodedName);
2395     RegionState state = regionStates.getRegionTransitionState(encodedName);
2396     boolean reassign = true;
2397     try {
2398       if (state == null) {
2399         // Region is not in transition.
2400         // We can unassign it only if it's not SPLIT/MERGED.
2401         state = regionStates.getRegionState(encodedName);
2402         if (state != null && state.isUnassignable()) {
2403           LOG.info("Attempting to unassign " + state + ", ignored");
2404           // Offline region will be reassigned below
2405           return;
2406         }
2407         // Create the znode in CLOSING state
2408         try {
2409           if (state == null || state.getServerName() == null) {
2410             // We don't know where the region is, offline it.
2411             // No need to send CLOSE RPC
2412             LOG.warn("Attempting to unassign a region not in RegionStates"
2413               + region.getRegionNameAsString() + ", offlined");
2414             regionOffline(region);
2415             return;
2416           }
2417           if (useZKForAssignment) {
2418             versionOfClosingNode = ZKAssign.createNodeClosing(
2419               watcher, region, state.getServerName());
2420             if (versionOfClosingNode == -1) {
2421               LOG.info("Attempting to unassign " +
2422                 region.getRegionNameAsString() + " but ZK closing node "
2423                 + "can't be created.");
2424               reassign = false; // not unassigned at all
2425               return;
2426             }
2427           }
2428         } catch (KeeperException e) {
2429           if (e instanceof NodeExistsException) {
2430             // Handle race between master initiated close and regionserver
2431             // orchestrated splitting. See if existing node is in a
2432             // SPLITTING or SPLIT state.  If so, the regionserver started
2433             // an op on node before we could get our CLOSING in.  Deal.
2434             NodeExistsException nee = (NodeExistsException)e;
2435             String path = nee.getPath();
2436             try {
2437               if (isSplitOrSplittingOrMergedOrMerging(path)) {
2438                 LOG.debug(path + " is SPLIT or SPLITTING or MERGED or MERGING; " +
2439                   "skipping unassign because region no longer exists -- its split or merge");
2440                 reassign = false; // no need to reassign for split/merged region
2441                 return;
2442               }
2443             } catch (KeeperException.NoNodeException ke) {
2444               LOG.warn("Failed getData on SPLITTING/SPLIT at " + path +
2445                 "; presuming split and that the region to unassign, " +
2446                 encodedName + ", no longer exists -- confirm", ke);
2447               return;
2448             } catch (KeeperException ke) {
2449               LOG.error("Unexpected zk state", ke);
2450             } catch (DeserializationException de) {
2451               LOG.error("Failed parse", de);
2452             }
2453           }
2454           // If we get here, don't understand whats going on -- abort.
2455           server.abort("Unexpected ZK exception creating node CLOSING", e);
2456           reassign = false; // heading out already
2457           return;
2458         }
2459         state = regionStates.updateRegionState(region, State.PENDING_CLOSE);
2460       } else if (state.isFailedOpen()) {
2461         // The region is not open yet
2462         regionOffline(region);
2463         return;
2464       } else if (force && state.isPendingCloseOrClosing()) {
2465         LOG.debug("Attempting to unassign " + region.getRegionNameAsString() +
2466           " which is already " + state.getState()  +
2467           " but forcing to send a CLOSE RPC again ");
2468         if (state.isFailedClose()) {
2469           state = regionStates.updateRegionState(region, State.PENDING_CLOSE);
2470         }
2471         state.updateTimestampToNow();
2472       } else {
2473         LOG.debug("Attempting to unassign " +
2474           region.getRegionNameAsString() + " but it is " +
2475           "already in transition (" + state.getState() + ", force=" + force + ")");
2476         return;
2477       }
2478 
2479       unassign(region, state, versionOfClosingNode, dest, useZKForAssignment, null);
2480     } finally {
2481       lock.unlock();
2482 
2483       // Region is expected to be reassigned afterwards
2484       if (!replicasToClose.contains(region) && reassign && regionStates.isRegionOffline(region)) {
2485         assign(region, true);
2486       }
2487     }
2488   }
2489 
2490   public void unassign(HRegionInfo region, boolean force){
2491      unassign(region, force, null);
2492   }
2493 
2494   /**
2495    * @param region regioninfo of znode to be deleted.
2496    */
2497   public void deleteClosingOrClosedNode(HRegionInfo region, ServerName sn) {
2498     String encodedName = region.getEncodedName();
2499     deleteNodeInStates(encodedName, "closing", sn, EventType.M_ZK_REGION_CLOSING,
2500       EventType.RS_ZK_REGION_CLOSED);
2501   }
2502 
2503   /**
2504    * @param path
2505    * @return True if znode is in SPLIT or SPLITTING or MERGED or MERGING state.
2506    * @throws KeeperException Can happen if the znode went away in meantime.
2507    * @throws DeserializationException
2508    */
2509   private boolean isSplitOrSplittingOrMergedOrMerging(final String path)
2510       throws KeeperException, DeserializationException {
2511     boolean result = false;
2512     // This may fail if the SPLIT or SPLITTING or MERGED or MERGING znode gets
2513     // cleaned up before we can get data from it.
2514     byte [] data = ZKAssign.getData(watcher, path);
2515     if (data == null) {
2516       LOG.info("Node " + path + " is gone");
2517       return false;
2518     }
2519     RegionTransition rt = RegionTransition.parseFrom(data);
2520     switch (rt.getEventType()) {
2521     case RS_ZK_REQUEST_REGION_SPLIT:
2522     case RS_ZK_REGION_SPLIT:
2523     case RS_ZK_REGION_SPLITTING:
2524     case RS_ZK_REQUEST_REGION_MERGE:
2525     case RS_ZK_REGION_MERGED:
2526     case RS_ZK_REGION_MERGING:
2527       result = true;
2528       break;
2529     default:
2530       LOG.info("Node " + path + " is in " + rt.getEventType());
2531       break;
2532     }
2533     return result;
2534   }
2535 
2536   /**
2537    * Used by unit tests. Return the number of regions opened so far in the life
2538    * of the master. Increases by one every time the master opens a region
2539    * @return the counter value of the number of regions opened so far
2540    */
2541   public int getNumRegionsOpened() {
2542     return numRegionsOpened.get();
2543   }
2544 
2545   /**
2546    * Waits until the specified region has completed assignment.
2547    * <p>
2548    * If the region is already assigned, returns immediately.  Otherwise, method
2549    * blocks until the region is assigned.
2550    * @param regionInfo region to wait on assignment for
2551    * @throws InterruptedException
2552    */
2553   public boolean waitForAssignment(HRegionInfo regionInfo)
2554       throws InterruptedException {
2555     while (!regionStates.isRegionOnline(regionInfo)) {
2556       if (regionStates.isRegionInState(regionInfo, State.FAILED_OPEN)
2557           || this.server.isStopped()) {
2558         return false;
2559       }
2560 
2561       // We should receive a notification, but it's
2562       //  better to have a timeout to recheck the condition here:
2563       //  it lowers the impact of a race condition if any
2564       regionStates.waitForUpdate(100);
2565     }
2566     return true;
2567   }
2568 
2569   /**
2570    * Assigns the hbase:meta region.
2571    * <p>
2572    * Assumes that hbase:meta is currently closed and is not being actively served by
2573    * any RegionServer.
2574    * <p>
2575    * Forcibly unsets the current meta region location in ZooKeeper and assigns
2576    * hbase:meta to a random RegionServer.
2577    * @throws KeeperException
2578    */
2579   public void assignMeta() throws KeeperException {
2580     this.server.getMetaTableLocator().deleteMetaLocation(this.watcher);
2581     assign(HRegionInfo.FIRST_META_REGIONINFO, true);
2582   }
2583 
2584   /**
2585    * Assigns specified regions retaining assignments, if any.
2586    * <p>
2587    * This is a synchronous call and will return once every region has been
2588    * assigned.  If anything fails, an exception is thrown
2589    * @throws InterruptedException
2590    * @throws IOException
2591    */
2592   public void assign(Map<HRegionInfo, ServerName> regions)
2593         throws IOException, InterruptedException {
2594     if (regions == null || regions.isEmpty()) {
2595       return;
2596     }
2597     List<ServerName> servers = serverManager.createDestinationServersList();
2598     if (servers == null || servers.isEmpty()) {
2599       throw new IOException("Found no destination server to assign region(s)");
2600     }
2601 
2602     // Reuse existing assignment info
2603     Map<ServerName, List<HRegionInfo>> bulkPlan =
2604       balancer.retainAssignment(regions, servers);
2605 
2606     assign(regions.size(), servers.size(),
2607       "retainAssignment=true", bulkPlan);
2608   }
2609 
2610   /**
2611    * Assigns specified regions round robin, if any.
2612    * <p>
2613    * This is a synchronous call and will return once every region has been
2614    * assigned.  If anything fails, an exception is thrown
2615    * @throws InterruptedException
2616    * @throws IOException
2617    */
2618   public void assign(List<HRegionInfo> regions)
2619         throws IOException, InterruptedException {
2620     if (regions == null || regions.isEmpty()) {
2621       return;
2622     }
2623 
2624     List<ServerName> servers = serverManager.createDestinationServersList();
2625     if (servers == null || servers.isEmpty()) {
2626       throw new IOException("Found no destination server to assign region(s)");
2627     }
2628 
2629     // Generate a round-robin bulk assignment plan
2630     Map<ServerName, List<HRegionInfo>> bulkPlan
2631       = balancer.roundRobinAssignment(regions, servers);
2632     processFavoredNodes(regions);
2633 
2634     assign(regions.size(), servers.size(),
2635       "round-robin=true", bulkPlan);
2636   }
2637 
2638   private void assign(int regions, int totalServers,
2639       String message, Map<ServerName, List<HRegionInfo>> bulkPlan)
2640           throws InterruptedException, IOException {
2641 
2642     int servers = bulkPlan.size();
2643     if (servers == 1 || (regions < bulkAssignThresholdRegions
2644         && servers < bulkAssignThresholdServers)) {
2645 
2646       // Not use bulk assignment.  This could be more efficient in small
2647       // cluster, especially mini cluster for testing, so that tests won't time out
2648       if (LOG.isTraceEnabled()) {
2649         LOG.trace("Not using bulk assignment since we are assigning only " + regions +
2650           " region(s) to " + servers + " server(s)");
2651       }
2652       for (Map.Entry<ServerName, List<HRegionInfo>> plan: bulkPlan.entrySet()) {
2653         if (!assign(plan.getKey(), plan.getValue())) {
2654           for (HRegionInfo region: plan.getValue()) {
2655             if (!regionStates.isRegionOnline(region)) {
2656               invokeAssign(region);
2657             }
2658           }
2659         }
2660       }
2661     } else {
2662       LOG.info("Bulk assigning " + regions + " region(s) across "
2663         + totalServers + " server(s), " + message);
2664 
2665       // Use fixed count thread pool assigning.
2666       BulkAssigner ba = new GeneralBulkAssigner(
2667         this.server, bulkPlan, this, bulkAssignWaitTillAllAssigned);
2668       ba.bulkAssign();
2669       LOG.info("Bulk assigning done");
2670     }
2671   }
2672 
2673   /**
2674    * Assigns all user regions, if any exist.  Used during cluster startup.
2675    * <p>
2676    * This is a synchronous call and will return once every region has been
2677    * assigned.  If anything fails, an exception is thrown and the cluster
2678    * should be shutdown.
2679    * @throws InterruptedException
2680    * @throws IOException
2681    */
2682   private void assignAllUserRegions(Map<HRegionInfo, ServerName> allRegions)
2683       throws IOException, InterruptedException {
2684     if (allRegions == null || allRegions.isEmpty()) return;
2685 
2686     // Determine what type of assignment to do on startup
2687     boolean retainAssignment = server.getConfiguration().
2688       getBoolean("hbase.master.startup.retainassign", true);
2689 
2690     Set<HRegionInfo> regionsFromMetaScan = allRegions.keySet();
2691     if (retainAssignment) {
2692       assign(allRegions);
2693     } else {
2694       List<HRegionInfo> regions = new ArrayList<HRegionInfo>(regionsFromMetaScan);
2695       assign(regions);
2696     }
2697 
2698     for (HRegionInfo hri : regionsFromMetaScan) {
2699       TableName tableName = hri.getTable();
2700       if (!tableStateManager.isTableState(tableName,
2701           ZooKeeperProtos.Table.State.ENABLED)) {
2702         setEnabledTable(tableName);
2703       }
2704     }
2705     // assign all the replicas that were not recorded in the meta
2706     assign(replicaRegionsNotRecordedInMeta(regionsFromMetaScan, (MasterServices)server));
2707   }
2708 
2709   /**
2710    * Get a list of replica regions that are:
2711    * not recorded in meta yet. We might not have recorded the locations
2712    * for the replicas since the replicas may not have been online yet, master restarted
2713    * in the middle of assigning, ZK erased, etc.
2714    * @param regionsRecordedInMeta the list of regions we know are recorded in meta
2715    * either as a default, or, as the location of a replica
2716    * @param master
2717    * @return list of replica regions
2718    * @throws IOException
2719    */
2720   public static List<HRegionInfo> replicaRegionsNotRecordedInMeta(
2721       Set<HRegionInfo> regionsRecordedInMeta, MasterServices master)throws IOException {
2722     List<HRegionInfo> regionsNotRecordedInMeta = new ArrayList<HRegionInfo>();
2723     for (HRegionInfo hri : regionsRecordedInMeta) {
2724       TableName table = hri.getTable();
2725       HTableDescriptor htd = master.getTableDescriptors().get(table);
2726       // look at the HTD for the replica count. That's the source of truth
2727       int desiredRegionReplication = htd.getRegionReplication();
2728       for (int i = 0; i < desiredRegionReplication; i++) {
2729         HRegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hri, i);
2730         if (regionsRecordedInMeta.contains(replica)) continue;
2731         regionsNotRecordedInMeta.add(replica);
2732       }
2733     }
2734     return regionsNotRecordedInMeta;
2735   }
2736 
2737   /**
2738    * Wait until no regions in transition.
2739    * @param timeout How long to wait.
2740    * @return True if nothing in regions in transition.
2741    * @throws InterruptedException
2742    */
2743   boolean waitUntilNoRegionsInTransition(final long timeout)
2744       throws InterruptedException {
2745     // Blocks until there are no regions in transition. It is possible that
2746     // there
2747     // are regions in transition immediately after this returns but guarantees
2748     // that if it returns without an exception that there was a period of time
2749     // with no regions in transition from the point-of-view of the in-memory
2750     // state of the Master.
2751     final long endTime = System.currentTimeMillis() + timeout;
2752 
2753     while (!this.server.isStopped() && regionStates.isRegionsInTransition()
2754         && endTime > System.currentTimeMillis()) {
2755       regionStates.waitForUpdate(100);
2756     }
2757 
2758     return !regionStates.isRegionsInTransition();
2759   }
2760 
2761   /**
2762    * Rebuild the list of user regions and assignment information.
2763    * <p>
2764    * Returns a set of servers that are not found to be online that hosted
2765    * some regions.
2766    * @return set of servers not online that hosted some regions per meta
2767    * @throws IOException
2768    */
2769   Set<ServerName> rebuildUserRegions() throws
2770       IOException, KeeperException, CoordinatedStateException {
2771     Set<TableName> disabledOrEnablingTables = tableStateManager.getTablesInStates(
2772       ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.ENABLING);
2773 
2774     Set<TableName> disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
2775       ZooKeeperProtos.Table.State.DISABLED,
2776       ZooKeeperProtos.Table.State.DISABLING,
2777       ZooKeeperProtos.Table.State.ENABLING);
2778 
2779     // Region assignment from META
2780     List<Result> results = MetaTableAccessor.fullScanOfMeta(server.getShortCircuitConnection());
2781     // Get any new but slow to checkin region server that joined the cluster
2782     Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
2783     // Set of offline servers to be returned
2784     Set<ServerName> offlineServers = new HashSet<ServerName>();
2785     // Iterate regions in META
2786     for (Result result : results) {
2787       if (result == null && LOG.isDebugEnabled()){
2788         LOG.debug("null result from meta - ignoring but this is strange.");
2789         continue;
2790       }
2791       // keep a track of replicas to close. These were the replicas of the originally
2792       // unmerged regions. The master might have closed them before but it mightn't
2793       // maybe because it crashed.
2794       PairOfSameType<HRegionInfo> p = MetaTableAccessor.getMergeRegions(result);
2795       if (p.getFirst() != null && p.getSecond() != null) {
2796         int numReplicas = ((MasterServices)server).getTableDescriptors().get(p.getFirst().
2797             getTable()).getRegionReplication();
2798         for (HRegionInfo merge : p) {
2799           for (int i = 1; i < numReplicas; i++) {
2800             replicasToClose.add(RegionReplicaUtil.getRegionInfoForReplica(merge, i));
2801           }
2802         }
2803       }
2804       RegionLocations rl =  MetaTableAccessor.getRegionLocations(result);
2805       if (rl == null) continue;
2806       HRegionLocation[] locations = rl.getRegionLocations();
2807       if (locations == null) continue;
2808       for (HRegionLocation hrl : locations) {
2809         HRegionInfo regionInfo = hrl.getRegionInfo();
2810         if (regionInfo == null) continue;
2811         int replicaId = regionInfo.getReplicaId();
2812         State state = RegionStateStore.getRegionState(result, replicaId);
2813         // keep a track of replicas to close. These were the replicas of the split parents
2814         // from the previous life of the master. The master should have closed them before
2815         // but it couldn't maybe because it crashed
2816         if (replicaId == 0 && state.equals(State.SPLIT)) {
2817           for (HRegionLocation h : locations) {
2818             replicasToClose.add(h.getRegionInfo());
2819           }
2820         }
2821         ServerName lastHost = hrl.getServerName();
2822         ServerName regionLocation = RegionStateStore.getRegionServer(result, replicaId);
2823         regionStates.createRegionState(regionInfo, state, regionLocation, lastHost);
2824         if (!regionStates.isRegionInState(regionInfo, State.OPEN)) {
2825           // Region is not open (either offline or in transition), skip
2826           continue;
2827         }
2828         TableName tableName = regionInfo.getTable();
2829         if (!onlineServers.contains(regionLocation)) {
2830           // Region is located on a server that isn't online
2831           offlineServers.add(regionLocation);
2832           if (useZKForAssignment) {
2833             regionStates.regionOffline(regionInfo);
2834           }
2835         } else if (!disabledOrEnablingTables.contains(tableName)) {
2836           // Region is being served and on an active server
2837           // add only if region not in disabled or enabling table
2838           regionStates.regionOnline(regionInfo, regionLocation);
2839           balancer.regionOnline(regionInfo, regionLocation);
2840         } else if (useZKForAssignment) {
2841           regionStates.regionOffline(regionInfo);
2842         }
2843         // need to enable the table if not disabled or disabling or enabling
2844         // this will be used in rolling restarts
2845         if (!disabledOrDisablingOrEnabling.contains(tableName)
2846           && !getTableStateManager().isTableState(tableName,
2847             ZooKeeperProtos.Table.State.ENABLED)) {
2848           setEnabledTable(tableName);
2849         }
2850       }
2851     }
2852     return offlineServers;
2853   }
2854 
2855   /**
2856    * Recover the tables that were not fully moved to DISABLED state. These
2857    * tables are in DISABLING state when the master restarted/switched.
2858    *
2859    * @throws KeeperException
2860    * @throws TableNotFoundException
2861    * @throws IOException
2862    */
2863   private void recoverTableInDisablingState()
2864       throws KeeperException, IOException, CoordinatedStateException {
2865     Set<TableName> disablingTables =
2866       tableStateManager.getTablesInStates(ZooKeeperProtos.Table.State.DISABLING);
2867     if (disablingTables.size() != 0) {
2868       for (TableName tableName : disablingTables) {
2869         // Recover by calling DisableTableHandler
2870         LOG.info("The table " + tableName
2871             + " is in DISABLING state.  Hence recovering by moving the table"
2872             + " to DISABLED state.");
2873         new DisableTableHandler(this.server, tableName,
2874             this, tableLockManager, true).prepare().process();
2875       }
2876     }
2877   }
2878 
2879   /**
2880    * Recover the tables that are not fully moved to ENABLED state. These tables
2881    * are in ENABLING state when the master restarted/switched
2882    *
2883    * @throws KeeperException
2884    * @throws org.apache.hadoop.hbase.TableNotFoundException
2885    * @throws IOException
2886    */
2887   private void recoverTableInEnablingState()
2888       throws KeeperException, IOException, CoordinatedStateException {
2889     Set<TableName> enablingTables = tableStateManager.
2890       getTablesInStates(ZooKeeperProtos.Table.State.ENABLING);
2891     if (enablingTables.size() != 0) {
2892       for (TableName tableName : enablingTables) {
2893         // Recover by calling EnableTableHandler
2894         LOG.info("The table " + tableName
2895             + " is in ENABLING state.  Hence recovering by moving the table"
2896             + " to ENABLED state.");
2897         // enableTable in sync way during master startup,
2898         // no need to invoke coprocessor
2899         EnableTableHandler eth = new EnableTableHandler(this.server, tableName,
2900           this, tableLockManager, true);
2901         try {
2902           eth.prepare();
2903         } catch (TableNotFoundException e) {
2904           LOG.warn("Table " + tableName + " not found in hbase:meta to recover.");
2905           continue;
2906         }
2907         eth.process();
2908       }
2909     }
2910   }
2911 
2912   /**
2913    * Processes list of dead servers from result of hbase:meta scan and regions in RIT
2914    * <p>
2915    * This is used for failover to recover the lost regions that belonged to
2916    * RegionServers which failed while there was no active master or regions
2917    * that were in RIT.
2918    * <p>
2919    *
2920    *
2921    * @param deadServers
2922    *          The list of dead servers which failed while there was no active
2923    *          master. Can be null.
2924    * @throws IOException
2925    * @throws KeeperException
2926    */
2927   private void processDeadServersAndRecoverLostRegions(
2928       Set<ServerName> deadServers) throws IOException, KeeperException {
2929     if (deadServers != null && !deadServers.isEmpty()) {
2930       for (ServerName serverName: deadServers) {
2931         if (!serverManager.isServerDead(serverName)) {
2932           serverManager.expireServer(serverName); // Let SSH do region re-assign
2933         }
2934       }
2935     }
2936 
2937     List<String> nodes = useZKForAssignment ?
2938       ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.assignmentZNode)
2939       : ZKUtil.listChildrenNoWatch(watcher, watcher.assignmentZNode);
2940     if (nodes != null && !nodes.isEmpty()) {
2941       for (String encodedRegionName : nodes) {
2942         processRegionInTransition(encodedRegionName, null);
2943       }
2944     } else if (!useZKForAssignment) {
2945       // We need to send RPC call again for PENDING_OPEN/PENDING_CLOSE regions
2946       // in case the RPC call is not sent out yet before the master was shut down
2947       // since we update the state before we send the RPC call. We can't update
2948       // the state after the RPC call. Otherwise, we don't know what's happened
2949       // to the region if the master dies right after the RPC call is out.
2950       Map<String, RegionState> rits = regionStates.getRegionsInTransition();
2951       for (RegionState regionState: rits.values()) {
2952         if (!serverManager.isServerOnline(regionState.getServerName())) {
2953           continue; // SSH will handle it
2954         }
2955         State state = regionState.getState();
2956         LOG.info("Processing " + regionState);
2957         switch (state) {
2958         case PENDING_OPEN:
2959           retrySendRegionOpen(regionState);
2960           break;
2961         case PENDING_CLOSE:
2962           retrySendRegionClose(regionState);
2963           break;
2964         default:
2965           // No process for other states
2966         }
2967       }
2968     }
2969   }
2970 
2971   /**
2972    * At master failover, for pending_open region, make sure
2973    * sendRegionOpen RPC call is sent to the target regionserver
2974    */
2975   private void retrySendRegionOpen(final RegionState regionState) {
2976     this.executorService.submit(
2977       new EventHandler(server, EventType.M_MASTER_RECOVERY) {
2978         @Override
2979         public void process() throws IOException {
2980           HRegionInfo hri = regionState.getRegion();
2981           ServerName serverName = regionState.getServerName();
2982           ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
2983           try {
2984             if (!regionState.equals(regionStates.getRegionState(hri))) {
2985               return; // Region is not in the expected state any more
2986             }
2987             while (serverManager.isServerOnline(serverName)
2988                 && !server.isStopped() && !server.isAborted()) {
2989               try {
2990                 List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
2991                 if (shouldAssignRegionsWithFavoredNodes) {
2992                   favoredNodes = ((FavoredNodeLoadBalancer)balancer).getFavoredNodes(hri);
2993                 }
2994                 RegionOpeningState regionOpenState = serverManager.sendRegionOpen(
2995                   serverName, hri, -1, favoredNodes);
2996 
2997                 if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
2998                   // Failed opening this region, this means the target server didn't get
2999                   // the original region open RPC, so re-assign it with a new plan
3000                   LOG.debug("Got failed_opening in retry sendRegionOpen for "
3001                     + regionState + ", re-assign it");
3002                   invokeAssign(hri, true);
3003                 }
3004                 return; // Done.
3005               } catch (Throwable t) {
3006                 if (t instanceof RemoteException) {
3007                   t = ((RemoteException) t).unwrapRemoteException();
3008                 }
3009                 // In case SocketTimeoutException/FailedServerException, retry
3010                 if (t instanceof java.net.SocketTimeoutException
3011                     || t instanceof FailedServerException) {
3012                   Threads.sleep(100);
3013                   continue;
3014                 }
3015                 // For other exceptions, re-assign it
3016                 LOG.debug("Got exception in retry sendRegionOpen for "
3017                   + regionState + ", re-assign it", t);
3018                 invokeAssign(hri);
3019                 return; // Done.
3020               }
3021             }
3022           } finally {
3023             lock.unlock();
3024           }
3025         }
3026       });
3027   }
3028 
3029   /**
3030    * At master failover, for pending_close region, make sure
3031    * sendRegionClose RPC call is sent to the target regionserver
3032    */
3033   private void retrySendRegionClose(final RegionState regionState) {
3034     this.executorService.submit(
3035       new EventHandler(server, EventType.M_MASTER_RECOVERY) {
3036         @Override
3037         public void process() throws IOException {
3038           HRegionInfo hri = regionState.getRegion();
3039           ServerName serverName = regionState.getServerName();
3040           ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
3041           try {
3042             if (!regionState.equals(regionStates.getRegionState(hri))) {
3043               return; // Region is not in the expected state any more
3044             }
3045             while (serverManager.isServerOnline(serverName)
3046                 && !server.isStopped() && !server.isAborted()) {
3047               try {
3048                 if (!serverManager.sendRegionClose(serverName, hri, -1, null, false)) {
3049                   // This means the region is still on the target server
3050                   LOG.debug("Got false in retry sendRegionClose for "
3051                     + regionState + ", re-close it");
3052                   invokeUnAssign(hri);
3053                 }
3054                 return; // Done.
3055               } catch (Throwable t) {
3056                 if (t instanceof RemoteException) {
3057                   t = ((RemoteException) t).unwrapRemoteException();
3058                 }
3059                 // In case SocketTimeoutException/FailedServerException, retry
3060                 if (t instanceof java.net.SocketTimeoutException
3061                     || t instanceof FailedServerException) {
3062                   Threads.sleep(100);
3063                   continue;
3064                 }
3065                 if (!(t instanceof NotServingRegionException
3066                     || t instanceof RegionAlreadyInTransitionException)) {
3067                   // NotServingRegionException/RegionAlreadyInTransitionException
3068                   // means the target server got the original region close request.
3069                   // For other exceptions, re-close it
3070                   LOG.debug("Got exception in retry sendRegionClose for "
3071                     + regionState + ", re-close it", t);
3072                   invokeUnAssign(hri);
3073                 }
3074                 return; // Done.
3075               }
3076             }
3077           } finally {
3078             lock.unlock();
3079           }
3080         }
3081       });
3082   }
3083 
3084   /**
3085    * Set Regions in transitions metrics.
3086    * This takes an iterator on the RegionInTransition map (CLSM), and is not synchronized.
3087    * This iterator is not fail fast, which may lead to stale read; but that's better than
3088    * creating a copy of the map for metrics computation, as this method will be invoked
3089    * on a frequent interval.
3090    */
3091   public void updateRegionsInTransitionMetrics() {
3092     long currentTime = System.currentTimeMillis();
3093     int totalRITs = 0;
3094     int totalRITsOverThreshold = 0;
3095     long oldestRITTime = 0;
3096     int ritThreshold = this.server.getConfiguration().
3097       getInt(HConstants.METRICS_RIT_STUCK_WARNING_THRESHOLD, 60000);
3098     for (RegionState state: regionStates.getRegionsInTransition().values()) {
3099       totalRITs++;
3100       long ritTime = currentTime - state.getStamp();
3101       if (ritTime > ritThreshold) { // more than the threshold
3102         totalRITsOverThreshold++;
3103       }
3104       if (oldestRITTime < ritTime) {
3105         oldestRITTime = ritTime;
3106       }
3107     }
3108     if (this.metricsAssignmentManager != null) {
3109       this.metricsAssignmentManager.updateRITOldestAge(oldestRITTime);
3110       this.metricsAssignmentManager.updateRITCount(totalRITs);
3111       this.metricsAssignmentManager.updateRITCountOverThreshold(totalRITsOverThreshold);
3112     }
3113   }
3114 
3115   /**
3116    * @param region Region whose plan we are to clear.
3117    */
3118   void clearRegionPlan(final HRegionInfo region) {
3119     synchronized (this.regionPlans) {
3120       this.regionPlans.remove(region.getEncodedName());
3121     }
3122   }
3123 
3124   /**
3125    * Wait on region to clear regions-in-transition.
3126    * @param hri Region to wait on.
3127    * @throws IOException
3128    */
3129   public void waitOnRegionToClearRegionsInTransition(final HRegionInfo hri)
3130       throws IOException, InterruptedException {
3131     waitOnRegionToClearRegionsInTransition(hri, -1L);
3132   }
3133 
3134   /**
3135    * Wait on region to clear regions-in-transition or time out
3136    * @param hri
3137    * @param timeOut Milliseconds to wait for current region to be out of transition state.
3138    * @return True when a region clears regions-in-transition before timeout otherwise false
3139    * @throws InterruptedException
3140    */
3141   public boolean waitOnRegionToClearRegionsInTransition(final HRegionInfo hri, long timeOut)
3142       throws InterruptedException {
3143     if (!regionStates.isRegionInTransition(hri)) return true;
3144     long end = (timeOut <= 0) ? Long.MAX_VALUE : EnvironmentEdgeManager.currentTimeMillis()
3145         + timeOut;
3146     // There is already a timeout monitor on regions in transition so I
3147     // should not have to have one here too?
3148     LOG.info("Waiting for " + hri.getEncodedName() +
3149         " to leave regions-in-transition, timeOut=" + timeOut + " ms.");
3150     while (!this.server.isStopped() && regionStates.isRegionInTransition(hri)) {
3151       regionStates.waitForUpdate(100);
3152       if (EnvironmentEdgeManager.currentTimeMillis() > end) {
3153         LOG.info("Timed out on waiting for " + hri.getEncodedName() + " to be assigned.");
3154         return false;
3155       }
3156     }
3157     if (this.server.isStopped()) {
3158       LOG.info("Giving up wait on regions in transition because stoppable.isStopped is set");
3159       return false;
3160     }
3161     return true;
3162   }
3163 
3164   void invokeAssign(HRegionInfo regionInfo) {
3165     invokeAssign(regionInfo, true);
3166   }
3167 
3168   void invokeAssign(HRegionInfo regionInfo, boolean newPlan) {
3169     threadPoolExecutorService.submit(new AssignCallable(this, regionInfo, newPlan));
3170   }
3171 
3172   void invokeUnAssign(HRegionInfo regionInfo) {
3173     threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
3174   }
3175 
3176   public boolean isCarryingMeta(ServerName serverName) {
3177     return isCarryingRegion(serverName, HRegionInfo.FIRST_META_REGIONINFO);
3178   }
3179 
3180   /**
3181    * Check if the shutdown server carries the specific region.
3182    * We have a bunch of places that store region location
3183    * Those values aren't consistent. There is a delay of notification.
3184    * The location from zookeeper unassigned node has the most recent data;
3185    * but the node could be deleted after the region is opened by AM.
3186    * The AM's info could be old when OpenedRegionHandler
3187    * processing hasn't finished yet when server shutdown occurs.
3188    * @return whether the serverName currently hosts the region
3189    */
3190   private boolean isCarryingRegion(ServerName serverName, HRegionInfo hri) {
3191     RegionTransition rt = null;
3192     try {
3193       byte [] data = ZKAssign.getData(watcher, hri.getEncodedName());
3194       // This call can legitimately come by null
3195       rt = data == null? null: RegionTransition.parseFrom(data);
3196     } catch (KeeperException e) {
3197       server.abort("Exception reading unassigned node for region=" + hri.getEncodedName(), e);
3198     } catch (DeserializationException e) {
3199       server.abort("Exception parsing unassigned node for region=" + hri.getEncodedName(), e);
3200     }
3201 
3202     ServerName addressFromZK = rt != null? rt.getServerName():  null;
3203     if (addressFromZK != null) {
3204       // if we get something from ZK, we will use the data
3205       boolean matchZK = addressFromZK.equals(serverName);
3206       LOG.debug("Checking region=" + hri.getRegionNameAsString() + ", zk server=" + addressFromZK +
3207         " current=" + serverName + ", matches=" + matchZK);
3208       return matchZK;
3209     }
3210 
3211     ServerName addressFromAM = regionStates.getRegionServerOfRegion(hri);
3212     boolean matchAM = (addressFromAM != null &&
3213       addressFromAM.equals(serverName));
3214     LOG.debug("based on AM, current region=" + hri.getRegionNameAsString() +
3215       " is on server=" + (addressFromAM != null ? addressFromAM : "null") +
3216       " server being checked: " + serverName);
3217 
3218     return matchAM;
3219   }
3220 
3221   /**
3222    * Process shutdown server removing any assignments.
3223    * @param sn Server that went down.
3224    * @return list of regions in transition on this server
3225    */
3226   public List<HRegionInfo> processServerShutdown(final ServerName sn) {
3227     // Clean out any existing assignment plans for this server
3228     synchronized (this.regionPlans) {
3229       for (Iterator <Map.Entry<String, RegionPlan>> i =
3230           this.regionPlans.entrySet().iterator(); i.hasNext();) {
3231         Map.Entry<String, RegionPlan> e = i.next();
3232         ServerName otherSn = e.getValue().getDestination();
3233         // The name will be null if the region is planned for a random assign.
3234         if (otherSn != null && otherSn.equals(sn)) {
3235           // Use iterator's remove else we'll get CME
3236           i.remove();
3237         }
3238       }
3239     }
3240     List<HRegionInfo> regions = regionStates.serverOffline(watcher, sn);
3241     for (Iterator<HRegionInfo> it = regions.iterator(); it.hasNext(); ) {
3242       HRegionInfo hri = it.next();
3243       String encodedName = hri.getEncodedName();
3244 
3245       // We need a lock on the region as we could update it
3246       Lock lock = locker.acquireLock(encodedName);
3247       try {
3248         RegionState regionState =
3249           regionStates.getRegionTransitionState(encodedName);
3250         if (regionState == null
3251             || (regionState.getServerName() != null && !regionState.isOnServer(sn))
3252             || !(regionState.isFailedClose() || regionState.isOffline()
3253               || regionState.isPendingOpenOrOpening())) {
3254           LOG.info("Skip " + regionState + " since it is not opening/failed_close"
3255             + " on the dead server any more: " + sn);
3256           it.remove();
3257         } else {
3258           try {
3259             // Delete the ZNode if exists
3260             ZKAssign.deleteNodeFailSilent(watcher, hri);
3261           } catch (KeeperException ke) {
3262             server.abort("Unexpected ZK exception deleting node " + hri, ke);
3263           }
3264           if (tableStateManager.isTableState(hri.getTable(),
3265               ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3266             regionStates.regionOffline(hri);
3267             it.remove();
3268             continue;
3269           }
3270           // Mark the region offline and assign it again by SSH
3271           regionStates.updateRegionState(hri, State.OFFLINE);
3272         }
3273       } finally {
3274         lock.unlock();
3275       }
3276     }
3277     return regions;
3278   }
3279 
3280   /**
3281    * @param plan Plan to execute.
3282    */
3283   public void balance(final RegionPlan plan) {
3284     HRegionInfo hri = plan.getRegionInfo();
3285     TableName tableName = hri.getTable();
3286     if (tableStateManager.isTableState(tableName,
3287       ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3288       LOG.info("Ignored moving region of disabling/disabled table "
3289         + tableName);
3290       return;
3291     }
3292 
3293     // Move the region only if it's assigned
3294     String encodedName = hri.getEncodedName();
3295     ReentrantLock lock = locker.acquireLock(encodedName);
3296     try {
3297       if (!regionStates.isRegionOnline(hri)) {
3298         RegionState state = regionStates.getRegionState(encodedName);
3299         LOG.info("Ignored moving region not assigned: " + hri + ", "
3300           + (state == null ? "not in region states" : state));
3301         return;
3302       }
3303       synchronized (this.regionPlans) {
3304         this.regionPlans.put(plan.getRegionName(), plan);
3305       }
3306       unassign(hri, false, plan.getDestination());
3307     } finally {
3308       lock.unlock();
3309     }
3310   }
3311 
3312   public void stop() {
3313     shutdown(); // Stop executor service, etc
3314   }
3315 
3316   /**
3317    * Shutdown the threadpool executor service
3318    */
3319   public void shutdown() {
3320     // It's an immediate shutdown, so we're clearing the remaining tasks.
3321     synchronized (zkEventWorkerWaitingList){
3322       zkEventWorkerWaitingList.clear();
3323     }
3324     threadPoolExecutorService.shutdownNow();
3325     zkEventWorkers.shutdownNow();
3326     regionStateStore.stop();
3327   }
3328 
3329   protected void setEnabledTable(TableName tableName) {
3330     try {
3331       this.tableStateManager.setTableState(tableName,
3332         ZooKeeperProtos.Table.State.ENABLED);
3333     } catch (CoordinatedStateException e) {
3334       // here we can abort as it is the start up flow
3335       String errorMsg = "Unable to ensure that the table " + tableName
3336           + " will be" + " enabled because of a ZooKeeper issue";
3337       LOG.error(errorMsg);
3338       this.server.abort(errorMsg, e);
3339     }
3340   }
3341 
3342   /**
3343    * Set region as OFFLINED up in zookeeper asynchronously.
3344    * @param state
3345    * @return True if we succeeded, false otherwise (State was incorrect or failed
3346    * updating zk).
3347    */
3348   private boolean asyncSetOfflineInZooKeeper(final RegionState state,
3349       final AsyncCallback.StringCallback cb, final ServerName destination) {
3350     if (!state.isClosed() && !state.isOffline()) {
3351       this.server.abort("Unexpected state trying to OFFLINE; " + state,
3352         new IllegalStateException());
3353       return false;
3354     }
3355     regionStates.updateRegionState(state.getRegion(), State.OFFLINE);
3356     try {
3357       ZKAssign.asyncCreateNodeOffline(watcher, state.getRegion(),
3358         destination, cb, state);
3359     } catch (KeeperException e) {
3360       if (e instanceof NodeExistsException) {
3361         LOG.warn("Node for " + state.getRegion() + " already exists");
3362       } else {
3363         server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
3364       }
3365       return false;
3366     }
3367     return true;
3368   }
3369 
3370   private boolean deleteNodeInStates(String encodedName,
3371       String desc, ServerName sn, EventType... types) {
3372     try {
3373       for (EventType et: types) {
3374         if (ZKAssign.deleteNode(watcher, encodedName, et, sn)) {
3375           return true;
3376         }
3377       }
3378       LOG.info("Failed to delete the " + desc + " node for "
3379         + encodedName + ". The node type may not match");
3380     } catch (NoNodeException e) {
3381       if (LOG.isDebugEnabled()) {
3382         LOG.debug("The " + desc + " node for " + encodedName + " already deleted");
3383       }
3384     } catch (KeeperException ke) {
3385       server.abort("Unexpected ZK exception deleting " + desc
3386         + " node for the region " + encodedName, ke);
3387     }
3388     return false;
3389   }
3390 
3391   private void deleteMergingNode(String encodedName, ServerName sn) {
3392     deleteNodeInStates(encodedName, "merging", sn, EventType.RS_ZK_REGION_MERGING,
3393       EventType.RS_ZK_REQUEST_REGION_MERGE, EventType.RS_ZK_REGION_MERGED);
3394   }
3395 
3396   private void deleteSplittingNode(String encodedName, ServerName sn) {
3397     deleteNodeInStates(encodedName, "splitting", sn, EventType.RS_ZK_REGION_SPLITTING,
3398       EventType.RS_ZK_REQUEST_REGION_SPLIT, EventType.RS_ZK_REGION_SPLIT);
3399   }
3400 
3401   private void onRegionFailedOpen(
3402       final HRegionInfo hri, final ServerName sn) {
3403     String encodedName = hri.getEncodedName();
3404     AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
3405     if (failedOpenCount == null) {
3406       failedOpenCount = new AtomicInteger();
3407       // No need to use putIfAbsent, or extra synchronization since
3408       // this whole handleRegion block is locked on the encoded region
3409       // name, and failedOpenTracker is updated only in this block
3410       failedOpenTracker.put(encodedName, failedOpenCount);
3411     }
3412     if (failedOpenCount.incrementAndGet() >= maximumAttempts) {
3413       regionStates.updateRegionState(hri, State.FAILED_OPEN);
3414       // remove the tracking info to save memory, also reset
3415       // the count for next open initiative
3416       failedOpenTracker.remove(encodedName);
3417     } else {
3418       // Handle this the same as if it were opened and then closed.
3419       RegionState regionState = regionStates.updateRegionState(hri, State.CLOSED);
3420       if (regionState != null) {
3421         // When there are more than one region server a new RS is selected as the
3422         // destination and the same is updated in the region plan. (HBASE-5546)
3423         if (getTableStateManager().isTableState(hri.getTable(),
3424             ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING) ||
3425             replicasToClose.contains(hri)) {
3426           offlineDisabledRegion(hri);
3427           return;
3428         }
3429         // ZK Node is in CLOSED state, assign it.
3430          regionStates.updateRegionState(hri, RegionState.State.CLOSED);
3431         // This below has to do w/ online enable/disable of a table
3432         removeClosedRegion(hri);
3433         try {
3434           getRegionPlan(hri, sn, true);
3435         } catch (HBaseIOException e) {
3436           LOG.warn("Failed to get region plan", e);
3437         }
3438         invokeAssign(hri, false);
3439       }
3440     }
3441   }
3442 
3443   private void onRegionOpen(
3444       final HRegionInfo hri, final ServerName sn, long openSeqNum) {
3445     regionOnline(hri, sn, openSeqNum);
3446     if (useZKForAssignment) {
3447       try {
3448         // Delete the ZNode if exists
3449         ZKAssign.deleteNodeFailSilent(watcher, hri);
3450       } catch (KeeperException ke) {
3451         server.abort("Unexpected ZK exception deleting node " + hri, ke);
3452       }
3453     }
3454 
3455     // reset the count, if any
3456     failedOpenTracker.remove(hri.getEncodedName());
3457     if (getTableStateManager().isTableState(hri.getTable(),
3458         ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3459       invokeUnAssign(hri);
3460     }
3461   }
3462 
3463   private void onRegionClosed(final HRegionInfo hri) {
3464     if (getTableStateManager().isTableState(hri.getTable(),
3465         ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING) ||
3466         replicasToClose.contains(hri)) {
3467       offlineDisabledRegion(hri);
3468       return;
3469     }
3470     regionStates.updateRegionState(hri, RegionState.State.CLOSED);
3471     sendRegionClosedNotification(hri);
3472     // This below has to do w/ online enable/disable of a table
3473     removeClosedRegion(hri);
3474     invokeAssign(hri, false);
3475   }
3476 
3477   private String onRegionSplit(ServerName sn, TransitionCode code,
3478       final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) {
3479     final RegionState rs_p = regionStates.getRegionState(p);
3480     RegionState rs_a = regionStates.getRegionState(a);
3481     RegionState rs_b = regionStates.getRegionState(b);
3482     if (!(rs_p.isOpenOrSplittingOnServer(sn)
3483         && (rs_a == null || rs_a.isOpenOrSplittingNewOnServer(sn))
3484         && (rs_b == null || rs_b.isOpenOrSplittingNewOnServer(sn)))) {
3485       return "Not in state good for split";
3486     }
3487 
3488     regionStates.updateRegionState(a, State.SPLITTING_NEW, sn);
3489     regionStates.updateRegionState(b, State.SPLITTING_NEW, sn);
3490     regionStates.updateRegionState(p, State.SPLITTING);
3491 
3492     if (code == TransitionCode.SPLIT) {
3493       if (TEST_SKIP_SPLIT_HANDLING) {
3494         return "Skipping split message, TEST_SKIP_SPLIT_HANDLING is set";
3495       }
3496       regionOffline(p, State.SPLIT);
3497       regionOnline(a, sn, 1);
3498       regionOnline(b, sn, 1);
3499 
3500       // User could disable the table before master knows the new region.
3501       if (getTableStateManager().isTableState(p.getTable(),
3502           ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3503         invokeUnAssign(a);
3504         invokeUnAssign(b);
3505       } else {
3506         Callable<Object> splitReplicasCallable = new Callable<Object>() {
3507           @Override
3508           public Object call() {
3509             doSplittingOfReplicas(p, a, b);
3510             return null;
3511           }
3512         };
3513         threadPoolExecutorService.submit(splitReplicasCallable);
3514       }
3515     } else if (code == TransitionCode.SPLIT_PONR) {
3516       try {
3517         regionStates.splitRegion(p, a, b, sn);
3518       } catch (IOException ioe) {
3519         LOG.info("Failed to record split region " + p.getShortNameToLog());
3520         return "Failed to record the splitting in meta";
3521       }
3522     } else if (code == TransitionCode.SPLIT_REVERTED) {
3523       regionOnline(p, sn);
3524       regionOffline(a);
3525       regionOffline(b);
3526 
3527       if (getTableStateManager().isTableState(p.getTable(),
3528           ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3529         invokeUnAssign(p);
3530       }
3531     }
3532     return null;
3533   }
3534 
3535   private String onRegionMerge(ServerName sn, TransitionCode code,
3536       final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) {
3537     RegionState rs_p = regionStates.getRegionState(p);
3538     RegionState rs_a = regionStates.getRegionState(a);
3539     RegionState rs_b = regionStates.getRegionState(b);
3540     if (!(rs_a.isOpenOrMergingOnServer(sn) && rs_b.isOpenOrMergingOnServer(sn)
3541         && (rs_p == null || rs_p.isOpenOrMergingNewOnServer(sn)))) {
3542       return "Not in state good for merge";
3543     }
3544 
3545     regionStates.updateRegionState(a, State.MERGING);
3546     regionStates.updateRegionState(b, State.MERGING);
3547     regionStates.updateRegionState(p, State.MERGING_NEW, sn);
3548 
3549     String encodedName = p.getEncodedName();
3550     if (code == TransitionCode.READY_TO_MERGE) {
3551       mergingRegions.put(encodedName,
3552         new PairOfSameType<HRegionInfo>(a, b));
3553     } else if (code == TransitionCode.MERGED) {
3554       mergingRegions.remove(encodedName);
3555       regionOffline(a, State.MERGED);
3556       regionOffline(b, State.MERGED);
3557       regionOnline(p, sn, 1);
3558 
3559       // User could disable the table before master knows the new region.
3560       if (getTableStateManager().isTableState(p.getTable(),
3561           ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3562         invokeUnAssign(p);
3563       } else {
3564         Callable<Object> mergeReplicasCallable = new Callable<Object>() {
3565           @Override
3566           public Object call() {
3567             doMergingOfReplicas(p, a, b);
3568             return null;
3569           }
3570         };
3571         threadPoolExecutorService.submit(mergeReplicasCallable);
3572       }
3573     } else if (code == TransitionCode.MERGE_PONR) {
3574       try {
3575         regionStates.mergeRegions(p, a, b, sn);
3576       } catch (IOException ioe) {
3577         LOG.info("Failed to record merged region " + p.getShortNameToLog());
3578         return "Failed to record the merging in meta";
3579       }
3580     } else {
3581       mergingRegions.remove(encodedName);
3582       regionOnline(a, sn);
3583       regionOnline(b, sn);
3584       regionOffline(p);
3585 
3586       if (getTableStateManager().isTableState(p.getTable(),
3587           ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3588         invokeUnAssign(a);
3589         invokeUnAssign(b);
3590       }
3591     }
3592     return null;
3593   }
3594 
3595   /**
3596    * A helper to handle region merging transition event.
3597    * It transitions merging regions to MERGING state.
3598    */
3599   private boolean handleRegionMerging(final RegionTransition rt, final String encodedName,
3600       final String prettyPrintedRegionName, final ServerName sn) {
3601     if (!serverManager.isServerOnline(sn)) {
3602       LOG.warn("Dropped merging! ServerName=" + sn + " unknown.");
3603       return false;
3604     }
3605     byte [] payloadOfMerging = rt.getPayload();
3606     List<HRegionInfo> mergingRegions;
3607     try {
3608       mergingRegions = HRegionInfo.parseDelimitedFrom(
3609         payloadOfMerging, 0, payloadOfMerging.length);
3610     } catch (IOException e) {
3611       LOG.error("Dropped merging! Failed reading "  + rt.getEventType()
3612         + " payload for " + prettyPrintedRegionName);
3613       return false;
3614     }
3615     assert mergingRegions.size() == 3;
3616     HRegionInfo p = mergingRegions.get(0);
3617     HRegionInfo hri_a = mergingRegions.get(1);
3618     HRegionInfo hri_b = mergingRegions.get(2);
3619 
3620     RegionState rs_p = regionStates.getRegionState(p);
3621     RegionState rs_a = regionStates.getRegionState(hri_a);
3622     RegionState rs_b = regionStates.getRegionState(hri_b);
3623 
3624     if (!((rs_a == null || rs_a.isOpenOrMergingOnServer(sn))
3625         && (rs_b == null || rs_b.isOpenOrMergingOnServer(sn))
3626         && (rs_p == null || rs_p.isOpenOrMergingNewOnServer(sn)))) {
3627       LOG.warn("Dropped merging! Not in state good for MERGING; rs_p="
3628         + rs_p + ", rs_a=" + rs_a + ", rs_b=" + rs_b);
3629       return false;
3630     }
3631 
3632     EventType et = rt.getEventType();
3633     if (et == EventType.RS_ZK_REQUEST_REGION_MERGE) {
3634       try {
3635         RegionMergeCoordination.RegionMergeDetails std =
3636             ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
3637                 .getRegionMergeCoordination().getDefaultDetails();
3638         ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
3639             .getRegionMergeCoordination().processRegionMergeRequest(p, hri_a, hri_b, sn, std);
3640         if (((ZkRegionMergeCoordination.ZkRegionMergeDetails) std).getZnodeVersion() == -1) {
3641           byte[] data = ZKAssign.getData(watcher, encodedName);
3642          EventType currentType = null;
3643           if (data != null) {
3644             RegionTransition newRt = RegionTransition.parseFrom(data);
3645             currentType = newRt.getEventType();
3646           }
3647           if (currentType == null || (currentType != EventType.RS_ZK_REGION_MERGED
3648               && currentType != EventType.RS_ZK_REGION_MERGING)) {
3649             LOG.warn("Failed to transition pending_merge node "
3650               + encodedName + " to merging, it's now " + currentType);
3651             return false;
3652           }
3653         }
3654       } catch (Exception e) {
3655         LOG.warn("Failed to transition pending_merge node "
3656           + encodedName + " to merging", e);
3657         return false;
3658       }
3659     }
3660 
3661     synchronized (regionStates) {
3662       regionStates.updateRegionState(hri_a, State.MERGING);
3663       regionStates.updateRegionState(hri_b, State.MERGING);
3664       regionStates.updateRegionState(p, State.MERGING_NEW, sn);
3665 
3666       if (et != EventType.RS_ZK_REGION_MERGED) {
3667         this.mergingRegions.put(encodedName,
3668           new PairOfSameType<HRegionInfo>(hri_a, hri_b));
3669       } else {
3670         this.mergingRegions.remove(encodedName);
3671         regionOffline(hri_a, State.MERGED);
3672         regionOffline(hri_b, State.MERGED);
3673         regionOnline(p, sn);
3674       }
3675     }
3676 
3677     if (et == EventType.RS_ZK_REGION_MERGED) {
3678       doMergingOfReplicas(p, hri_a, hri_b);
3679       LOG.debug("Handling MERGED event for " + encodedName + "; deleting node");
3680       // Remove region from ZK
3681       try {
3682         boolean successful = false;
3683         while (!successful) {
3684           // It's possible that the RS tickles in between the reading of the
3685           // znode and the deleting, so it's safe to retry.
3686           successful = ZKAssign.deleteNode(watcher, encodedName,
3687             EventType.RS_ZK_REGION_MERGED, sn);
3688         }
3689       } catch (KeeperException e) {
3690         if (e instanceof NoNodeException) {
3691           String znodePath = ZKUtil.joinZNode(watcher.splitLogZNode, encodedName);
3692           LOG.debug("The znode " + znodePath + " does not exist.  May be deleted already.");
3693         } else {
3694           server.abort("Error deleting MERGED node " + encodedName, e);
3695         }
3696       }
3697       LOG.info("Handled MERGED event; merged=" + p.getRegionNameAsString()
3698         + ", region_a=" + hri_a.getRegionNameAsString() + ", region_b="
3699         + hri_b.getRegionNameAsString() + ", on " + sn);
3700 
3701       // User could disable the table before master knows the new region.
3702       if (tableStateManager.isTableState(p.getTable(),
3703           ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3704         unassign(p);
3705       }
3706     }
3707     return true;
3708   }
3709 
3710   /**
3711    * A helper to handle region splitting transition event.
3712    */
3713   private boolean handleRegionSplitting(final RegionTransition rt, final String encodedName,
3714       final String prettyPrintedRegionName, final ServerName sn) {
3715     if (!serverManager.isServerOnline(sn)) {
3716       LOG.warn("Dropped splitting! ServerName=" + sn + " unknown.");
3717       return false;
3718     }
3719     byte [] payloadOfSplitting = rt.getPayload();
3720     List<HRegionInfo> splittingRegions;
3721     try {
3722       splittingRegions = HRegionInfo.parseDelimitedFrom(
3723         payloadOfSplitting, 0, payloadOfSplitting.length);
3724     } catch (IOException e) {
3725       LOG.error("Dropped splitting! Failed reading " + rt.getEventType()
3726         + " payload for " + prettyPrintedRegionName);
3727       return false;
3728     }
3729     assert splittingRegions.size() == 2;
3730     HRegionInfo hri_a = splittingRegions.get(0);
3731     HRegionInfo hri_b = splittingRegions.get(1);
3732 
3733     RegionState rs_p = regionStates.getRegionState(encodedName);
3734     RegionState rs_a = regionStates.getRegionState(hri_a);
3735     RegionState rs_b = regionStates.getRegionState(hri_b);
3736 
3737     if (!((rs_p == null || rs_p.isOpenOrSplittingOnServer(sn))
3738         && (rs_a == null || rs_a.isOpenOrSplittingNewOnServer(sn))
3739         && (rs_b == null || rs_b.isOpenOrSplittingNewOnServer(sn)))) {
3740       LOG.warn("Dropped splitting! Not in state good for SPLITTING; rs_p="
3741         + rs_p + ", rs_a=" + rs_a + ", rs_b=" + rs_b);
3742       return false;
3743     }
3744 
3745     if (rs_p == null) {
3746       // Splitting region should be online
3747       rs_p = regionStates.updateRegionState(rt, State.OPEN);
3748       if (rs_p == null) {
3749         LOG.warn("Received splitting for region " + prettyPrintedRegionName
3750           + " from server " + sn + " but it doesn't exist anymore,"
3751           + " probably already processed its split");
3752         return false;
3753       }
3754       regionStates.regionOnline(rs_p.getRegion(), sn);
3755     }
3756 
3757     HRegionInfo p = rs_p.getRegion();
3758     EventType et = rt.getEventType();
3759     if (et == EventType.RS_ZK_REQUEST_REGION_SPLIT) {
3760       try {
3761         SplitTransactionDetails std =
3762             ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
3763                 .getSplitTransactionCoordination().getDefaultDetails();
3764         if (((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
3765             .getSplitTransactionCoordination().processTransition(p, hri_a, hri_b, sn, std) == -1) {
3766           byte[] data = ZKAssign.getData(watcher, encodedName);
3767           EventType currentType = null;
3768           if (data != null) {
3769             RegionTransition newRt = RegionTransition.parseFrom(data);
3770             currentType = newRt.getEventType();
3771           }
3772           if (currentType == null
3773               || (currentType != EventType.RS_ZK_REGION_SPLIT && currentType != EventType.RS_ZK_REGION_SPLITTING)) {
3774             LOG.warn("Failed to transition pending_split node " + encodedName
3775                 + " to splitting, it's now " + currentType);
3776             return false;
3777           }
3778         }
3779       } catch (Exception e) {
3780         LOG.warn("Failed to transition pending_split node " + encodedName + " to splitting", e);
3781         return false;
3782       }
3783     }
3784 
3785     synchronized (regionStates) {
3786       regionStates.updateRegionState(hri_a, State.SPLITTING_NEW, sn);
3787       regionStates.updateRegionState(hri_b, State.SPLITTING_NEW, sn);
3788       regionStates.updateRegionState(rt, State.SPLITTING);
3789 
3790       // The below is for testing ONLY!  We can't do fault injection easily, so
3791       // resort to this kinda uglyness -- St.Ack 02/25/2011.
3792       if (TEST_SKIP_SPLIT_HANDLING) {
3793         LOG.warn("Skipping split message, TEST_SKIP_SPLIT_HANDLING is set");
3794         return true; // return true so that the splitting node stays
3795       }
3796 
3797       if (et == EventType.RS_ZK_REGION_SPLIT) {
3798         regionOffline(p, State.SPLIT);
3799         regionOnline(hri_a, sn);
3800         regionOnline(hri_b, sn);
3801       }
3802     }
3803 
3804     if (et == EventType.RS_ZK_REGION_SPLIT) {
3805       // split replicas
3806       doSplittingOfReplicas(rs_p.getRegion(), hri_a, hri_b);
3807       LOG.debug("Handling SPLIT event for " + encodedName + "; deleting node");
3808       // Remove region from ZK
3809       try {
3810         boolean successful = false;
3811         while (!successful) {
3812           // It's possible that the RS tickles in between the reading of the
3813           // znode and the deleting, so it's safe to retry.
3814           successful = ZKAssign.deleteNode(watcher, encodedName,
3815             EventType.RS_ZK_REGION_SPLIT, sn);
3816         }
3817       } catch (KeeperException e) {
3818         if (e instanceof NoNodeException) {
3819           String znodePath = ZKUtil.joinZNode(watcher.splitLogZNode, encodedName);
3820           LOG.debug("The znode " + znodePath + " does not exist.  May be deleted already.");
3821         } else {
3822           server.abort("Error deleting SPLIT node " + encodedName, e);
3823         }
3824       }
3825       LOG.info("Handled SPLIT event; parent=" + p.getRegionNameAsString()
3826         + ", daughter a=" + hri_a.getRegionNameAsString() + ", daughter b="
3827         + hri_b.getRegionNameAsString() + ", on " + sn);
3828 
3829       // User could disable the table before master knows the new region.
3830       if (tableStateManager.isTableState(p.getTable(),
3831           ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3832         unassign(hri_a);
3833         unassign(hri_b);
3834       }
3835     }
3836     return true;
3837   }
3838 
3839   private void doMergingOfReplicas(HRegionInfo mergedHri, final HRegionInfo hri_a,
3840       final HRegionInfo hri_b) {
3841     // Close replicas for the original unmerged regions. create/assign new replicas
3842     // for the merged parent.
3843     List<HRegionInfo> unmergedRegions = new ArrayList<HRegionInfo>();
3844     unmergedRegions.add(hri_a);
3845     unmergedRegions.add(hri_b);
3846     Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(unmergedRegions);
3847     Collection<List<HRegionInfo>> c = map.values();
3848     for (List<HRegionInfo> l : c) {
3849       for (HRegionInfo h : l) {
3850         if (!RegionReplicaUtil.isDefaultReplica(h)) {
3851           LOG.debug("Unassigning un-merged replica " + h);
3852           unassign(h);
3853         }
3854       }
3855     }
3856     int numReplicas = 1;
3857     try {
3858       numReplicas = ((MasterServices)server).getTableDescriptors().get(mergedHri.getTable()).
3859           getRegionReplication();
3860     } catch (IOException e) {
3861       LOG.warn("Couldn't get the replication attribute of the table " + mergedHri.getTable() +
3862           " due to " + e.getMessage() + ". The assignment of replicas for the merged region " +
3863           "will not be done");
3864     }
3865     List<HRegionInfo> regions = new ArrayList<HRegionInfo>();
3866     for (int i = 1; i < numReplicas; i++) {
3867       regions.add(RegionReplicaUtil.getRegionInfoForReplica(mergedHri, i));
3868     }
3869     try {
3870       assign(regions);
3871     } catch (IOException ioe) {
3872       LOG.warn("Couldn't assign all replica(s) of region " + mergedHri + " because of " +
3873                 ioe.getMessage());
3874     } catch (InterruptedException ie) {
3875       LOG.warn("Couldn't assign all replica(s) of region " + mergedHri+ " because of " +
3876                 ie.getMessage());
3877     }
3878   }
3879 
3880   private void doSplittingOfReplicas(final HRegionInfo parentHri, final HRegionInfo hri_a,
3881       final HRegionInfo hri_b) {
3882     // create new regions for the replica, and assign them to match with the
3883     // current replica assignments. If replica1 of parent is assigned to RS1,
3884     // the replica1s of daughters will be on the same machine
3885     int numReplicas = 1;
3886     try {
3887       numReplicas = ((MasterServices)server).getTableDescriptors().get(parentHri.getTable()).
3888           getRegionReplication();
3889     } catch (IOException e) {
3890       LOG.warn("Couldn't get the replication attribute of the table " + parentHri.getTable() +
3891           " due to " + e.getMessage() + ". The assignment of daughter replicas " +
3892           "replicas will not be done");
3893     }
3894     // unassign the old replicas
3895     List<HRegionInfo> parentRegion = new ArrayList<HRegionInfo>();
3896     parentRegion.add(parentHri);
3897     Map<ServerName, List<HRegionInfo>> currentAssign =
3898         regionStates.getRegionAssignments(parentRegion);
3899     Collection<List<HRegionInfo>> c = currentAssign.values();
3900     for (List<HRegionInfo> l : c) {
3901       for (HRegionInfo h : l) {
3902         if (!RegionReplicaUtil.isDefaultReplica(h)) {
3903           LOG.debug("Unassigning parent's replica " + h);
3904           unassign(h);
3905         }
3906       }
3907     }
3908     // assign daughter replicas
3909     Map<HRegionInfo, ServerName> map = new HashMap<HRegionInfo, ServerName>();
3910     for (int i = 1; i < numReplicas; i++) {
3911       prepareDaughterReplicaForAssignment(hri_a, parentHri, i, map);
3912       prepareDaughterReplicaForAssignment(hri_b, parentHri, i, map);
3913     }
3914     try {
3915       assign(map);
3916     } catch (IOException e) {
3917       LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)");
3918     } catch (InterruptedException e) {
3919       LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)");
3920     }
3921   }
3922 
3923   private void prepareDaughterReplicaForAssignment(HRegionInfo daughterHri, HRegionInfo parentHri,
3924       int replicaId, Map<HRegionInfo, ServerName> map) {
3925     HRegionInfo parentReplica = RegionReplicaUtil.getRegionInfoForReplica(parentHri, replicaId);
3926     HRegionInfo daughterReplica = RegionReplicaUtil.getRegionInfoForReplica(daughterHri,
3927         replicaId);
3928     LOG.debug("Created replica region for daughter " + daughterReplica);
3929     ServerName sn;
3930     if ((sn = regionStates.getRegionServerOfRegion(parentReplica)) != null) {
3931       map.put(daughterReplica, sn);
3932     } else {
3933       List<ServerName> servers = serverManager.getOnlineServersList();
3934       sn = servers.get((new Random(System.currentTimeMillis())).nextInt(servers.size()));
3935       map.put(daughterReplica, sn);
3936     }
3937   }
3938 
3939   public Set<HRegionInfo> getReplicasToClose() {
3940     return replicasToClose;
3941   }
3942 
3943   /**
3944    * A region is offline.  The new state should be the specified one,
3945    * if not null.  If the specified state is null, the new state is Offline.
3946    * The specified state can be Split/Merged/Offline/null only.
3947    */
3948   private void regionOffline(final HRegionInfo regionInfo, final State state) {
3949     regionStates.regionOffline(regionInfo, state);
3950     removeClosedRegion(regionInfo);
3951     // remove the region plan as well just in case.
3952     clearRegionPlan(regionInfo);
3953     balancer.regionOffline(regionInfo);
3954 
3955     // Tell our listeners that a region was closed
3956     sendRegionClosedNotification(regionInfo);
3957     // also note that all the replicas of the primary should be closed
3958     if (state != null && state.equals(State.SPLIT)) {
3959       Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1);
3960       c.add(regionInfo);
3961       Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(c);
3962       Collection<List<HRegionInfo>> allReplicas = map.values();
3963       for (List<HRegionInfo> list : allReplicas) {
3964         replicasToClose.addAll(list);
3965       }
3966     }
3967     else if (state != null && state.equals(State.MERGED)) {
3968       Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1);
3969       c.add(regionInfo);
3970       Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(c);
3971       Collection<List<HRegionInfo>> allReplicas = map.values();
3972       for (List<HRegionInfo> list : allReplicas) {
3973         replicasToClose.addAll(list);
3974       }
3975     }
3976   }
3977 
3978   private void sendRegionOpenedNotification(final HRegionInfo regionInfo,
3979       final ServerName serverName) {
3980     if (!this.listeners.isEmpty()) {
3981       for (AssignmentListener listener : this.listeners) {
3982         listener.regionOpened(regionInfo, serverName);
3983       }
3984     }
3985   }
3986 
3987   private void sendRegionClosedNotification(final HRegionInfo regionInfo) {
3988     if (!this.listeners.isEmpty()) {
3989       for (AssignmentListener listener : this.listeners) {
3990         listener.regionClosed(regionInfo);
3991       }
3992     }
3993   }
3994 
3995   /**
3996    * Try to update some region states. If the state machine prevents
3997    * such update, an error message is returned to explain the reason.
3998    *
3999    * It's expected that in each transition there should have just one
4000    * region for opening/closing, 3 regions for splitting/merging.
4001    * These regions should be on the server that requested the change.
4002    *
4003    * Region state machine. Only these transitions
4004    * are expected to be triggered by a region server.
4005    *
4006    * On the state transition:
4007    *  (1) Open/Close should be initiated by master
4008    *      (a) Master sets the region to pending_open/pending_close
4009    *        in memory and hbase:meta after sending the request
4010    *        to the region server
4011    *      (b) Region server reports back to the master
4012    *        after open/close is done (either success/failure)
4013    *      (c) If region server has problem to report the status
4014    *        to master, it must be because the master is down or some
4015    *        temporary network issue. Otherwise, the region server should
4016    *        abort since it must be a bug. If the master is not accessible,
4017    *        the region server should keep trying until the server is
4018    *        stopped or till the status is reported to the (new) master
4019    *      (d) If region server dies in the middle of opening/closing
4020    *        a region, SSH picks it up and finishes it
4021    *      (e) If master dies in the middle, the new master recovers
4022    *        the state during initialization from hbase:meta. Region server
4023    *        can report any transition that has not been reported to
4024    *        the previous active master yet
4025    *  (2) Split/merge is initiated by region servers
4026    *      (a) To split a region, a region server sends a request
4027    *        to master to try to set a region to splitting, together with
4028    *        two daughters (to be created) to splitting new. If approved
4029    *        by the master, the splitting can then move ahead
4030    *      (b) To merge two regions, a region server sends a request to
4031    *        master to try to set the new merged region (to be created) to
4032    *        merging_new, together with two regions (to be merged) to merging.
4033    *        If it is ok with the master, the merge can then move ahead
4034    *      (c) Once the splitting/merging is done, the region server
4035    *        reports the status back to the master either success/failure.
4036    *      (d) Other scenarios should be handled similarly as for
4037    *        region open/close
4038    */
4039   protected String onRegionTransition(final ServerName serverName,
4040       final RegionStateTransition transition) {
4041     TransitionCode code = transition.getTransitionCode();
4042     HRegionInfo hri = HRegionInfo.convert(transition.getRegionInfo(0));
4043     RegionState current = regionStates.getRegionState(hri);
4044     if (LOG.isDebugEnabled()) {
4045       LOG.debug("Got transition " + code + " for "
4046         + (current != null ? current.toString() : hri.getShortNameToLog())
4047         + " from " + serverName);
4048     }
4049     String errorMsg = null;
4050     switch (code) {
4051     case OPENED:
4052     case FAILED_OPEN:
4053       if (current == null
4054           || !current.isPendingOpenOrOpeningOnServer(serverName)) {
4055         errorMsg = hri.getShortNameToLog()
4056           + " is not pending open on " + serverName;
4057       } else if (code == TransitionCode.FAILED_OPEN) {
4058         onRegionFailedOpen(hri, serverName);
4059       } else {
4060         long openSeqNum = HConstants.NO_SEQNUM;
4061         if (transition.hasOpenSeqNum()) {
4062           openSeqNum = transition.getOpenSeqNum();
4063         }
4064         if (openSeqNum < 0) {
4065           errorMsg = "Newly opened region has invalid open seq num " + openSeqNum;
4066         } else {
4067           onRegionOpen(hri, serverName, openSeqNum);
4068         }
4069       }
4070       break;
4071 
4072     case CLOSED:
4073       if (current == null
4074           || !current.isPendingCloseOrClosingOnServer(serverName)) {
4075         errorMsg = hri.getShortNameToLog()
4076           + " is not pending close on " + serverName;
4077       } else {
4078         onRegionClosed(hri);
4079       }
4080       break;
4081 
4082     case READY_TO_SPLIT:
4083     case SPLIT_PONR:
4084     case SPLIT:
4085     case SPLIT_REVERTED:
4086       errorMsg = onRegionSplit(serverName, code, hri,
4087         HRegionInfo.convert(transition.getRegionInfo(1)),
4088         HRegionInfo.convert(transition.getRegionInfo(2)));
4089       break;
4090 
4091     case READY_TO_MERGE:
4092     case MERGE_PONR:
4093     case MERGED:
4094     case MERGE_REVERTED:
4095       errorMsg = onRegionMerge(serverName, code, hri,
4096         HRegionInfo.convert(transition.getRegionInfo(1)),
4097         HRegionInfo.convert(transition.getRegionInfo(2)));
4098       break;
4099 
4100     default:
4101       errorMsg = "Unexpected transition code " + code;
4102     }
4103     if (errorMsg != null) {
4104       LOG.error("Failed to transtion region from " + current + " to "
4105         + code + " by " + serverName + ": " + errorMsg);
4106     }
4107     return errorMsg;
4108   }
4109 
4110   /**
4111    * @return Instance of load balancer
4112    */
4113   public LoadBalancer getBalancer() {
4114     return this.balancer;
4115   }
4116 
4117   public Map<ServerName, List<HRegionInfo>>
4118     getSnapShotOfAssignment(Collection<HRegionInfo> infos) {
4119     return getRegionStates().getRegionAssignments(infos);
4120   }
4121 }