View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.util.ArrayList;
24  import java.util.Arrays;
25  import java.util.Collections;
26  import java.util.HashMap;
27  import java.util.HashSet;
28  import java.util.Iterator;
29  import java.util.List;
30  import java.util.Map;
31  import java.util.NavigableMap;
32  import java.util.Set;
33  import java.util.TreeMap;
34  import java.util.concurrent.ConcurrentHashMap;
35  import java.util.concurrent.CopyOnWriteArrayList;
36  import java.util.concurrent.ThreadFactory;
37  import java.util.concurrent.TimeUnit;
38  import java.util.concurrent.atomic.AtomicBoolean;
39  import java.util.concurrent.atomic.AtomicInteger;
40  import java.util.concurrent.locks.Lock;
41  import java.util.concurrent.locks.ReentrantLock;
42  
43  import org.apache.commons.logging.Log;
44  import org.apache.commons.logging.LogFactory;
45  import org.apache.hadoop.classification.InterfaceAudience;
46  import org.apache.hadoop.conf.Configuration;
47  import org.apache.hadoop.fs.FileSystem;
48  import org.apache.hadoop.fs.Path;
49  import org.apache.hadoop.hbase.CoordinatedStateException;
50  import org.apache.hadoop.hbase.HBaseIOException;
51  import org.apache.hadoop.hbase.HConstants;
52  import org.apache.hadoop.hbase.HRegionInfo;
53  import org.apache.hadoop.hbase.NotServingRegionException;
54  import org.apache.hadoop.hbase.RegionTransition;
55  import org.apache.hadoop.hbase.Server;
56  import org.apache.hadoop.hbase.ServerName;
57  import org.apache.hadoop.hbase.TableName;
58  import org.apache.hadoop.hbase.TableNotFoundException;
59  import org.apache.hadoop.hbase.TableStateManager;
60  import org.apache.hadoop.hbase.catalog.CatalogTracker;
61  import org.apache.hadoop.hbase.catalog.MetaReader;
62  import org.apache.hadoop.hbase.client.Result;
63  import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
64  import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
65  import org.apache.hadoop.hbase.coordination.RegionMergeCoordination;
66  import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination.SplitTransactionDetails;
67  import org.apache.hadoop.hbase.coordination.ZkOpenRegionCoordination;
68  import org.apache.hadoop.hbase.coordination.ZkRegionMergeCoordination;
69  import org.apache.hadoop.hbase.exceptions.DeserializationException;
70  import org.apache.hadoop.hbase.executor.EventHandler;
71  import org.apache.hadoop.hbase.executor.EventType;
72  import org.apache.hadoop.hbase.executor.ExecutorService;
73  import org.apache.hadoop.hbase.ipc.RpcClient;
74  import org.apache.hadoop.hbase.ipc.RpcClient.FailedServerException;
75  import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
76  import org.apache.hadoop.hbase.master.RegionState.State;
77  import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper;
78  import org.apache.hadoop.hbase.master.balancer.FavoredNodeLoadBalancer;
79  import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler;
80  import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
81  import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
82  import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler;
83  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
84  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
85  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
86  import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
87  import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
88  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
89  import org.apache.hadoop.hbase.regionserver.wal.HLog;
90  import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
91  import org.apache.hadoop.hbase.util.ConfigUtil;
92  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
93  import org.apache.hadoop.hbase.util.FSUtils;
94  import org.apache.hadoop.hbase.util.KeyLocker;
95  import org.apache.hadoop.hbase.util.Pair;
96  import org.apache.hadoop.hbase.util.PairOfSameType;
97  import org.apache.hadoop.hbase.util.Threads;
98  import org.apache.hadoop.hbase.util.Triple;
99  import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker;
100 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
101 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
102 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
103 import org.apache.hadoop.ipc.RemoteException;
104 import org.apache.zookeeper.AsyncCallback;
105 import org.apache.zookeeper.KeeperException;
106 import org.apache.zookeeper.KeeperException.NoNodeException;
107 import org.apache.zookeeper.KeeperException.NodeExistsException;
108 import org.apache.zookeeper.data.Stat;
109 
110 import com.google.common.annotations.VisibleForTesting;
111 import com.google.common.collect.LinkedHashMultimap;
112 
113 /**
114  * Manages and performs region assignment.
115  * <p>
116  * Monitors ZooKeeper for events related to regions in transition.
117  * <p>
118  * Handles existing regions in transition during master failover.
119  */
120 @InterfaceAudience.Private
121 public class AssignmentManager extends ZooKeeperListener {
122   private static final Log LOG = LogFactory.getLog(AssignmentManager.class);
123 
124   public static final ServerName HBCK_CODE_SERVERNAME = ServerName.valueOf(HConstants.HBCK_CODE_NAME,
125       -1, -1L);
126 
127   static final String ALREADY_IN_TRANSITION_WAITTIME
128     = "hbase.assignment.already.intransition.waittime";
129   static final int DEFAULT_ALREADY_IN_TRANSITION_WAITTIME = 60000; // 1 minute
130 
131   protected final Server server;
132 
133   private ServerManager serverManager;
134 
135   private boolean shouldAssignRegionsWithFavoredNodes;
136 
137   private CatalogTracker catalogTracker;
138 
139   private LoadBalancer balancer;
140 
141   private final MetricsAssignmentManager metricsAssignmentManager;
142 
143   private final TableLockManager tableLockManager;
144 
145   private AtomicInteger numRegionsOpened = new AtomicInteger(0);
146 
147   final private KeyLocker<String> locker = new KeyLocker<String>();
148 
149   /**
150    * Map of regions to reopen after the schema of a table is changed. Key -
151    * encoded region name, value - HRegionInfo
152    */
153   private final Map <String, HRegionInfo> regionsToReopen;
154 
155   /*
156    * Maximum times we recurse an assignment/unassignment.
157    * See below in {@link #assign()} and {@link #unassign()}.
158    */
159   private final int maximumAttempts;
160 
161   /**
162    * Map of two merging regions from the region to be created.
163    */
164   private final Map<String, PairOfSameType<HRegionInfo>> mergingRegions
165     = new HashMap<String, PairOfSameType<HRegionInfo>>();
166 
167   /**
168    * The sleep time for which the assignment will wait before retrying in case of hbase:meta assignment
169    * failure due to lack of availability of region plan
170    */
171   private final long sleepTimeBeforeRetryingMetaAssignment;
172 
173   /** Plans for region movement. Key is the encoded version of a region name*/
174   // TODO: When do plans get cleaned out?  Ever? In server open and in server
175   // shutdown processing -- St.Ack
176   // All access to this Map must be synchronized.
177   final NavigableMap<String, RegionPlan> regionPlans =
178     new TreeMap<String, RegionPlan>();
179 
180   private final TableStateManager tableStateManager;
181 
182   private final ExecutorService executorService;
183 
184   // For unit tests, keep track of calls to ClosedRegionHandler
185   private Map<HRegionInfo, AtomicBoolean> closedRegionHandlerCalled = null;
186 
187   // For unit tests, keep track of calls to OpenedRegionHandler
188   private Map<HRegionInfo, AtomicBoolean> openedRegionHandlerCalled = null;
189 
190   //Thread pool executor service for timeout monitor
191   private java.util.concurrent.ExecutorService threadPoolExecutorService;
192 
193   // A bunch of ZK events workers. Each is a single thread executor service
194   private final java.util.concurrent.ExecutorService zkEventWorkers;
195 
196   private List<EventType> ignoreStatesRSOffline = Arrays.asList(
197       EventType.RS_ZK_REGION_FAILED_OPEN, EventType.RS_ZK_REGION_CLOSED);
198 
199   private final RegionStates regionStates;
200 
201   // The threshold to use bulk assigning. Using bulk assignment
202   // only if assigning at least this many regions to at least this
203   // many servers. If assigning fewer regions to fewer servers,
204   // bulk assigning may be not as efficient.
205   private final int bulkAssignThresholdRegions;
206   private final int bulkAssignThresholdServers;
207 
208   // Should bulk assignment wait till all regions are assigned,
209   // or it is timed out?  This is useful to measure bulk assignment
210   // performance, but not needed in most use cases.
211   private final boolean bulkAssignWaitTillAllAssigned;
212 
213   /**
214    * Indicator that AssignmentManager has recovered the region states so
215    * that ServerShutdownHandler can be fully enabled and re-assign regions
216    * of dead servers. So that when re-assignment happens, AssignmentManager
217    * has proper region states.
218    *
219    * Protected to ease testing.
220    */
221   protected final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
222 
223   /**
224    * A map to track the count a region fails to open in a row.
225    * So that we don't try to open a region forever if the failure is
226    * unrecoverable.  We don't put this information in region states
227    * because we don't expect this to happen frequently; we don't
228    * want to copy this information over during each state transition either.
229    */
230   private final ConcurrentHashMap<String, AtomicInteger>
231     failedOpenTracker = new ConcurrentHashMap<String, AtomicInteger>();
232 
233   // A flag to indicate if we are using ZK for region assignment
234   private final boolean useZKForAssignment;
235 
236   // In case not using ZK for region assignment, region states
237   // are persisted in meta with a state store
238   private final RegionStateStore regionStateStore;
239 
240   /**
241    * For testing only!  Set to true to skip handling of split.
242    */
243   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
244   public static boolean TEST_SKIP_SPLIT_HANDLING = false;
245 
246   /** Listeners that are called on assignment events. */
247   private List<AssignmentListener> listeners = new CopyOnWriteArrayList<AssignmentListener>();
248 
249   /**
250    * Constructs a new assignment manager.
251    *
252    * @param server
253    * @param serverManager
254    * @param catalogTracker
255    * @param service
256    * @throws KeeperException
257    * @throws IOException
258    */
259   public AssignmentManager(Server server, ServerManager serverManager,
260       CatalogTracker catalogTracker, final LoadBalancer balancer,
261       final ExecutorService service, MetricsMaster metricsMaster,
262       final TableLockManager tableLockManager) throws KeeperException,
263         IOException, CoordinatedStateException {
264     super(server.getZooKeeper());
265     this.server = server;
266     this.serverManager = serverManager;
267     this.catalogTracker = catalogTracker;
268     this.executorService = service;
269     this.regionStateStore = new RegionStateStore(server);
270     this.regionsToReopen = Collections.synchronizedMap
271                            (new HashMap<String, HRegionInfo> ());
272     Configuration conf = server.getConfiguration();
273     // Only read favored nodes if using the favored nodes load balancer.
274     this.shouldAssignRegionsWithFavoredNodes = conf.getClass(
275            HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class).equals(
276            FavoredNodeLoadBalancer.class);
277     try {
278       if (server.getCoordinatedStateManager() != null) {
279         this.tableStateManager = server.getCoordinatedStateManager().getTableStateManager();
280       } else {
281         this.tableStateManager = null;
282       }
283     } catch (InterruptedException e) {
284       throw new InterruptedIOException();
285     }
286     // This is the max attempts, not retries, so it should be at least 1.
287     this.maximumAttempts = Math.max(1,
288       this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10));
289     this.sleepTimeBeforeRetryingMetaAssignment = this.server.getConfiguration().getLong(
290         "hbase.meta.assignment.retry.sleeptime", 1000l);
291     this.balancer = balancer;
292     int maxThreads = conf.getInt("hbase.assignment.threads.max", 30);
293     this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool(
294       maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("AM."));
295     this.regionStates = new RegionStates(server, serverManager, regionStateStore);
296 
297     this.bulkAssignWaitTillAllAssigned =
298       conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false);
299     this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7);
300     this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3);
301 
302     int workers = conf.getInt("hbase.assignment.zkevent.workers", 20);
303     ThreadFactory threadFactory = Threads.newDaemonThreadFactory("AM.ZK.Worker");
304     zkEventWorkers = Threads.getBoundedCachedThreadPool(workers, 60L,
305             TimeUnit.SECONDS, threadFactory);
306     this.tableLockManager = tableLockManager;
307 
308     this.metricsAssignmentManager = new MetricsAssignmentManager();
309     useZKForAssignment = ConfigUtil.useZKForAssignment(conf);
310   }
311 
312   /**
313    * Add the listener to the notification list.
314    * @param listener The AssignmentListener to register
315    */
316   public void registerListener(final AssignmentListener listener) {
317     this.listeners.add(listener);
318   }
319 
320   /**
321    * Remove the listener from the notification list.
322    * @param listener The AssignmentListener to unregister
323    */
324   public boolean unregisterListener(final AssignmentListener listener) {
325     return this.listeners.remove(listener);
326   }
327 
328   /**
329    * @return Instance of ZKTableStateManager.
330    */
331   public TableStateManager getTableStateManager() {
332     // These are 'expensive' to make involving trip to zk ensemble so allow
333     // sharing.
334     return this.tableStateManager;
335   }
336 
337   /**
338    * This SHOULD not be public. It is public now
339    * because of some unit tests.
340    *
341    * TODO: make it package private and keep RegionStates in the master package
342    */
343   public RegionStates getRegionStates() {
344     return regionStates;
345   }
346 
347   /**
348    * Used in some tests to mock up region state in meta
349    */
350   @VisibleForTesting
351   RegionStateStore getRegionStateStore() {
352     return regionStateStore;
353   }
354 
355   public RegionPlan getRegionReopenPlan(HRegionInfo hri) {
356     return new RegionPlan(hri, null, regionStates.getRegionServerOfRegion(hri));
357   }
358 
359   /**
360    * Add a regionPlan for the specified region.
361    * @param encodedName
362    * @param plan
363    */
364   public void addPlan(String encodedName, RegionPlan plan) {
365     synchronized (regionPlans) {
366       regionPlans.put(encodedName, plan);
367     }
368   }
369 
370   /**
371    * Add a map of region plans.
372    */
373   public void addPlans(Map<String, RegionPlan> plans) {
374     synchronized (regionPlans) {
375       regionPlans.putAll(plans);
376     }
377   }
378 
379   /**
380    * Set the list of regions that will be reopened
381    * because of an update in table schema
382    *
383    * @param regions
384    *          list of regions that should be tracked for reopen
385    */
386   public void setRegionsToReopen(List <HRegionInfo> regions) {
387     for(HRegionInfo hri : regions) {
388       regionsToReopen.put(hri.getEncodedName(), hri);
389     }
390   }
391 
392   /**
393    * Used by the client to identify if all regions have the schema updates
394    *
395    * @param tableName
396    * @return Pair indicating the status of the alter command
397    * @throws IOException
398    */
399   public Pair<Integer, Integer> getReopenStatus(TableName tableName)
400       throws IOException {
401     List <HRegionInfo> hris =
402       MetaReader.getTableRegions(this.server.getCatalogTracker(), tableName, true);
403     Integer pending = 0;
404     for (HRegionInfo hri : hris) {
405       String name = hri.getEncodedName();
406       // no lock concurrent access ok: sequential consistency respected.
407       if (regionsToReopen.containsKey(name)
408           || regionStates.isRegionInTransition(name)) {
409         pending++;
410       }
411     }
412     return new Pair<Integer, Integer>(pending, hris.size());
413   }
414 
415   /**
416    * Used by ServerShutdownHandler to make sure AssignmentManager has completed
417    * the failover cleanup before re-assigning regions of dead servers. So that
418    * when re-assignment happens, AssignmentManager has proper region states.
419    */
420   public boolean isFailoverCleanupDone() {
421     return failoverCleanupDone.get();
422   }
423 
424   /**
425    * To avoid racing with AM, external entities may need to lock a region,
426    * for example, when SSH checks what regions to skip re-assigning.
427    */
428   public Lock acquireRegionLock(final String encodedName) {
429     return locker.acquireLock(encodedName);
430   }
431 
432   /**
433    * Now, failover cleanup is completed. Notify server manager to
434    * process queued up dead servers processing, if any.
435    */
436   void failoverCleanupDone() {
437     failoverCleanupDone.set(true);
438     serverManager.processQueuedDeadServers();
439   }
440 
441   /**
442    * Called on startup.
443    * Figures whether a fresh cluster start of we are joining extant running cluster.
444    * @throws IOException
445    * @throws KeeperException
446    * @throws InterruptedException
447    * @throws CoordinatedStateException
448    */
449   void joinCluster() throws IOException,
450       KeeperException, InterruptedException, CoordinatedStateException {
451     long startTime = System.currentTimeMillis();
452     // Concurrency note: In the below the accesses on regionsInTransition are
453     // outside of a synchronization block where usually all accesses to RIT are
454     // synchronized.  The presumption is that in this case it is safe since this
455     // method is being played by a single thread on startup.
456 
457     // TODO: Regions that have a null location and are not in regionsInTransitions
458     // need to be handled.
459 
460     // Scan hbase:meta to build list of existing regions, servers, and assignment
461     // Returns servers who have not checked in (assumed dead) and their regions
462     Map<ServerName, List<HRegionInfo>> deadServers;
463 
464     deadServers = rebuildUserRegions();
465     // This method will assign all user regions if a clean server startup or
466     // it will reconstruct master state and cleanup any leftovers from
467     // previous master process.
468     boolean failover = processDeadServersAndRegionsInTransition(deadServers);
469 
470     if (!useZKForAssignment) {
471       // Not use ZK for assignment any more, remove the ZNode
472       ZKUtil.deleteNodeFailSilent(watcher, watcher.assignmentZNode);
473     }
474     recoverTableInDisablingState();
475     recoverTableInEnablingState();
476     LOG.info("Joined the cluster in " + (System.currentTimeMillis()
477       - startTime) + "ms, failover=" + failover);
478   }
479 
480   /**
481    * Process all regions that are in transition in zookeeper and also
482    * processes the list of dead servers by scanning the META.
483    * Used by master joining an cluster.  If we figure this is a clean cluster
484    * startup, will assign all user regions.
485    * @param deadServers
486    *          Map of dead servers and their regions. Can be null.
487    * @throws KeeperException
488    * @throws IOException
489    * @throws InterruptedException
490    */
491   boolean processDeadServersAndRegionsInTransition(
492       final Map<ServerName, List<HRegionInfo>> deadServers)
493     throws KeeperException, IOException, InterruptedException, CoordinatedStateException {
494     List<String> nodes = ZKUtil.listChildrenNoWatch(watcher,
495       watcher.assignmentZNode);
496 
497     if (useZKForAssignment && nodes == null) {
498       String errorMessage = "Failed to get the children from ZK";
499       server.abort(errorMessage, new IOException(errorMessage));
500       return true; // Doesn't matter in this case
501     }
502 
503     boolean failover = !serverManager.getDeadServers().isEmpty();
504     if (failover) {
505       // This may not be a failover actually, especially if meta is on this master.
506       if (LOG.isDebugEnabled()) {
507         LOG.debug("Found dead servers out on cluster " + serverManager.getDeadServers());
508       }
509     } else {
510       // If any one region except meta is assigned, it's a failover.
511       for (HRegionInfo hri: regionStates.getRegionAssignments().keySet()) {
512         if (!hri.isMetaTable()) {
513           LOG.debug("Found " + hri + " out on cluster");
514           failover = true;
515           break;
516         }
517       }
518     }
519     if (!failover && nodes != null) {
520       // If any one region except meta is in transition, it's a failover.
521       for (String encodedName: nodes) {
522         RegionState regionState = regionStates.getRegionState(encodedName);
523         if (regionState != null && !regionState.getRegion().isMetaRegion()) {
524           LOG.debug("Found " + regionState + " in RITs");
525           failover = true;
526           break;
527         }
528       }
529     }
530     if (!failover && !useZKForAssignment) {
531       // If any region except meta is in transition on a live server, it's a failover.
532       Map<String, RegionState> regionsInTransition = regionStates.getRegionsInTransition();
533       if (!regionsInTransition.isEmpty()) {
534         Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
535         for (RegionState regionState: regionsInTransition.values()) {
536           if (!regionState.getRegion().isMetaRegion()
537               && onlineServers.contains(regionState.getServerName())) {
538             LOG.debug("Found " + regionState + " in RITs");
539             failover = true;
540             break;
541           }
542         }
543       }
544     }
545     if (!failover) {
546       // If we get here, we have a full cluster restart. It is a failover only
547       // if there are some HLogs are not split yet. For meta HLogs, they should have
548       // been split already, if any. We can walk through those queued dead servers,
549       // if they don't have any HLogs, this restart should be considered as a clean one
550       Set<ServerName> queuedDeadServers = serverManager.getRequeuedDeadServers().keySet();
551       if (!queuedDeadServers.isEmpty()) {
552         Configuration conf = server.getConfiguration();
553         Path rootdir = FSUtils.getRootDir(conf);
554         FileSystem fs = rootdir.getFileSystem(conf);
555         for (ServerName serverName: queuedDeadServers) {
556           Path logDir = new Path(rootdir, HLogUtil.getHLogDirectoryName(serverName.toString()));
557           Path splitDir = logDir.suffix(HLog.SPLITTING_EXT);
558           if (fs.exists(logDir) || fs.exists(splitDir)) {
559             LOG.debug("Found queued dead server " + serverName);
560             failover = true;
561             break;
562           }
563         }
564         if (!failover) {
565           // We figured that it's not a failover, so no need to
566           // work on these re-queued dead servers any more.
567           LOG.info("AM figured that it's not a failover and cleaned up "
568             + queuedDeadServers.size() + " queued dead servers");
569           serverManager.removeRequeuedDeadServers();
570         }
571       }
572     }
573 
574     Set<TableName> disabledOrDisablingOrEnabling = null;
575     Map<HRegionInfo, ServerName> allRegions = null;
576 
577     if (!failover) {
578       disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
579         ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING,
580         ZooKeeperProtos.Table.State.ENABLING);
581 
582       // Clean re/start, mark all user regions closed before reassignment
583       allRegions = regionStates.closeAllUserRegions(
584         disabledOrDisablingOrEnabling);
585     }
586 
587     // Now region states are restored
588     regionStateStore.start();
589 
590     // If we found user regions out on cluster, its a failover.
591     if (failover) {
592       LOG.info("Found regions out on cluster or in RIT; presuming failover");
593       // Process list of dead servers and regions in RIT.
594       // See HBASE-4580 for more information.
595       processDeadServersAndRecoverLostRegions(deadServers);
596     }
597 
598     if (!failover && useZKForAssignment) {
599       // Cleanup any existing ZK nodes and start watching
600       ZKAssign.deleteAllNodes(watcher);
601       ZKUtil.listChildrenAndWatchForNewChildren(this.watcher,
602         this.watcher.assignmentZNode);
603     }
604 
605     // Now we can safely claim failover cleanup completed and enable
606     // ServerShutdownHandler for further processing. The nodes (below)
607     // in transition, if any, are for regions not related to those
608     // dead servers at all, and can be done in parallel to SSH.
609     failoverCleanupDone();
610     if (!failover) {
611       // Fresh cluster startup.
612       LOG.info("Clean cluster startup. Assigning user regions");
613       assignAllUserRegions(allRegions);
614     }
615     return failover;
616   }
617 
618   /**
619    * If region is up in zk in transition, then do fixup and block and wait until
620    * the region is assigned and out of transition.  Used on startup for
621    * catalog regions.
622    * @param hri Region to look for.
623    * @return True if we processed a region in transition else false if region
624    * was not up in zk in transition.
625    * @throws InterruptedException
626    * @throws KeeperException
627    * @throws IOException
628    */
629   boolean processRegionInTransitionAndBlockUntilAssigned(final HRegionInfo hri)
630       throws InterruptedException, KeeperException, IOException {
631     String encodedRegionName = hri.getEncodedName();
632     if (!processRegionInTransition(encodedRegionName, hri)) {
633       return false; // The region is not in transition
634     }
635     LOG.debug("Waiting on " + HRegionInfo.prettyPrint(encodedRegionName));
636     while (!this.server.isStopped() &&
637         this.regionStates.isRegionInTransition(encodedRegionName)) {
638       RegionState state = this.regionStates.getRegionTransitionState(encodedRegionName);
639       if (state == null || !serverManager.isServerOnline(state.getServerName())) {
640         // The region is not in transition, or not in transition on an online
641         // server. Doesn't help to block here any more. Caller need to
642         // verify the region is actually assigned.
643         break;
644       }
645       this.regionStates.waitForUpdate(100);
646     }
647     return true;
648   }
649 
650   /**
651    * Process failover of new master for region <code>encodedRegionName</code>
652    * up in zookeeper.
653    * @param encodedRegionName Region to process failover for.
654    * @param regionInfo If null we'll go get it from meta table.
655    * @return True if we processed <code>regionInfo</code> as a RIT.
656    * @throws KeeperException
657    * @throws IOException
658    */
659   boolean processRegionInTransition(final String encodedRegionName,
660       final HRegionInfo regionInfo) throws KeeperException, IOException {
661     // We need a lock here to ensure that we will not put the same region twice
662     // It has no reason to be a lock shared with the other operations.
663     // We can do the lock on the region only, instead of a global lock: what we want to ensure
664     // is that we don't have two threads working on the same region.
665     Lock lock = locker.acquireLock(encodedRegionName);
666     try {
667       Stat stat = new Stat();
668       byte [] data = ZKAssign.getDataAndWatch(watcher, encodedRegionName, stat);
669       if (data == null) return false;
670       RegionTransition rt;
671       try {
672         rt = RegionTransition.parseFrom(data);
673       } catch (DeserializationException e) {
674         LOG.warn("Failed parse znode data", e);
675         return false;
676       }
677       HRegionInfo hri = regionInfo;
678       if (hri == null) {
679         // The region info is not passed in. We will try to find the region
680         // from region states map/meta based on the encoded region name. But we
681         // may not be able to find it. This is valid for online merge that
682         // the region may have not been created if the merge is not completed.
683         // Therefore, it is not in meta at master recovery time.
684         hri = regionStates.getRegionInfo(rt.getRegionName());
685         EventType et = rt.getEventType();
686         if (hri == null && et != EventType.RS_ZK_REGION_MERGING
687             && et != EventType.RS_ZK_REQUEST_REGION_MERGE) {
688           LOG.warn("Couldn't find the region in recovering " + rt);
689           return false;
690         }
691       }
692 
693       // TODO: This code is tied to ZK anyway, so for now leaving it as is,
694       // will refactor when whole region assignment will be abstracted from ZK
695       BaseCoordinatedStateManager cp =
696         (BaseCoordinatedStateManager) this.server.getCoordinatedStateManager();
697       OpenRegionCoordination openRegionCoordination = cp.getOpenRegionCoordination();
698 
699       ZkOpenRegionCoordination.ZkOpenRegionDetails zkOrd =
700         new ZkOpenRegionCoordination.ZkOpenRegionDetails();
701       zkOrd.setVersion(stat.getVersion());
702       zkOrd.setServerName(cp.getServer().getServerName());
703 
704       return processRegionsInTransition(
705         rt, hri, openRegionCoordination, zkOrd);
706     } finally {
707       lock.unlock();
708     }
709   }
710 
711   /**
712    * This call is invoked only (1) master assign meta;
713    * (2) during failover mode startup, zk assignment node processing.
714    * The locker is set in the caller. It returns true if the region
715    * is in transition for sure, false otherwise.
716    *
717    * It should be private but it is used by some test too.
718    */
719   boolean processRegionsInTransition(
720       final RegionTransition rt, final HRegionInfo regionInfo,
721       OpenRegionCoordination coordination,
722       final OpenRegionCoordination.OpenRegionDetails ord) throws KeeperException {
723     EventType et = rt.getEventType();
724     // Get ServerName.  Could not be null.
725     final ServerName sn = rt.getServerName();
726     final byte[] regionName = rt.getRegionName();
727     final String encodedName = HRegionInfo.encodeRegionName(regionName);
728     final String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
729     LOG.info("Processing " + prettyPrintedRegionName + " in state: " + et);
730 
731     if (regionStates.isRegionInTransition(encodedName)) {
732       LOG.info("Processed region " + prettyPrintedRegionName + " in state: "
733         + et + ", does nothing since the region is already in transition "
734         + regionStates.getRegionTransitionState(encodedName));
735       // Just return
736       return true;
737     }
738     if (!serverManager.isServerOnline(sn)) {
739       // It was transitioning on a dead server, so it's closed now.
740       // Force to OFFLINE and put it in transition, but not assign it
741       // since log splitting for the dead server is not done yet.
742       LOG.debug("RIT " + encodedName + " in state=" + rt.getEventType() +
743         " was on deadserver; forcing offline");
744       if (regionStates.isRegionOnline(regionInfo)) {
745         // Meta could still show the region is assigned to the previous
746         // server. If that server is online, when we reload the meta, the
747         // region is put back to online, we need to offline it.
748         regionStates.regionOffline(regionInfo);
749         sendRegionClosedNotification(regionInfo);
750       }
751       // Put it back in transition so that SSH can re-assign it
752       regionStates.updateRegionState(regionInfo, State.OFFLINE, sn);
753 
754       if (regionInfo.isMetaRegion()) {
755         // If it's meta region, reset the meta location.
756         // So that master knows the right meta region server.
757         MetaRegionTracker.setMetaLocation(watcher, sn);
758       } else {
759         // No matter the previous server is online or offline,
760         // we need to reset the last region server of the region.
761         regionStates.setLastRegionServerOfRegion(sn, encodedName);
762         // Make sure we know the server is dead.
763         if (!serverManager.isServerDead(sn)) {
764           serverManager.expireServer(sn);
765         }
766       }
767       return false;
768     }
769     switch (et) {
770       case M_ZK_REGION_CLOSING:
771         // Insert into RIT & resend the query to the region server: may be the previous master
772         // died before sending the query the first time.
773         final RegionState rsClosing = regionStates.updateRegionState(rt, State.CLOSING);
774         this.executorService.submit(
775           new EventHandler(server, EventType.M_MASTER_RECOVERY) {
776             @Override
777             public void process() throws IOException {
778               ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
779               try {
780                 final int expectedVersion = ((ZkOpenRegionCoordination.ZkOpenRegionDetails) ord)
781                   .getVersion();
782                 unassign(regionInfo, rsClosing, expectedVersion, null, useZKForAssignment, null);
783                 if (regionStates.isRegionOffline(regionInfo)) {
784                   assign(regionInfo, true);
785                 }
786               } finally {
787                 lock.unlock();
788               }
789             }
790           });
791         break;
792 
793       case RS_ZK_REGION_CLOSED:
794       case RS_ZK_REGION_FAILED_OPEN:
795         // Region is closed, insert into RIT and handle it
796         regionStates.updateRegionState(regionInfo, State.CLOSED, sn);
797         invokeAssign(regionInfo);
798         break;
799 
800       case M_ZK_REGION_OFFLINE:
801         // Insert in RIT and resend to the regionserver
802         regionStates.updateRegionState(rt, State.PENDING_OPEN);
803         final RegionState rsOffline = regionStates.getRegionState(regionInfo);
804         this.executorService.submit(
805           new EventHandler(server, EventType.M_MASTER_RECOVERY) {
806             @Override
807             public void process() throws IOException {
808               ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
809               try {
810                 RegionPlan plan = new RegionPlan(regionInfo, null, sn);
811                 addPlan(encodedName, plan);
812                 assign(rsOffline, false, false);
813               } finally {
814                 lock.unlock();
815               }
816             }
817           });
818         break;
819 
820       case RS_ZK_REGION_OPENING:
821         regionStates.updateRegionState(rt, State.OPENING);
822         break;
823 
824       case RS_ZK_REGION_OPENED:
825         // Region is opened, insert into RIT and handle it
826         // This could be done asynchronously, we would need then to acquire the lock in the
827         //  handler.
828         regionStates.updateRegionState(rt, State.OPEN);
829         new OpenedRegionHandler(server, this, regionInfo, coordination, ord).process();
830         break;
831       case RS_ZK_REQUEST_REGION_SPLIT:
832       case RS_ZK_REGION_SPLITTING:
833       case RS_ZK_REGION_SPLIT:
834         // Splitting region should be online. We could have skipped it during
835         // user region rebuilding since we may consider the split is completed.
836         // Put it in SPLITTING state to avoid complications.
837         regionStates.regionOnline(regionInfo, sn);
838         regionStates.updateRegionState(rt, State.SPLITTING);
839         if (!handleRegionSplitting(
840             rt, encodedName, prettyPrintedRegionName, sn)) {
841           deleteSplittingNode(encodedName, sn);
842         }
843         break;
844       case RS_ZK_REQUEST_REGION_MERGE:
845       case RS_ZK_REGION_MERGING:
846       case RS_ZK_REGION_MERGED:
847         if (!handleRegionMerging(
848             rt, encodedName, prettyPrintedRegionName, sn)) {
849           deleteMergingNode(encodedName, sn);
850         }
851         break;
852       default:
853         throw new IllegalStateException("Received region in state:" + et + " is not valid.");
854     }
855     LOG.info("Processed region " + prettyPrintedRegionName + " in state "
856       + et + ", on " + (serverManager.isServerOnline(sn) ? "" : "dead ")
857       + "server: " + sn);
858     return true;
859   }
860 
861   /**
862    * When a region is closed, it should be removed from the regionsToReopen
863    * @param hri HRegionInfo of the region which was closed
864    */
865   public void removeClosedRegion(HRegionInfo hri) {
866     if (regionsToReopen.remove(hri.getEncodedName()) != null) {
867       LOG.debug("Removed region from reopening regions because it was closed");
868     }
869   }
870 
871   /**
872    * Handles various states an unassigned node can be in.
873    * <p>
874    * Method is called when a state change is suspected for an unassigned node.
875    * <p>
876    * This deals with skipped transitions (we got a CLOSED but didn't see CLOSING
877    * yet).
878    * @param rt region transition
879    * @param coordination coordination for opening region
880    * @param ord details about opening region
881    */
882   void handleRegion(final RegionTransition rt, OpenRegionCoordination coordination,
883                     OpenRegionCoordination.OpenRegionDetails ord) {
884     if (rt == null) {
885       LOG.warn("Unexpected NULL input for RegionTransition rt");
886       return;
887     }
888     final ServerName sn = rt.getServerName();
889     // Check if this is a special HBCK transition
890     if (sn.equals(HBCK_CODE_SERVERNAME)) {
891       handleHBCK(rt);
892       return;
893     }
894     final long createTime = rt.getCreateTime();
895     final byte[] regionName = rt.getRegionName();
896     String encodedName = HRegionInfo.encodeRegionName(regionName);
897     String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
898     // Verify this is a known server
899     if (!serverManager.isServerOnline(sn)
900       && !ignoreStatesRSOffline.contains(rt.getEventType())) {
901       LOG.warn("Attempted to handle region transition for server but " +
902         "it is not online: " + prettyPrintedRegionName + ", " + rt);
903       return;
904     }
905 
906     RegionState regionState =
907       regionStates.getRegionState(encodedName);
908     long startTime = System.currentTimeMillis();
909     if (LOG.isDebugEnabled()) {
910       boolean lateEvent = createTime < (startTime - 15000);
911       LOG.debug("Handling " + rt.getEventType() +
912         ", server=" + sn + ", region=" +
913         (prettyPrintedRegionName == null ? "null" : prettyPrintedRegionName) +
914         (lateEvent ? ", which is more than 15 seconds late" : "") +
915         ", current_state=" + regionState);
916     }
917     // We don't do anything for this event,
918     // so separate it out, no need to lock/unlock anything
919     if (rt.getEventType() == EventType.M_ZK_REGION_OFFLINE) {
920       return;
921     }
922 
923     // We need a lock on the region as we could update it
924     Lock lock = locker.acquireLock(encodedName);
925     try {
926       RegionState latestState =
927         regionStates.getRegionState(encodedName);
928       if ((regionState == null && latestState != null)
929           || (regionState != null && latestState == null)
930           || (regionState != null && latestState != null
931             && latestState.getState() != regionState.getState())) {
932         LOG.warn("Region state changed from " + regionState + " to "
933           + latestState + ", while acquiring lock");
934       }
935       long waitedTime = System.currentTimeMillis() - startTime;
936       if (waitedTime > 5000) {
937         LOG.warn("Took " + waitedTime + "ms to acquire the lock");
938       }
939       regionState = latestState;
940       switch (rt.getEventType()) {
941       case RS_ZK_REQUEST_REGION_SPLIT:
942       case RS_ZK_REGION_SPLITTING:
943       case RS_ZK_REGION_SPLIT:
944         if (!handleRegionSplitting(
945             rt, encodedName, prettyPrintedRegionName, sn)) {
946           deleteSplittingNode(encodedName, sn);
947         }
948         break;
949 
950       case RS_ZK_REQUEST_REGION_MERGE:
951       case RS_ZK_REGION_MERGING:
952       case RS_ZK_REGION_MERGED:
953         // Merged region is a new region, we can't find it in the region states now.
954         // However, the two merging regions are not new. They should be in state for merging.
955         if (!handleRegionMerging(
956             rt, encodedName, prettyPrintedRegionName, sn)) {
957           deleteMergingNode(encodedName, sn);
958         }
959         break;
960 
961       case M_ZK_REGION_CLOSING:
962         // Should see CLOSING after we have asked it to CLOSE or additional
963         // times after already being in state of CLOSING
964         if (regionState == null
965             || !regionState.isPendingCloseOrClosingOnServer(sn)) {
966           LOG.warn("Received CLOSING for " + prettyPrintedRegionName
967             + " from " + sn + " but the region isn't PENDING_CLOSE/CLOSING here: "
968             + regionStates.getRegionState(encodedName));
969           return;
970         }
971         // Transition to CLOSING (or update stamp if already CLOSING)
972         regionStates.updateRegionState(rt, State.CLOSING);
973         break;
974 
975       case RS_ZK_REGION_CLOSED:
976         // Should see CLOSED after CLOSING but possible after PENDING_CLOSE
977         if (regionState == null
978             || !regionState.isPendingCloseOrClosingOnServer(sn)) {
979           LOG.warn("Received CLOSED for " + prettyPrintedRegionName
980             + " from " + sn + " but the region isn't PENDING_CLOSE/CLOSING here: "
981             + regionStates.getRegionState(encodedName));
982           return;
983         }
984         // Handle CLOSED by assigning elsewhere or stopping if a disable
985         // If we got here all is good.  Need to update RegionState -- else
986         // what follows will fail because not in expected state.
987         new ClosedRegionHandler(server, this, regionState.getRegion()).process();
988         updateClosedRegionHandlerTracker(regionState.getRegion());
989         break;
990 
991         case RS_ZK_REGION_FAILED_OPEN:
992           if (regionState == null
993               || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
994             LOG.warn("Received FAILED_OPEN for " + prettyPrintedRegionName
995               + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
996               + regionStates.getRegionState(encodedName));
997             return;
998           }
999           AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
1000           if (failedOpenCount == null) {
1001             failedOpenCount = new AtomicInteger();
1002             // No need to use putIfAbsent, or extra synchronization since
1003             // this whole handleRegion block is locked on the encoded region
1004             // name, and failedOpenTracker is updated only in this block
1005             failedOpenTracker.put(encodedName, failedOpenCount);
1006           }
1007           if (failedOpenCount.incrementAndGet() >= maximumAttempts) {
1008             regionStates.updateRegionState(rt, State.FAILED_OPEN);
1009             // remove the tracking info to save memory, also reset
1010             // the count for next open initiative
1011             failedOpenTracker.remove(encodedName);
1012           } else {
1013             // Handle this the same as if it were opened and then closed.
1014             regionState = regionStates.updateRegionState(rt, State.CLOSED);
1015             if (regionState != null) {
1016               // When there are more than one region server a new RS is selected as the
1017               // destination and the same is updated in the regionplan. (HBASE-5546)
1018               try {
1019                 getRegionPlan(regionState.getRegion(), sn, true);
1020                 new ClosedRegionHandler(server, this, regionState.getRegion()).process();
1021               } catch (HBaseIOException e) {
1022                 LOG.warn("Failed to get region plan", e);
1023               }
1024             }
1025           }
1026           break;
1027 
1028         case RS_ZK_REGION_OPENING:
1029           // Should see OPENING after we have asked it to OPEN or additional
1030           // times after already being in state of OPENING
1031           if (regionState == null
1032               || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
1033             LOG.warn("Received OPENING for " + prettyPrintedRegionName
1034               + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
1035               + regionStates.getRegionState(encodedName));
1036             return;
1037           }
1038           // Transition to OPENING (or update stamp if already OPENING)
1039           regionStates.updateRegionState(rt, State.OPENING);
1040           break;
1041 
1042         case RS_ZK_REGION_OPENED:
1043           // Should see OPENED after OPENING but possible after PENDING_OPEN.
1044           if (regionState == null
1045               || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
1046             LOG.warn("Received OPENED for " + prettyPrintedRegionName
1047               + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
1048               + regionStates.getRegionState(encodedName));
1049 
1050             if (regionState != null) {
1051               // Close it without updating the internal region states,
1052               // so as not to create double assignments in unlucky scenarios
1053               // mentioned in OpenRegionHandler#process
1054               unassign(regionState.getRegion(), null, -1, null, false, sn);
1055             }
1056             return;
1057           }
1058           // Handle OPENED by removing from transition and deleted zk node
1059           regionState = regionStates.updateRegionState(rt, State.OPEN);
1060           if (regionState != null) {
1061             failedOpenTracker.remove(encodedName); // reset the count, if any
1062             new OpenedRegionHandler(
1063               server, this, regionState.getRegion(), coordination, ord).process();
1064             updateOpenedRegionHandlerTracker(regionState.getRegion());
1065           }
1066           break;
1067 
1068         default:
1069           throw new IllegalStateException("Received event is not valid.");
1070       }
1071     } finally {
1072       lock.unlock();
1073     }
1074   }
1075 
1076   //For unit tests only
1077   boolean wasClosedHandlerCalled(HRegionInfo hri) {
1078     AtomicBoolean b = closedRegionHandlerCalled.get(hri);
1079     //compareAndSet to be sure that unit tests don't see stale values. Means,
1080     //we will return true exactly once unless the handler code resets to true
1081     //this value.
1082     return b == null ? false : b.compareAndSet(true, false);
1083   }
1084 
1085   //For unit tests only
1086   boolean wasOpenedHandlerCalled(HRegionInfo hri) {
1087     AtomicBoolean b = openedRegionHandlerCalled.get(hri);
1088     //compareAndSet to be sure that unit tests don't see stale values. Means,
1089     //we will return true exactly once unless the handler code resets to true
1090     //this value.
1091     return b == null ? false : b.compareAndSet(true, false);
1092   }
1093 
1094   //For unit tests only
1095   void initializeHandlerTrackers() {
1096     closedRegionHandlerCalled = new HashMap<HRegionInfo, AtomicBoolean>();
1097     openedRegionHandlerCalled = new HashMap<HRegionInfo, AtomicBoolean>();
1098   }
1099 
1100   void updateClosedRegionHandlerTracker(HRegionInfo hri) {
1101     if (closedRegionHandlerCalled != null) { //only for unit tests this is true
1102       closedRegionHandlerCalled.put(hri, new AtomicBoolean(true));
1103     }
1104   }
1105 
1106   void updateOpenedRegionHandlerTracker(HRegionInfo hri) {
1107     if (openedRegionHandlerCalled != null) { //only for unit tests this is true
1108       openedRegionHandlerCalled.put(hri, new AtomicBoolean(true));
1109     }
1110   }
1111 
1112   // TODO: processFavoredNodes might throw an exception, for e.g., if the
1113   // meta could not be contacted/updated. We need to see how seriously to treat
1114   // this problem as. Should we fail the current assignment. We should be able
1115   // to recover from this problem eventually (if the meta couldn't be updated
1116   // things should work normally and eventually get fixed up).
1117   void processFavoredNodes(List<HRegionInfo> regions) throws IOException {
1118     if (!shouldAssignRegionsWithFavoredNodes) return;
1119     // The AM gets the favored nodes info for each region and updates the meta
1120     // table with that info
1121     Map<HRegionInfo, List<ServerName>> regionToFavoredNodes =
1122         new HashMap<HRegionInfo, List<ServerName>>();
1123     for (HRegionInfo region : regions) {
1124       regionToFavoredNodes.put(region,
1125           ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region));
1126     }
1127     FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(regionToFavoredNodes, catalogTracker);
1128   }
1129 
1130   /**
1131    * Handle a ZK unassigned node transition triggered by HBCK repair tool.
1132    * <p>
1133    * This is handled in a separate code path because it breaks the normal rules.
1134    * @param rt
1135    */
1136   private void handleHBCK(RegionTransition rt) {
1137     String encodedName = HRegionInfo.encodeRegionName(rt.getRegionName());
1138     LOG.info("Handling HBCK triggered transition=" + rt.getEventType() +
1139       ", server=" + rt.getServerName() + ", region=" +
1140       HRegionInfo.prettyPrint(encodedName));
1141     RegionState regionState = regionStates.getRegionTransitionState(encodedName);
1142     switch (rt.getEventType()) {
1143       case M_ZK_REGION_OFFLINE:
1144         HRegionInfo regionInfo;
1145         if (regionState != null) {
1146           regionInfo = regionState.getRegion();
1147         } else {
1148           try {
1149             byte [] name = rt.getRegionName();
1150             Pair<HRegionInfo, ServerName> p = MetaReader.getRegion(catalogTracker, name);
1151             regionInfo = p.getFirst();
1152           } catch (IOException e) {
1153             LOG.info("Exception reading hbase:meta doing HBCK repair operation", e);
1154             return;
1155           }
1156         }
1157         LOG.info("HBCK repair is triggering assignment of region=" +
1158             regionInfo.getRegionNameAsString());
1159         // trigger assign, node is already in OFFLINE so don't need to update ZK
1160         assign(regionInfo, false);
1161         break;
1162 
1163       default:
1164         LOG.warn("Received unexpected region state from HBCK: " + rt.toString());
1165         break;
1166     }
1167 
1168   }
1169 
1170   // ZooKeeper events
1171 
1172   /**
1173    * New unassigned node has been created.
1174    *
1175    * <p>This happens when an RS begins the OPENING or CLOSING of a region by
1176    * creating an unassigned node.
1177    *
1178    * <p>When this happens we must:
1179    * <ol>
1180    *   <li>Watch the node for further events</li>
1181    *   <li>Read and handle the state in the node</li>
1182    * </ol>
1183    */
1184   @Override
1185   public void nodeCreated(String path) {
1186     handleAssignmentEvent(path);
1187   }
1188 
1189   /**
1190    * Existing unassigned node has had data changed.
1191    *
1192    * <p>This happens when an RS transitions from OFFLINE to OPENING, or between
1193    * OPENING/OPENED and CLOSING/CLOSED.
1194    *
1195    * <p>When this happens we must:
1196    * <ol>
1197    *   <li>Watch the node for further events</li>
1198    *   <li>Read and handle the state in the node</li>
1199    * </ol>
1200    */
1201   @Override
1202   public void nodeDataChanged(String path) {
1203     handleAssignmentEvent(path);
1204   }
1205 
1206 
1207   // We  don't want to have two events on the same region managed simultaneously.
1208   // For this reason, we need to wait if an event on the same region is currently in progress.
1209   // So we track the region names of the events in progress, and we keep a waiting list.
1210   private final Set<String> regionsInProgress = new HashSet<String>();
1211   // In a LinkedHashMultimap, the put order is kept when we retrieve the collection back. We need
1212   //  this as we want the events to be managed in the same order as we received them.
1213   private final LinkedHashMultimap <String, RegionRunnable>
1214       zkEventWorkerWaitingList = LinkedHashMultimap.create();
1215 
1216   /**
1217    * A specific runnable that works only on a region.
1218    */
1219   private interface RegionRunnable extends Runnable{
1220     /**
1221      * @return - the name of the region it works on.
1222      */
1223     String getRegionName();
1224   }
1225 
1226   /**
1227    * Submit a task, ensuring that there is only one task at a time that working on a given region.
1228    * Order is respected.
1229    */
1230   protected void zkEventWorkersSubmit(final RegionRunnable regRunnable) {
1231 
1232     synchronized (regionsInProgress) {
1233       // If we're there is already a task with this region, we add it to the
1234       //  waiting list and return.
1235       if (regionsInProgress.contains(regRunnable.getRegionName())) {
1236         synchronized (zkEventWorkerWaitingList){
1237           zkEventWorkerWaitingList.put(regRunnable.getRegionName(), regRunnable);
1238         }
1239         return;
1240       }
1241 
1242       // No event in progress on this region => we can submit a new task immediately.
1243       regionsInProgress.add(regRunnable.getRegionName());
1244       zkEventWorkers.submit(new Runnable() {
1245         @Override
1246         public void run() {
1247           try {
1248             regRunnable.run();
1249           } finally {
1250             // now that we have finished, let's see if there is an event for the same region in the
1251             //  waiting list. If it's the case, we can now submit it to the pool.
1252             synchronized (regionsInProgress) {
1253               regionsInProgress.remove(regRunnable.getRegionName());
1254               synchronized (zkEventWorkerWaitingList) {
1255                 java.util.Set<RegionRunnable> waiting = zkEventWorkerWaitingList.get(
1256                     regRunnable.getRegionName());
1257                 if (!waiting.isEmpty()) {
1258                   // We want the first object only. The only way to get it is through an iterator.
1259                   RegionRunnable toSubmit = waiting.iterator().next();
1260                   zkEventWorkerWaitingList.remove(toSubmit.getRegionName(), toSubmit);
1261                   zkEventWorkersSubmit(toSubmit);
1262                 }
1263               }
1264             }
1265           }
1266         }
1267       });
1268     }
1269   }
1270 
1271   @Override
1272   public void nodeDeleted(final String path) {
1273     if (path.startsWith(watcher.assignmentZNode)) {
1274       final String regionName = ZKAssign.getRegionName(watcher, path);
1275       zkEventWorkersSubmit(new RegionRunnable() {
1276         @Override
1277         public String getRegionName() {
1278           return regionName;
1279         }
1280 
1281         @Override
1282         public void run() {
1283           Lock lock = locker.acquireLock(regionName);
1284           try {
1285             RegionState rs = regionStates.getRegionTransitionState(regionName);
1286             if (rs == null) {
1287               rs = regionStates.getRegionState(regionName);
1288               if (rs == null || !rs.isMergingNew()) {
1289                 // MergingNew is an offline state
1290                 return;
1291               }
1292             }
1293 
1294             HRegionInfo regionInfo = rs.getRegion();
1295             String regionNameStr = regionInfo.getRegionNameAsString();
1296             LOG.debug("Znode " + regionNameStr + " deleted, state: " + rs);
1297 
1298             boolean disabled = getTableStateManager().isTableState(regionInfo.getTable(),
1299                 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING);
1300 
1301             ServerName serverName = rs.getServerName();
1302             if (serverManager.isServerOnline(serverName)) {
1303               if (rs.isOnServer(serverName)
1304                   && (rs.isOpened() || rs.isSplitting())) {
1305                 regionOnline(regionInfo, serverName);
1306                 if (disabled) {
1307                   // if server is offline, no hurt to unassign again
1308                   LOG.info("Opened " + regionNameStr
1309                     + "but this table is disabled, triggering close of region");
1310                   unassign(regionInfo);
1311                 }
1312               } else if (rs.isMergingNew()) {
1313                 synchronized (regionStates) {
1314                   String p = regionInfo.getEncodedName();
1315                   PairOfSameType<HRegionInfo> regions = mergingRegions.get(p);
1316                   if (regions != null) {
1317                     onlineMergingRegion(disabled, regions.getFirst(), serverName);
1318                     onlineMergingRegion(disabled, regions.getSecond(), serverName);
1319                   }
1320                 }
1321               }
1322             }
1323           } finally {
1324             lock.unlock();
1325           }
1326         }
1327 
1328         private void onlineMergingRegion(boolean disabled,
1329             final HRegionInfo hri, final ServerName serverName) {
1330           RegionState regionState = regionStates.getRegionState(hri);
1331           if (regionState != null && regionState.isMerging()
1332               && regionState.isOnServer(serverName)) {
1333             regionOnline(regionState.getRegion(), serverName);
1334             if (disabled) {
1335               unassign(hri);
1336             }
1337           }
1338         }
1339       });
1340     }
1341   }
1342 
1343   /**
1344    * New unassigned node has been created.
1345    *
1346    * <p>This happens when an RS begins the OPENING, SPLITTING or CLOSING of a
1347    * region by creating a znode.
1348    *
1349    * <p>When this happens we must:
1350    * <ol>
1351    *   <li>Watch the node for further children changed events</li>
1352    *   <li>Watch all new children for changed events</li>
1353    * </ol>
1354    */
1355   @Override
1356   public void nodeChildrenChanged(String path) {
1357     if (path.equals(watcher.assignmentZNode)) {
1358       zkEventWorkers.submit(new Runnable() {
1359         @Override
1360         public void run() {
1361           try {
1362             // Just make sure we see the changes for the new znodes
1363             List<String> children =
1364               ZKUtil.listChildrenAndWatchForNewChildren(
1365                 watcher, watcher.assignmentZNode);
1366             if (children != null) {
1367               Stat stat = new Stat();
1368               for (String child : children) {
1369                 // if region is in transition, we already have a watch
1370                 // on it, so no need to watch it again. So, as I know for now,
1371                 // this is needed to watch splitting nodes only.
1372                 if (!regionStates.isRegionInTransition(child)) {
1373                   ZKAssign.getDataAndWatch(watcher, child, stat);
1374                 }
1375               }
1376             }
1377           } catch (KeeperException e) {
1378             server.abort("Unexpected ZK exception reading unassigned children", e);
1379           }
1380         }
1381       });
1382     }
1383   }
1384 
1385   
1386   /**
1387    * Marks the region as online.  Removes it from regions in transition and
1388    * updates the in-memory assignment information.
1389    * <p>
1390    * Used when a region has been successfully opened on a region server.
1391    * @param regionInfo
1392    * @param sn
1393    */
1394   void regionOnline(HRegionInfo regionInfo, ServerName sn) {
1395     regionOnline(regionInfo, sn, HConstants.NO_SEQNUM);
1396   }
1397 
1398   void regionOnline(HRegionInfo regionInfo, ServerName sn, long openSeqNum) {
1399     numRegionsOpened.incrementAndGet();
1400     regionStates.regionOnline(regionInfo, sn, openSeqNum);
1401 
1402     // Remove plan if one.
1403     clearRegionPlan(regionInfo);
1404     balancer.regionOnline(regionInfo, sn);
1405 
1406     // Tell our listeners that a region was opened
1407     sendRegionOpenedNotification(regionInfo, sn);
1408   }
1409 
1410   /**
1411    * Pass the assignment event to a worker for processing.
1412    * Each worker is a single thread executor service.  The reason
1413    * for just one thread is to make sure all events for a given
1414    * region are processed in order.
1415    *
1416    * @param path
1417    */
1418   private void handleAssignmentEvent(final String path) {
1419     if (path.startsWith(watcher.assignmentZNode)) {
1420       final String regionName = ZKAssign.getRegionName(watcher, path);
1421 
1422       zkEventWorkersSubmit(new RegionRunnable() {
1423         @Override
1424         public String getRegionName() {
1425           return regionName;
1426         }
1427 
1428         @Override
1429         public void run() {
1430           try {
1431             Stat stat = new Stat();
1432             byte [] data = ZKAssign.getDataAndWatch(watcher, path, stat);
1433             if (data == null) return;
1434 
1435             RegionTransition rt = RegionTransition.parseFrom(data);
1436 
1437             // TODO: This code is tied to ZK anyway, so for now leaving it as is,
1438             // will refactor when whole region assignment will be abstracted from ZK
1439             BaseCoordinatedStateManager csm =
1440               (BaseCoordinatedStateManager) server.getCoordinatedStateManager();
1441             OpenRegionCoordination openRegionCoordination = csm.getOpenRegionCoordination();
1442 
1443             ZkOpenRegionCoordination.ZkOpenRegionDetails zkOrd =
1444               new ZkOpenRegionCoordination.ZkOpenRegionDetails();
1445             zkOrd.setVersion(stat.getVersion());
1446             zkOrd.setServerName(csm.getServer().getServerName());
1447 
1448             handleRegion(rt, openRegionCoordination, zkOrd);
1449           } catch (KeeperException e) {
1450             server.abort("Unexpected ZK exception reading unassigned node data", e);
1451           } catch (DeserializationException e) {
1452             server.abort("Unexpected exception deserializing node data", e);
1453           }
1454         }
1455       });
1456     }
1457   }
1458 
1459   /**
1460    * Marks the region as offline.  Removes it from regions in transition and
1461    * removes in-memory assignment information.
1462    * <p>
1463    * Used when a region has been closed and should remain closed.
1464    * @param regionInfo
1465    */
1466   public void regionOffline(final HRegionInfo regionInfo) {
1467     regionOffline(regionInfo, null);
1468   }
1469 
1470   public void offlineDisabledRegion(HRegionInfo regionInfo) {
1471     if (useZKForAssignment) {
1472       // Disabling so should not be reassigned, just delete the CLOSED node
1473       LOG.debug("Table being disabled so deleting ZK node and removing from " +
1474         "regions in transition, skipping assignment of region " +
1475           regionInfo.getRegionNameAsString());
1476       String encodedName = regionInfo.getEncodedName();
1477       deleteNodeInStates(encodedName, "closed", null,
1478         EventType.RS_ZK_REGION_CLOSED, EventType.M_ZK_REGION_OFFLINE);
1479     }
1480     regionOffline(regionInfo);
1481   }
1482 
1483   // Assignment methods
1484 
1485   /**
1486    * Assigns the specified region.
1487    * <p>
1488    * If a RegionPlan is available with a valid destination then it will be used
1489    * to determine what server region is assigned to.  If no RegionPlan is
1490    * available, region will be assigned to a random available server.
1491    * <p>
1492    * Updates the RegionState and sends the OPEN RPC.
1493    * <p>
1494    * This will only succeed if the region is in transition and in a CLOSED or
1495    * OFFLINE state or not in transition (in-memory not zk), and of course, the
1496    * chosen server is up and running (It may have just crashed!).  If the
1497    * in-memory checks pass, the zk node is forced to OFFLINE before assigning.
1498    *
1499    * @param region server to be assigned
1500    * @param setOfflineInZK whether ZK node should be created/transitioned to an
1501    *                       OFFLINE state before assigning the region
1502    */
1503   public void assign(HRegionInfo region, boolean setOfflineInZK) {
1504     assign(region, setOfflineInZK, false);
1505   }
1506 
1507   /**
1508    * Use care with forceNewPlan. It could cause double assignment.
1509    */
1510   public void assign(HRegionInfo region,
1511       boolean setOfflineInZK, boolean forceNewPlan) {
1512     if (isDisabledorDisablingRegionInRIT(region)) {
1513       return;
1514     }
1515     if (this.serverManager.isClusterShutdown()) {
1516       LOG.info("Cluster shutdown is set; skipping assign of " +
1517         region.getRegionNameAsString());
1518       return;
1519     }
1520     String encodedName = region.getEncodedName();
1521     Lock lock = locker.acquireLock(encodedName);
1522     try {
1523       RegionState state = forceRegionStateToOffline(region, forceNewPlan);
1524       if (state != null) {
1525         if (regionStates.wasRegionOnDeadServer(encodedName)) {
1526           LOG.info("Skip assigning " + region.getRegionNameAsString()
1527             + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
1528             + " is dead but not processed yet");
1529           return;
1530         }
1531         assign(state, setOfflineInZK && useZKForAssignment, forceNewPlan);
1532       }
1533     } finally {
1534       lock.unlock();
1535     }
1536   }
1537 
1538   /**
1539    * Bulk assign regions to <code>destination</code>.
1540    * @param destination
1541    * @param regions Regions to assign.
1542    * @return true if successful
1543    */
1544   boolean assign(final ServerName destination, final List<HRegionInfo> regions)
1545     throws InterruptedException {
1546     long startTime = EnvironmentEdgeManager.currentTimeMillis();
1547     try {
1548       int regionCount = regions.size();
1549       if (regionCount == 0) {
1550         return true;
1551       }
1552       LOG.info("Assigning " + regionCount + " region(s) to " + destination.toString());
1553       Set<String> encodedNames = new HashSet<String>(regionCount);
1554       for (HRegionInfo region : regions) {
1555         encodedNames.add(region.getEncodedName());
1556       }
1557 
1558       List<HRegionInfo> failedToOpenRegions = new ArrayList<HRegionInfo>();
1559       Map<String, Lock> locks = locker.acquireLocks(encodedNames);
1560       try {
1561         AtomicInteger counter = new AtomicInteger(0);
1562         Map<String, Integer> offlineNodesVersions = new ConcurrentHashMap<String, Integer>();
1563         OfflineCallback cb = new OfflineCallback(
1564           watcher, destination, counter, offlineNodesVersions);
1565         Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>(regions.size());
1566         List<RegionState> states = new ArrayList<RegionState>(regions.size());
1567         for (HRegionInfo region : regions) {
1568           String encodedName = region.getEncodedName();
1569           if (!isDisabledorDisablingRegionInRIT(region)) {
1570             RegionState state = forceRegionStateToOffline(region, false);
1571             boolean onDeadServer = false;
1572             if (state != null) {
1573               if (regionStates.wasRegionOnDeadServer(encodedName)) {
1574                 LOG.info("Skip assigning " + region.getRegionNameAsString()
1575                   + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
1576                   + " is dead but not processed yet");
1577                 onDeadServer = true;
1578               } else if (!useZKForAssignment
1579                   || asyncSetOfflineInZooKeeper(state, cb, destination)) {
1580                 RegionPlan plan = new RegionPlan(region, state.getServerName(), destination);
1581                 plans.put(encodedName, plan);
1582                 states.add(state);
1583                 continue;
1584               }
1585             }
1586             // Reassign if the region wasn't on a dead server
1587             if (!onDeadServer) {
1588               LOG.info("failed to force region state to offline or "
1589                 + "failed to set it offline in ZK, will reassign later: " + region);
1590               failedToOpenRegions.add(region); // assign individually later
1591             }
1592           }
1593           // Release the lock, this region is excluded from bulk assign because
1594           // we can't update its state, or set its znode to offline.
1595           Lock lock = locks.remove(encodedName);
1596           lock.unlock();
1597         }
1598 
1599         if (useZKForAssignment) {
1600           // Wait until all unassigned nodes have been put up and watchers set.
1601           int total = states.size();
1602           for (int oldCounter = 0; !server.isStopped();) {
1603             int count = counter.get();
1604             if (oldCounter != count) {
1605               LOG.debug(destination.toString() + " unassigned znodes=" + count +
1606                 " of total=" + total + "; oldCounter=" + oldCounter);
1607               oldCounter = count;
1608             }
1609             if (count >= total) break;
1610             Thread.sleep(5);
1611           }
1612         }
1613 
1614         if (server.isStopped()) {
1615           return false;
1616         }
1617 
1618         // Add region plans, so we can updateTimers when one region is opened so
1619         // that unnecessary timeout on RIT is reduced.
1620         this.addPlans(plans);
1621 
1622         List<Triple<HRegionInfo, Integer, List<ServerName>>> regionOpenInfos =
1623           new ArrayList<Triple<HRegionInfo, Integer, List<ServerName>>>(states.size());
1624         for (RegionState state: states) {
1625           HRegionInfo region = state.getRegion();
1626           String encodedRegionName = region.getEncodedName();
1627           Integer nodeVersion = offlineNodesVersions.get(encodedRegionName);
1628           if (useZKForAssignment && (nodeVersion == null || nodeVersion == -1)) {
1629             LOG.warn("failed to offline in zookeeper: " + region);
1630             failedToOpenRegions.add(region); // assign individually later
1631             Lock lock = locks.remove(encodedRegionName);
1632             lock.unlock();
1633           } else {
1634             regionStates.updateRegionState(
1635               region, State.PENDING_OPEN, destination);
1636             List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
1637             if (this.shouldAssignRegionsWithFavoredNodes) {
1638               favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
1639             }
1640             regionOpenInfos.add(new Triple<HRegionInfo, Integer,  List<ServerName>>(
1641               region, nodeVersion, favoredNodes));
1642           }
1643         }
1644 
1645         // Move on to open regions.
1646         try {
1647           // Send OPEN RPC. If it fails on a IOE or RemoteException,
1648           // regions will be assigned individually.
1649           long maxWaitTime = System.currentTimeMillis() +
1650             this.server.getConfiguration().
1651               getLong("hbase.regionserver.rpc.startup.waittime", 60000);
1652           for (int i = 1; i <= maximumAttempts && !server.isStopped(); i++) {
1653             try {
1654               List<RegionOpeningState> regionOpeningStateList = serverManager
1655                 .sendRegionOpen(destination, regionOpenInfos);
1656               if (regionOpeningStateList == null) {
1657                 // Failed getting RPC connection to this server
1658                 return false;
1659               }
1660               for (int k = 0, n = regionOpeningStateList.size(); k < n; k++) {
1661                 RegionOpeningState openingState = regionOpeningStateList.get(k);
1662                 if (openingState != RegionOpeningState.OPENED) {
1663                   HRegionInfo region = regionOpenInfos.get(k).getFirst();
1664                   if (openingState == RegionOpeningState.ALREADY_OPENED) {
1665                     processAlreadyOpenedRegion(region, destination);
1666                   } else if (openingState == RegionOpeningState.FAILED_OPENING) {
1667                     // Failed opening this region, reassign it later
1668                     failedToOpenRegions.add(region);
1669                   } else {
1670                     LOG.warn("THIS SHOULD NOT HAPPEN: unknown opening state "
1671                       + openingState + " in assigning region " + region);
1672                   }
1673                 }
1674               }
1675               break;
1676             } catch (IOException e) {
1677               if (e instanceof RemoteException) {
1678                 e = ((RemoteException)e).unwrapRemoteException();
1679               }
1680               if (e instanceof RegionServerStoppedException) {
1681                 LOG.warn("The region server was shut down, ", e);
1682                 // No need to retry, the region server is a goner.
1683                 return false;
1684               } else if (e instanceof ServerNotRunningYetException) {
1685                 long now = System.currentTimeMillis();
1686                 if (now < maxWaitTime) {
1687                   LOG.debug("Server is not yet up; waiting up to " +
1688                     (maxWaitTime - now) + "ms", e);
1689                   Thread.sleep(100);
1690                   i--; // reset the try count
1691                   continue;
1692                 }
1693               } else if (e instanceof java.net.SocketTimeoutException
1694                   && this.serverManager.isServerOnline(destination)) {
1695                 // In case socket is timed out and the region server is still online,
1696                 // the openRegion RPC could have been accepted by the server and
1697                 // just the response didn't go through.  So we will retry to
1698                 // open the region on the same server.
1699                 if (LOG.isDebugEnabled()) {
1700                   LOG.debug("Bulk assigner openRegion() to " + destination
1701                     + " has timed out, but the regions might"
1702                     + " already be opened on it.", e);
1703                 }
1704                 // wait and reset the re-try count, server might be just busy.
1705                 Thread.sleep(100);
1706                 i--;
1707                 continue;
1708               }
1709               throw e;
1710             }
1711           }
1712         } catch (IOException e) {
1713           // Can be a socket timeout, EOF, NoRouteToHost, etc
1714           LOG.info("Unable to communicate with " + destination
1715             + " in order to assign regions, ", e);
1716           return false;
1717         }
1718       } finally {
1719         for (Lock lock : locks.values()) {
1720           lock.unlock();
1721         }
1722       }
1723 
1724       if (!failedToOpenRegions.isEmpty()) {
1725         for (HRegionInfo region : failedToOpenRegions) {
1726           if (!regionStates.isRegionOnline(region)) {
1727             invokeAssign(region);
1728           }
1729         }
1730       }
1731       LOG.debug("Bulk assigning done for " + destination);
1732       return true;
1733     } finally {
1734       metricsAssignmentManager.updateBulkAssignTime(EnvironmentEdgeManager.currentTimeMillis() - startTime);
1735     }
1736   }
1737 
1738   /**
1739    * Send CLOSE RPC if the server is online, otherwise, offline the region.
1740    *
1741    * The RPC will be sent only to the region sever found in the region state
1742    * if it is passed in, otherwise, to the src server specified. If region
1743    * state is not specified, we don't update region state at all, instead
1744    * we just send the RPC call. This is useful for some cleanup without
1745    * messing around the region states (see handleRegion, on region opened
1746    * on an unexpected server scenario, for an example)
1747    */
1748   private void unassign(final HRegionInfo region,
1749       final RegionState state, final int versionOfClosingNode,
1750       final ServerName dest, final boolean transitionInZK,
1751       final ServerName src) {
1752     ServerName server = src;
1753     if (state != null) {
1754       server = state.getServerName();
1755     }
1756     long maxWaitTime = -1;
1757     for (int i = 1; i <= this.maximumAttempts; i++) {
1758       if (this.server.isStopped() || this.server.isAborted()) {
1759         LOG.debug("Server stopped/aborted; skipping unassign of " + region);
1760         return;
1761       }
1762       // ClosedRegionhandler can remove the server from this.regions
1763       if (!serverManager.isServerOnline(server)) {
1764         LOG.debug("Offline " + region.getRegionNameAsString()
1765           + ", no need to unassign since it's on a dead server: " + server);
1766         if (transitionInZK) {
1767           // delete the node. if no node exists need not bother.
1768           deleteClosingOrClosedNode(region, server);
1769         }
1770         if (state != null) {
1771           regionOffline(region);
1772         }
1773         return;
1774       }
1775       try {
1776         // Send CLOSE RPC
1777         if (serverManager.sendRegionClose(server, region,
1778           versionOfClosingNode, dest, transitionInZK)) {
1779           LOG.debug("Sent CLOSE to " + server + " for region " +
1780             region.getRegionNameAsString());
1781           if (useZKForAssignment && !transitionInZK && state != null) {
1782             // Retry to make sure the region is
1783             // closed so as to avoid double assignment.
1784             unassign(region, state, versionOfClosingNode,
1785               dest, transitionInZK, src);
1786           }
1787           return;
1788         }
1789         // This never happens. Currently regionserver close always return true.
1790         // Todo; this can now happen (0.96) if there is an exception in a coprocessor
1791         LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
1792           region.getRegionNameAsString());
1793       } catch (Throwable t) {
1794         if (t instanceof RemoteException) {
1795           t = ((RemoteException)t).unwrapRemoteException();
1796         }
1797         boolean logRetries = true;
1798         if (t instanceof NotServingRegionException
1799             || t instanceof RegionServerStoppedException
1800             || t instanceof ServerNotRunningYetException) {
1801           LOG.debug("Offline " + region.getRegionNameAsString()
1802             + ", it's not any more on " + server, t);
1803           if (transitionInZK) {
1804             deleteClosingOrClosedNode(region, server);
1805           }
1806           if (state != null) {
1807             regionOffline(region);
1808           }
1809           return;
1810         } else if ((t instanceof FailedServerException) || (state != null &&
1811             t instanceof RegionAlreadyInTransitionException)) {
1812           long sleepTime = 0;
1813           Configuration conf = this.server.getConfiguration();
1814           if(t instanceof FailedServerException) {
1815             sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1816                   RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
1817           } else {
1818             // RS is already processing this region, only need to update the timestamp
1819             LOG.debug("update " + state + " the timestamp.");
1820             state.updateTimestampToNow();
1821             if (maxWaitTime < 0) {
1822               maxWaitTime =
1823                   EnvironmentEdgeManager.currentTimeMillis()
1824                       + conf.getLong(ALREADY_IN_TRANSITION_WAITTIME,
1825                         DEFAULT_ALREADY_IN_TRANSITION_WAITTIME);
1826             }
1827             long now = EnvironmentEdgeManager.currentTimeMillis();
1828             if (now < maxWaitTime) {
1829               LOG.debug("Region is already in transition; "
1830                 + "waiting up to " + (maxWaitTime - now) + "ms", t);
1831               sleepTime = 100;
1832               i--; // reset the try count
1833               logRetries = false;
1834             }
1835           }
1836           try {
1837             if (sleepTime > 0) {
1838               Thread.sleep(sleepTime);
1839             }
1840           } catch (InterruptedException ie) {
1841             LOG.warn("Failed to unassign "
1842               + region.getRegionNameAsString() + " since interrupted", ie);
1843             Thread.currentThread().interrupt();
1844             if (state != null) {
1845               regionStates.updateRegionState(region, State.FAILED_CLOSE);
1846             }
1847             return;
1848           }
1849         }
1850 
1851         if (logRetries) {
1852           LOG.info("Server " + server + " returned " + t + " for "
1853             + region.getRegionNameAsString() + ", try=" + i
1854             + " of " + this.maximumAttempts, t);
1855           // Presume retry or server will expire.
1856         }
1857       }
1858     }
1859     // Run out of attempts
1860     if (state != null) {
1861       regionStates.updateRegionState(region, State.FAILED_CLOSE);
1862     }
1863   }
1864 
1865   /**
1866    * Set region to OFFLINE unless it is opening and forceNewPlan is false.
1867    */
1868   private RegionState forceRegionStateToOffline(
1869       final HRegionInfo region, final boolean forceNewPlan) {
1870     RegionState state = regionStates.getRegionState(region);
1871     if (state == null) {
1872       LOG.warn("Assigning a region not in region states: " + region);
1873       state = regionStates.createRegionState(region);
1874     }
1875 
1876     ServerName sn = state.getServerName();
1877     if (forceNewPlan && LOG.isDebugEnabled()) {
1878       LOG.debug("Force region state offline " + state);
1879     }
1880 
1881     switch (state.getState()) {
1882     case OPEN:
1883     case OPENING:
1884     case PENDING_OPEN:
1885     case CLOSING:
1886     case PENDING_CLOSE:
1887       if (!forceNewPlan) {
1888         LOG.debug("Skip assigning " +
1889           region + ", it is already " + state);
1890         return null;
1891       }
1892     case FAILED_CLOSE:
1893     case FAILED_OPEN:
1894       unassign(region, state, -1, null, false, null);
1895       state = regionStates.getRegionState(region);
1896       if (state.isFailedClose()) {
1897         // If we can't close the region, we can't re-assign
1898         // it so as to avoid possible double assignment/data loss.
1899         LOG.info("Skip assigning " +
1900           region + ", we couldn't close it: " + state);
1901         return null;
1902       }
1903     case OFFLINE:
1904       // This region could have been open on this server
1905       // for a while. If the server is dead and not processed
1906       // yet, we can move on only if the meta shows the
1907       // region is not on this server actually, or on a server
1908       // not dead, or dead and processed already.
1909       // In case not using ZK, we don't need this check because
1910       // we have the latest info in memory, and the caller
1911       // will do another round checking any way.
1912       if (useZKForAssignment
1913           && regionStates.isServerDeadAndNotProcessed(sn)
1914           && wasRegionOnDeadServerByMeta(region, sn)) {
1915         LOG.info("Skip assigning " + region.getRegionNameAsString()
1916           + ", it is on a dead but not processed yet server: " + sn);
1917         return null;
1918       }
1919     case CLOSED:
1920       break;
1921     default:
1922       LOG.error("Trying to assign region " + region
1923         + ", which is " + state);
1924       return null;
1925     }
1926     return state;
1927   }
1928 
1929   private boolean wasRegionOnDeadServerByMeta(
1930       final HRegionInfo region, final ServerName sn) {
1931     try {
1932       if (region.isMetaRegion()) {
1933         ServerName server = catalogTracker.getMetaLocation();
1934         return regionStates.isServerDeadAndNotProcessed(server);
1935       }
1936       while (!server.isStopped()) {
1937         try {
1938           catalogTracker.waitForMeta();
1939           Result r = MetaReader.getRegionResult(catalogTracker, region.getRegionName());
1940           if (r == null || r.isEmpty()) return false;
1941           ServerName server = HRegionInfo.getServerName(r);
1942           return regionStates.isServerDeadAndNotProcessed(server);
1943         } catch (IOException ioe) {
1944           LOG.info("Received exception accessing hbase:meta during force assign "
1945             + region.getRegionNameAsString() + ", retrying", ioe);
1946         }
1947       }
1948     } catch (InterruptedException e) {
1949       Thread.currentThread().interrupt();
1950       LOG.info("Interrupted accessing hbase:meta", e);
1951     }
1952     // Call is interrupted or server is stopped.
1953     return regionStates.isServerDeadAndNotProcessed(sn);
1954   }
1955 
1956   /**
1957    * Caller must hold lock on the passed <code>state</code> object.
1958    * @param state
1959    * @param setOfflineInZK
1960    * @param forceNewPlan
1961    */
1962   private void assign(RegionState state,
1963       final boolean setOfflineInZK, final boolean forceNewPlan) {
1964     long startTime = EnvironmentEdgeManager.currentTimeMillis();
1965     try {
1966       Configuration conf = server.getConfiguration();
1967       RegionState currentState = state;
1968       int versionOfOfflineNode = -1;
1969       RegionPlan plan = null;
1970       long maxWaitTime = -1;
1971       HRegionInfo region = state.getRegion();
1972       RegionOpeningState regionOpenState;
1973       Throwable previousException = null;
1974       for (int i = 1; i <= maximumAttempts; i++) {
1975         if (server.isStopped() || server.isAborted()) {
1976           LOG.info("Skip assigning " + region.getRegionNameAsString()
1977             + ", the server is stopped/aborted");
1978           return;
1979         }
1980         if (plan == null) { // Get a server for the region at first
1981           try {
1982             plan = getRegionPlan(region, forceNewPlan);
1983           } catch (HBaseIOException e) {
1984             LOG.warn("Failed to get region plan", e);
1985           }
1986         }
1987         if (plan == null) {
1988           LOG.warn("Unable to determine a plan to assign " + region);
1989           if (region.isMetaRegion()) {
1990             try {
1991               Thread.sleep(this.sleepTimeBeforeRetryingMetaAssignment);
1992               if (i == maximumAttempts) i = 1;
1993               continue;
1994             } catch (InterruptedException e) {
1995               LOG.error("Got exception while waiting for hbase:meta assignment");
1996               Thread.currentThread().interrupt();
1997             }
1998           }
1999           regionStates.updateRegionState(region, State.FAILED_OPEN);
2000           return;
2001         }
2002         if (setOfflineInZK && versionOfOfflineNode == -1) {
2003           // get the version of the znode after setting it to OFFLINE.
2004           // versionOfOfflineNode will be -1 if the znode was not set to OFFLINE
2005           versionOfOfflineNode = setOfflineInZooKeeper(currentState, plan.getDestination());
2006           if (versionOfOfflineNode != -1) {
2007             if (isDisabledorDisablingRegionInRIT(region)) {
2008               return;
2009             }
2010             // In case of assignment from EnableTableHandler table state is ENABLING. Any how
2011             // EnableTableHandler will set ENABLED after assigning all the table regions. If we
2012             // try to set to ENABLED directly then client API may think table is enabled.
2013             // When we have a case such as all the regions are added directly into hbase:meta and we call
2014             // assignRegion then we need to make the table ENABLED. Hence in such case the table
2015             // will not be in ENABLING or ENABLED state.
2016             TableName tableName = region.getTable();
2017             if (!tableStateManager.isTableState(tableName,
2018               ZooKeeperProtos.Table.State.ENABLED, ZooKeeperProtos.Table.State.ENABLING)) {
2019               LOG.debug("Setting table " + tableName + " to ENABLED state.");
2020               setEnabledTable(tableName);
2021             }
2022           }
2023         }
2024         if (setOfflineInZK && versionOfOfflineNode == -1) {
2025           LOG.info("Unable to set offline in ZooKeeper to assign " + region);
2026           // Setting offline in ZK must have been failed due to ZK racing or some
2027           // exception which may make the server to abort. If it is ZK racing,
2028           // we should retry since we already reset the region state,
2029           // existing (re)assignment will fail anyway.
2030           if (!server.isAborted()) {
2031             continue;
2032           }
2033         }
2034         LOG.info("Assigning " + region.getRegionNameAsString() +
2035             " to " + plan.getDestination().toString());
2036         // Transition RegionState to PENDING_OPEN
2037         currentState = regionStates.updateRegionState(region,
2038           State.PENDING_OPEN, plan.getDestination());
2039 
2040         boolean needNewPlan;
2041         final String assignMsg = "Failed assignment of " + region.getRegionNameAsString() +
2042             " to " + plan.getDestination();
2043         try {
2044           List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
2045           if (this.shouldAssignRegionsWithFavoredNodes) {
2046             favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
2047           }
2048           regionOpenState = serverManager.sendRegionOpen(
2049               plan.getDestination(), region, versionOfOfflineNode, favoredNodes);
2050 
2051           if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
2052             // Failed opening this region, looping again on a new server.
2053             needNewPlan = true;
2054             LOG.warn(assignMsg + ", regionserver says 'FAILED_OPENING', " +
2055                 " trying to assign elsewhere instead; " +
2056                 "try=" + i + " of " + this.maximumAttempts);
2057           } else {
2058             // we're done
2059             if (regionOpenState == RegionOpeningState.ALREADY_OPENED) {
2060               processAlreadyOpenedRegion(region, plan.getDestination());
2061             }
2062             return;
2063           }
2064 
2065         } catch (Throwable t) {
2066           if (t instanceof RemoteException) {
2067             t = ((RemoteException) t).unwrapRemoteException();
2068           }
2069           previousException = t;
2070 
2071           // Should we wait a little before retrying? If the server is starting it's yes.
2072           // If the region is already in transition, it's yes as well: we want to be sure that
2073           //  the region will get opened but we don't want a double assignment.
2074           boolean hold = (t instanceof RegionAlreadyInTransitionException ||
2075               t instanceof ServerNotRunningYetException);
2076 
2077           // In case socket is timed out and the region server is still online,
2078           // the openRegion RPC could have been accepted by the server and
2079           // just the response didn't go through.  So we will retry to
2080           // open the region on the same server to avoid possible
2081           // double assignment.
2082           boolean retry = !hold && (t instanceof java.net.SocketTimeoutException
2083               && this.serverManager.isServerOnline(plan.getDestination()));
2084 
2085 
2086           if (hold) {
2087             LOG.warn(assignMsg + ", waiting a little before trying on the same region server " +
2088               "try=" + i + " of " + this.maximumAttempts, t);
2089 
2090             if (maxWaitTime < 0) {
2091               if (t instanceof RegionAlreadyInTransitionException) {
2092                 maxWaitTime = EnvironmentEdgeManager.currentTimeMillis()
2093                   + this.server.getConfiguration().getLong(ALREADY_IN_TRANSITION_WAITTIME,
2094                     DEFAULT_ALREADY_IN_TRANSITION_WAITTIME);
2095               } else {
2096                 maxWaitTime = this.server.getConfiguration().
2097                   getLong("hbase.regionserver.rpc.startup.waittime", 60000);
2098               }
2099             }
2100             try {
2101               needNewPlan = false;
2102               long now = EnvironmentEdgeManager.currentTimeMillis();
2103               if (now < maxWaitTime) {
2104                 LOG.debug("Server is not yet up or region is already in transition; "
2105                   + "waiting up to " + (maxWaitTime - now) + "ms", t);
2106                 Thread.sleep(100);
2107                 i--; // reset the try count
2108               } else if (!(t instanceof RegionAlreadyInTransitionException)) {
2109                 LOG.debug("Server is not up for a while; try a new one", t);
2110                 needNewPlan = true;
2111               }
2112             } catch (InterruptedException ie) {
2113               LOG.warn("Failed to assign "
2114                   + region.getRegionNameAsString() + " since interrupted", ie);
2115               regionStates.updateRegionState(region, State.FAILED_OPEN);
2116               Thread.currentThread().interrupt();
2117               return;
2118             }
2119           } else if (retry) {
2120             needNewPlan = false;
2121             i--; // we want to retry as many times as needed as long as the RS is not dead.
2122             LOG.warn(assignMsg + ", trying to assign to the same region server due ", t);
2123           } else {
2124             needNewPlan = true;
2125             LOG.warn(assignMsg + ", trying to assign elsewhere instead;" +
2126                 " try=" + i + " of " + this.maximumAttempts, t);
2127           }
2128         }
2129 
2130         if (i == this.maximumAttempts) {
2131           // Don't reset the region state or get a new plan any more.
2132           // This is the last try.
2133           continue;
2134         }
2135 
2136         // If region opened on destination of present plan, reassigning to new
2137         // RS may cause double assignments. In case of RegionAlreadyInTransitionException
2138         // reassigning to same RS.
2139         if (needNewPlan) {
2140           // Force a new plan and reassign. Will return null if no servers.
2141           // The new plan could be the same as the existing plan since we don't
2142           // exclude the server of the original plan, which should not be
2143           // excluded since it could be the only server up now.
2144           RegionPlan newPlan = null;
2145           try {
2146             newPlan = getRegionPlan(region, true);
2147           } catch (HBaseIOException e) {
2148             LOG.warn("Failed to get region plan", e);
2149           }
2150           if (newPlan == null) {
2151             regionStates.updateRegionState(region, State.FAILED_OPEN);
2152             LOG.warn("Unable to find a viable location to assign region " +
2153                 region.getRegionNameAsString());
2154             return;
2155           }
2156 
2157           if (plan != newPlan && !plan.getDestination().equals(newPlan.getDestination())) {
2158             // Clean out plan we failed execute and one that doesn't look like it'll
2159             // succeed anyways; we need a new plan!
2160             // Transition back to OFFLINE
2161             currentState = regionStates.updateRegionState(region, State.OFFLINE);
2162             versionOfOfflineNode = -1;
2163             plan = newPlan;
2164           } else if(plan.getDestination().equals(newPlan.getDestination()) &&
2165               previousException instanceof FailedServerException) {
2166             try {
2167               LOG.info("Trying to re-assign " + region.getRegionNameAsString() +
2168                 " to the same failed server.");
2169               Thread.sleep(1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
2170                 RpcClient.FAILED_SERVER_EXPIRY_DEFAULT));
2171             } catch (InterruptedException ie) {
2172               LOG.warn("Failed to assign "
2173                   + region.getRegionNameAsString() + " since interrupted", ie);
2174               regionStates.updateRegionState(region, State.FAILED_OPEN);
2175               Thread.currentThread().interrupt();
2176               return;
2177             }
2178           }
2179         }
2180       }
2181       // Run out of attempts
2182       regionStates.updateRegionState(region, State.FAILED_OPEN);
2183     } finally {
2184       metricsAssignmentManager.updateAssignmentTime(EnvironmentEdgeManager.currentTimeMillis() - startTime);
2185     }
2186   }
2187 
2188   private void processAlreadyOpenedRegion(HRegionInfo region, ServerName sn) {
2189     // Remove region from in-memory transition and unassigned node from ZK
2190     // While trying to enable the table the regions of the table were
2191     // already enabled.
2192     LOG.debug("ALREADY_OPENED " + region.getRegionNameAsString()
2193       + " to " + sn);
2194     String encodedName = region.getEncodedName();
2195     deleteNodeInStates(encodedName, "offline", sn, EventType.M_ZK_REGION_OFFLINE);
2196     regionStates.regionOnline(region, sn);
2197   }
2198 
2199   private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) {
2200     if (this.tableStateManager.isTableState(region.getTable(),
2201         ZooKeeperProtos.Table.State.DISABLED,
2202         ZooKeeperProtos.Table.State.DISABLING)) {
2203       LOG.info("Table " + region.getTable() + " is disabled or disabling;"
2204         + " skipping assign of " + region.getRegionNameAsString());
2205       offlineDisabledRegion(region);
2206       return true;
2207     }
2208     return false;
2209   }
2210 
2211   /**
2212    * Set region as OFFLINED up in zookeeper
2213    *
2214    * @param state
2215    * @return the version of the offline node if setting of the OFFLINE node was
2216    *         successful, -1 otherwise.
2217    */
2218   private int setOfflineInZooKeeper(final RegionState state, final ServerName destination) {
2219     if (!state.isClosed() && !state.isOffline()) {
2220       String msg = "Unexpected state : " + state + " .. Cannot transit it to OFFLINE.";
2221       this.server.abort(msg, new IllegalStateException(msg));
2222       return -1;
2223     }
2224     regionStates.updateRegionState(state.getRegion(), State.OFFLINE);
2225     int versionOfOfflineNode;
2226     try {
2227       // get the version after setting the znode to OFFLINE
2228       versionOfOfflineNode = ZKAssign.createOrForceNodeOffline(watcher,
2229         state.getRegion(), destination);
2230       if (versionOfOfflineNode == -1) {
2231         LOG.warn("Attempted to create/force node into OFFLINE state before "
2232             + "completing assignment but failed to do so for " + state);
2233         return -1;
2234       }
2235     } catch (KeeperException e) {
2236       server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
2237       return -1;
2238     }
2239     return versionOfOfflineNode;
2240   }
2241 
2242   /**
2243    * @param region the region to assign
2244    * @return Plan for passed <code>region</code> (If none currently, it creates one or
2245    * if no servers to assign, it returns null).
2246    */
2247   private RegionPlan getRegionPlan(final HRegionInfo region,
2248       final boolean forceNewPlan)  throws HBaseIOException {
2249     return getRegionPlan(region, null, forceNewPlan);
2250   }
2251 
2252   /**
2253    * @param region the region to assign
2254    * @param serverToExclude Server to exclude (we know its bad). Pass null if
2255    * all servers are thought to be assignable.
2256    * @param forceNewPlan If true, then if an existing plan exists, a new plan
2257    * will be generated.
2258    * @return Plan for passed <code>region</code> (If none currently, it creates one or
2259    * if no servers to assign, it returns null).
2260    */
2261   private RegionPlan getRegionPlan(final HRegionInfo region,
2262       final ServerName serverToExclude, final boolean forceNewPlan) throws HBaseIOException {
2263     // Pickup existing plan or make a new one
2264     final String encodedName = region.getEncodedName();
2265     final List<ServerName> destServers =
2266       serverManager.createDestinationServersList(serverToExclude);
2267 
2268     if (destServers.isEmpty()){
2269       LOG.warn("Can't move " + encodedName +
2270         ", there is no destination server available.");
2271       return null;
2272     }
2273 
2274     RegionPlan randomPlan = null;
2275     boolean newPlan = false;
2276     RegionPlan existingPlan;
2277 
2278     synchronized (this.regionPlans) {
2279       existingPlan = this.regionPlans.get(encodedName);
2280 
2281       if (existingPlan != null && existingPlan.getDestination() != null) {
2282         LOG.debug("Found an existing plan for " + region.getRegionNameAsString()
2283           + " destination server is " + existingPlan.getDestination() +
2284             " accepted as a dest server = " + destServers.contains(existingPlan.getDestination()));
2285       }
2286 
2287       if (forceNewPlan
2288           || existingPlan == null
2289           || existingPlan.getDestination() == null
2290           || !destServers.contains(existingPlan.getDestination())) {
2291         newPlan = true;
2292         randomPlan = new RegionPlan(region, null,
2293             balancer.randomAssignment(region, destServers));
2294         if (!region.isMetaTable() && shouldAssignRegionsWithFavoredNodes) {
2295           List<HRegionInfo> regions = new ArrayList<HRegionInfo>(1);
2296           regions.add(region);
2297           try {
2298             processFavoredNodes(regions);
2299           } catch (IOException ie) {
2300             LOG.warn("Ignoring exception in processFavoredNodes " + ie);
2301           }
2302         }
2303         this.regionPlans.put(encodedName, randomPlan);
2304       }
2305     }
2306 
2307     if (newPlan) {
2308       if (randomPlan.getDestination() == null) {
2309         LOG.warn("Can't find a destination for " + encodedName);
2310         return null;
2311       }
2312       LOG.debug("No previous transition plan found (or ignoring " +
2313         "an existing plan) for " + region.getRegionNameAsString() +
2314         "; generated random plan=" + randomPlan + "; " + destServers.size() +
2315         " (online=" + serverManager.getOnlineServers().size() +
2316         ") available servers, forceNewPlan=" + forceNewPlan);
2317         return randomPlan;
2318       }
2319     LOG.debug("Using pre-existing plan for " +
2320       region.getRegionNameAsString() + "; plan=" + existingPlan);
2321     return existingPlan;
2322   }
2323 
2324   /**
2325    * Unassigns the specified region.
2326    * <p>
2327    * Updates the RegionState and sends the CLOSE RPC unless region is being
2328    * split by regionserver; then the unassign fails (silently) because we
2329    * presume the region being unassigned no longer exists (its been split out
2330    * of existence). TODO: What to do if split fails and is rolled back and
2331    * parent is revivified?
2332    * <p>
2333    * If a RegionPlan is already set, it will remain.
2334    *
2335    * @param region server to be unassigned
2336    */
2337   public void unassign(HRegionInfo region) {
2338     unassign(region, false);
2339   }
2340 
2341 
2342   /**
2343    * Unassigns the specified region.
2344    * <p>
2345    * Updates the RegionState and sends the CLOSE RPC unless region is being
2346    * split by regionserver; then the unassign fails (silently) because we
2347    * presume the region being unassigned no longer exists (its been split out
2348    * of existence). TODO: What to do if split fails and is rolled back and
2349    * parent is revivified?
2350    * <p>
2351    * If a RegionPlan is already set, it will remain.
2352    *
2353    * @param region server to be unassigned
2354    * @param force if region should be closed even if already closing
2355    */
2356   public void unassign(HRegionInfo region, boolean force, ServerName dest) {
2357     // TODO: Method needs refactoring.  Ugly buried returns throughout.  Beware!
2358     LOG.debug("Starting unassign of " + region.getRegionNameAsString()
2359       + " (offlining), current state: " + regionStates.getRegionState(region));
2360 
2361     String encodedName = region.getEncodedName();
2362     // Grab the state of this region and synchronize on it
2363     int versionOfClosingNode = -1;
2364     // We need a lock here as we're going to do a put later and we don't want multiple states
2365     //  creation
2366     ReentrantLock lock = locker.acquireLock(encodedName);
2367     RegionState state = regionStates.getRegionTransitionState(encodedName);
2368     boolean reassign = true;
2369     try {
2370       if (state == null) {
2371         // Region is not in transition.
2372         // We can unassign it only if it's not SPLIT/MERGED.
2373         state = regionStates.getRegionState(encodedName);
2374         if (state != null && state.isUnassignable()) {
2375           LOG.info("Attempting to unassign " + state + ", ignored");
2376           // Offline region will be reassigned below
2377           return;
2378         }
2379         // Create the znode in CLOSING state
2380         try {
2381           if (state == null || state.getServerName() == null) {
2382             // We don't know where the region is, offline it.
2383             // No need to send CLOSE RPC
2384             LOG.warn("Attempting to unassign a region not in RegionStates"
2385               + region.getRegionNameAsString() + ", offlined");
2386             regionOffline(region);
2387             return;
2388           }
2389           if (useZKForAssignment) {
2390             versionOfClosingNode = ZKAssign.createNodeClosing(
2391               watcher, region, state.getServerName());
2392             if (versionOfClosingNode == -1) {
2393               LOG.info("Attempting to unassign " +
2394                 region.getRegionNameAsString() + " but ZK closing node "
2395                 + "can't be created.");
2396               reassign = false; // not unassigned at all
2397               return;
2398             }
2399           }
2400         } catch (KeeperException e) {
2401           if (e instanceof NodeExistsException) {
2402             // Handle race between master initiated close and regionserver
2403             // orchestrated splitting. See if existing node is in a
2404             // SPLITTING or SPLIT state.  If so, the regionserver started
2405             // an op on node before we could get our CLOSING in.  Deal.
2406             NodeExistsException nee = (NodeExistsException)e;
2407             String path = nee.getPath();
2408             try {
2409               if (isSplitOrSplittingOrMergedOrMerging(path)) {
2410                 LOG.debug(path + " is SPLIT or SPLITTING or MERGED or MERGING; " +
2411                   "skipping unassign because region no longer exists -- its split or merge");
2412                 reassign = false; // no need to reassign for split/merged region
2413                 return;
2414               }
2415             } catch (KeeperException.NoNodeException ke) {
2416               LOG.warn("Failed getData on SPLITTING/SPLIT at " + path +
2417                 "; presuming split and that the region to unassign, " +
2418                 encodedName + ", no longer exists -- confirm", ke);
2419               return;
2420             } catch (KeeperException ke) {
2421               LOG.error("Unexpected zk state", ke);
2422             } catch (DeserializationException de) {
2423               LOG.error("Failed parse", de);
2424             }
2425           }
2426           // If we get here, don't understand whats going on -- abort.
2427           server.abort("Unexpected ZK exception creating node CLOSING", e);
2428           reassign = false; // heading out already
2429           return;
2430         }
2431         state = regionStates.updateRegionState(region, State.PENDING_CLOSE);
2432       } else if (state.isFailedOpen()) {
2433         // The region is not open yet
2434         regionOffline(region);
2435         return;
2436       } else if (force && state.isPendingCloseOrClosing()) {
2437         LOG.debug("Attempting to unassign " + region.getRegionNameAsString() +
2438           " which is already " + state.getState()  +
2439           " but forcing to send a CLOSE RPC again ");
2440         if (state.isFailedClose()) {
2441           state = regionStates.updateRegionState(region, State.PENDING_CLOSE);
2442         }
2443         state.updateTimestampToNow();
2444       } else {
2445         LOG.debug("Attempting to unassign " +
2446           region.getRegionNameAsString() + " but it is " +
2447           "already in transition (" + state.getState() + ", force=" + force + ")");
2448         return;
2449       }
2450 
2451       unassign(region, state, versionOfClosingNode, dest, useZKForAssignment, null);
2452     } finally {
2453       lock.unlock();
2454 
2455       // Region is expected to be reassigned afterwards
2456       if (reassign && regionStates.isRegionOffline(region)) {
2457         assign(region, true);
2458       }
2459     }
2460   }
2461 
2462   public void unassign(HRegionInfo region, boolean force){
2463      unassign(region, force, null);
2464   }
2465 
2466   /**
2467    * @param region regioninfo of znode to be deleted.
2468    */
2469   public void deleteClosingOrClosedNode(HRegionInfo region, ServerName sn) {
2470     String encodedName = region.getEncodedName();
2471     deleteNodeInStates(encodedName, "closing", sn, EventType.M_ZK_REGION_CLOSING,
2472       EventType.RS_ZK_REGION_CLOSED);
2473   }
2474 
2475   /**
2476    * @param path
2477    * @return True if znode is in SPLIT or SPLITTING or MERGED or MERGING state.
2478    * @throws KeeperException Can happen if the znode went away in meantime.
2479    * @throws DeserializationException
2480    */
2481   private boolean isSplitOrSplittingOrMergedOrMerging(final String path)
2482       throws KeeperException, DeserializationException {
2483     boolean result = false;
2484     // This may fail if the SPLIT or SPLITTING or MERGED or MERGING znode gets
2485     // cleaned up before we can get data from it.
2486     byte [] data = ZKAssign.getData(watcher, path);
2487     if (data == null) {
2488       LOG.info("Node " + path + " is gone");
2489       return false;
2490     }
2491     RegionTransition rt = RegionTransition.parseFrom(data);
2492     switch (rt.getEventType()) {
2493     case RS_ZK_REQUEST_REGION_SPLIT:
2494     case RS_ZK_REGION_SPLIT:
2495     case RS_ZK_REGION_SPLITTING:
2496     case RS_ZK_REQUEST_REGION_MERGE:
2497     case RS_ZK_REGION_MERGED:
2498     case RS_ZK_REGION_MERGING:
2499       result = true;
2500       break;
2501     default:
2502       LOG.info("Node " + path + " is in " + rt.getEventType());
2503       break;
2504     }
2505     return result;
2506   }
2507 
2508   /**
2509    * Used by unit tests. Return the number of regions opened so far in the life
2510    * of the master. Increases by one every time the master opens a region
2511    * @return the counter value of the number of regions opened so far
2512    */
2513   public int getNumRegionsOpened() {
2514     return numRegionsOpened.get();
2515   }
2516 
2517   /**
2518    * Waits until the specified region has completed assignment.
2519    * <p>
2520    * If the region is already assigned, returns immediately.  Otherwise, method
2521    * blocks until the region is assigned.
2522    * @param regionInfo region to wait on assignment for
2523    * @throws InterruptedException
2524    */
2525   public boolean waitForAssignment(HRegionInfo regionInfo)
2526       throws InterruptedException {
2527     while (!regionStates.isRegionOnline(regionInfo)) {
2528       if (regionStates.isRegionInState(regionInfo, State.FAILED_OPEN)
2529           || this.server.isStopped()) {
2530         return false;
2531       }
2532 
2533       // We should receive a notification, but it's
2534       //  better to have a timeout to recheck the condition here:
2535       //  it lowers the impact of a race condition if any
2536       regionStates.waitForUpdate(100);
2537     }
2538     return true;
2539   }
2540 
2541   /**
2542    * Assigns the hbase:meta region.
2543    * <p>
2544    * Assumes that hbase:meta is currently closed and is not being actively served by
2545    * any RegionServer.
2546    * <p>
2547    * Forcibly unsets the current meta region location in ZooKeeper and assigns
2548    * hbase:meta to a random RegionServer.
2549    * @throws KeeperException
2550    */
2551   public void assignMeta() throws KeeperException {
2552     MetaRegionTracker.deleteMetaLocation(this.watcher);
2553     assign(HRegionInfo.FIRST_META_REGIONINFO, true);
2554   }
2555 
2556   /**
2557    * Assigns specified regions retaining assignments, if any.
2558    * <p>
2559    * This is a synchronous call and will return once every region has been
2560    * assigned.  If anything fails, an exception is thrown
2561    * @throws InterruptedException
2562    * @throws IOException
2563    */
2564   public void assign(Map<HRegionInfo, ServerName> regions)
2565         throws IOException, InterruptedException {
2566     if (regions == null || regions.isEmpty()) {
2567       return;
2568     }
2569     List<ServerName> servers = serverManager.createDestinationServersList();
2570     if (servers == null || servers.isEmpty()) {
2571       throw new IOException("Found no destination server to assign region(s)");
2572     }
2573 
2574     // Reuse existing assignment info
2575     Map<ServerName, List<HRegionInfo>> bulkPlan =
2576       balancer.retainAssignment(regions, servers);
2577 
2578     assign(regions.size(), servers.size(),
2579       "retainAssignment=true", bulkPlan);
2580   }
2581 
2582   /**
2583    * Assigns specified regions round robin, if any.
2584    * <p>
2585    * This is a synchronous call and will return once every region has been
2586    * assigned.  If anything fails, an exception is thrown
2587    * @throws InterruptedException
2588    * @throws IOException
2589    */
2590   public void assign(List<HRegionInfo> regions)
2591         throws IOException, InterruptedException {
2592     if (regions == null || regions.isEmpty()) {
2593       return;
2594     }
2595 
2596     List<ServerName> servers = serverManager.createDestinationServersList();
2597     if (servers == null || servers.isEmpty()) {
2598       throw new IOException("Found no destination server to assign region(s)");
2599     }
2600 
2601     // Generate a round-robin bulk assignment plan
2602     Map<ServerName, List<HRegionInfo>> bulkPlan
2603       = balancer.roundRobinAssignment(regions, servers);
2604     processFavoredNodes(regions);
2605 
2606     assign(regions.size(), servers.size(),
2607       "round-robin=true", bulkPlan);
2608   }
2609 
2610   private void assign(int regions, int totalServers,
2611       String message, Map<ServerName, List<HRegionInfo>> bulkPlan)
2612           throws InterruptedException, IOException {
2613 
2614     int servers = bulkPlan.size();
2615     if (servers == 1 || (regions < bulkAssignThresholdRegions
2616         && servers < bulkAssignThresholdServers)) {
2617 
2618       // Not use bulk assignment.  This could be more efficient in small
2619       // cluster, especially mini cluster for testing, so that tests won't time out
2620       if (LOG.isTraceEnabled()) {
2621         LOG.trace("Not using bulk assignment since we are assigning only " + regions +
2622           " region(s) to " + servers + " server(s)");
2623       }
2624       for (Map.Entry<ServerName, List<HRegionInfo>> plan: bulkPlan.entrySet()) {
2625         if (!assign(plan.getKey(), plan.getValue())) {
2626           for (HRegionInfo region: plan.getValue()) {
2627             if (!regionStates.isRegionOnline(region)) {
2628               invokeAssign(region);
2629             }
2630           }
2631         }
2632       }
2633     } else {
2634       LOG.info("Bulk assigning " + regions + " region(s) across "
2635         + totalServers + " server(s), " + message);
2636 
2637       // Use fixed count thread pool assigning.
2638       BulkAssigner ba = new GeneralBulkAssigner(
2639         this.server, bulkPlan, this, bulkAssignWaitTillAllAssigned);
2640       ba.bulkAssign();
2641       LOG.info("Bulk assigning done");
2642     }
2643   }
2644 
2645   /**
2646    * Assigns all user regions, if any exist.  Used during cluster startup.
2647    * <p>
2648    * This is a synchronous call and will return once every region has been
2649    * assigned.  If anything fails, an exception is thrown and the cluster
2650    * should be shutdown.
2651    * @throws InterruptedException
2652    * @throws IOException
2653    */
2654   private void assignAllUserRegions(Map<HRegionInfo, ServerName> allRegions)
2655       throws IOException, InterruptedException {
2656     if (allRegions == null || allRegions.isEmpty()) return;
2657 
2658     // Determine what type of assignment to do on startup
2659     boolean retainAssignment = server.getConfiguration().
2660       getBoolean("hbase.master.startup.retainassign", true);
2661 
2662     if (retainAssignment) {
2663       assign(allRegions);
2664     } else {
2665       List<HRegionInfo> regions = new ArrayList<HRegionInfo>(allRegions.keySet());
2666       assign(regions);
2667     }
2668 
2669     for (HRegionInfo hri : allRegions.keySet()) {
2670       TableName tableName = hri.getTable();
2671       if (!tableStateManager.isTableState(tableName,
2672           ZooKeeperProtos.Table.State.ENABLED)) {
2673         setEnabledTable(tableName);
2674       }
2675     }
2676   }
2677 
2678   /**
2679    * Wait until no regions in transition.
2680    * @param timeout How long to wait.
2681    * @return True if nothing in regions in transition.
2682    * @throws InterruptedException
2683    */
2684   boolean waitUntilNoRegionsInTransition(final long timeout)
2685       throws InterruptedException {
2686     // Blocks until there are no regions in transition. It is possible that
2687     // there
2688     // are regions in transition immediately after this returns but guarantees
2689     // that if it returns without an exception that there was a period of time
2690     // with no regions in transition from the point-of-view of the in-memory
2691     // state of the Master.
2692     final long endTime = System.currentTimeMillis() + timeout;
2693 
2694     while (!this.server.isStopped() && regionStates.isRegionsInTransition()
2695         && endTime > System.currentTimeMillis()) {
2696       regionStates.waitForUpdate(100);
2697     }
2698 
2699     return !regionStates.isRegionsInTransition();
2700   }
2701 
2702   /**
2703    * Rebuild the list of user regions and assignment information.
2704    * <p>
2705    * Returns a map of servers that are not found to be online and the regions
2706    * they were hosting.
2707    * @return map of servers not online to their assigned regions, as stored
2708    *         in META
2709    * @throws IOException
2710    */
2711   Map<ServerName, List<HRegionInfo>> rebuildUserRegions() throws
2712       IOException, KeeperException, CoordinatedStateException {
2713     Set<TableName> disabledOrEnablingTables = tableStateManager.getTablesInStates(
2714       ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.ENABLING);
2715 
2716     Set<TableName> disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
2717       ZooKeeperProtos.Table.State.DISABLED,
2718       ZooKeeperProtos.Table.State.DISABLING,
2719       ZooKeeperProtos.Table.State.ENABLING);
2720 
2721     // Region assignment from META
2722     List<Result> results = MetaReader.fullScan(this.catalogTracker);
2723     // Get any new but slow to checkin region server that joined the cluster
2724     Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
2725     // Map of offline servers and their regions to be returned
2726     Map<ServerName, List<HRegionInfo>> offlineServers =
2727       new TreeMap<ServerName, List<HRegionInfo>>();
2728     // Iterate regions in META
2729     for (Result result : results) {
2730       HRegionInfo regionInfo = HRegionInfo.getHRegionInfo(result);
2731       if (regionInfo == null) continue;
2732       State state = RegionStateStore.getRegionState(result);
2733       ServerName regionLocation = RegionStateStore.getRegionServer(result);
2734       regionStates.createRegionState(regionInfo, state, regionLocation);
2735       if (!regionStates.isRegionInState(regionInfo, State.OPEN)) {
2736         // Region is not open (either offline or in transition), skip
2737         continue;
2738       }
2739       TableName tableName = regionInfo.getTable();
2740       if (!onlineServers.contains(regionLocation)) {
2741         // Region is located on a server that isn't online
2742         List<HRegionInfo> offlineRegions = offlineServers.get(regionLocation);
2743         if (offlineRegions == null) {
2744           offlineRegions = new ArrayList<HRegionInfo>(1);
2745           offlineServers.put(regionLocation, offlineRegions);
2746         }
2747         regionStates.regionOffline(regionInfo);
2748         offlineRegions.add(regionInfo);
2749       } else if (!disabledOrEnablingTables.contains(tableName)) {
2750         // Region is being served and on an active server
2751         // add only if region not in disabled or enabling table
2752         regionStates.regionOnline(regionInfo, regionLocation);
2753         balancer.regionOnline(regionInfo, regionLocation);
2754       } else if (useZKForAssignment) {
2755         regionStates.regionOffline(regionInfo);
2756       }
2757       // need to enable the table if not disabled or disabling or enabling
2758       // this will be used in rolling restarts
2759       if (!disabledOrDisablingOrEnabling.contains(tableName)
2760         && !getTableStateManager().isTableState(tableName,
2761           ZooKeeperProtos.Table.State.ENABLED)) {
2762         setEnabledTable(tableName);
2763       }
2764     }
2765     return offlineServers;
2766   }
2767 
2768   /**
2769    * Recover the tables that were not fully moved to DISABLED state. These
2770    * tables are in DISABLING state when the master restarted/switched.
2771    *
2772    * @throws KeeperException
2773    * @throws TableNotFoundException
2774    * @throws IOException
2775    */
2776   private void recoverTableInDisablingState()
2777       throws KeeperException, IOException, CoordinatedStateException {
2778     Set<TableName> disablingTables =
2779       tableStateManager.getTablesInStates(ZooKeeperProtos.Table.State.DISABLING);
2780     if (disablingTables.size() != 0) {
2781       for (TableName tableName : disablingTables) {
2782         // Recover by calling DisableTableHandler
2783         LOG.info("The table " + tableName
2784             + " is in DISABLING state.  Hence recovering by moving the table"
2785             + " to DISABLED state.");
2786         new DisableTableHandler(this.server, tableName, catalogTracker,
2787             this, tableLockManager, true).prepare().process();
2788       }
2789     }
2790   }
2791 
2792   /**
2793    * Recover the tables that are not fully moved to ENABLED state. These tables
2794    * are in ENABLING state when the master restarted/switched
2795    *
2796    * @throws KeeperException
2797    * @throws org.apache.hadoop.hbase.TableNotFoundException
2798    * @throws IOException
2799    */
2800   private void recoverTableInEnablingState()
2801       throws KeeperException, IOException, CoordinatedStateException {
2802     Set<TableName> enablingTables = tableStateManager.
2803       getTablesInStates(ZooKeeperProtos.Table.State.ENABLING);
2804     if (enablingTables.size() != 0) {
2805       for (TableName tableName : enablingTables) {
2806         // Recover by calling EnableTableHandler
2807         LOG.info("The table " + tableName
2808             + " is in ENABLING state.  Hence recovering by moving the table"
2809             + " to ENABLED state.");
2810         // enableTable in sync way during master startup,
2811         // no need to invoke coprocessor
2812         EnableTableHandler eth = new EnableTableHandler(this.server, tableName,
2813           catalogTracker, this, tableLockManager, true);
2814         try {
2815           eth.prepare();
2816         } catch (TableNotFoundException e) {
2817           LOG.warn("Table " + tableName + " not found in hbase:meta to recover.");
2818           continue;
2819         }
2820         eth.process();
2821       }
2822     }
2823   }
2824 
2825   /**
2826    * Processes list of dead servers from result of hbase:meta scan and regions in RIT
2827    * <p>
2828    * This is used for failover to recover the lost regions that belonged to
2829    * RegionServers which failed while there was no active master or regions
2830    * that were in RIT.
2831    * <p>
2832    *
2833    *
2834    * @param deadServers
2835    *          The list of dead servers which failed while there was no active
2836    *          master. Can be null.
2837    * @throws IOException
2838    * @throws KeeperException
2839    */
2840   private void processDeadServersAndRecoverLostRegions(
2841       Map<ServerName, List<HRegionInfo>> deadServers)
2842           throws IOException, KeeperException {
2843     if (deadServers != null) {
2844       for (Map.Entry<ServerName, List<HRegionInfo>> server: deadServers.entrySet()) {
2845         ServerName serverName = server.getKey();
2846         // We need to keep such info even if the server is known dead
2847         regionStates.setLastRegionServerOfRegions(serverName, server.getValue());
2848         if (!serverManager.isServerDead(serverName)) {
2849           serverManager.expireServer(serverName); // Let SSH do region re-assign
2850         }
2851       }
2852     }
2853 
2854     List<String> nodes = useZKForAssignment ?
2855       ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.assignmentZNode)
2856       : ZKUtil.listChildrenNoWatch(watcher, watcher.assignmentZNode);
2857     if (nodes != null && !nodes.isEmpty()) {
2858       for (String encodedRegionName : nodes) {
2859         processRegionInTransition(encodedRegionName, null);
2860       }
2861     } else if (!useZKForAssignment) {
2862       // We need to send RPC call again for PENDING_OPEN/PENDING_CLOSE regions
2863       // in case the RPC call is not sent out yet before the master was shut down
2864       // since we update the state before we send the RPC call. We can't update
2865       // the state after the RPC call. Otherwise, we don't know what's happened
2866       // to the region if the master dies right after the RPC call is out.
2867       Map<String, RegionState> rits = regionStates.getRegionsInTransition();
2868       for (RegionState regionState: rits.values()) {
2869         if (!serverManager.isServerOnline(regionState.getServerName())) {
2870           continue; // SSH will handle it
2871         }
2872         State state = regionState.getState();
2873         LOG.info("Processing " + regionState);
2874         switch (state) {
2875         case PENDING_OPEN:
2876           retrySendRegionOpen(regionState);
2877           break;
2878         case PENDING_CLOSE:
2879           retrySendRegionClose(regionState);
2880           break;
2881         default:
2882           // No process for other states
2883         }
2884       }
2885     }
2886   }
2887 
2888   /**
2889    * At master failover, for pending_open region, make sure
2890    * sendRegionOpen RPC call is sent to the target regionserver
2891    */
2892   private void retrySendRegionOpen(final RegionState regionState) {
2893     this.executorService.submit(
2894       new EventHandler(server, EventType.M_MASTER_RECOVERY) {
2895         @Override
2896         public void process() throws IOException {
2897           HRegionInfo hri = regionState.getRegion();
2898           ServerName serverName = regionState.getServerName();
2899           ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
2900           try {
2901             if (!regionState.equals(regionStates.getRegionState(hri))) {
2902               return; // Region is not in the expected state any more
2903             }
2904             while (serverManager.isServerOnline(serverName)
2905                 && !server.isStopped() && !server.isAborted()) {
2906               try {
2907                 List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
2908                 if (shouldAssignRegionsWithFavoredNodes) {
2909                   favoredNodes = ((FavoredNodeLoadBalancer)balancer).getFavoredNodes(hri);
2910                 }
2911                 RegionOpeningState regionOpenState = serverManager.sendRegionOpen(
2912                   serverName, hri, -1, favoredNodes);
2913 
2914                 if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
2915                   // Failed opening this region, this means the target server didn't get
2916                   // the original region open RPC, so re-assign it with a new plan
2917                   LOG.debug("Got failed_opening in retry sendRegionOpen for "
2918                     + regionState + ", re-assign it");
2919                   invokeAssign(hri, true);
2920                 }
2921                 return; // Done.
2922               } catch (Throwable t) {
2923                 if (t instanceof RemoteException) {
2924                   t = ((RemoteException) t).unwrapRemoteException();
2925                 }
2926                 // In case SocketTimeoutException/FailedServerException, retry
2927                 if (t instanceof java.net.SocketTimeoutException
2928                     || t instanceof FailedServerException) {
2929                   Threads.sleep(100);
2930                   continue;
2931                 }
2932                 // For other exceptions, re-assign it
2933                 LOG.debug("Got exception in retry sendRegionOpen for "
2934                   + regionState + ", re-assign it", t);
2935                 invokeAssign(hri);
2936                 return; // Done.
2937               }
2938             }
2939           } finally {
2940             lock.unlock();
2941           }
2942         }
2943       });
2944   }
2945 
2946   /**
2947    * At master failover, for pending_close region, make sure
2948    * sendRegionClose RPC call is sent to the target regionserver
2949    */
2950   private void retrySendRegionClose(final RegionState regionState) {
2951     this.executorService.submit(
2952       new EventHandler(server, EventType.M_MASTER_RECOVERY) {
2953         @Override
2954         public void process() throws IOException {
2955           HRegionInfo hri = regionState.getRegion();
2956           ServerName serverName = regionState.getServerName();
2957           ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
2958           try {
2959             if (!regionState.equals(regionStates.getRegionState(hri))) {
2960               return; // Region is not in the expected state any more
2961             }
2962             while (serverManager.isServerOnline(serverName)
2963                 && !server.isStopped() && !server.isAborted()) {
2964               try {
2965                 if (!serverManager.sendRegionClose(serverName, hri, -1, null, false)) {
2966                   // This means the region is still on the target server
2967                   LOG.debug("Got false in retry sendRegionClose for "
2968                     + regionState + ", re-close it");
2969                   invokeUnAssign(hri);
2970                 }
2971                 return; // Done.
2972               } catch (Throwable t) {
2973                 if (t instanceof RemoteException) {
2974                   t = ((RemoteException) t).unwrapRemoteException();
2975                 }
2976                 // In case SocketTimeoutException/FailedServerException, retry
2977                 if (t instanceof java.net.SocketTimeoutException
2978                     || t instanceof FailedServerException) {
2979                   Threads.sleep(100);
2980                   continue;
2981                 }
2982                 if (!(t instanceof NotServingRegionException
2983                     || t instanceof RegionAlreadyInTransitionException)) {
2984                   // NotServingRegionException/RegionAlreadyInTransitionException
2985                   // means the target server got the original region close request.
2986                   // For other exceptions, re-close it
2987                   LOG.debug("Got exception in retry sendRegionClose for "
2988                     + regionState + ", re-close it", t);
2989                   invokeUnAssign(hri);
2990                 }
2991                 return; // Done.
2992               }
2993             }
2994           } finally {
2995             lock.unlock();
2996           }
2997         }
2998       });
2999   }
3000 
3001   /**
3002    * Set Regions in transitions metrics.
3003    * This takes an iterator on the RegionInTransition map (CLSM), and is not synchronized.
3004    * This iterator is not fail fast, which may lead to stale read; but that's better than
3005    * creating a copy of the map for metrics computation, as this method will be invoked
3006    * on a frequent interval.
3007    */
3008   public void updateRegionsInTransitionMetrics() {
3009     long currentTime = System.currentTimeMillis();
3010     int totalRITs = 0;
3011     int totalRITsOverThreshold = 0;
3012     long oldestRITTime = 0;
3013     int ritThreshold = this.server.getConfiguration().
3014       getInt(HConstants.METRICS_RIT_STUCK_WARNING_THRESHOLD, 60000);
3015     for (RegionState state: regionStates.getRegionsInTransition().values()) {
3016       totalRITs++;
3017       long ritTime = currentTime - state.getStamp();
3018       if (ritTime > ritThreshold) { // more than the threshold
3019         totalRITsOverThreshold++;
3020       }
3021       if (oldestRITTime < ritTime) {
3022         oldestRITTime = ritTime;
3023       }
3024     }
3025     if (this.metricsAssignmentManager != null) {
3026       this.metricsAssignmentManager.updateRITOldestAge(oldestRITTime);
3027       this.metricsAssignmentManager.updateRITCount(totalRITs);
3028       this.metricsAssignmentManager.updateRITCountOverThreshold(totalRITsOverThreshold);
3029     }
3030   }
3031 
3032   /**
3033    * @param region Region whose plan we are to clear.
3034    */
3035   void clearRegionPlan(final HRegionInfo region) {
3036     synchronized (this.regionPlans) {
3037       this.regionPlans.remove(region.getEncodedName());
3038     }
3039   }
3040 
3041   /**
3042    * Wait on region to clear regions-in-transition.
3043    * @param hri Region to wait on.
3044    * @throws IOException
3045    */
3046   public void waitOnRegionToClearRegionsInTransition(final HRegionInfo hri)
3047       throws IOException, InterruptedException {
3048     waitOnRegionToClearRegionsInTransition(hri, -1L);
3049   }
3050 
3051   /**
3052    * Wait on region to clear regions-in-transition or time out
3053    * @param hri
3054    * @param timeOut Milliseconds to wait for current region to be out of transition state.
3055    * @return True when a region clears regions-in-transition before timeout otherwise false
3056    * @throws InterruptedException
3057    */
3058   public boolean waitOnRegionToClearRegionsInTransition(final HRegionInfo hri, long timeOut)
3059       throws InterruptedException {
3060     if (!regionStates.isRegionInTransition(hri)) return true;
3061     long end = (timeOut <= 0) ? Long.MAX_VALUE : EnvironmentEdgeManager.currentTimeMillis()
3062         + timeOut;
3063     // There is already a timeout monitor on regions in transition so I
3064     // should not have to have one here too?
3065     LOG.info("Waiting for " + hri.getEncodedName() +
3066         " to leave regions-in-transition, timeOut=" + timeOut + " ms.");
3067     while (!this.server.isStopped() && regionStates.isRegionInTransition(hri)) {
3068       regionStates.waitForUpdate(100);
3069       if (EnvironmentEdgeManager.currentTimeMillis() > end) {
3070         LOG.info("Timed out on waiting for " + hri.getEncodedName() + " to be assigned.");
3071         return false;
3072       }
3073     }
3074     if (this.server.isStopped()) {
3075       LOG.info("Giving up wait on regions in transition because stoppable.isStopped is set");
3076       return false;
3077     }
3078     return true;
3079   }
3080 
3081   void invokeAssign(HRegionInfo regionInfo) {
3082     invokeAssign(regionInfo, true);
3083   }
3084 
3085   void invokeAssign(HRegionInfo regionInfo, boolean newPlan) {
3086     threadPoolExecutorService.submit(new AssignCallable(this, regionInfo, newPlan));
3087   }
3088 
3089   void invokeUnAssign(HRegionInfo regionInfo) {
3090     threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
3091   }
3092 
3093   public boolean isCarryingMeta(ServerName serverName) {
3094     return isCarryingRegion(serverName, HRegionInfo.FIRST_META_REGIONINFO);
3095   }
3096 
3097   /**
3098    * Check if the shutdown server carries the specific region.
3099    * We have a bunch of places that store region location
3100    * Those values aren't consistent. There is a delay of notification.
3101    * The location from zookeeper unassigned node has the most recent data;
3102    * but the node could be deleted after the region is opened by AM.
3103    * The AM's info could be old when OpenedRegionHandler
3104    * processing hasn't finished yet when server shutdown occurs.
3105    * @return whether the serverName currently hosts the region
3106    */
3107   private boolean isCarryingRegion(ServerName serverName, HRegionInfo hri) {
3108     RegionTransition rt = null;
3109     try {
3110       byte [] data = ZKAssign.getData(watcher, hri.getEncodedName());
3111       // This call can legitimately come by null
3112       rt = data == null? null: RegionTransition.parseFrom(data);
3113     } catch (KeeperException e) {
3114       server.abort("Exception reading unassigned node for region=" + hri.getEncodedName(), e);
3115     } catch (DeserializationException e) {
3116       server.abort("Exception parsing unassigned node for region=" + hri.getEncodedName(), e);
3117     }
3118 
3119     ServerName addressFromZK = rt != null? rt.getServerName():  null;
3120     if (addressFromZK != null) {
3121       // if we get something from ZK, we will use the data
3122       boolean matchZK = addressFromZK.equals(serverName);
3123       LOG.debug("Checking region=" + hri.getRegionNameAsString() + ", zk server=" + addressFromZK +
3124         " current=" + serverName + ", matches=" + matchZK);
3125       return matchZK;
3126     }
3127 
3128     ServerName addressFromAM = regionStates.getRegionServerOfRegion(hri);
3129     boolean matchAM = (addressFromAM != null &&
3130       addressFromAM.equals(serverName));
3131     LOG.debug("based on AM, current region=" + hri.getRegionNameAsString() +
3132       " is on server=" + (addressFromAM != null ? addressFromAM : "null") +
3133       " server being checked: " + serverName);
3134 
3135     return matchAM;
3136   }
3137 
3138   /**
3139    * Process shutdown server removing any assignments.
3140    * @param sn Server that went down.
3141    * @return list of regions in transition on this server
3142    */
3143   public List<HRegionInfo> processServerShutdown(final ServerName sn) {
3144     // Clean out any existing assignment plans for this server
3145     synchronized (this.regionPlans) {
3146       for (Iterator <Map.Entry<String, RegionPlan>> i =
3147           this.regionPlans.entrySet().iterator(); i.hasNext();) {
3148         Map.Entry<String, RegionPlan> e = i.next();
3149         ServerName otherSn = e.getValue().getDestination();
3150         // The name will be null if the region is planned for a random assign.
3151         if (otherSn != null && otherSn.equals(sn)) {
3152           // Use iterator's remove else we'll get CME
3153           i.remove();
3154         }
3155       }
3156     }
3157     List<HRegionInfo> regions = regionStates.serverOffline(watcher, sn);
3158     for (Iterator<HRegionInfo> it = regions.iterator(); it.hasNext(); ) {
3159       HRegionInfo hri = it.next();
3160       String encodedName = hri.getEncodedName();
3161 
3162       // We need a lock on the region as we could update it
3163       Lock lock = locker.acquireLock(encodedName);
3164       try {
3165         RegionState regionState =
3166           regionStates.getRegionTransitionState(encodedName);
3167         if (regionState == null
3168             || (regionState.getServerName() != null && !regionState.isOnServer(sn))
3169             || !(regionState.isFailedClose() || regionState.isOffline()
3170               || regionState.isPendingOpenOrOpening())) {
3171           LOG.info("Skip " + regionState + " since it is not opening/failed_close"
3172             + " on the dead server any more: " + sn);
3173           it.remove();
3174         } else {
3175           try {
3176             // Delete the ZNode if exists
3177             ZKAssign.deleteNodeFailSilent(watcher, hri);
3178           } catch (KeeperException ke) {
3179             server.abort("Unexpected ZK exception deleting node " + hri, ke);
3180           }
3181           if (tableStateManager.isTableState(hri.getTable(),
3182               ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3183             regionStates.regionOffline(hri);
3184             it.remove();
3185             continue;
3186           }
3187           // Mark the region offline and assign it again by SSH
3188           regionStates.updateRegionState(hri, State.OFFLINE);
3189         }
3190       } finally {
3191         lock.unlock();
3192       }
3193     }
3194     return regions;
3195   }
3196 
3197   /**
3198    * @param plan Plan to execute.
3199    */
3200   public void balance(final RegionPlan plan) {
3201     HRegionInfo hri = plan.getRegionInfo();
3202     TableName tableName = hri.getTable();
3203     if (tableStateManager.isTableState(tableName,
3204       ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3205       LOG.info("Ignored moving region of disabling/disabled table "
3206         + tableName);
3207       return;
3208     }
3209 
3210     // Move the region only if it's assigned
3211     String encodedName = hri.getEncodedName();
3212     ReentrantLock lock = locker.acquireLock(encodedName);
3213     try {
3214       if (!regionStates.isRegionOnline(hri)) {
3215         RegionState state = regionStates.getRegionState(encodedName);
3216         LOG.info("Ignored moving region not assigned: " + hri + ", "
3217           + (state == null ? "not in region states" : state));
3218         return;
3219       }
3220       synchronized (this.regionPlans) {
3221         this.regionPlans.put(plan.getRegionName(), plan);
3222       }
3223       unassign(hri, false, plan.getDestination());
3224     } finally {
3225       lock.unlock();
3226     }
3227   }
3228 
3229   public void stop() {
3230     shutdown(); // Stop executor service, etc
3231   }
3232 
3233   /**
3234    * Shutdown the threadpool executor service
3235    */
3236   public void shutdown() {
3237     // It's an immediate shutdown, so we're clearing the remaining tasks.
3238     synchronized (zkEventWorkerWaitingList){
3239       zkEventWorkerWaitingList.clear();
3240     }
3241     threadPoolExecutorService.shutdownNow();
3242     zkEventWorkers.shutdownNow();
3243     regionStateStore.stop();
3244   }
3245 
3246   protected void setEnabledTable(TableName tableName) {
3247     try {
3248       this.tableStateManager.setTableState(tableName,
3249         ZooKeeperProtos.Table.State.ENABLED);
3250     } catch (CoordinatedStateException e) {
3251       // here we can abort as it is the start up flow
3252       String errorMsg = "Unable to ensure that the table " + tableName
3253           + " will be" + " enabled because of a ZooKeeper issue";
3254       LOG.error(errorMsg);
3255       this.server.abort(errorMsg, e);
3256     }
3257   }
3258 
3259   /**
3260    * Set region as OFFLINED up in zookeeper asynchronously.
3261    * @param state
3262    * @return True if we succeeded, false otherwise (State was incorrect or failed
3263    * updating zk).
3264    */
3265   private boolean asyncSetOfflineInZooKeeper(final RegionState state,
3266       final AsyncCallback.StringCallback cb, final ServerName destination) {
3267     if (!state.isClosed() && !state.isOffline()) {
3268       this.server.abort("Unexpected state trying to OFFLINE; " + state,
3269         new IllegalStateException());
3270       return false;
3271     }
3272     regionStates.updateRegionState(state.getRegion(), State.OFFLINE);
3273     try {
3274       ZKAssign.asyncCreateNodeOffline(watcher, state.getRegion(),
3275         destination, cb, state);
3276     } catch (KeeperException e) {
3277       if (e instanceof NodeExistsException) {
3278         LOG.warn("Node for " + state.getRegion() + " already exists");
3279       } else {
3280         server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
3281       }
3282       return false;
3283     }
3284     return true;
3285   }
3286 
3287   private boolean deleteNodeInStates(String encodedName,
3288       String desc, ServerName sn, EventType... types) {
3289     try {
3290       for (EventType et: types) {
3291         if (ZKAssign.deleteNode(watcher, encodedName, et, sn)) {
3292           return true;
3293         }
3294       }
3295       LOG.info("Failed to delete the " + desc + " node for "
3296         + encodedName + ". The node type may not match");
3297     } catch (NoNodeException e) {
3298       if (LOG.isDebugEnabled()) {
3299         LOG.debug("The " + desc + " node for " + encodedName + " already deleted");
3300       }
3301     } catch (KeeperException ke) {
3302       server.abort("Unexpected ZK exception deleting " + desc
3303         + " node for the region " + encodedName, ke);
3304     }
3305     return false;
3306   }
3307 
3308   private void deleteMergingNode(String encodedName, ServerName sn) {
3309     deleteNodeInStates(encodedName, "merging", sn, EventType.RS_ZK_REGION_MERGING,
3310       EventType.RS_ZK_REQUEST_REGION_MERGE, EventType.RS_ZK_REGION_MERGED);
3311   }
3312 
3313   private void deleteSplittingNode(String encodedName, ServerName sn) {
3314     deleteNodeInStates(encodedName, "splitting", sn, EventType.RS_ZK_REGION_SPLITTING,
3315       EventType.RS_ZK_REQUEST_REGION_SPLIT, EventType.RS_ZK_REGION_SPLIT);
3316   }
3317 
3318   private void onRegionFailedOpen(
3319       final HRegionInfo hri, final ServerName sn) {
3320     String encodedName = hri.getEncodedName();
3321     AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
3322     if (failedOpenCount == null) {
3323       failedOpenCount = new AtomicInteger();
3324       // No need to use putIfAbsent, or extra synchronization since
3325       // this whole handleRegion block is locked on the encoded region
3326       // name, and failedOpenTracker is updated only in this block
3327       failedOpenTracker.put(encodedName, failedOpenCount);
3328     }
3329     if (failedOpenCount.incrementAndGet() >= maximumAttempts) {
3330       regionStates.updateRegionState(hri, State.FAILED_OPEN);
3331       // remove the tracking info to save memory, also reset
3332       // the count for next open initiative
3333       failedOpenTracker.remove(encodedName);
3334     } else {
3335       // Handle this the same as if it were opened and then closed.
3336       RegionState regionState = regionStates.updateRegionState(hri, State.CLOSED);
3337       if (regionState != null) {
3338         // When there are more than one region server a new RS is selected as the
3339         // destination and the same is updated in the region plan. (HBASE-5546)
3340         if (getTableStateManager().isTableState(hri.getTable(),
3341             ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3342           offlineDisabledRegion(hri);
3343           return;
3344         }
3345         // ZK Node is in CLOSED state, assign it.
3346          regionStates.updateRegionState(hri, RegionState.State.CLOSED);
3347         // This below has to do w/ online enable/disable of a table
3348         removeClosedRegion(hri);
3349         try {
3350           getRegionPlan(hri, sn, true);
3351         } catch (HBaseIOException e) {
3352           LOG.warn("Failed to get region plan", e);
3353         }
3354         invokeAssign(hri, false);
3355       }
3356     }
3357   }
3358 
3359   private void onRegionOpen(
3360       final HRegionInfo hri, final ServerName sn, long openSeqNum) {
3361     regionOnline(hri, sn, openSeqNum);
3362     if (useZKForAssignment) {
3363       try {
3364         // Delete the ZNode if exists
3365         ZKAssign.deleteNodeFailSilent(watcher, hri);
3366       } catch (KeeperException ke) {
3367         server.abort("Unexpected ZK exception deleting node " + hri, ke);
3368       }
3369     }
3370 
3371     // reset the count, if any
3372     failedOpenTracker.remove(hri.getEncodedName());
3373     if (getTableStateManager().isTableState(hri.getTable(),
3374         ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3375       invokeUnAssign(hri);
3376     }
3377   }
3378 
3379   private void onRegionClosed(final HRegionInfo hri) {
3380     if (getTableStateManager().isTableState(hri.getTable(),
3381         ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3382       offlineDisabledRegion(hri);
3383       return;
3384     }
3385     regionStates.updateRegionState(hri, RegionState.State.CLOSED);
3386     sendRegionClosedNotification(hri);
3387     // This below has to do w/ online enable/disable of a table
3388     removeClosedRegion(hri);
3389     invokeAssign(hri, false);
3390   }
3391 
3392   private String onRegionSplit(ServerName sn, TransitionCode code,
3393       HRegionInfo p, HRegionInfo a, HRegionInfo b) {
3394     RegionState rs_p = regionStates.getRegionState(p);
3395     RegionState rs_a = regionStates.getRegionState(a);
3396     RegionState rs_b = regionStates.getRegionState(b);
3397     if (!(rs_p.isOpenOrSplittingOnServer(sn)
3398         && (rs_a == null || rs_a.isOpenOrSplittingNewOnServer(sn))
3399         && (rs_b == null || rs_b.isOpenOrSplittingNewOnServer(sn)))) {
3400       return "Not in state good for split";
3401     }
3402 
3403     regionStates.updateRegionState(a, State.SPLITTING_NEW, sn);
3404     regionStates.updateRegionState(b, State.SPLITTING_NEW, sn);
3405     regionStates.updateRegionState(p, State.SPLITTING);
3406 
3407     if (code == TransitionCode.SPLIT) {
3408       if (TEST_SKIP_SPLIT_HANDLING) {
3409         return "Skipping split message, TEST_SKIP_SPLIT_HANDLING is set";
3410       }
3411       regionOffline(p, State.SPLIT);
3412       regionOnline(a, sn, 1);
3413       regionOnline(b, sn, 1);
3414 
3415       // User could disable the table before master knows the new region.
3416       if (getTableStateManager().isTableState(p.getTable(),
3417           ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3418         invokeUnAssign(a);
3419         invokeUnAssign(b);
3420       }
3421     } else if (code == TransitionCode.SPLIT_PONR) {
3422       try {
3423         regionStateStore.splitRegion(p, a, b, sn);
3424       } catch (IOException ioe) {
3425         LOG.info("Failed to record split region " + p.getShortNameToLog());
3426         return "Failed to record the splitting in meta";
3427       }
3428     } else if (code == TransitionCode.SPLIT_REVERTED) {
3429       regionOnline(p, sn);
3430       regionOffline(a);
3431       regionOffline(b);
3432 
3433       if (getTableStateManager().isTableState(p.getTable(),
3434           ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3435         invokeUnAssign(p);
3436       }
3437     }
3438     return null;
3439   }
3440 
3441   private String onRegionMerge(ServerName sn, TransitionCode code,
3442       HRegionInfo p, HRegionInfo a, HRegionInfo b) {
3443     RegionState rs_p = regionStates.getRegionState(p);
3444     RegionState rs_a = regionStates.getRegionState(a);
3445     RegionState rs_b = regionStates.getRegionState(b);
3446     if (!(rs_a.isOpenOrMergingOnServer(sn) && rs_b.isOpenOrMergingOnServer(sn)
3447         && (rs_p == null || rs_p.isOpenOrMergingNewOnServer(sn)))) {
3448       return "Not in state good for merge";
3449     }
3450 
3451     regionStates.updateRegionState(a, State.MERGING);
3452     regionStates.updateRegionState(b, State.MERGING);
3453     regionStates.updateRegionState(p, State.MERGING_NEW, sn);
3454 
3455     String encodedName = p.getEncodedName();
3456     if (code == TransitionCode.READY_TO_MERGE) {
3457       mergingRegions.put(encodedName,
3458         new PairOfSameType<HRegionInfo>(a, b));
3459     } else if (code == TransitionCode.MERGED) {
3460       mergingRegions.remove(encodedName);
3461       regionOffline(a, State.MERGED);
3462       regionOffline(b, State.MERGED);
3463       regionOnline(p, sn, 1);
3464 
3465       // User could disable the table before master knows the new region.
3466       if (getTableStateManager().isTableState(p.getTable(),
3467           ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3468         invokeUnAssign(p);
3469       }
3470     } else if (code == TransitionCode.MERGE_PONR) {
3471       try {
3472         regionStateStore.mergeRegions(p, a, b, sn);
3473       } catch (IOException ioe) {
3474         LOG.info("Failed to record merged region " + p.getShortNameToLog());
3475         return "Failed to record the merging in meta";
3476       }
3477     } else {
3478       mergingRegions.remove(encodedName);
3479       regionOnline(a, sn);
3480       regionOnline(b, sn);
3481       regionOffline(p);
3482 
3483       if (getTableStateManager().isTableState(p.getTable(),
3484           ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3485         invokeUnAssign(a);
3486         invokeUnAssign(b);
3487       }
3488     }
3489     return null;
3490   }
3491 
3492   /**
3493    * A helper to handle region merging transition event.
3494    * It transitions merging regions to MERGING state.
3495    */
3496   private boolean handleRegionMerging(final RegionTransition rt, final String encodedName,
3497       final String prettyPrintedRegionName, final ServerName sn) {
3498     if (!serverManager.isServerOnline(sn)) {
3499       LOG.warn("Dropped merging! ServerName=" + sn + " unknown.");
3500       return false;
3501     }
3502     byte [] payloadOfMerging = rt.getPayload();
3503     List<HRegionInfo> mergingRegions;
3504     try {
3505       mergingRegions = HRegionInfo.parseDelimitedFrom(
3506         payloadOfMerging, 0, payloadOfMerging.length);
3507     } catch (IOException e) {
3508       LOG.error("Dropped merging! Failed reading "  + rt.getEventType()
3509         + " payload for " + prettyPrintedRegionName);
3510       return false;
3511     }
3512     assert mergingRegions.size() == 3;
3513     HRegionInfo p = mergingRegions.get(0);
3514     HRegionInfo hri_a = mergingRegions.get(1);
3515     HRegionInfo hri_b = mergingRegions.get(2);
3516 
3517     RegionState rs_p = regionStates.getRegionState(p);
3518     RegionState rs_a = regionStates.getRegionState(hri_a);
3519     RegionState rs_b = regionStates.getRegionState(hri_b);
3520 
3521     if (!((rs_a == null || rs_a.isOpenOrMergingOnServer(sn))
3522         && (rs_b == null || rs_b.isOpenOrMergingOnServer(sn))
3523         && (rs_p == null || rs_p.isOpenOrMergingNewOnServer(sn)))) {
3524       LOG.warn("Dropped merging! Not in state good for MERGING; rs_p="
3525         + rs_p + ", rs_a=" + rs_a + ", rs_b=" + rs_b);
3526       return false;
3527     }
3528 
3529     EventType et = rt.getEventType();
3530     if (et == EventType.RS_ZK_REQUEST_REGION_MERGE) {
3531       try {
3532         RegionMergeCoordination.RegionMergeDetails std =
3533             ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
3534                 .getRegionMergeCoordination().getDefaultDetails();
3535         ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
3536             .getRegionMergeCoordination().processRegionMergeRequest(p, hri_a, hri_b, sn, std);
3537         if (((ZkRegionMergeCoordination.ZkRegionMergeDetails) std).getZnodeVersion() == -1) {
3538           byte[] data = ZKAssign.getData(watcher, encodedName);
3539          EventType currentType = null;
3540           if (data != null) {
3541             RegionTransition newRt = RegionTransition.parseFrom(data);
3542             currentType = newRt.getEventType();
3543           }
3544           if (currentType == null || (currentType != EventType.RS_ZK_REGION_MERGED
3545               && currentType != EventType.RS_ZK_REGION_MERGING)) {
3546             LOG.warn("Failed to transition pending_merge node "
3547               + encodedName + " to merging, it's now " + currentType);
3548             return false;
3549           }
3550         }
3551       } catch (Exception e) {
3552         LOG.warn("Failed to transition pending_merge node "
3553           + encodedName + " to merging", e);
3554         return false;
3555       }
3556     }
3557 
3558     synchronized (regionStates) {
3559       regionStates.updateRegionState(hri_a, State.MERGING);
3560       regionStates.updateRegionState(hri_b, State.MERGING);
3561       regionStates.updateRegionState(p, State.MERGING_NEW, sn);
3562 
3563       if (et != EventType.RS_ZK_REGION_MERGED) {
3564         this.mergingRegions.put(encodedName,
3565           new PairOfSameType<HRegionInfo>(hri_a, hri_b));
3566       } else {
3567         this.mergingRegions.remove(encodedName);
3568         regionOffline(hri_a, State.MERGED);
3569         regionOffline(hri_b, State.MERGED);
3570         regionOnline(p, sn);
3571       }
3572     }
3573 
3574     if (et == EventType.RS_ZK_REGION_MERGED) {
3575       LOG.debug("Handling MERGED event for " + encodedName + "; deleting node");
3576       // Remove region from ZK
3577       try {
3578         boolean successful = false;
3579         while (!successful) {
3580           // It's possible that the RS tickles in between the reading of the
3581           // znode and the deleting, so it's safe to retry.
3582           successful = ZKAssign.deleteNode(watcher, encodedName,
3583             EventType.RS_ZK_REGION_MERGED, sn);
3584         }
3585       } catch (KeeperException e) {
3586         if (e instanceof NoNodeException) {
3587           String znodePath = ZKUtil.joinZNode(watcher.splitLogZNode, encodedName);
3588           LOG.debug("The znode " + znodePath + " does not exist.  May be deleted already.");
3589         } else {
3590           server.abort("Error deleting MERGED node " + encodedName, e);
3591         }
3592       }
3593       LOG.info("Handled MERGED event; merged=" + p.getRegionNameAsString()
3594         + ", region_a=" + hri_a.getRegionNameAsString() + ", region_b="
3595         + hri_b.getRegionNameAsString() + ", on " + sn);
3596 
3597       // User could disable the table before master knows the new region.
3598       if (tableStateManager.isTableState(p.getTable(),
3599           ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3600         unassign(p);
3601       }
3602     }
3603     return true;
3604   }
3605 
3606   /**
3607    * A helper to handle region splitting transition event.
3608    */
3609   private boolean handleRegionSplitting(final RegionTransition rt, final String encodedName,
3610       final String prettyPrintedRegionName, final ServerName sn) {
3611     if (!serverManager.isServerOnline(sn)) {
3612       LOG.warn("Dropped splitting! ServerName=" + sn + " unknown.");
3613       return false;
3614     }
3615     byte [] payloadOfSplitting = rt.getPayload();
3616     List<HRegionInfo> splittingRegions;
3617     try {
3618       splittingRegions = HRegionInfo.parseDelimitedFrom(
3619         payloadOfSplitting, 0, payloadOfSplitting.length);
3620     } catch (IOException e) {
3621       LOG.error("Dropped splitting! Failed reading " + rt.getEventType()
3622         + " payload for " + prettyPrintedRegionName);
3623       return false;
3624     }
3625     assert splittingRegions.size() == 2;
3626     HRegionInfo hri_a = splittingRegions.get(0);
3627     HRegionInfo hri_b = splittingRegions.get(1);
3628 
3629     RegionState rs_p = regionStates.getRegionState(encodedName);
3630     RegionState rs_a = regionStates.getRegionState(hri_a);
3631     RegionState rs_b = regionStates.getRegionState(hri_b);
3632 
3633     if (!((rs_p == null || rs_p.isOpenOrSplittingOnServer(sn))
3634         && (rs_a == null || rs_a.isOpenOrSplittingNewOnServer(sn))
3635         && (rs_b == null || rs_b.isOpenOrSplittingNewOnServer(sn)))) {
3636       LOG.warn("Dropped splitting! Not in state good for SPLITTING; rs_p="
3637         + rs_p + ", rs_a=" + rs_a + ", rs_b=" + rs_b);
3638       return false;
3639     }
3640 
3641     if (rs_p == null) {
3642       // Splitting region should be online
3643       rs_p = regionStates.updateRegionState(rt, State.OPEN);
3644       if (rs_p == null) {
3645         LOG.warn("Received splitting for region " + prettyPrintedRegionName
3646           + " from server " + sn + " but it doesn't exist anymore,"
3647           + " probably already processed its split");
3648         return false;
3649       }
3650       regionStates.regionOnline(rs_p.getRegion(), sn);
3651     }
3652 
3653     HRegionInfo p = rs_p.getRegion();
3654     EventType et = rt.getEventType();
3655     if (et == EventType.RS_ZK_REQUEST_REGION_SPLIT) {
3656       try {
3657         SplitTransactionDetails std =
3658             ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
3659                 .getSplitTransactionCoordination().getDefaultDetails();
3660         if (((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
3661             .getSplitTransactionCoordination().processTransition(p, hri_a, hri_b, sn, std) == -1) {
3662           byte[] data = ZKAssign.getData(watcher, encodedName);
3663           EventType currentType = null;
3664           if (data != null) {
3665             RegionTransition newRt = RegionTransition.parseFrom(data);
3666             currentType = newRt.getEventType();
3667           }
3668           if (currentType == null
3669               || (currentType != EventType.RS_ZK_REGION_SPLIT && currentType != EventType.RS_ZK_REGION_SPLITTING)) {
3670             LOG.warn("Failed to transition pending_split node " + encodedName
3671                 + " to splitting, it's now " + currentType);
3672             return false;
3673           }
3674         }
3675       } catch (Exception e) {
3676         LOG.warn("Failed to transition pending_split node " + encodedName + " to splitting", e);
3677         return false;
3678       }
3679     }
3680 
3681     synchronized (regionStates) {
3682       regionStates.updateRegionState(hri_a, State.SPLITTING_NEW, sn);
3683       regionStates.updateRegionState(hri_b, State.SPLITTING_NEW, sn);
3684       regionStates.updateRegionState(rt, State.SPLITTING);
3685 
3686       // The below is for testing ONLY!  We can't do fault injection easily, so
3687       // resort to this kinda uglyness -- St.Ack 02/25/2011.
3688       if (TEST_SKIP_SPLIT_HANDLING) {
3689         LOG.warn("Skipping split message, TEST_SKIP_SPLIT_HANDLING is set");
3690         return true; // return true so that the splitting node stays
3691       }
3692 
3693       if (et == EventType.RS_ZK_REGION_SPLIT) {
3694         regionOffline(p, State.SPLIT);
3695         regionOnline(hri_a, sn);
3696         regionOnline(hri_b, sn);
3697       }
3698     }
3699 
3700     if (et == EventType.RS_ZK_REGION_SPLIT) {
3701       LOG.debug("Handling SPLIT event for " + encodedName + "; deleting node");
3702       // Remove region from ZK
3703       try {
3704         boolean successful = false;
3705         while (!successful) {
3706           // It's possible that the RS tickles in between the reading of the
3707           // znode and the deleting, so it's safe to retry.
3708           successful = ZKAssign.deleteNode(watcher, encodedName,
3709             EventType.RS_ZK_REGION_SPLIT, sn);
3710         }
3711       } catch (KeeperException e) {
3712         if (e instanceof NoNodeException) {
3713           String znodePath = ZKUtil.joinZNode(watcher.splitLogZNode, encodedName);
3714           LOG.debug("The znode " + znodePath + " does not exist.  May be deleted already.");
3715         } else {
3716           server.abort("Error deleting SPLIT node " + encodedName, e);
3717         }
3718       }
3719       LOG.info("Handled SPLIT event; parent=" + p.getRegionNameAsString()
3720         + ", daughter a=" + hri_a.getRegionNameAsString() + ", daughter b="
3721         + hri_b.getRegionNameAsString() + ", on " + sn);
3722 
3723       // User could disable the table before master knows the new region.
3724       if (tableStateManager.isTableState(p.getTable(),
3725           ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3726         unassign(hri_a);
3727         unassign(hri_b);
3728       }
3729     }
3730     return true;
3731   }
3732 
3733   /**
3734    * A region is offline.  The new state should be the specified one,
3735    * if not null.  If the specified state is null, the new state is Offline.
3736    * The specified state can be Split/Merged/Offline/null only.
3737    */
3738   private void regionOffline(final HRegionInfo regionInfo, final State state) {
3739     regionStates.regionOffline(regionInfo, state);
3740     removeClosedRegion(regionInfo);
3741     // remove the region plan as well just in case.
3742     clearRegionPlan(regionInfo);
3743     balancer.regionOffline(regionInfo);
3744 
3745     // Tell our listeners that a region was closed
3746     sendRegionClosedNotification(regionInfo);
3747   }
3748 
3749   private void sendRegionOpenedNotification(final HRegionInfo regionInfo,
3750       final ServerName serverName) {
3751     if (!this.listeners.isEmpty()) {
3752       for (AssignmentListener listener : this.listeners) {
3753         listener.regionOpened(regionInfo, serverName);
3754       }
3755     }
3756   }
3757 
3758   private void sendRegionClosedNotification(final HRegionInfo regionInfo) {
3759     if (!this.listeners.isEmpty()) {
3760       for (AssignmentListener listener : this.listeners) {
3761         listener.regionClosed(regionInfo);
3762       }
3763     }
3764   }
3765 
3766   /**
3767    * Try to update some region states. If the state machine prevents
3768    * such update, an error message is returned to explain the reason.
3769    *
3770    * It's expected that in each transition there should have just one
3771    * region for opening/closing, 3 regions for splitting/merging.
3772    * These regions should be on the server that requested the change.
3773    *
3774    * Region state machine. Only these transitions
3775    * are expected to be triggered by a region server.
3776    *
3777    * On the state transition:
3778    *  (1) Open/Close should be initiated by master
3779    *      (a) Master sets the region to pending_open/pending_close
3780    *        in memory and hbase:meta after sending the request
3781    *        to the region server
3782    *      (b) Region server reports back to the master
3783    *        after open/close is done (either success/failure)
3784    *      (c) If region server has problem to report the status
3785    *        to master, it must be because the master is down or some
3786    *        temporary network issue. Otherwise, the region server should
3787    *        abort since it must be a bug. If the master is not accessible,
3788    *        the region server should keep trying until the server is
3789    *        stopped or till the status is reported to the (new) master
3790    *      (d) If region server dies in the middle of opening/closing
3791    *        a region, SSH picks it up and finishes it
3792    *      (e) If master dies in the middle, the new master recovers
3793    *        the state during initialization from hbase:meta. Region server
3794    *        can report any transition that has not been reported to
3795    *        the previous active master yet
3796    *  (2) Split/merge is initiated by region servers
3797    *      (a) To split a region, a region server sends a request
3798    *        to master to try to set a region to splitting, together with
3799    *        two daughters (to be created) to splitting new. If approved
3800    *        by the master, the splitting can then move ahead
3801    *      (b) To merge two regions, a region server sends a request to
3802    *        master to try to set the new merged region (to be created) to
3803    *        merging_new, together with two regions (to be merged) to merging.
3804    *        If it is ok with the master, the merge can then move ahead
3805    *      (c) Once the splitting/merging is done, the region server
3806    *        reports the status back to the master either success/failure.
3807    *      (d) Other scenarios should be handled similarly as for
3808    *        region open/close
3809    */
3810   protected String onRegionTransition(final ServerName serverName,
3811       final RegionStateTransition transition) {
3812     TransitionCode code = transition.getTransitionCode();
3813     HRegionInfo hri = HRegionInfo.convert(transition.getRegionInfo(0));
3814     RegionState current = regionStates.getRegionState(hri);
3815     if (LOG.isDebugEnabled()) {
3816       LOG.debug("Got transition " + code + " for "
3817         + (current != null ? current.toString() : hri.getShortNameToLog())
3818         + " from " + serverName);
3819     }
3820     String errorMsg = null;
3821     switch (code) {
3822     case OPENED:
3823     case FAILED_OPEN:
3824       if (current == null
3825           || !current.isPendingOpenOrOpeningOnServer(serverName)) {
3826         errorMsg = hri.getShortNameToLog()
3827           + " is not pending open on " + serverName;
3828       } else if (code == TransitionCode.FAILED_OPEN) {
3829         onRegionFailedOpen(hri, serverName);
3830       } else {
3831         long openSeqNum = HConstants.NO_SEQNUM;
3832         if (transition.hasOpenSeqNum()) {
3833           openSeqNum = transition.getOpenSeqNum();
3834         }
3835         if (openSeqNum < 0) {
3836           errorMsg = "Newly opened region has invalid open seq num " + openSeqNum;
3837         } else {
3838           onRegionOpen(hri, serverName, openSeqNum);
3839         }
3840       }
3841       break;
3842 
3843     case CLOSED:
3844       if (current == null
3845           || !current.isPendingCloseOrClosingOnServer(serverName)) {
3846         errorMsg = hri.getShortNameToLog()
3847           + " is not pending close on " + serverName;
3848       } else {
3849         onRegionClosed(hri);
3850       }
3851       break;
3852 
3853     case READY_TO_SPLIT:
3854     case SPLIT_PONR:
3855     case SPLIT:
3856     case SPLIT_REVERTED:
3857       errorMsg = onRegionSplit(serverName, code, hri,
3858         HRegionInfo.convert(transition.getRegionInfo(1)),
3859         HRegionInfo.convert(transition.getRegionInfo(2)));
3860       break;
3861 
3862     case READY_TO_MERGE:
3863     case MERGE_PONR:
3864     case MERGED:
3865     case MERGE_REVERTED:
3866       errorMsg = onRegionMerge(serverName, code, hri,
3867         HRegionInfo.convert(transition.getRegionInfo(1)),
3868         HRegionInfo.convert(transition.getRegionInfo(2)));
3869       break;
3870 
3871     default:
3872       errorMsg = "Unexpected transition code " + code;
3873     }
3874     if (errorMsg != null) {
3875       LOG.error("Failed to transtion region from " + current + " to "
3876         + code + " by " + serverName + ": " + errorMsg);
3877     }
3878     return errorMsg;
3879   }
3880 
3881   /**
3882    * @return Instance of load balancer
3883    */
3884   public LoadBalancer getBalancer() {
3885     return this.balancer;
3886   }
3887 }