View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.util.ArrayList;
24  import java.util.Arrays;
25  import java.util.Collection;
26  import java.util.Collections;
27  import java.util.HashMap;
28  import java.util.HashSet;
29  import java.util.Iterator;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.NavigableMap;
33  import java.util.Random;
34  import java.util.Set;
35  import java.util.TreeMap;
36  import java.util.concurrent.Callable;
37  import java.util.concurrent.ConcurrentHashMap;
38  import java.util.concurrent.CopyOnWriteArrayList;
39  import java.util.concurrent.ThreadFactory;
40  import java.util.concurrent.TimeUnit;
41  import java.util.concurrent.atomic.AtomicBoolean;
42  import java.util.concurrent.atomic.AtomicInteger;
43  import java.util.concurrent.locks.Lock;
44  import java.util.concurrent.locks.ReentrantLock;
45  
46  import org.apache.commons.logging.Log;
47  import org.apache.commons.logging.LogFactory;
48  import org.apache.hadoop.conf.Configuration;
49  import org.apache.hadoop.fs.FileSystem;
50  import org.apache.hadoop.fs.Path;
51  import org.apache.hadoop.hbase.CoordinatedStateException;
52  import org.apache.hadoop.hbase.HBaseIOException;
53  import org.apache.hadoop.hbase.HConstants;
54  import org.apache.hadoop.hbase.HRegionInfo;
55  import org.apache.hadoop.hbase.HRegionLocation;
56  import org.apache.hadoop.hbase.HTableDescriptor;
57  import org.apache.hadoop.hbase.MetaTableAccessor;
58  import org.apache.hadoop.hbase.NotServingRegionException;
59  import org.apache.hadoop.hbase.RegionLocations;
60  import org.apache.hadoop.hbase.RegionStateListener;
61  import org.apache.hadoop.hbase.RegionTransition;
62  import org.apache.hadoop.hbase.ServerName;
63  import org.apache.hadoop.hbase.TableName;
64  import org.apache.hadoop.hbase.TableNotFoundException;
65  import org.apache.hadoop.hbase.TableStateManager;
66  import org.apache.hadoop.hbase.classification.InterfaceAudience;
67  import org.apache.hadoop.hbase.client.RegionReplicaUtil;
68  import org.apache.hadoop.hbase.client.Result;
69  import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
70  import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
71  import org.apache.hadoop.hbase.coordination.RegionMergeCoordination;
72  import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination.SplitTransactionDetails;
73  import org.apache.hadoop.hbase.coordination.ZkOpenRegionCoordination;
74  import org.apache.hadoop.hbase.coordination.ZkRegionMergeCoordination;
75  import org.apache.hadoop.hbase.exceptions.DeserializationException;
76  import org.apache.hadoop.hbase.executor.EventHandler;
77  import org.apache.hadoop.hbase.executor.EventType;
78  import org.apache.hadoop.hbase.executor.ExecutorService;
79  import org.apache.hadoop.hbase.ipc.FailedServerException;
80  import org.apache.hadoop.hbase.ipc.RpcClient;
81  import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
82  import org.apache.hadoop.hbase.master.RegionState.State;
83  import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper;
84  import org.apache.hadoop.hbase.master.balancer.FavoredNodeLoadBalancer;
85  import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler;
86  import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
87  import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
88  import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler;
89  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
90  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
91  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
92  import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
93  import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
94  import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
95  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
96  import org.apache.hadoop.hbase.util.ConfigUtil;
97  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
98  import org.apache.hadoop.hbase.util.FSUtils;
99  import org.apache.hadoop.hbase.util.KeyLocker;
100 import org.apache.hadoop.hbase.util.Pair;
101 import org.apache.hadoop.hbase.util.PairOfSameType;
102 import org.apache.hadoop.hbase.util.Threads;
103 import org.apache.hadoop.hbase.util.Triple;
104 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
105 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
106 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
107 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
108 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
109 import org.apache.hadoop.ipc.RemoteException;
110 import org.apache.hadoop.util.StringUtils;
111 import org.apache.zookeeper.AsyncCallback;
112 import org.apache.zookeeper.KeeperException;
113 import org.apache.zookeeper.KeeperException.NoNodeException;
114 import org.apache.zookeeper.KeeperException.NodeExistsException;
115 import org.apache.zookeeper.data.Stat;
116 
117 import com.google.common.annotations.VisibleForTesting;
118 import com.google.common.collect.LinkedHashMultimap;
119 
120 /**
121  * Manages and performs region assignment.
122  * <p>
123  * Monitors ZooKeeper for events related to regions in transition.
124  * <p>
125  * Handles existing regions in transition during master failover.
126  */
127 @InterfaceAudience.Private
128 public class AssignmentManager extends ZooKeeperListener {
129   private static final Log LOG = LogFactory.getLog(AssignmentManager.class);
130 
131   public static final ServerName HBCK_CODE_SERVERNAME = ServerName.valueOf(HConstants.HBCK_CODE_NAME,
132       -1, -1L);
133 
134   static final String ALREADY_IN_TRANSITION_WAITTIME
135     = "hbase.assignment.already.intransition.waittime";
136   static final int DEFAULT_ALREADY_IN_TRANSITION_WAITTIME = 60000; // 1 minute
137 
138   protected final MasterServices server;
139 
140   private ServerManager serverManager;
141 
142   private boolean shouldAssignRegionsWithFavoredNodes;
143 
144   private LoadBalancer balancer;
145 
146   private final MetricsAssignmentManager metricsAssignmentManager;
147 
148   private final TableLockManager tableLockManager;
149 
150   private AtomicInteger numRegionsOpened = new AtomicInteger(0);
151 
152   final private KeyLocker<String> locker = new KeyLocker<String>();
153 
154   Set<HRegionInfo> replicasToClose = Collections.synchronizedSet(new HashSet<HRegionInfo>());
155 
156   /**
157    * Map of regions to reopen after the schema of a table is changed. Key -
158    * encoded region name, value - HRegionInfo
159    */
160   private final Map <String, HRegionInfo> regionsToReopen;
161 
162   /*
163    * Maximum times we recurse an assignment/unassignment.
164    * See below in {@link #assign()} and {@link #unassign()}.
165    */
166   private final int maximumAttempts;
167 
168   /**
169    * Map of two merging regions from the region to be created.
170    */
171   private final Map<String, PairOfSameType<HRegionInfo>> mergingRegions
172     = new HashMap<String, PairOfSameType<HRegionInfo>>();
173 
174   private final Map<HRegionInfo, PairOfSameType<HRegionInfo>> splitRegions
175   = new HashMap<HRegionInfo, PairOfSameType<HRegionInfo>>();
176 
177   /**
178    * The sleep time for which the assignment will wait before retrying in case of hbase:meta assignment
179    * failure due to lack of availability of region plan or bad region plan
180    */
181   private final long sleepTimeBeforeRetryingMetaAssignment;
182 
183   /** Plans for region movement. Key is the encoded version of a region name*/
184   // TODO: When do plans get cleaned out?  Ever? In server open and in server
185   // shutdown processing -- St.Ack
186   // All access to this Map must be synchronized.
187   final NavigableMap<String, RegionPlan> regionPlans =
188     new TreeMap<String, RegionPlan>();
189 
190   private final TableStateManager tableStateManager;
191 
192   private final ExecutorService executorService;
193 
194   // For unit tests, keep track of calls to ClosedRegionHandler
195   private Map<HRegionInfo, AtomicBoolean> closedRegionHandlerCalled = null;
196 
197   // For unit tests, keep track of calls to OpenedRegionHandler
198   private Map<HRegionInfo, AtomicBoolean> openedRegionHandlerCalled = null;
199 
200   //Thread pool executor service for timeout monitor
201   private java.util.concurrent.ExecutorService threadPoolExecutorService;
202 
203   // A bunch of ZK events workers. Each is a single thread executor service
204   private final java.util.concurrent.ExecutorService zkEventWorkers;
205 
206   private List<EventType> ignoreStatesRSOffline = Arrays.asList(
207       EventType.RS_ZK_REGION_FAILED_OPEN, EventType.RS_ZK_REGION_CLOSED);
208 
209   private final RegionStates regionStates;
210 
211   // The threshold to use bulk assigning. Using bulk assignment
212   // only if assigning at least this many regions to at least this
213   // many servers. If assigning fewer regions to fewer servers,
214   // bulk assigning may be not as efficient.
215   private final int bulkAssignThresholdRegions;
216   private final int bulkAssignThresholdServers;
217   private final int bulkPerRegionOpenTimeGuesstimate;
218 
219   // Should bulk assignment wait till all regions are assigned,
220   // or it is timed out?  This is useful to measure bulk assignment
221   // performance, but not needed in most use cases.
222   private final boolean bulkAssignWaitTillAllAssigned;
223 
224   /**
225    * Indicator that AssignmentManager has recovered the region states so
226    * that ServerShutdownHandler can be fully enabled and re-assign regions
227    * of dead servers. So that when re-assignment happens, AssignmentManager
228    * has proper region states.
229    *
230    * Protected to ease testing.
231    */
232   protected final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
233 
234   /**
235    * A map to track the count a region fails to open in a row.
236    * So that we don't try to open a region forever if the failure is
237    * unrecoverable.  We don't put this information in region states
238    * because we don't expect this to happen frequently; we don't
239    * want to copy this information over during each state transition either.
240    */
241   private final ConcurrentHashMap<String, AtomicInteger>
242     failedOpenTracker = new ConcurrentHashMap<String, AtomicInteger>();
243 
244   // A flag to indicate if we are using ZK for region assignment
245   private final boolean useZKForAssignment;
246 
247   // In case not using ZK for region assignment, region states
248   // are persisted in meta with a state store
249   private final RegionStateStore regionStateStore;
250 
251   /**
252    * For testing only!  Set to true to skip handling of split.
253    */
254   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
255   public static boolean TEST_SKIP_SPLIT_HANDLING = false;
256 
257   /** Listeners that are called on assignment events. */
258   private List<AssignmentListener> listeners = new CopyOnWriteArrayList<AssignmentListener>();
259 
260   private RegionStateListener regionStateListener;
261 
262   public enum ServerHostRegion {
263     NOT_HOSTING_REGION, HOSTING_REGION, UNKNOWN,
264   }
265 
266   /**
267    * Constructs a new assignment manager.
268    *
269    * @param server instance of HMaster this AM running inside
270    * @param serverManager serverManager for associated HMaster
271    * @param balancer implementation of {@link LoadBalancer}
272    * @param service Executor service
273    * @param metricsMaster metrics manager
274    * @param tableLockManager TableLock manager
275    * @throws KeeperException
276    * @throws IOException
277    */
278   public AssignmentManager(MasterServices server, ServerManager serverManager,
279       final LoadBalancer balancer,
280       final ExecutorService service, MetricsMaster metricsMaster,
281       final TableLockManager tableLockManager) throws KeeperException,
282         IOException, CoordinatedStateException {
283     super(server.getZooKeeper());
284     this.server = server;
285     this.serverManager = serverManager;
286     this.executorService = service;
287     this.regionStateStore = new RegionStateStore(server);
288     this.regionsToReopen = Collections.synchronizedMap
289                            (new HashMap<String, HRegionInfo> ());
290     Configuration conf = server.getConfiguration();
291     // Only read favored nodes if using the favored nodes load balancer.
292     this.shouldAssignRegionsWithFavoredNodes = conf.getClass(
293            HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class).equals(
294            FavoredNodeLoadBalancer.class);
295     try {
296       if (server.getCoordinatedStateManager() != null) {
297         this.tableStateManager = server.getCoordinatedStateManager().getTableStateManager();
298       } else {
299         this.tableStateManager = null;
300       }
301     } catch (InterruptedException e) {
302       throw new InterruptedIOException();
303     }
304     // This is the max attempts, not retries, so it should be at least 1.
305     this.maximumAttempts = Math.max(1,
306       this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10));
307     this.sleepTimeBeforeRetryingMetaAssignment = this.server.getConfiguration().getLong(
308         "hbase.meta.assignment.retry.sleeptime", 1000l);
309     this.balancer = balancer;
310     int maxThreads = conf.getInt("hbase.assignment.threads.max", 30);
311     this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool(
312       maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("AM."));
313     this.regionStates = new RegionStates(
314       server, tableStateManager, serverManager, regionStateStore);
315 
316     this.bulkAssignWaitTillAllAssigned =
317       conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false);
318     this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7);
319     this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3);
320     this.bulkPerRegionOpenTimeGuesstimate =
321       conf.getInt("hbase.bulk.assignment.perregion.open.time", 10000);
322 
323     int workers = conf.getInt("hbase.assignment.zkevent.workers", 20);
324     ThreadFactory threadFactory = Threads.newDaemonThreadFactory("AM.ZK.Worker");
325     zkEventWorkers = Threads.getBoundedCachedThreadPool(workers, 60L,
326             TimeUnit.SECONDS, threadFactory);
327     this.tableLockManager = tableLockManager;
328 
329     this.metricsAssignmentManager = new MetricsAssignmentManager();
330     useZKForAssignment = ConfigUtil.useZKForAssignment(conf);
331   }
332 
333   MetricsAssignmentManager getAssignmentManagerMetrics() {
334     return this.metricsAssignmentManager;
335   }
336 
337   /**
338    * Add the listener to the notification list.
339    * @param listener The AssignmentListener to register
340    */
341   public void registerListener(final AssignmentListener listener) {
342     this.listeners.add(listener);
343   }
344 
345   /**
346    * Remove the listener from the notification list.
347    * @param listener The AssignmentListener to unregister
348    */
349   public boolean unregisterListener(final AssignmentListener listener) {
350     return this.listeners.remove(listener);
351   }
352 
353   /**
354    * @return Instance of ZKTableStateManager.
355    */
356   public TableStateManager getTableStateManager() {
357     // These are 'expensive' to make involving trip to zk ensemble so allow
358     // sharing.
359     return this.tableStateManager;
360   }
361 
362   /**
363    * This SHOULD not be public. It is public now
364    * because of some unit tests.
365    *
366    * TODO: make it package private and keep RegionStates in the master package
367    */
368   public RegionStates getRegionStates() {
369     return regionStates;
370   }
371 
372   /**
373    * Used in some tests to mock up region state in meta
374    */
375   @VisibleForTesting
376   RegionStateStore getRegionStateStore() {
377     return regionStateStore;
378   }
379 
380   public RegionPlan getRegionReopenPlan(HRegionInfo hri) {
381     return new RegionPlan(hri, null, regionStates.getRegionServerOfRegion(hri));
382   }
383 
384   /**
385    * Add a regionPlan for the specified region.
386    * @param encodedName
387    * @param plan
388    */
389   public void addPlan(String encodedName, RegionPlan plan) {
390     synchronized (regionPlans) {
391       regionPlans.put(encodedName, plan);
392     }
393   }
394 
395   /**
396    * Add a map of region plans.
397    */
398   public void addPlans(Map<String, RegionPlan> plans) {
399     synchronized (regionPlans) {
400       regionPlans.putAll(plans);
401     }
402   }
403 
404   /**
405    * Set the list of regions that will be reopened
406    * because of an update in table schema
407    *
408    * @param regions
409    *          list of regions that should be tracked for reopen
410    */
411   public void setRegionsToReopen(List <HRegionInfo> regions) {
412     for(HRegionInfo hri : regions) {
413       regionsToReopen.put(hri.getEncodedName(), hri);
414     }
415   }
416 
417   /**
418    * Used by the client to identify if all regions have the schema updates
419    *
420    * @param tableName
421    * @return Pair indicating the status of the alter command
422    * @throws IOException
423    */
424   public Pair<Integer, Integer> getReopenStatus(TableName tableName)
425       throws IOException {
426     List<HRegionInfo> hris;
427     if (TableName.META_TABLE_NAME.equals(tableName)) {
428       hris = new MetaTableLocator().getMetaRegions(server.getZooKeeper());
429     } else {
430       hris = MetaTableAccessor.getTableRegions(server.getZooKeeper(),
431         server.getConnection(), tableName, true);
432     }
433 
434     Integer pending = 0;
435     for (HRegionInfo hri : hris) {
436       String name = hri.getEncodedName();
437       // no lock concurrent access ok: sequential consistency respected.
438       if (regionsToReopen.containsKey(name)
439           || regionStates.isRegionInTransition(name)) {
440         pending++;
441       }
442     }
443     return new Pair<Integer, Integer>(pending, hris.size());
444   }
445 
446   /**
447    * Used by ServerShutdownHandler to make sure AssignmentManager has completed
448    * the failover cleanup before re-assigning regions of dead servers. So that
449    * when re-assignment happens, AssignmentManager has proper region states.
450    */
451   public boolean isFailoverCleanupDone() {
452     return failoverCleanupDone.get();
453   }
454 
455   /**
456    * To avoid racing with AM, external entities may need to lock a region,
457    * for example, when SSH checks what regions to skip re-assigning.
458    */
459   public Lock acquireRegionLock(final String encodedName) {
460     return locker.acquireLock(encodedName);
461   }
462 
463   /**
464    * Now, failover cleanup is completed. Notify server manager to
465    * process queued up dead servers processing, if any.
466    */
467   void failoverCleanupDone() {
468     failoverCleanupDone.set(true);
469     serverManager.processQueuedDeadServers();
470   }
471 
472   /**
473    * Called on startup.
474    * Figures whether a fresh cluster start of we are joining extant running cluster.
475    * @throws IOException
476    * @throws KeeperException
477    * @throws InterruptedException
478    * @throws CoordinatedStateException
479    */
480   void joinCluster() throws IOException,
481       KeeperException, InterruptedException, CoordinatedStateException {
482     long startTime = System.currentTimeMillis();
483     // Concurrency note: In the below the accesses on regionsInTransition are
484     // outside of a synchronization block where usually all accesses to RIT are
485     // synchronized.  The presumption is that in this case it is safe since this
486     // method is being played by a single thread on startup.
487 
488     // TODO: Regions that have a null location and are not in regionsInTransitions
489     // need to be handled.
490 
491     // Scan hbase:meta to build list of existing regions, servers, and assignment
492     // Returns servers who have not checked in (assumed dead) that some regions
493     // were assigned to (according to the meta)
494     Set<ServerName> deadServers = rebuildUserRegions();
495 
496     // This method will assign all user regions if a clean server startup or
497     // it will reconstruct master state and cleanup any leftovers from previous master process.
498     boolean failover = processDeadServersAndRegionsInTransition(deadServers);
499 
500     if (!useZKForAssignment) {
501       // Not use ZK for assignment any more, remove the ZNode
502       ZKUtil.deleteNodeRecursively(watcher, watcher.assignmentZNode);
503     }
504     recoverTableInDisablingState();
505     recoverTableInEnablingState();
506     LOG.info("Joined the cluster in " + (System.currentTimeMillis()
507       - startTime) + "ms, failover=" + failover);
508   }
509 
510   /**
511    * Process all regions that are in transition in zookeeper and also
512    * processes the list of dead servers.
513    * Used by master joining an cluster.  If we figure this is a clean cluster
514    * startup, will assign all user regions.
515    * @param deadServers Set of servers that are offline probably legitimately that were carrying
516    * regions according to a scan of hbase:meta. Can be null.
517    * @throws KeeperException
518    * @throws IOException
519    * @throws InterruptedException
520    */
521   boolean processDeadServersAndRegionsInTransition(final Set<ServerName> deadServers)
522   throws KeeperException, IOException, InterruptedException, CoordinatedStateException {
523     List<String> nodes = ZKUtil.listChildrenNoWatch(watcher, watcher.assignmentZNode);
524 
525     if (useZKForAssignment && nodes == null) {
526       String errorMessage = "Failed to get the children from ZK";
527       server.abort(errorMessage, new IOException(errorMessage));
528       return true; // Doesn't matter in this case
529     }
530 
531     boolean failover = !serverManager.getDeadServers().isEmpty();
532     if (failover) {
533       // This may not be a failover actually, especially if meta is on this master.
534       if (LOG.isDebugEnabled()) {
535         LOG.debug("Found dead servers out on cluster " + serverManager.getDeadServers());
536       }
537     } else {
538       // If any one region except meta is assigned, it's a failover.
539       Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
540       for (Map.Entry<HRegionInfo, ServerName> en:
541           regionStates.getRegionAssignments().entrySet()) {
542         HRegionInfo hri = en.getKey();
543         if (!hri.isMetaTable()
544             && onlineServers.contains(en.getValue())) {
545           LOG.debug("Found " + hri + " out on cluster");
546           failover = true;
547           break;
548         }
549       }
550       if (!failover && nodes != null) {
551         // If any one region except meta is in transition, it's a failover.
552         for (String encodedName: nodes) {
553           RegionState regionState = regionStates.getRegionState(encodedName);
554           if (regionState != null && !regionState.getRegion().isMetaRegion()) {
555             LOG.debug("Found " + regionState + " in RITs");
556             failover = true;
557             break;
558           }
559         }
560       }
561     }
562     if (!failover && !useZKForAssignment) {
563       // If any region except meta is in transition on a live server, it's a failover.
564       Map<String, RegionState> regionsInTransition = regionStates.getRegionsInTransition();
565       if (!regionsInTransition.isEmpty()) {
566         Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
567         for (RegionState regionState: regionsInTransition.values()) {
568           ServerName serverName = regionState.getServerName();
569           if (!regionState.getRegion().isMetaRegion()
570               && serverName != null && onlineServers.contains(serverName)) {
571             LOG.debug("Found " + regionState + " in RITs");
572             failover = true;
573             break;
574           }
575         }
576       }
577     }
578     if (!failover) {
579       // If we get here, we have a full cluster restart. It is a failover only
580       // if there are some WALs are not split yet. For meta WALs, they should have
581       // been split already, if any. We can walk through those queued dead servers,
582       // if they don't have any WALs, this restart should be considered as a clean one
583       Set<ServerName> queuedDeadServers = serverManager.getRequeuedDeadServers().keySet();
584       if (!queuedDeadServers.isEmpty()) {
585         Configuration conf = server.getConfiguration();
586         Path rootdir = FSUtils.getRootDir(conf);
587         FileSystem fs = rootdir.getFileSystem(conf);
588         for (ServerName serverName: queuedDeadServers) {
589           // In the case of a clean exit, the shutdown handler would have presplit any WALs and
590           // removed empty directories.
591           Path logDir = new Path(rootdir,
592               DefaultWALProvider.getWALDirectoryName(serverName.toString()));
593           Path splitDir = logDir.suffix(DefaultWALProvider.SPLITTING_EXT);
594           if (fs.exists(logDir) || fs.exists(splitDir)) {
595             LOG.debug("Found queued dead server " + serverName);
596             failover = true;
597             break;
598           }
599         }
600         if (!failover) {
601           // We figured that it's not a failover, so no need to
602           // work on these re-queued dead servers any more.
603           LOG.info("AM figured that it's not a failover and cleaned up "
604             + queuedDeadServers.size() + " queued dead servers");
605           serverManager.removeRequeuedDeadServers();
606         }
607       }
608     }
609 
610     Set<TableName> disabledOrDisablingOrEnabling = null;
611     Map<HRegionInfo, ServerName> allRegions = null;
612 
613     if (!failover) {
614       disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
615         ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING,
616         ZooKeeperProtos.Table.State.ENABLING);
617 
618       // Clean re/start, mark all user regions closed before reassignment
619       allRegions = regionStates.closeAllUserRegions(
620         disabledOrDisablingOrEnabling);
621     }
622 
623     // Now region states are restored
624     regionStateStore.start();
625 
626     // If we found user regions out on cluster, its a failover.
627     if (failover) {
628       LOG.info("Found regions out on cluster or in RIT; presuming failover");
629       // Process list of dead servers and regions in RIT.
630       // See HBASE-4580 for more information.
631       processDeadServersAndRecoverLostRegions(deadServers);
632     }
633 
634     if (!failover && useZKForAssignment) {
635       // Cleanup any existing ZK nodes and start watching
636       ZKAssign.deleteAllNodes(watcher);
637       ZKUtil.listChildrenAndWatchForNewChildren(this.watcher,
638         this.watcher.assignmentZNode);
639     }
640 
641     // Now we can safely claim failover cleanup completed and enable
642     // ServerShutdownHandler for further processing. The nodes (below)
643     // in transition, if any, are for regions not related to those
644     // dead servers at all, and can be done in parallel to SSH.
645     failoverCleanupDone();
646     if (!failover) {
647       // Fresh cluster startup.
648       LOG.info("Clean cluster startup. Assigning user regions");
649       assignAllUserRegions(allRegions);
650     }
651     // unassign replicas of the split parents and the merged regions
652     // the daughter replicas are opened in assignAllUserRegions if it was
653     // not already opened.
654     for (HRegionInfo h : replicasToClose) {
655       unassign(h);
656     }
657     replicasToClose.clear();
658     return failover;
659   }
660 
661   /**
662    * If region is up in zk in transition, then do fixup and block and wait until
663    * the region is assigned and out of transition.  Used on startup for
664    * catalog regions.
665    * @param hri Region to look for.
666    * @return True if we processed a region in transition else false if region
667    * was not up in zk in transition.
668    * @throws InterruptedException
669    * @throws KeeperException
670    * @throws IOException
671    */
672   boolean processRegionInTransitionAndBlockUntilAssigned(final HRegionInfo hri)
673       throws InterruptedException, KeeperException, IOException {
674     String encodedRegionName = hri.getEncodedName();
675     if (!processRegionInTransition(encodedRegionName, hri)) {
676       return false; // The region is not in transition
677     }
678     LOG.debug("Waiting on " + HRegionInfo.prettyPrint(encodedRegionName));
679     while (!this.server.isStopped() &&
680         this.regionStates.isRegionInTransition(encodedRegionName)) {
681       RegionState state = this.regionStates.getRegionTransitionState(encodedRegionName);
682       if (state == null || !serverManager.isServerOnline(state.getServerName())) {
683         // The region is not in transition, or not in transition on an online
684         // server. Doesn't help to block here any more. Caller need to
685         // verify the region is actually assigned.
686         break;
687       }
688       this.regionStates.waitForUpdate(100);
689     }
690     return true;
691   }
692 
693   /**
694    * Process failover of new master for region <code>encodedRegionName</code>
695    * up in zookeeper.
696    * @param encodedRegionName Region to process failover for.
697    * @param regionInfo If null we'll go get it from meta table.
698    * @return True if we processed <code>regionInfo</code> as a RIT.
699    * @throws KeeperException
700    * @throws IOException
701    */
702   boolean processRegionInTransition(final String encodedRegionName,
703       final HRegionInfo regionInfo) throws KeeperException, IOException {
704     // We need a lock here to ensure that we will not put the same region twice
705     // It has no reason to be a lock shared with the other operations.
706     // We can do the lock on the region only, instead of a global lock: what we want to ensure
707     // is that we don't have two threads working on the same region.
708     Lock lock = locker.acquireLock(encodedRegionName);
709     try {
710       Stat stat = new Stat();
711       byte [] data = ZKAssign.getDataAndWatch(watcher, encodedRegionName, stat);
712       if (data == null) return false;
713       RegionTransition rt;
714       try {
715         rt = RegionTransition.parseFrom(data);
716       } catch (DeserializationException e) {
717         LOG.warn("Failed parse znode data", e);
718         return false;
719       }
720       HRegionInfo hri = regionInfo;
721       if (hri == null) {
722         // The region info is not passed in. We will try to find the region
723         // from region states map/meta based on the encoded region name. But we
724         // may not be able to find it. This is valid for online merge that
725         // the region may have not been created if the merge is not completed.
726         // Therefore, it is not in meta at master recovery time.
727         hri = regionStates.getRegionInfo(rt.getRegionName());
728         EventType et = rt.getEventType();
729         if (hri == null && et != EventType.RS_ZK_REGION_MERGING
730             && et != EventType.RS_ZK_REQUEST_REGION_MERGE) {
731           LOG.warn("Couldn't find the region in recovering " + rt);
732           return false;
733         }
734       }
735 
736       // TODO: This code is tied to ZK anyway, so for now leaving it as is,
737       // will refactor when whole region assignment will be abstracted from ZK
738       BaseCoordinatedStateManager cp =
739         (BaseCoordinatedStateManager) this.server.getCoordinatedStateManager();
740       OpenRegionCoordination openRegionCoordination = cp.getOpenRegionCoordination();
741 
742       ZkOpenRegionCoordination.ZkOpenRegionDetails zkOrd =
743         new ZkOpenRegionCoordination.ZkOpenRegionDetails();
744       zkOrd.setVersion(stat.getVersion());
745       zkOrd.setServerName(cp.getServer().getServerName());
746 
747       return processRegionsInTransition(
748         rt, hri, openRegionCoordination, zkOrd);
749     } finally {
750       lock.unlock();
751     }
752   }
753 
754   /**
755    * This call is invoked only (1) master assign meta;
756    * (2) during failover mode startup, zk assignment node processing.
757    * The locker is set in the caller. It returns true if the region
758    * is in transition for sure, false otherwise.
759    *
760    * It should be private but it is used by some test too.
761    */
762   boolean processRegionsInTransition(
763       final RegionTransition rt, final HRegionInfo regionInfo,
764       OpenRegionCoordination coordination,
765       final OpenRegionCoordination.OpenRegionDetails ord) throws KeeperException {
766     EventType et = rt.getEventType();
767     // Get ServerName.  Could not be null.
768     final ServerName sn = rt.getServerName();
769     final byte[] regionName = rt.getRegionName();
770     final String encodedName = HRegionInfo.encodeRegionName(regionName);
771     final String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
772     LOG.info("Processing " + prettyPrintedRegionName + " in state: " + et);
773 
774     if (regionStates.isRegionInTransition(encodedName)
775         && (regionInfo.isMetaRegion() || !useZKForAssignment)) {
776       LOG.info("Processed region " + prettyPrintedRegionName + " in state: "
777         + et + ", does nothing since the region is already in transition "
778         + regionStates.getRegionTransitionState(encodedName));
779       // Just return
780       return true;
781     }
782     if (!serverManager.isServerOnline(sn)) {
783       // It was transitioning on a dead server, so it's closed now.
784       // Force to OFFLINE and put it in transition, but not assign it
785       // since log splitting for the dead server is not done yet.
786       LOG.debug("RIT " + encodedName + " in state=" + rt.getEventType() +
787         " was on deadserver; forcing offline");
788       if (regionStates.isRegionOnline(regionInfo)) {
789         // Meta could still show the region is assigned to the previous
790         // server. If that server is online, when we reload the meta, the
791         // region is put back to online, we need to offline it.
792         regionStates.regionOffline(regionInfo);
793         sendRegionClosedNotification(regionInfo);
794       }
795       // Put it back in transition so that SSH can re-assign it
796       regionStates.updateRegionState(regionInfo, State.OFFLINE, sn);
797 
798       if (regionInfo.isMetaRegion()) {
799         // If it's meta region, reset the meta location.
800         // So that master knows the right meta region server.
801         MetaTableLocator.setMetaLocation(watcher, sn, State.OPEN);
802       } else {
803         // No matter the previous server is online or offline,
804         // we need to reset the last region server of the region.
805         regionStates.setLastRegionServerOfRegion(sn, encodedName);
806         // Make sure we know the server is dead.
807         if (!serverManager.isServerDead(sn)) {
808           serverManager.expireServer(sn);
809         }
810       }
811       return false;
812     }
813     switch (et) {
814       case M_ZK_REGION_CLOSING:
815         // Insert into RIT & resend the query to the region server: may be the previous master
816         // died before sending the query the first time.
817         final RegionState rsClosing = regionStates.updateRegionState(rt, State.CLOSING);
818         this.executorService.submit(
819           new EventHandler(server, EventType.M_MASTER_RECOVERY) {
820             @Override
821             public void process() throws IOException {
822               ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
823               try {
824                 final int expectedVersion = ((ZkOpenRegionCoordination.ZkOpenRegionDetails) ord)
825                   .getVersion();
826                 unassign(regionInfo, rsClosing, expectedVersion, null, useZKForAssignment, null);
827                 if (regionStates.isRegionOffline(regionInfo)) {
828                   assign(regionInfo, true);
829                 }
830               } finally {
831                 lock.unlock();
832               }
833             }
834           });
835         break;
836 
837       case RS_ZK_REGION_CLOSED:
838       case RS_ZK_REGION_FAILED_OPEN:
839         // Region is closed, insert into RIT and handle it
840         regionStates.setLastRegionServerOfRegion(sn, encodedName);
841         regionStates.updateRegionState(regionInfo, State.CLOSED, sn);
842         if (!replicasToClose.contains(regionInfo)) {
843           invokeAssign(regionInfo);
844         } else {
845           offlineDisabledRegion(regionInfo);
846         }
847         break;
848 
849       case M_ZK_REGION_OFFLINE:
850         // Insert in RIT and resend to the regionserver
851         regionStates.updateRegionState(rt, State.PENDING_OPEN);
852         final RegionState rsOffline = regionStates.getRegionState(regionInfo);
853         this.executorService.submit(
854           new EventHandler(server, EventType.M_MASTER_RECOVERY) {
855             @Override
856             public void process() throws IOException {
857               ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
858               try {
859                 RegionPlan plan = new RegionPlan(regionInfo, null, sn);
860                 addPlan(encodedName, plan);
861                 assign(rsOffline, false, false);
862               } finally {
863                 lock.unlock();
864               }
865             }
866           });
867         break;
868 
869       case RS_ZK_REGION_OPENING:
870         regionStates.updateRegionState(rt, State.OPENING);
871         break;
872 
873       case RS_ZK_REGION_OPENED:
874         // Region is opened, insert into RIT and handle it
875         // This could be done asynchronously, we would need then to acquire the lock in the
876         //  handler.
877         regionStates.updateRegionState(rt, State.OPEN);
878         new OpenedRegionHandler(server, this, regionInfo, coordination, ord).process();
879         break;
880       case RS_ZK_REQUEST_REGION_SPLIT:
881       case RS_ZK_REGION_SPLITTING:
882       case RS_ZK_REGION_SPLIT:
883         // Splitting region should be online. We could have skipped it during
884         // user region rebuilding since we may consider the split is completed.
885         // Put it in SPLITTING state to avoid complications.
886         regionStates.regionOnline(regionInfo, sn);
887         regionStates.updateRegionState(rt, State.SPLITTING);
888         if (!handleRegionSplitting(
889             rt, encodedName, prettyPrintedRegionName, sn)) {
890           deleteSplittingNode(encodedName, sn);
891         }
892         break;
893       case RS_ZK_REQUEST_REGION_MERGE:
894       case RS_ZK_REGION_MERGING:
895       case RS_ZK_REGION_MERGED:
896         if (!handleRegionMerging(
897             rt, encodedName, prettyPrintedRegionName, sn)) {
898           deleteMergingNode(encodedName, sn);
899         }
900         break;
901       default:
902         throw new IllegalStateException("Received region in state:" + et + " is not valid.");
903     }
904     LOG.info("Processed region " + prettyPrintedRegionName + " in state "
905       + et + ", on " + (serverManager.isServerOnline(sn) ? "" : "dead ")
906       + "server: " + sn);
907     return true;
908   }
909 
910   /**
911    * When a region is closed, it should be removed from the regionsToReopen
912    * @param hri HRegionInfo of the region which was closed
913    */
914   public void removeClosedRegion(HRegionInfo hri) {
915     if (regionsToReopen.remove(hri.getEncodedName()) != null) {
916       LOG.debug("Removed region from reopening regions because it was closed");
917     }
918   }
919 
920   /**
921    * Handles various states an unassigned node can be in.
922    * <p>
923    * Method is called when a state change is suspected for an unassigned node.
924    * <p>
925    * This deals with skipped transitions (we got a CLOSED but didn't see CLOSING
926    * yet).
927    * @param rt region transition
928    * @param coordination coordination for opening region
929    * @param ord details about opening region
930    */
931   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
932       value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
933       justification="Needs work; says access to ConcurrentHashMaps not ATOMIC!!!")
934   void handleRegion(final RegionTransition rt, OpenRegionCoordination coordination,
935                     OpenRegionCoordination.OpenRegionDetails ord) {
936     if (rt == null) {
937       LOG.warn("Unexpected NULL input for RegionTransition rt");
938       return;
939     }
940     final ServerName sn = rt.getServerName();
941     // Check if this is a special HBCK transition
942     if (sn.equals(HBCK_CODE_SERVERNAME)) {
943       handleHBCK(rt);
944       return;
945     }
946     final long createTime = rt.getCreateTime();
947     final byte[] regionName = rt.getRegionName();
948     String encodedName = HRegionInfo.encodeRegionName(regionName);
949     String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
950     // Verify this is a known server
951     if (!serverManager.isServerOnline(sn)
952       && !ignoreStatesRSOffline.contains(rt.getEventType())) {
953       LOG.warn("Attempted to handle region transition for server but " +
954         "it is not online: " + prettyPrintedRegionName + ", " + rt);
955       return;
956     }
957 
958     RegionState regionState =
959       regionStates.getRegionState(encodedName);
960     long startTime = System.currentTimeMillis();
961     if (LOG.isDebugEnabled()) {
962       boolean lateEvent = createTime < (startTime - 15000);
963       LOG.debug("Handling " + rt.getEventType() +
964         ", server=" + sn + ", region=" +
965         (prettyPrintedRegionName == null ? "null" : prettyPrintedRegionName) +
966         (lateEvent ? ", which is more than 15 seconds late" : "") +
967         ", current_state=" + regionState);
968     }
969     // We don't do anything for this event,
970     // so separate it out, no need to lock/unlock anything
971     if (rt.getEventType() == EventType.M_ZK_REGION_OFFLINE) {
972       return;
973     }
974 
975     // We need a lock on the region as we could update it
976     Lock lock = locker.acquireLock(encodedName);
977     try {
978       RegionState latestState =
979         regionStates.getRegionState(encodedName);
980       if ((regionState == null && latestState != null)
981           || (regionState != null && latestState == null)
982           || (regionState != null && latestState != null
983             && latestState.getState() != regionState.getState())) {
984         LOG.warn("Region state changed from " + regionState + " to "
985           + latestState + ", while acquiring lock");
986       }
987       long waitedTime = System.currentTimeMillis() - startTime;
988       if (waitedTime > 5000) {
989         LOG.warn("Took " + waitedTime + "ms to acquire the lock");
990       }
991       regionState = latestState;
992       switch (rt.getEventType()) {
993       case RS_ZK_REQUEST_REGION_SPLIT:
994       case RS_ZK_REGION_SPLITTING:
995       case RS_ZK_REGION_SPLIT:
996         if (!handleRegionSplitting(
997             rt, encodedName, prettyPrintedRegionName, sn)) {
998           deleteSplittingNode(encodedName, sn);
999         }
1000         break;
1001 
1002       case RS_ZK_REQUEST_REGION_MERGE:
1003       case RS_ZK_REGION_MERGING:
1004       case RS_ZK_REGION_MERGED:
1005         // Merged region is a new region, we can't find it in the region states now.
1006         // However, the two merging regions are not new. They should be in state for merging.
1007         if (!handleRegionMerging(
1008             rt, encodedName, prettyPrintedRegionName, sn)) {
1009           deleteMergingNode(encodedName, sn);
1010         }
1011         break;
1012 
1013       case M_ZK_REGION_CLOSING:
1014         // Should see CLOSING after we have asked it to CLOSE or additional
1015         // times after already being in state of CLOSING
1016         if (regionState == null
1017             || !regionState.isPendingCloseOrClosingOnServer(sn)) {
1018           LOG.warn("Received CLOSING for " + prettyPrintedRegionName
1019             + " from " + sn + " but the region isn't PENDING_CLOSE/CLOSING here: "
1020             + regionStates.getRegionState(encodedName));
1021           return;
1022         }
1023         // Transition to CLOSING (or update stamp if already CLOSING)
1024         regionStates.updateRegionState(rt, State.CLOSING);
1025         break;
1026 
1027       case RS_ZK_REGION_CLOSED:
1028         // Should see CLOSED after CLOSING but possible after PENDING_CLOSE
1029         if (regionState == null
1030             || !regionState.isPendingCloseOrClosingOnServer(sn)) {
1031           LOG.warn("Received CLOSED for " + prettyPrintedRegionName
1032             + " from " + sn + " but the region isn't PENDING_CLOSE/CLOSING here: "
1033             + regionStates.getRegionState(encodedName));
1034           return;
1035         }
1036         // Handle CLOSED by assigning elsewhere or stopping if a disable
1037         // If we got here all is good.  Need to update RegionState -- else
1038         // what follows will fail because not in expected state.
1039         new ClosedRegionHandler(server, this, regionState.getRegion()).process();
1040         updateClosedRegionHandlerTracker(regionState.getRegion());
1041         break;
1042 
1043         case RS_ZK_REGION_FAILED_OPEN:
1044           if (regionState == null
1045               || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
1046             LOG.warn("Received FAILED_OPEN for " + prettyPrintedRegionName
1047               + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
1048               + regionStates.getRegionState(encodedName));
1049             return;
1050           }
1051           AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
1052           if (failedOpenCount == null) {
1053             failedOpenCount = new AtomicInteger();
1054             // No need to use putIfAbsent, or extra synchronization since
1055             // this whole handleRegion block is locked on the encoded region
1056             // name, and failedOpenTracker is updated only in this block
1057             // FindBugs: AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION
1058             failedOpenTracker.put(encodedName, failedOpenCount);
1059           }
1060           if (failedOpenCount.incrementAndGet() >= maximumAttempts) {
1061             // FindBugs: AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION
1062             regionStates.updateRegionState(rt, State.FAILED_OPEN);
1063             // remove the tracking info to save memory, also reset
1064             // the count for next open initiative
1065             failedOpenTracker.remove(encodedName);
1066           } else {
1067             // Handle this the same as if it were opened and then closed.
1068             regionState = regionStates.updateRegionState(rt, State.CLOSED);
1069             if (regionState != null) {
1070               // When there are more than one region server a new RS is selected as the
1071               // destination and the same is updated in the regionplan. (HBASE-5546)
1072               try {
1073                 getRegionPlan(regionState.getRegion(), sn, true);
1074                 new ClosedRegionHandler(server, this, regionState.getRegion()).process();
1075               } catch (HBaseIOException e) {
1076                 LOG.warn("Failed to get region plan", e);
1077               }
1078             }
1079           }
1080           break;
1081 
1082         case RS_ZK_REGION_OPENING:
1083           // Should see OPENING after we have asked it to OPEN or additional
1084           // times after already being in state of OPENING
1085           if (regionState == null
1086               || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
1087             LOG.warn("Received OPENING for " + prettyPrintedRegionName
1088               + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
1089               + regionStates.getRegionState(encodedName));
1090             return;
1091           }
1092           // Transition to OPENING (or update stamp if already OPENING)
1093           regionStates.updateRegionState(rt, State.OPENING);
1094           break;
1095 
1096         case RS_ZK_REGION_OPENED:
1097           // Should see OPENED after OPENING but possible after PENDING_OPEN.
1098           if (regionState == null
1099               || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
1100             LOG.warn("Received OPENED for " + prettyPrintedRegionName
1101               + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
1102               + regionStates.getRegionState(encodedName));
1103 
1104             if (regionState != null) {
1105               // Close it without updating the internal region states,
1106               // so as not to create double assignments in unlucky scenarios
1107               // mentioned in OpenRegionHandler#process
1108               unassign(regionState.getRegion(), null, -1, null, false, sn);
1109             }
1110             return;
1111           }
1112           // Handle OPENED by removing from transition and deleted zk node
1113           regionState =
1114               regionStates.transitionOpenFromPendingOpenOrOpeningOnServer(rt,regionState, sn);
1115           if (regionState != null) {
1116             failedOpenTracker.remove(encodedName); // reset the count, if any
1117             new OpenedRegionHandler(
1118               server, this, regionState.getRegion(), coordination, ord).process();
1119             updateOpenedRegionHandlerTracker(regionState.getRegion());
1120           }
1121           break;
1122 
1123         default:
1124           throw new IllegalStateException("Received event is not valid.");
1125       }
1126     } finally {
1127       lock.unlock();
1128     }
1129   }
1130 
1131   //For unit tests only
1132   boolean wasClosedHandlerCalled(HRegionInfo hri) {
1133     AtomicBoolean b = closedRegionHandlerCalled.get(hri);
1134     //compareAndSet to be sure that unit tests don't see stale values. Means,
1135     //we will return true exactly once unless the handler code resets to true
1136     //this value.
1137     return b == null ? false : b.compareAndSet(true, false);
1138   }
1139 
1140   //For unit tests only
1141   boolean wasOpenedHandlerCalled(HRegionInfo hri) {
1142     AtomicBoolean b = openedRegionHandlerCalled.get(hri);
1143     //compareAndSet to be sure that unit tests don't see stale values. Means,
1144     //we will return true exactly once unless the handler code resets to true
1145     //this value.
1146     return b == null ? false : b.compareAndSet(true, false);
1147   }
1148 
1149   //For unit tests only
1150   void initializeHandlerTrackers() {
1151     closedRegionHandlerCalled = new HashMap<HRegionInfo, AtomicBoolean>();
1152     openedRegionHandlerCalled = new HashMap<HRegionInfo, AtomicBoolean>();
1153   }
1154 
1155   void updateClosedRegionHandlerTracker(HRegionInfo hri) {
1156     if (closedRegionHandlerCalled != null) { //only for unit tests this is true
1157       closedRegionHandlerCalled.put(hri, new AtomicBoolean(true));
1158     }
1159   }
1160 
1161   void updateOpenedRegionHandlerTracker(HRegionInfo hri) {
1162     if (openedRegionHandlerCalled != null) { //only for unit tests this is true
1163       openedRegionHandlerCalled.put(hri, new AtomicBoolean(true));
1164     }
1165   }
1166 
1167   // TODO: processFavoredNodes might throw an exception, for e.g., if the
1168   // meta could not be contacted/updated. We need to see how seriously to treat
1169   // this problem as. Should we fail the current assignment. We should be able
1170   // to recover from this problem eventually (if the meta couldn't be updated
1171   // things should work normally and eventually get fixed up).
1172   void processFavoredNodes(List<HRegionInfo> regions) throws IOException {
1173     if (!shouldAssignRegionsWithFavoredNodes) return;
1174     // The AM gets the favored nodes info for each region and updates the meta
1175     // table with that info
1176     Map<HRegionInfo, List<ServerName>> regionToFavoredNodes =
1177         new HashMap<HRegionInfo, List<ServerName>>();
1178     for (HRegionInfo region : regions) {
1179       regionToFavoredNodes.put(region,
1180           ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region));
1181     }
1182     FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(regionToFavoredNodes,
1183       this.server.getConnection());
1184   }
1185 
1186   /**
1187    * Handle a ZK unassigned node transition triggered by HBCK repair tool.
1188    * <p>
1189    * This is handled in a separate code path because it breaks the normal rules.
1190    * @param rt
1191    */
1192   @SuppressWarnings("deprecation")
1193   private void handleHBCK(RegionTransition rt) {
1194     String encodedName = HRegionInfo.encodeRegionName(rt.getRegionName());
1195     LOG.info("Handling HBCK triggered transition=" + rt.getEventType() +
1196       ", server=" + rt.getServerName() + ", region=" +
1197       HRegionInfo.prettyPrint(encodedName));
1198     RegionState regionState = regionStates.getRegionTransitionState(encodedName);
1199     switch (rt.getEventType()) {
1200       case M_ZK_REGION_OFFLINE:
1201         HRegionInfo regionInfo;
1202         if (regionState != null) {
1203           regionInfo = regionState.getRegion();
1204         } else {
1205           try {
1206             byte [] name = rt.getRegionName();
1207             Pair<HRegionInfo, ServerName> p = MetaTableAccessor.getRegion(
1208               this.server.getConnection(), name);
1209             regionInfo = p.getFirst();
1210           } catch (IOException e) {
1211             LOG.info("Exception reading hbase:meta doing HBCK repair operation", e);
1212             return;
1213           }
1214         }
1215         LOG.info("HBCK repair is triggering assignment of region=" +
1216             regionInfo.getRegionNameAsString());
1217         // trigger assign, node is already in OFFLINE so don't need to update ZK
1218         assign(regionInfo, false);
1219         break;
1220 
1221       default:
1222         LOG.warn("Received unexpected region state from HBCK: " + rt.toString());
1223         break;
1224     }
1225 
1226   }
1227 
1228   // ZooKeeper events
1229 
1230   /**
1231    * New unassigned node has been created.
1232    *
1233    * <p>This happens when an RS begins the OPENING or CLOSING of a region by
1234    * creating an unassigned node.
1235    *
1236    * <p>When this happens we must:
1237    * <ol>
1238    *   <li>Watch the node for further events</li>
1239    *   <li>Read and handle the state in the node</li>
1240    * </ol>
1241    */
1242   @Override
1243   public void nodeCreated(String path) {
1244     handleAssignmentEvent(path);
1245   }
1246 
1247   /**
1248    * Existing unassigned node has had data changed.
1249    *
1250    * <p>This happens when an RS transitions from OFFLINE to OPENING, or between
1251    * OPENING/OPENED and CLOSING/CLOSED.
1252    *
1253    * <p>When this happens we must:
1254    * <ol>
1255    *   <li>Watch the node for further events</li>
1256    *   <li>Read and handle the state in the node</li>
1257    * </ol>
1258    */
1259   @Override
1260   public void nodeDataChanged(String path) {
1261     handleAssignmentEvent(path);
1262   }
1263 
1264 
1265   // We  don't want to have two events on the same region managed simultaneously.
1266   // For this reason, we need to wait if an event on the same region is currently in progress.
1267   // So we track the region names of the events in progress, and we keep a waiting list.
1268   private final Set<String> regionsInProgress = new HashSet<String>();
1269   // In a LinkedHashMultimap, the put order is kept when we retrieve the collection back. We need
1270   //  this as we want the events to be managed in the same order as we received them.
1271   private final LinkedHashMultimap <String, RegionRunnable>
1272       zkEventWorkerWaitingList = LinkedHashMultimap.create();
1273 
1274   /**
1275    * A specific runnable that works only on a region.
1276    */
1277   private interface RegionRunnable extends Runnable{
1278     /**
1279      * @return - the name of the region it works on.
1280      */
1281     String getRegionName();
1282   }
1283 
1284   /**
1285    * Submit a task, ensuring that there is only one task at a time that working on a given region.
1286    * Order is respected.
1287    */
1288   protected void zkEventWorkersSubmit(final RegionRunnable regRunnable) {
1289 
1290     synchronized (regionsInProgress) {
1291       // If we're there is already a task with this region, we add it to the
1292       //  waiting list and return.
1293       if (regionsInProgress.contains(regRunnable.getRegionName())) {
1294         synchronized (zkEventWorkerWaitingList){
1295           zkEventWorkerWaitingList.put(regRunnable.getRegionName(), regRunnable);
1296         }
1297         return;
1298       }
1299 
1300       // No event in progress on this region => we can submit a new task immediately.
1301       regionsInProgress.add(regRunnable.getRegionName());
1302       zkEventWorkers.submit(new Runnable() {
1303         @Override
1304         public void run() {
1305           try {
1306             regRunnable.run();
1307           } finally {
1308             // now that we have finished, let's see if there is an event for the same region in the
1309             //  waiting list. If it's the case, we can now submit it to the pool.
1310             synchronized (regionsInProgress) {
1311               regionsInProgress.remove(regRunnable.getRegionName());
1312               synchronized (zkEventWorkerWaitingList) {
1313                 java.util.Set<RegionRunnable> waiting = zkEventWorkerWaitingList.get(
1314                     regRunnable.getRegionName());
1315                 if (!waiting.isEmpty()) {
1316                   // We want the first object only. The only way to get it is through an iterator.
1317                   RegionRunnable toSubmit = waiting.iterator().next();
1318                   zkEventWorkerWaitingList.remove(toSubmit.getRegionName(), toSubmit);
1319                   zkEventWorkersSubmit(toSubmit);
1320                 }
1321               }
1322             }
1323           }
1324         }
1325       });
1326     }
1327   }
1328 
1329   @Override
1330   public void nodeDeleted(final String path) {
1331     if (path.startsWith(watcher.assignmentZNode)) {
1332       final String regionName = ZKAssign.getRegionName(watcher, path);
1333       zkEventWorkersSubmit(new RegionRunnable() {
1334         @Override
1335         public String getRegionName() {
1336           return regionName;
1337         }
1338 
1339         @Override
1340         public void run() {
1341           Lock lock = locker.acquireLock(regionName);
1342           try {
1343             RegionState rs = regionStates.getRegionTransitionState(regionName);
1344             if (rs == null) {
1345               rs = regionStates.getRegionState(regionName);
1346               if (rs == null || !rs.isMergingNew()) {
1347                 // MergingNew is an offline state
1348                 return;
1349               }
1350             }
1351 
1352             HRegionInfo regionInfo = rs.getRegion();
1353             String regionNameStr = regionInfo.getRegionNameAsString();
1354             LOG.debug("Znode " + regionNameStr + " deleted, state: " + rs);
1355 
1356             boolean disabled = getTableStateManager().isTableState(regionInfo.getTable(),
1357                 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING);
1358 
1359             ServerName serverName = rs.getServerName();
1360             if (serverManager.isServerOnline(serverName)) {
1361               if (rs.isOnServer(serverName) && (rs.isOpened() || rs.isSplitting())) {
1362                 synchronized (regionStates) {
1363                   regionOnline(regionInfo, serverName);
1364                   if (rs.isSplitting() && splitRegions.containsKey(regionInfo)) {
1365                     // Check if the daugter regions are still there, if they are present, offline
1366                     // as its the case of a rollback.
1367                     HRegionInfo hri_a = splitRegions.get(regionInfo).getFirst();
1368                     HRegionInfo hri_b = splitRegions.get(regionInfo).getSecond();
1369                     if (!regionStates.isRegionInTransition(hri_a.getEncodedName())) {
1370                       LOG.warn("Split daughter region not in transition " + hri_a);
1371                     }
1372                     if (!regionStates.isRegionInTransition(hri_b.getEncodedName())) {
1373                       LOG.warn("Split daughter region not in transition" + hri_b);
1374                     }
1375                     regionOffline(hri_a);
1376                     regionOffline(hri_b);
1377                     splitRegions.remove(regionInfo);
1378                   }
1379                   if (disabled) {
1380                     // if server is offline, no hurt to unassign again
1381                     LOG.info("Opened " + regionNameStr
1382                         + "but this table is disabled, triggering close of region");
1383                     unassign(regionInfo);
1384                   }
1385                 }
1386               } else if (rs.isMergingNew()) {
1387                 synchronized (regionStates) {
1388                   String p = regionInfo.getEncodedName();
1389                   PairOfSameType<HRegionInfo> regions = mergingRegions.get(p);
1390                   if (regions != null) {
1391                     onlineMergingRegion(disabled, regions.getFirst(), serverName);
1392                     onlineMergingRegion(disabled, regions.getSecond(), serverName);
1393                   }
1394                 }
1395               }
1396             }
1397           } finally {
1398             lock.unlock();
1399           }
1400         }
1401 
1402         private void onlineMergingRegion(boolean disabled,
1403             final HRegionInfo hri, final ServerName serverName) {
1404           RegionState regionState = regionStates.getRegionState(hri);
1405           if (regionState != null && regionState.isMerging()
1406               && regionState.isOnServer(serverName)) {
1407             regionOnline(regionState.getRegion(), serverName);
1408             if (disabled) {
1409               unassign(hri);
1410             }
1411           }
1412         }
1413       });
1414     }
1415   }
1416 
1417   /**
1418    * New unassigned node has been created.
1419    *
1420    * <p>This happens when an RS begins the OPENING, SPLITTING or CLOSING of a
1421    * region by creating a znode.
1422    *
1423    * <p>When this happens we must:
1424    * <ol>
1425    *   <li>Watch the node for further children changed events</li>
1426    *   <li>Watch all new children for changed events</li>
1427    * </ol>
1428    */
1429   @Override
1430   public void nodeChildrenChanged(String path) {
1431     if (path.equals(watcher.assignmentZNode)) {
1432       zkEventWorkers.submit(new Runnable() {
1433         @Override
1434         public void run() {
1435           try {
1436             // Just make sure we see the changes for the new znodes
1437             List<String> children =
1438               ZKUtil.listChildrenAndWatchForNewChildren(
1439                 watcher, watcher.assignmentZNode);
1440             if (children != null) {
1441               Stat stat = new Stat();
1442               for (String child : children) {
1443                 // if region is in transition, we already have a watch
1444                 // on it, so no need to watch it again. So, as I know for now,
1445                 // this is needed to watch splitting nodes only.
1446                 if (!regionStates.isRegionInTransition(child)) {
1447                   ZKAssign.getDataAndWatch(watcher, child, stat);
1448                 }
1449               }
1450             }
1451           } catch (KeeperException e) {
1452             server.abort("Unexpected ZK exception reading unassigned children", e);
1453           }
1454         }
1455       });
1456     }
1457   }
1458 
1459 
1460   /**
1461    * Marks the region as online.  Removes it from regions in transition and
1462    * updates the in-memory assignment information.
1463    * <p>
1464    * Used when a region has been successfully opened on a region server.
1465    * @param regionInfo
1466    * @param sn
1467    */
1468   void regionOnline(HRegionInfo regionInfo, ServerName sn) {
1469     regionOnline(regionInfo, sn, HConstants.NO_SEQNUM);
1470   }
1471 
1472   void regionOnline(HRegionInfo regionInfo, ServerName sn, long openSeqNum) {
1473     numRegionsOpened.incrementAndGet();
1474     regionStates.regionOnline(regionInfo, sn, openSeqNum);
1475 
1476     // Remove plan if one.
1477     clearRegionPlan(regionInfo);
1478     balancer.regionOnline(regionInfo, sn);
1479 
1480     // Tell our listeners that a region was opened
1481     sendRegionOpenedNotification(regionInfo, sn);
1482   }
1483 
1484   /**
1485    * Pass the assignment event to a worker for processing.
1486    * Each worker is a single thread executor service.  The reason
1487    * for just one thread is to make sure all events for a given
1488    * region are processed in order.
1489    *
1490    * @param path
1491    */
1492   private void handleAssignmentEvent(final String path) {
1493     if (path.startsWith(watcher.assignmentZNode)) {
1494       final String regionName = ZKAssign.getRegionName(watcher, path);
1495 
1496       zkEventWorkersSubmit(new RegionRunnable() {
1497         @Override
1498         public String getRegionName() {
1499           return regionName;
1500         }
1501 
1502         @Override
1503         public void run() {
1504           try {
1505             Stat stat = new Stat();
1506             byte [] data = ZKAssign.getDataAndWatch(watcher, path, stat);
1507             if (data == null) return;
1508 
1509             RegionTransition rt = RegionTransition.parseFrom(data);
1510 
1511             // TODO: This code is tied to ZK anyway, so for now leaving it as is,
1512             // will refactor when whole region assignment will be abstracted from ZK
1513             BaseCoordinatedStateManager csm =
1514               (BaseCoordinatedStateManager) server.getCoordinatedStateManager();
1515             OpenRegionCoordination openRegionCoordination = csm.getOpenRegionCoordination();
1516 
1517             ZkOpenRegionCoordination.ZkOpenRegionDetails zkOrd =
1518               new ZkOpenRegionCoordination.ZkOpenRegionDetails();
1519             zkOrd.setVersion(stat.getVersion());
1520             zkOrd.setServerName(csm.getServer().getServerName());
1521 
1522             handleRegion(rt, openRegionCoordination, zkOrd);
1523           } catch (KeeperException e) {
1524             server.abort("Unexpected ZK exception reading unassigned node data", e);
1525           } catch (DeserializationException e) {
1526             server.abort("Unexpected exception deserializing node data", e);
1527           }
1528         }
1529       });
1530     }
1531   }
1532 
1533   /**
1534    * Marks the region as offline.  Removes it from regions in transition and
1535    * removes in-memory assignment information.
1536    * <p>
1537    * Used when a region has been closed and should remain closed.
1538    * @param regionInfo
1539    */
1540   public void regionOffline(final HRegionInfo regionInfo) {
1541     regionOffline(regionInfo, null);
1542   }
1543 
1544   public void offlineDisabledRegion(HRegionInfo regionInfo) {
1545     if (useZKForAssignment) {
1546       // Disabling so should not be reassigned, just delete the CLOSED node
1547       LOG.debug("Table being disabled so deleting ZK node and removing from " +
1548         "regions in transition, skipping assignment of region " +
1549           regionInfo.getRegionNameAsString());
1550       String encodedName = regionInfo.getEncodedName();
1551       deleteNodeInStates(encodedName, "closed", null,
1552         EventType.RS_ZK_REGION_CLOSED, EventType.M_ZK_REGION_OFFLINE);
1553     }
1554     replicasToClose.remove(regionInfo);
1555     regionOffline(regionInfo);
1556   }
1557 
1558   // Assignment methods
1559 
1560   /**
1561    * Assigns the specified region.
1562    * <p>
1563    * If a RegionPlan is available with a valid destination then it will be used
1564    * to determine what server region is assigned to.  If no RegionPlan is
1565    * available, region will be assigned to a random available server.
1566    * <p>
1567    * Updates the RegionState and sends the OPEN RPC.
1568    * <p>
1569    * This will only succeed if the region is in transition and in a CLOSED or
1570    * OFFLINE state or not in transition (in-memory not zk), and of course, the
1571    * chosen server is up and running (It may have just crashed!).  If the
1572    * in-memory checks pass, the zk node is forced to OFFLINE before assigning.
1573    *
1574    * @param region server to be assigned
1575    * @param setOfflineInZK whether ZK node should be created/transitioned to an
1576    *                       OFFLINE state before assigning the region
1577    */
1578   public void assign(HRegionInfo region, boolean setOfflineInZK) {
1579     assign(region, setOfflineInZK, false);
1580   }
1581 
1582   /**
1583    * Use care with forceNewPlan. It could cause double assignment.
1584    */
1585   public void assign(HRegionInfo region,
1586       boolean setOfflineInZK, boolean forceNewPlan) {
1587     if (isDisabledorDisablingRegionInRIT(region)) {
1588       return;
1589     }
1590     String encodedName = region.getEncodedName();
1591     Lock lock = locker.acquireLock(encodedName);
1592     try {
1593       RegionState state = forceRegionStateToOffline(region, forceNewPlan);
1594       if (state != null) {
1595         if (regionStates.wasRegionOnDeadServer(encodedName)) {
1596           LOG.info("Skip assigning " + region.getRegionNameAsString()
1597             + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
1598             + " is dead but not processed yet");
1599           return;
1600         }
1601         assign(state, setOfflineInZK && useZKForAssignment, forceNewPlan);
1602       }
1603     } finally {
1604       lock.unlock();
1605     }
1606   }
1607 
1608   /**
1609    * Bulk assign regions to <code>destination</code>.
1610    * @param destination
1611    * @param regions Regions to assign.
1612    * @return true if successful
1613    */
1614   boolean assign(final ServerName destination, final List<HRegionInfo> regions)
1615     throws InterruptedException {
1616     long startTime = EnvironmentEdgeManager.currentTime();
1617     try {
1618       int regionCount = regions.size();
1619       if (regionCount == 0) {
1620         return true;
1621       }
1622       LOG.info("Assigning " + regionCount + " region(s) to " + destination.toString());
1623       Set<String> encodedNames = new HashSet<String>(regionCount);
1624       for (HRegionInfo region : regions) {
1625         encodedNames.add(region.getEncodedName());
1626       }
1627 
1628       List<HRegionInfo> failedToOpenRegions = new ArrayList<HRegionInfo>();
1629       Map<String, Lock> locks = locker.acquireLocks(encodedNames);
1630       try {
1631         AtomicInteger counter = new AtomicInteger(0);
1632         Map<String, Integer> offlineNodesVersions = new ConcurrentHashMap<String, Integer>();
1633         OfflineCallback cb = new OfflineCallback(
1634           watcher, destination, counter, offlineNodesVersions);
1635         Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>(regions.size());
1636         List<RegionState> states = new ArrayList<RegionState>(regions.size());
1637         for (HRegionInfo region : regions) {
1638           String encodedName = region.getEncodedName();
1639           if (!isDisabledorDisablingRegionInRIT(region)) {
1640             RegionState state = forceRegionStateToOffline(region, false);
1641             boolean onDeadServer = false;
1642             if (state != null) {
1643               if (regionStates.wasRegionOnDeadServer(encodedName)) {
1644                 LOG.info("Skip assigning " + region.getRegionNameAsString()
1645                   + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
1646                   + " is dead but not processed yet");
1647                 onDeadServer = true;
1648               } else if (!useZKForAssignment
1649                   || asyncSetOfflineInZooKeeper(state, cb, destination)) {
1650                 RegionPlan plan = new RegionPlan(region, state.getServerName(), destination);
1651                 plans.put(encodedName, plan);
1652                 states.add(state);
1653                 continue;
1654               }
1655             }
1656             // Reassign if the region wasn't on a dead server
1657             if (!onDeadServer) {
1658               LOG.info("failed to force region state to offline or "
1659                 + "failed to set it offline in ZK, will reassign later: " + region);
1660               failedToOpenRegions.add(region); // assign individually later
1661             }
1662           }
1663           // Release the lock, this region is excluded from bulk assign because
1664           // we can't update its state, or set its znode to offline.
1665           Lock lock = locks.remove(encodedName);
1666           lock.unlock();
1667         }
1668 
1669         if (useZKForAssignment) {
1670           // Wait until all unassigned nodes have been put up and watchers set.
1671           int total = states.size();
1672           for (int oldCounter = 0; !server.isStopped();) {
1673             int count = counter.get();
1674             if (oldCounter != count) {
1675               LOG.debug(destination.toString() + " unassigned znodes=" + count +
1676                 " of total=" + total + "; oldCounter=" + oldCounter);
1677               oldCounter = count;
1678             }
1679             if (count >= total) break;
1680             Thread.sleep(5);
1681           }
1682         }
1683 
1684         if (server.isStopped()) {
1685           return false;
1686         }
1687 
1688         // Add region plans, so we can updateTimers when one region is opened so
1689         // that unnecessary timeout on RIT is reduced.
1690         this.addPlans(plans);
1691 
1692         List<Triple<HRegionInfo, Integer, List<ServerName>>> regionOpenInfos =
1693           new ArrayList<Triple<HRegionInfo, Integer, List<ServerName>>>(states.size());
1694         for (RegionState state: states) {
1695           HRegionInfo region = state.getRegion();
1696           String encodedRegionName = region.getEncodedName();
1697           Integer nodeVersion = offlineNodesVersions.get(encodedRegionName);
1698           if (useZKForAssignment && (nodeVersion == null || nodeVersion == -1)) {
1699             LOG.warn("failed to offline in zookeeper: " + region);
1700             failedToOpenRegions.add(region); // assign individually later
1701             Lock lock = locks.remove(encodedRegionName);
1702             lock.unlock();
1703           } else {
1704             regionStates.updateRegionState(
1705               region, State.PENDING_OPEN, destination);
1706             List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
1707             if (this.shouldAssignRegionsWithFavoredNodes) {
1708               favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
1709             }
1710             regionOpenInfos.add(new Triple<HRegionInfo, Integer,  List<ServerName>>(
1711               region, nodeVersion, favoredNodes));
1712           }
1713         }
1714 
1715         // Move on to open regions.
1716         try {
1717           // Send OPEN RPC. If it fails on a IOE or RemoteException,
1718           // regions will be assigned individually.
1719           long maxWaitTime = System.currentTimeMillis() +
1720             this.server.getConfiguration().
1721               getLong("hbase.regionserver.rpc.startup.waittime", 60000);
1722           for (int i = 1; i <= maximumAttempts && !server.isStopped(); i++) {
1723             try {
1724               // regionOpenInfos is empty if all regions are in failedToOpenRegions list
1725               if (regionOpenInfos.isEmpty()) {
1726                 break;
1727               }
1728               List<RegionOpeningState> regionOpeningStateList = serverManager
1729                 .sendRegionOpen(destination, regionOpenInfos);
1730               if (regionOpeningStateList == null) {
1731                 // Failed getting RPC connection to this server
1732                 return false;
1733               }
1734               for (int k = 0, n = regionOpeningStateList.size(); k < n; k++) {
1735                 RegionOpeningState openingState = regionOpeningStateList.get(k);
1736                 if (openingState != RegionOpeningState.OPENED) {
1737                   HRegionInfo region = regionOpenInfos.get(k).getFirst();
1738                   if (openingState == RegionOpeningState.ALREADY_OPENED) {
1739                     processAlreadyOpenedRegion(region, destination);
1740                   } else if (openingState == RegionOpeningState.FAILED_OPENING) {
1741                     // Failed opening this region, reassign it later
1742                     failedToOpenRegions.add(region);
1743                   } else {
1744                     LOG.warn("THIS SHOULD NOT HAPPEN: unknown opening state "
1745                       + openingState + " in assigning region " + region);
1746                   }
1747                 }
1748               }
1749               break;
1750             } catch (IOException e) {
1751               if (e instanceof RemoteException) {
1752                 e = ((RemoteException)e).unwrapRemoteException();
1753               }
1754               if (e instanceof RegionServerStoppedException) {
1755                 LOG.warn("The region server was shut down, ", e);
1756                 // No need to retry, the region server is a goner.
1757                 return false;
1758               } else if (e instanceof ServerNotRunningYetException) {
1759                 long now = System.currentTimeMillis();
1760                 if (now < maxWaitTime) {
1761                   LOG.debug("Server is not yet up; waiting up to " +
1762                     (maxWaitTime - now) + "ms", e);
1763                   Thread.sleep(100);
1764                   i--; // reset the try count
1765                   continue;
1766                 }
1767               } else if (e instanceof java.net.SocketTimeoutException
1768                   && this.serverManager.isServerOnline(destination)) {
1769                 // In case socket is timed out and the region server is still online,
1770                 // the openRegion RPC could have been accepted by the server and
1771                 // just the response didn't go through.  So we will retry to
1772                 // open the region on the same server.
1773                 if (LOG.isDebugEnabled()) {
1774                   LOG.debug("Bulk assigner openRegion() to " + destination
1775                     + " has timed out, but the regions might"
1776                     + " already be opened on it.", e);
1777                 }
1778                 // wait and reset the re-try count, server might be just busy.
1779                 Thread.sleep(100);
1780                 i--;
1781                 continue;
1782               }
1783               throw e;
1784             }
1785           }
1786         } catch (IOException e) {
1787           // Can be a socket timeout, EOF, NoRouteToHost, etc
1788           LOG.info("Unable to communicate with " + destination
1789             + " in order to assign regions, ", e);
1790           return false;
1791         }
1792       } finally {
1793         for (Lock lock : locks.values()) {
1794           lock.unlock();
1795         }
1796       }
1797 
1798       if (!failedToOpenRegions.isEmpty()) {
1799         for (HRegionInfo region : failedToOpenRegions) {
1800           if (!regionStates.isRegionOnline(region)) {
1801             invokeAssign(region);
1802           }
1803         }
1804       }
1805 
1806       // wait for assignment completion
1807       ArrayList<HRegionInfo> userRegionSet = new ArrayList<HRegionInfo>(regions.size());
1808       for (HRegionInfo region: regions) {
1809         if (!region.getTable().isSystemTable()) {
1810           userRegionSet.add(region);
1811         }
1812       }
1813       if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
1814             System.currentTimeMillis())) {
1815         LOG.debug("some user regions are still in transition: " + userRegionSet);
1816       }
1817       LOG.debug("Bulk assigning done for " + destination);
1818       return true;
1819     } finally {
1820       metricsAssignmentManager.updateBulkAssignTime(EnvironmentEdgeManager.currentTime() - startTime);
1821     }
1822   }
1823 
1824   /**
1825    * Send CLOSE RPC if the server is online, otherwise, offline the region.
1826    *
1827    * The RPC will be sent only to the region sever found in the region state
1828    * if it is passed in, otherwise, to the src server specified. If region
1829    * state is not specified, we don't update region state at all, instead
1830    * we just send the RPC call. This is useful for some cleanup without
1831    * messing around the region states (see handleRegion, on region opened
1832    * on an unexpected server scenario, for an example)
1833    */
1834   private void unassign(final HRegionInfo region,
1835       final RegionState state, final int versionOfClosingNode,
1836       final ServerName dest, final boolean transitionInZK,
1837       final ServerName src) {
1838     ServerName server = src;
1839     if (state != null) {
1840       server = state.getServerName();
1841     }
1842     long maxWaitTime = -1;
1843     for (int i = 1; i <= this.maximumAttempts; i++) {
1844       if (this.server.isStopped() || this.server.isAborted()) {
1845         LOG.debug("Server stopped/aborted; skipping unassign of " + region);
1846         return;
1847       }
1848       // ClosedRegionhandler can remove the server from this.regions
1849       if (!serverManager.isServerOnline(server)) {
1850         LOG.debug("Offline " + region.getRegionNameAsString()
1851           + ", no need to unassign since it's on a dead server: " + server);
1852         if (transitionInZK) {
1853           // delete the node. if no node exists need not bother.
1854           deleteClosingOrClosedNode(region, server);
1855         }
1856         if (state != null) {
1857           regionOffline(region);
1858         }
1859         return;
1860       }
1861       try {
1862         // Send CLOSE RPC
1863         if (serverManager.sendRegionClose(server, region,
1864           versionOfClosingNode, dest, transitionInZK)) {
1865           LOG.debug("Sent CLOSE to " + server + " for region " +
1866             region.getRegionNameAsString());
1867           if (useZKForAssignment && !transitionInZK && state != null) {
1868             // Retry to make sure the region is
1869             // closed so as to avoid double assignment.
1870             unassign(region, state, versionOfClosingNode,
1871               dest, transitionInZK, src);
1872           }
1873           return;
1874         }
1875         // This never happens. Currently regionserver close always return true.
1876         // Todo; this can now happen (0.96) if there is an exception in a coprocessor
1877         LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
1878           region.getRegionNameAsString());
1879       } catch (Throwable t) {
1880         long sleepTime = 0;
1881         Configuration conf = this.server.getConfiguration();
1882         if (t instanceof RemoteException) {
1883           t = ((RemoteException)t).unwrapRemoteException();
1884         }
1885         boolean logRetries = true;
1886         if (t instanceof RegionServerAbortedException
1887             || t instanceof RegionServerStoppedException
1888             || t instanceof ServerNotRunningYetException) {
1889           // RS is aborting or stopping, we cannot offline the region since the region may need
1890           // to do WAL recovery. Until we see  the RS expiration, we should retry.
1891           sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1892             RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
1893 
1894         } else if (t instanceof NotServingRegionException) {
1895           LOG.debug("Offline " + region.getRegionNameAsString()
1896             + ", it's not any more on " + server, t);
1897           if (transitionInZK) {
1898             deleteClosingOrClosedNode(region, server);
1899           }
1900           if (state != null) {
1901             regionOffline(region);
1902           }
1903           return;
1904         } else if ((t instanceof FailedServerException) || (state != null &&
1905             t instanceof RegionAlreadyInTransitionException)) {
1906           if(t instanceof FailedServerException) {
1907             sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1908                   RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
1909           } else {
1910             // RS is already processing this region, only need to update the timestamp
1911             LOG.debug("update " + state + " the timestamp.");
1912             state.updateTimestampToNow();
1913             if (maxWaitTime < 0) {
1914               maxWaitTime =
1915                   EnvironmentEdgeManager.currentTime()
1916                       + conf.getLong(ALREADY_IN_TRANSITION_WAITTIME,
1917                         DEFAULT_ALREADY_IN_TRANSITION_WAITTIME);
1918             }
1919             long now = EnvironmentEdgeManager.currentTime();
1920             if (now < maxWaitTime) {
1921               LOG.debug("Region is already in transition; "
1922                 + "waiting up to " + (maxWaitTime - now) + "ms", t);
1923               sleepTime = 100;
1924               i--; // reset the try count
1925               logRetries = false;
1926             }
1927           }
1928         }
1929 
1930         try {
1931           if (sleepTime > 0) {
1932             Thread.sleep(sleepTime);
1933           }
1934         } catch (InterruptedException ie) {
1935           LOG.warn("Failed to unassign "
1936             + region.getRegionNameAsString() + " since interrupted", ie);
1937           Thread.currentThread().interrupt();
1938           if (state != null) {
1939             regionStates.updateRegionState(region, State.FAILED_CLOSE);
1940           }
1941           return;
1942         }
1943 
1944         if (logRetries) {
1945           LOG.info("Server " + server + " returned " + t + " for "
1946             + region.getRegionNameAsString() + ", try=" + i
1947             + " of " + this.maximumAttempts, t);
1948           // Presume retry or server will expire.
1949         }
1950       }
1951     }
1952     // Run out of attempts
1953     if (state != null) {
1954       regionStates.updateRegionState(region, State.FAILED_CLOSE);
1955     }
1956   }
1957 
1958   /**
1959    * Set region to OFFLINE unless it is opening and forceNewPlan is false.
1960    */
1961   private RegionState forceRegionStateToOffline(
1962       final HRegionInfo region, final boolean forceNewPlan) {
1963     RegionState state = regionStates.getRegionState(region);
1964     if (state == null) {
1965       LOG.warn("Assigning but not in region states: " + region);
1966       state = regionStates.createRegionState(region);
1967     }
1968 
1969     ServerName sn = state.getServerName();
1970     if (forceNewPlan && LOG.isDebugEnabled()) {
1971       LOG.debug("Force region state offline " + state);
1972     }
1973 
1974     switch (state.getState()) {
1975     case OPEN:
1976     case OPENING:
1977     case PENDING_OPEN:
1978     case CLOSING:
1979     case PENDING_CLOSE:
1980       if (!forceNewPlan) {
1981         LOG.debug("Skip assigning " +
1982           region + ", it is already " + state);
1983         return null;
1984       }
1985     case FAILED_CLOSE:
1986     case FAILED_OPEN:
1987       unassign(region, state, -1, null, false, null);
1988       state = regionStates.getRegionState(region);
1989       if (state.isFailedClose()) {
1990         // If we can't close the region, we can't re-assign
1991         // it so as to avoid possible double assignment/data loss.
1992         LOG.info("Skip assigning " +
1993           region + ", we couldn't close it: " + state);
1994         return null;
1995       }
1996     case OFFLINE:
1997       // This region could have been open on this server
1998       // for a while. If the server is dead and not processed
1999       // yet, we can move on only if the meta shows the
2000       // region is not on this server actually, or on a server
2001       // not dead, or dead and processed already.
2002       // In case not using ZK, we don't need this check because
2003       // we have the latest info in memory, and the caller
2004       // will do another round checking any way.
2005       if (useZKForAssignment
2006           && regionStates.isServerDeadAndNotProcessed(sn)
2007           && wasRegionOnDeadServerByMeta(region, sn)) {
2008         if (!regionStates.isRegionInTransition(region)) {
2009           LOG.info("Updating the state to " + State.OFFLINE + " to allow to be reassigned by SSH");
2010           regionStates.updateRegionState(region, State.OFFLINE);
2011         }
2012         LOG.info("Skip assigning " + region.getRegionNameAsString()
2013             + ", it is on a dead but not processed yet server: " + sn);
2014         return null;
2015       }
2016     case CLOSED:
2017       break;
2018     default:
2019       LOG.error("Trying to assign region " + region
2020         + ", which is " + state);
2021       return null;
2022     }
2023     return state;
2024   }
2025 
2026   @SuppressWarnings("deprecation")
2027   protected boolean wasRegionOnDeadServerByMeta(
2028       final HRegionInfo region, final ServerName sn) {
2029     try {
2030       if (region.isMetaRegion()) {
2031         ServerName server = this.server.getMetaTableLocator().
2032           getMetaRegionLocation(this.server.getZooKeeper());
2033         return regionStates.isServerDeadAndNotProcessed(server);
2034       }
2035       while (!server.isStopped()) {
2036         try {
2037           this.server.getMetaTableLocator().waitMetaRegionLocation(server.getZooKeeper());
2038           Result r = MetaTableAccessor.getRegionResult(server.getConnection(),
2039             region.getRegionName());
2040           if (r == null || r.isEmpty()) return false;
2041           ServerName server = HRegionInfo.getServerName(r);
2042           return regionStates.isServerDeadAndNotProcessed(server);
2043         } catch (IOException ioe) {
2044           LOG.info("Received exception accessing hbase:meta during force assign "
2045             + region.getRegionNameAsString() + ", retrying", ioe);
2046         }
2047       }
2048     } catch (InterruptedException e) {
2049       Thread.currentThread().interrupt();
2050       LOG.info("Interrupted accessing hbase:meta", e);
2051     }
2052     // Call is interrupted or server is stopped.
2053     return regionStates.isServerDeadAndNotProcessed(sn);
2054   }
2055 
2056   /**
2057    * Caller must hold lock on the passed <code>state</code> object.
2058    * @param state
2059    * @param setOfflineInZK
2060    * @param forceNewPlan
2061    */
2062   private void assign(RegionState state,
2063       boolean setOfflineInZK, final boolean forceNewPlan) {
2064     long startTime = EnvironmentEdgeManager.currentTime();
2065     try {
2066       Configuration conf = server.getConfiguration();
2067       RegionState currentState = state;
2068       int versionOfOfflineNode = -1;
2069       RegionPlan plan = null;
2070       long maxWaitTime = -1;
2071       HRegionInfo region = state.getRegion();
2072       RegionOpeningState regionOpenState;
2073       Throwable previousException = null;
2074       for (int i = 1; i <= maximumAttempts; i++) {
2075         if (server.isStopped() || server.isAborted()) {
2076           LOG.info("Skip assigning " + region.getRegionNameAsString()
2077             + ", the server is stopped/aborted");
2078           return;
2079         }
2080 
2081         if (plan == null) { // Get a server for the region at first
2082           try {
2083             plan = getRegionPlan(region, forceNewPlan);
2084           } catch (HBaseIOException e) {
2085             LOG.warn("Failed to get region plan", e);
2086           }
2087         }
2088 
2089         if (plan == null) {
2090           LOG.warn("Unable to determine a plan to assign " + region);
2091 
2092           // For meta region, we have to keep retrying until succeeding
2093           if (region.isMetaRegion()) {
2094             if (i == maximumAttempts) {
2095               i = 0; // re-set attempt count to 0 for at least 1 retry
2096 
2097               LOG.warn("Unable to determine a plan to assign a hbase:meta region " + region +
2098                 " after maximumAttempts (" + this.maximumAttempts +
2099                 "). Reset attempts count and continue retrying.");
2100             }
2101             waitForRetryingMetaAssignment();
2102             continue;
2103           }
2104 
2105           regionStates.updateRegionState(region, State.FAILED_OPEN);
2106           return;
2107         }
2108         if (setOfflineInZK && versionOfOfflineNode == -1) {
2109           LOG.info("Setting node as OFFLINED in ZooKeeper for region " + region);
2110           // get the version of the znode after setting it to OFFLINE.
2111           // versionOfOfflineNode will be -1 if the znode was not set to OFFLINE
2112           versionOfOfflineNode = setOfflineInZooKeeper(currentState, plan.getDestination());
2113           if (versionOfOfflineNode != -1) {
2114             if (isDisabledorDisablingRegionInRIT(region)) {
2115               return;
2116             }
2117             // In case of assignment from EnableTableHandler table state is ENABLING. Any how
2118             // EnableTableHandler will set ENABLED after assigning all the table regions. If we
2119             // try to set to ENABLED directly then client API may think table is enabled.
2120             // When we have a case such as all the regions are added directly into hbase:meta and we call
2121             // assignRegion then we need to make the table ENABLED. Hence in such case the table
2122             // will not be in ENABLING or ENABLED state.
2123             TableName tableName = region.getTable();
2124             if (!tableStateManager.isTableState(tableName,
2125               ZooKeeperProtos.Table.State.ENABLED, ZooKeeperProtos.Table.State.ENABLING)) {
2126               LOG.debug("Setting table " + tableName + " to ENABLED state.");
2127               setEnabledTable(tableName);
2128             }
2129           }
2130         }
2131         if (setOfflineInZK && versionOfOfflineNode == -1) {
2132           LOG.info("Unable to set offline in ZooKeeper to assign " + region);
2133           // Setting offline in ZK must have been failed due to ZK racing or some
2134           // exception which may make the server to abort. If it is ZK racing,
2135           // we should retry since we already reset the region state,
2136           // existing (re)assignment will fail anyway.
2137           if (!server.isAborted()) {
2138             continue;
2139           }
2140         }
2141         LOG.info("Assigning " + region.getRegionNameAsString() +
2142             " to " + plan.getDestination().toString());
2143         // Transition RegionState to PENDING_OPEN
2144         currentState = regionStates.updateRegionState(region,
2145           State.PENDING_OPEN, plan.getDestination());
2146 
2147         boolean needNewPlan;
2148         final String assignMsg = "Failed assignment of " + region.getRegionNameAsString() +
2149             " to " + plan.getDestination();
2150         try {
2151           List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
2152           if (this.shouldAssignRegionsWithFavoredNodes) {
2153             favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
2154           }
2155           regionOpenState = serverManager.sendRegionOpen(
2156               plan.getDestination(), region, versionOfOfflineNode, favoredNodes);
2157 
2158           if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
2159             // Failed opening this region, looping again on a new server.
2160             needNewPlan = true;
2161             LOG.warn(assignMsg + ", regionserver says 'FAILED_OPENING', " +
2162                 " trying to assign elsewhere instead; " +
2163                 "try=" + i + " of " + this.maximumAttempts);
2164           } else {
2165             // we're done
2166             if (regionOpenState == RegionOpeningState.ALREADY_OPENED) {
2167               processAlreadyOpenedRegion(region, plan.getDestination());
2168             }
2169             return;
2170           }
2171 
2172         } catch (Throwable t) {
2173           if (t instanceof RemoteException) {
2174             t = ((RemoteException) t).unwrapRemoteException();
2175           }
2176           previousException = t;
2177 
2178           // Should we wait a little before retrying? If the server is starting it's yes.
2179           // If the region is already in transition, it's yes as well: we want to be sure that
2180           //  the region will get opened but we don't want a double assignment.
2181           boolean hold = (t instanceof RegionAlreadyInTransitionException ||
2182               t instanceof ServerNotRunningYetException);
2183 
2184           // In case socket is timed out and the region server is still online,
2185           // the openRegion RPC could have been accepted by the server and
2186           // just the response didn't go through.  So we will retry to
2187           // open the region on the same server to avoid possible
2188           // double assignment.
2189           boolean retry = !hold && (t instanceof java.net.SocketTimeoutException
2190               && this.serverManager.isServerOnline(plan.getDestination()));
2191 
2192 
2193           if (hold) {
2194             LOG.warn(assignMsg + ", waiting a little before trying on the same region server " +
2195               "try=" + i + " of " + this.maximumAttempts, t);
2196 
2197             if (maxWaitTime < 0) {
2198               if (t instanceof RegionAlreadyInTransitionException) {
2199                 maxWaitTime = EnvironmentEdgeManager.currentTime()
2200                   + this.server.getConfiguration().getLong(ALREADY_IN_TRANSITION_WAITTIME,
2201                     DEFAULT_ALREADY_IN_TRANSITION_WAITTIME);
2202               } else {
2203                 maxWaitTime = EnvironmentEdgeManager.currentTime()
2204                   + this.server.getConfiguration().getLong(
2205                     "hbase.regionserver.rpc.startup.waittime", 60000);
2206               }
2207             }
2208             try {
2209               needNewPlan = false;
2210               long now = EnvironmentEdgeManager.currentTime();
2211               if (now < maxWaitTime) {
2212                 LOG.debug("Server is not yet up or region is already in transition; "
2213                   + "waiting up to " + (maxWaitTime - now) + "ms", t);
2214                 Thread.sleep(100);
2215                 i--; // reset the try count
2216               } else if (!(t instanceof RegionAlreadyInTransitionException)) {
2217                 LOG.debug("Server is not up for a while; try a new one", t);
2218                 needNewPlan = true;
2219               }
2220             } catch (InterruptedException ie) {
2221               LOG.warn("Failed to assign "
2222                   + region.getRegionNameAsString() + " since interrupted", ie);
2223               regionStates.updateRegionState(region, State.FAILED_OPEN);
2224               Thread.currentThread().interrupt();
2225               return;
2226             }
2227           } else if (retry) {
2228             needNewPlan = false;
2229             i--; // we want to retry as many times as needed as long as the RS is not dead.
2230             LOG.warn(assignMsg + ", trying to assign to the same region server due ", t);
2231           } else {
2232             needNewPlan = true;
2233             LOG.warn(assignMsg + ", trying to assign elsewhere instead;" +
2234                 " try=" + i + " of " + this.maximumAttempts, t);
2235           }
2236         }
2237 
2238         if (i == this.maximumAttempts) {
2239           // For meta region, we have to keep retrying until succeeding
2240           if (region.isMetaRegion()) {
2241             i = 0; // re-set attempt count to 0 for at least 1 retry
2242             LOG.warn(assignMsg +
2243                 ", trying to assign a hbase:meta region reached to maximumAttempts (" +
2244                 this.maximumAttempts + ").  Reset attempt counts and continue retrying.");
2245             waitForRetryingMetaAssignment();
2246           }
2247           else {
2248             // Don't reset the region state or get a new plan any more.
2249             // This is the last try.
2250             continue;
2251           }
2252         }
2253 
2254         // If region opened on destination of present plan, reassigning to new
2255         // RS may cause double assignments. In case of RegionAlreadyInTransitionException
2256         // reassigning to same RS.
2257         if (needNewPlan) {
2258           // Force a new plan and reassign. Will return null if no servers.
2259           // The new plan could be the same as the existing plan since we don't
2260           // exclude the server of the original plan, which should not be
2261           // excluded since it could be the only server up now.
2262           RegionPlan newPlan = null;
2263           try {
2264             newPlan = getRegionPlan(region, true);
2265           } catch (HBaseIOException e) {
2266             LOG.warn("Failed to get region plan", e);
2267           }
2268           if (newPlan == null) {
2269             regionStates.updateRegionState(region, State.FAILED_OPEN);
2270             LOG.warn("Unable to find a viable location to assign region " +
2271                 region.getRegionNameAsString());
2272             return;
2273           }
2274 
2275           if (plan != newPlan && !plan.getDestination().equals(newPlan.getDestination())) {
2276             // Clean out plan we failed execute and one that doesn't look like it'll
2277             // succeed anyways; we need a new plan!
2278             // Transition back to OFFLINE
2279             LOG.info("Region assignment plan changed from " + plan.getDestination() + " to "
2280                 + newPlan.getDestination() + " server.");
2281             currentState = regionStates.updateRegionState(region, State.OFFLINE);
2282             versionOfOfflineNode = -1;
2283             if (useZKForAssignment) {
2284               setOfflineInZK = true;
2285             }
2286             plan = newPlan;
2287           } else if(plan.getDestination().equals(newPlan.getDestination()) &&
2288               previousException instanceof FailedServerException) {
2289             try {
2290               LOG.info("Trying to re-assign " + region.getRegionNameAsString() +
2291                 " to the same failed server.");
2292               Thread.sleep(1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
2293                 RpcClient.FAILED_SERVER_EXPIRY_DEFAULT));
2294             } catch (InterruptedException ie) {
2295               LOG.warn("Failed to assign "
2296                   + region.getRegionNameAsString() + " since interrupted", ie);
2297               regionStates.updateRegionState(region, State.FAILED_OPEN);
2298               Thread.currentThread().interrupt();
2299               return;
2300             }
2301           }
2302         }
2303       }
2304       // Run out of attempts
2305       regionStates.updateRegionState(region, State.FAILED_OPEN);
2306     } finally {
2307       metricsAssignmentManager.updateAssignmentTime(EnvironmentEdgeManager.currentTime() - startTime);
2308     }
2309   }
2310 
2311   private void processAlreadyOpenedRegion(HRegionInfo region, ServerName sn) {
2312     // Remove region from in-memory transition and unassigned node from ZK
2313     // While trying to enable the table the regions of the table were
2314     // already enabled.
2315     LOG.debug("ALREADY_OPENED " + region.getRegionNameAsString()
2316       + " to " + sn);
2317     String encodedName = region.getEncodedName();
2318     
2319     //If use ZkForAssignment, region already Opened event should not be handled, 
2320     //leave it to zk event. See HBase-14407.
2321     if(useZKForAssignment){
2322       String node = ZKAssign.getNodeName(watcher, encodedName);
2323       Stat stat = new Stat();
2324       try {
2325         byte[] existingBytes = ZKUtil.getDataNoWatch(watcher, node, stat);
2326         if(existingBytes!=null){
2327           RegionTransition rt= RegionTransition.parseFrom(existingBytes);
2328           EventType et = rt.getEventType();
2329           if (et.equals(EventType.RS_ZK_REGION_OPENED)) {
2330             LOG.debug("ALREADY_OPENED " + region.getRegionNameAsString()
2331               + " and node in "+et+" state");
2332             return;
2333           }
2334         }
2335       } catch (KeeperException ke) {
2336         LOG.warn("Unexpected ZK exception getData " + node
2337           + " node for the region " + encodedName, ke);
2338       } catch (DeserializationException e) {
2339         LOG.warn("Get RegionTransition from zk deserialization failed! ", e);
2340       }
2341       
2342       deleteNodeInStates(encodedName, "offline", sn, EventType.M_ZK_REGION_OFFLINE);
2343     }
2344     
2345     regionStates.regionOnline(region, sn);
2346   }
2347 
2348   private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) {
2349     if (this.tableStateManager.isTableState(region.getTable(),
2350         ZooKeeperProtos.Table.State.DISABLED,
2351         ZooKeeperProtos.Table.State.DISABLING) || replicasToClose.contains(region)) {
2352       LOG.info("Table " + region.getTable() + " is disabled or disabling;"
2353         + " skipping assign of " + region.getRegionNameAsString());
2354       offlineDisabledRegion(region);
2355       return true;
2356     }
2357     return false;
2358   }
2359 
2360   /**
2361    * Set region as OFFLINED up in zookeeper
2362    *
2363    * @param state
2364    * @return the version of the offline node if setting of the OFFLINE node was
2365    *         successful, -1 otherwise.
2366    */
2367   private int setOfflineInZooKeeper(final RegionState state, final ServerName destination) {
2368     if (!state.isClosed() && !state.isOffline()) {
2369       String msg = "Unexpected state : " + state + " .. Cannot transit it to OFFLINE.";
2370       this.server.abort(msg, new IllegalStateException(msg));
2371       return -1;
2372     }
2373     regionStates.updateRegionState(state.getRegion(), State.OFFLINE);
2374     int versionOfOfflineNode;
2375     try {
2376       // get the version after setting the znode to OFFLINE
2377       versionOfOfflineNode = ZKAssign.createOrForceNodeOffline(watcher,
2378         state.getRegion(), destination);
2379       if (versionOfOfflineNode == -1) {
2380         LOG.warn("Attempted to create/force node into OFFLINE state before "
2381             + "completing assignment but failed to do so for " + state);
2382         return -1;
2383       }
2384     } catch (KeeperException e) {
2385       server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
2386       return -1;
2387     }
2388     return versionOfOfflineNode;
2389   }
2390 
2391   /**
2392    * @param region the region to assign
2393    * @return Plan for passed <code>region</code> (If none currently, it creates one or
2394    * if no servers to assign, it returns null).
2395    */
2396   private RegionPlan getRegionPlan(final HRegionInfo region,
2397       final boolean forceNewPlan)  throws HBaseIOException {
2398     return getRegionPlan(region, null, forceNewPlan);
2399   }
2400 
2401   /**
2402    * @param region the region to assign
2403    * @param serverToExclude Server to exclude (we know its bad). Pass null if
2404    * all servers are thought to be assignable.
2405    * @param forceNewPlan If true, then if an existing plan exists, a new plan
2406    * will be generated.
2407    * @return Plan for passed <code>region</code> (If none currently, it creates one or
2408    * if no servers to assign, it returns null).
2409    */
2410   private RegionPlan getRegionPlan(final HRegionInfo region,
2411       final ServerName serverToExclude, final boolean forceNewPlan) throws HBaseIOException {
2412     // Pickup existing plan or make a new one
2413     final String encodedName = region.getEncodedName();
2414     final List<ServerName> destServers =
2415       serverManager.createDestinationServersList(serverToExclude);
2416 
2417     if (destServers.isEmpty()){
2418       LOG.warn("Can't move " + encodedName +
2419         ", there is no destination server available.");
2420       return null;
2421     }
2422 
2423     RegionPlan randomPlan = null;
2424     boolean newPlan = false;
2425     RegionPlan existingPlan;
2426 
2427     synchronized (this.regionPlans) {
2428       existingPlan = this.regionPlans.get(encodedName);
2429 
2430       if (existingPlan != null && existingPlan.getDestination() != null) {
2431         LOG.debug("Found an existing plan for " + region.getRegionNameAsString()
2432           + " destination server is " + existingPlan.getDestination() +
2433             " accepted as a dest server = " + destServers.contains(existingPlan.getDestination()));
2434       }
2435 
2436       if (forceNewPlan
2437           || existingPlan == null
2438           || existingPlan.getDestination() == null
2439           || !destServers.contains(existingPlan.getDestination())) {
2440         newPlan = true;
2441       }
2442     }
2443 
2444     if (newPlan) {
2445       ServerName destination = balancer.randomAssignment(region, destServers);
2446       if (destination == null) {
2447         LOG.warn("Can't find a destination for " + encodedName);
2448         return null;
2449       }
2450       synchronized (this.regionPlans) {
2451         randomPlan = new RegionPlan(region, null, destination);
2452         if (!region.isMetaTable() && shouldAssignRegionsWithFavoredNodes) {
2453           List<HRegionInfo> regions = new ArrayList<HRegionInfo>(1);
2454           regions.add(region);
2455           try {
2456             processFavoredNodes(regions);
2457           } catch (IOException ie) {
2458             LOG.warn("Ignoring exception in processFavoredNodes " + ie);
2459           }
2460         }
2461         this.regionPlans.put(encodedName, randomPlan);
2462       }
2463       LOG.debug("No previous transition plan found (or ignoring " + "an existing plan) for "
2464           + region.getRegionNameAsString() + "; generated random plan=" + randomPlan + "; "
2465           + destServers.size() + " (online=" + serverManager.getOnlineServers().size()
2466           + ") available servers, forceNewPlan=" + forceNewPlan);
2467       return randomPlan;
2468     }
2469     LOG.debug("Using pre-existing plan for " +
2470       region.getRegionNameAsString() + "; plan=" + existingPlan);
2471     return existingPlan;
2472   }
2473 
2474   /**
2475    * Wait for some time before retrying meta table region assignment
2476    */
2477   private void waitForRetryingMetaAssignment() {
2478     try {
2479       Thread.sleep(this.sleepTimeBeforeRetryingMetaAssignment);
2480     } catch (InterruptedException e) {
2481       LOG.error("Got exception while waiting for hbase:meta assignment");
2482       Thread.currentThread().interrupt();
2483     }
2484   }
2485 
2486   /**
2487    * Unassigns the specified region.
2488    * <p>
2489    * Updates the RegionState and sends the CLOSE RPC unless region is being
2490    * split by regionserver; then the unassign fails (silently) because we
2491    * presume the region being unassigned no longer exists (its been split out
2492    * of existence). TODO: What to do if split fails and is rolled back and
2493    * parent is revivified?
2494    * <p>
2495    * If a RegionPlan is already set, it will remain.
2496    *
2497    * @param region server to be unassigned
2498    */
2499   public void unassign(HRegionInfo region) {
2500     unassign(region, false);
2501   }
2502 
2503 
2504   /**
2505    * Unassigns the specified region.
2506    * <p>
2507    * Updates the RegionState and sends the CLOSE RPC unless region is being
2508    * split by regionserver; then the unassign fails (silently) because we
2509    * presume the region being unassigned no longer exists (its been split out
2510    * of existence). TODO: What to do if split fails and is rolled back and
2511    * parent is revivified?
2512    * <p>
2513    * If a RegionPlan is already set, it will remain.
2514    *
2515    * @param region server to be unassigned
2516    * @param force if region should be closed even if already closing
2517    */
2518   public void unassign(HRegionInfo region, boolean force, ServerName dest) {
2519     // TODO: Method needs refactoring.  Ugly buried returns throughout.  Beware!
2520     LOG.debug("Starting unassign of " + region.getRegionNameAsString()
2521       + " (offlining), current state: " + regionStates.getRegionState(region));
2522 
2523     String encodedName = region.getEncodedName();
2524     // Grab the state of this region and synchronize on it
2525     int versionOfClosingNode = -1;
2526     // We need a lock here as we're going to do a put later and we don't want multiple states
2527     //  creation
2528     ReentrantLock lock = locker.acquireLock(encodedName);
2529     RegionState state = regionStates.getRegionTransitionState(encodedName);
2530     boolean reassign = true;
2531     try {
2532       if (state == null) {
2533         // Region is not in transition.
2534         // We can unassign it only if it's not SPLIT/MERGED.
2535         state = regionStates.getRegionState(encodedName);
2536         if (state != null && state.isUnassignable()) {
2537           LOG.info("Attempting to unassign " + state + ", ignored");
2538           // Offline region will be reassigned below
2539           return;
2540         }
2541         // Create the znode in CLOSING state
2542         try {
2543           if (state == null || state.getServerName() == null) {
2544             // We don't know where the region is, offline it.
2545             // No need to send CLOSE RPC
2546             LOG.warn("Attempting to unassign a region not in RegionStates "
2547               + region.getRegionNameAsString() + ", offlined");
2548             regionOffline(region);
2549             return;
2550           }
2551           if (useZKForAssignment) {
2552             versionOfClosingNode = ZKAssign.createNodeClosing(
2553               watcher, region, state.getServerName());
2554             if (versionOfClosingNode == -1) {
2555               LOG.info("Attempting to unassign " +
2556                 region.getRegionNameAsString() + " but ZK closing node "
2557                 + "can't be created.");
2558               reassign = false; // not unassigned at all
2559               return;
2560             }
2561           }
2562         } catch (KeeperException e) {
2563           if (e instanceof NodeExistsException) {
2564             // Handle race between master initiated close and regionserver
2565             // orchestrated splitting. See if existing node is in a
2566             // SPLITTING or SPLIT state.  If so, the regionserver started
2567             // an op on node before we could get our CLOSING in.  Deal.
2568             NodeExistsException nee = (NodeExistsException)e;
2569             String path = nee.getPath();
2570             try {
2571               if (isSplitOrSplittingOrMergedOrMerging(path)) {
2572                 LOG.debug(path + " is SPLIT or SPLITTING or MERGED or MERGING; " +
2573                   "skipping unassign because region no longer exists -- its split or merge");
2574                 reassign = false; // no need to reassign for split/merged region
2575                 return;
2576               }
2577             } catch (KeeperException.NoNodeException ke) {
2578               LOG.warn("Failed getData on SPLITTING/SPLIT at " + path +
2579                 "; presuming split and that the region to unassign, " +
2580                 encodedName + ", no longer exists -- confirm", ke);
2581               return;
2582             } catch (KeeperException ke) {
2583               LOG.error("Unexpected zk state", ke);
2584             } catch (DeserializationException de) {
2585               LOG.error("Failed parse", de);
2586             }
2587           }
2588           // If we get here, don't understand whats going on -- abort.
2589           server.abort("Unexpected ZK exception creating node CLOSING", e);
2590           reassign = false; // heading out already
2591           return;
2592         }
2593         state = regionStates.updateRegionState(region, State.PENDING_CLOSE);
2594       } else if (state.isFailedOpen()) {
2595         // The region is not open yet
2596         regionOffline(region);
2597         return;
2598       } else if (force && state.isPendingCloseOrClosing()) {
2599         LOG.debug("Attempting to unassign " + region.getRegionNameAsString() +
2600           " which is already " + state.getState()  +
2601           " but forcing to send a CLOSE RPC again ");
2602         if (state.isFailedClose()) {
2603           state = regionStates.updateRegionState(region, State.PENDING_CLOSE);
2604         }
2605         state.updateTimestampToNow();
2606       } else {
2607         LOG.debug("Attempting to unassign " +
2608           region.getRegionNameAsString() + " but it is " +
2609           "already in transition (" + state.getState() + ", force=" + force + ")");
2610         return;
2611       }
2612 
2613       unassign(region, state, versionOfClosingNode, dest, useZKForAssignment, null);
2614     } finally {
2615       lock.unlock();
2616 
2617       // Region is expected to be reassigned afterwards
2618       if (!replicasToClose.contains(region) && reassign && regionStates.isRegionOffline(region)) {
2619         assign(region, true);
2620       }
2621     }
2622   }
2623 
2624   public void unassign(HRegionInfo region, boolean force){
2625      unassign(region, force, null);
2626   }
2627 
2628   /**
2629    * @param region regioninfo of znode to be deleted.
2630    */
2631   public void deleteClosingOrClosedNode(HRegionInfo region, ServerName sn) {
2632     String encodedName = region.getEncodedName();
2633     deleteNodeInStates(encodedName, "closing", sn, EventType.M_ZK_REGION_CLOSING,
2634       EventType.RS_ZK_REGION_CLOSED);
2635   }
2636 
2637   /**
2638    * @param path
2639    * @return True if znode is in SPLIT or SPLITTING or MERGED or MERGING state.
2640    * @throws KeeperException Can happen if the znode went away in meantime.
2641    * @throws DeserializationException
2642    */
2643   private boolean isSplitOrSplittingOrMergedOrMerging(final String path)
2644       throws KeeperException, DeserializationException {
2645     boolean result = false;
2646     // This may fail if the SPLIT or SPLITTING or MERGED or MERGING znode gets
2647     // cleaned up before we can get data from it.
2648     byte [] data = ZKAssign.getData(watcher, path);
2649     if (data == null) {
2650       LOG.info("Node " + path + " is gone");
2651       return false;
2652     }
2653     RegionTransition rt = RegionTransition.parseFrom(data);
2654     switch (rt.getEventType()) {
2655     case RS_ZK_REQUEST_REGION_SPLIT:
2656     case RS_ZK_REGION_SPLIT:
2657     case RS_ZK_REGION_SPLITTING:
2658     case RS_ZK_REQUEST_REGION_MERGE:
2659     case RS_ZK_REGION_MERGED:
2660     case RS_ZK_REGION_MERGING:
2661       result = true;
2662       break;
2663     default:
2664       LOG.info("Node " + path + " is in " + rt.getEventType());
2665       break;
2666     }
2667     return result;
2668   }
2669 
2670   /**
2671    * Used by unit tests. Return the number of regions opened so far in the life
2672    * of the master. Increases by one every time the master opens a region
2673    * @return the counter value of the number of regions opened so far
2674    */
2675   public int getNumRegionsOpened() {
2676     return numRegionsOpened.get();
2677   }
2678 
2679   /**
2680    * Waits until the specified region has completed assignment.
2681    * <p>
2682    * If the region is already assigned, returns immediately.  Otherwise, method
2683    * blocks until the region is assigned.
2684    * @param regionInfo region to wait on assignment for
2685    * @return true if the region is assigned false otherwise.
2686    * @throws InterruptedException
2687    */
2688   public boolean waitForAssignment(HRegionInfo regionInfo)
2689       throws InterruptedException {
2690     ArrayList<HRegionInfo> regionSet = new ArrayList<HRegionInfo>(1);
2691     regionSet.add(regionInfo);
2692     return waitForAssignment(regionSet, true, Long.MAX_VALUE);
2693   }
2694 
2695   /**
2696    * Waits until the specified region has completed assignment, or the deadline is reached.
2697    */
2698   protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
2699       final boolean waitTillAllAssigned, final int reassigningRegions,
2700       final long minEndTime) throws InterruptedException {
2701     long deadline = minEndTime + bulkPerRegionOpenTimeGuesstimate * (reassigningRegions + 1);
2702     if (deadline < 0) { // Overflow
2703       deadline = Long.MAX_VALUE; // wait forever
2704     }
2705     return waitForAssignment(regionSet, waitTillAllAssigned, deadline);
2706   }
2707 
2708   /**
2709    * Waits until the specified region has completed assignment, or the deadline is reached.
2710    * @param regionSet set of region to wait on. the set is modified and the assigned regions removed
2711    * @param waitTillAllAssigned true if we should wait all the regions to be assigned
2712    * @param deadline the timestamp after which the wait is aborted
2713    * @return true if all the regions are assigned false otherwise.
2714    * @throws InterruptedException
2715    */
2716   protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
2717       final boolean waitTillAllAssigned, final long deadline) throws InterruptedException {
2718     // We're not synchronizing on regionsInTransition now because we don't use any iterator.
2719     while (!regionSet.isEmpty() && !server.isStopped() && deadline > System.currentTimeMillis()) {
2720       int failedOpenCount = 0;
2721       Iterator<HRegionInfo> regionInfoIterator = regionSet.iterator();
2722       while (regionInfoIterator.hasNext()) {
2723         HRegionInfo hri = regionInfoIterator.next();
2724         if (regionStates.isRegionOnline(hri) || regionStates.isRegionInState(hri,
2725             State.SPLITTING, State.SPLIT, State.MERGING, State.MERGED)) {
2726           regionInfoIterator.remove();
2727         } else if (regionStates.isRegionInState(hri, State.FAILED_OPEN)) {
2728           failedOpenCount++;
2729         }
2730       }
2731       if (!waitTillAllAssigned) {
2732         // No need to wait, let assignment going on asynchronously
2733         break;
2734       }
2735       if (!regionSet.isEmpty()) {
2736         if (failedOpenCount == regionSet.size()) {
2737           // all the regions we are waiting had an error on open.
2738           break;
2739         }
2740         regionStates.waitForUpdate(100);
2741       }
2742     }
2743     return regionSet.isEmpty();
2744   }
2745 
2746   /**
2747    * Assigns the hbase:meta region or a replica.
2748    * <p>
2749    * Assumes that hbase:meta is currently closed and is not being actively served by
2750    * any RegionServer.
2751    * <p>
2752    * Forcibly unsets the current meta region location in ZooKeeper and assigns
2753    * hbase:meta to a random RegionServer.
2754    * @param hri TODO
2755    * @throws KeeperException
2756    */
2757   public void assignMeta(HRegionInfo hri) throws KeeperException {
2758     this.server.getMetaTableLocator().deleteMetaLocation(this.watcher, hri.getReplicaId());
2759     assign(hri, true);
2760   }
2761 
2762   /**
2763    * Assigns specified regions retaining assignments, if any.
2764    * <p>
2765    * This is a synchronous call and will return once every region has been
2766    * assigned.  If anything fails, an exception is thrown
2767    * @throws InterruptedException
2768    * @throws IOException
2769    */
2770   public void assign(Map<HRegionInfo, ServerName> regions)
2771         throws IOException, InterruptedException {
2772     if (regions == null || regions.isEmpty()) {
2773       return;
2774     }
2775     List<ServerName> servers = serverManager.createDestinationServersList();
2776     if (servers == null || servers.isEmpty()) {
2777       throw new IOException("Found no destination server to assign region(s)");
2778     }
2779 
2780     // Reuse existing assignment info
2781     Map<ServerName, List<HRegionInfo>> bulkPlan =
2782       balancer.retainAssignment(regions, servers);
2783     if (bulkPlan == null) {
2784       throw new IOException("Unable to determine a plan to assign region(s)");
2785     }
2786 
2787     assign(regions.size(), servers.size(),
2788       "retainAssignment=true", bulkPlan);
2789   }
2790 
2791   /**
2792    * Assigns specified regions round robin, if any.
2793    * <p>
2794    * This is a synchronous call and will return once every region has been
2795    * assigned.  If anything fails, an exception is thrown
2796    * @throws InterruptedException
2797    * @throws IOException
2798    */
2799   public void assign(List<HRegionInfo> regions)
2800         throws IOException, InterruptedException {
2801     if (regions == null || regions.isEmpty()) {
2802       return;
2803     }
2804 
2805     List<ServerName> servers = serverManager.createDestinationServersList();
2806     if (servers == null || servers.isEmpty()) {
2807       throw new IOException("Found no destination server to assign region(s)");
2808     }
2809 
2810     // Generate a round-robin bulk assignment plan
2811     Map<ServerName, List<HRegionInfo>> bulkPlan = balancer.roundRobinAssignment(regions, servers);
2812     if (bulkPlan == null) {
2813       throw new IOException("Unable to determine a plan to assign region(s)");
2814     }
2815 
2816     processFavoredNodes(regions);
2817     assign(regions.size(), servers.size(), "round-robin=true", bulkPlan);
2818   }
2819 
2820   private void assign(int regions, int totalServers,
2821       String message, Map<ServerName, List<HRegionInfo>> bulkPlan)
2822           throws InterruptedException, IOException {
2823 
2824     int servers = bulkPlan.size();
2825     if (servers == 1 || (regions < bulkAssignThresholdRegions
2826         && servers < bulkAssignThresholdServers)) {
2827 
2828       // Not use bulk assignment.  This could be more efficient in small
2829       // cluster, especially mini cluster for testing, so that tests won't time out
2830       if (LOG.isTraceEnabled()) {
2831         LOG.trace("Not using bulk assignment since we are assigning only " + regions +
2832           " region(s) to " + servers + " server(s)");
2833       }
2834 
2835       // invoke assignment (async)
2836       ArrayList<HRegionInfo> userRegionSet = new ArrayList<HRegionInfo>(regions);
2837       for (Map.Entry<ServerName, List<HRegionInfo>> plan: bulkPlan.entrySet()) {
2838         if (!assign(plan.getKey(), plan.getValue())) {
2839           for (HRegionInfo region: plan.getValue()) {
2840             if (!regionStates.isRegionOnline(region)) {
2841               invokeAssign(region);
2842               if (!region.getTable().isSystemTable()) {
2843                 userRegionSet.add(region);
2844               }
2845             }
2846           }
2847         }
2848       }
2849 
2850       // wait for assignment completion
2851       if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
2852             System.currentTimeMillis())) {
2853         LOG.debug("some user regions are still in transition: " + userRegionSet);
2854       }
2855     } else {
2856       LOG.info("Bulk assigning " + regions + " region(s) across "
2857         + totalServers + " server(s), " + message);
2858 
2859       // Use fixed count thread pool assigning.
2860       BulkAssigner ba = new GeneralBulkAssigner(
2861         this.server, bulkPlan, this, bulkAssignWaitTillAllAssigned);
2862       ba.bulkAssign();
2863       LOG.info("Bulk assigning done");
2864     }
2865   }
2866 
2867   /**
2868    * Assigns all user regions, if any exist.  Used during cluster startup.
2869    * <p>
2870    * This is a synchronous call and will return once every region has been
2871    * assigned.  If anything fails, an exception is thrown and the cluster
2872    * should be shutdown.
2873    * @throws InterruptedException
2874    * @throws IOException
2875    */
2876   private void assignAllUserRegions(Map<HRegionInfo, ServerName> allRegions)
2877       throws IOException, InterruptedException {
2878     if (allRegions == null || allRegions.isEmpty()) return;
2879 
2880     // Determine what type of assignment to do on startup
2881     boolean retainAssignment = server.getConfiguration().
2882       getBoolean("hbase.master.startup.retainassign", true);
2883 
2884     Set<HRegionInfo> regionsFromMetaScan = allRegions.keySet();
2885     if (retainAssignment) {
2886       assign(allRegions);
2887     } else {
2888       List<HRegionInfo> regions = new ArrayList<HRegionInfo>(regionsFromMetaScan);
2889       assign(regions);
2890     }
2891 
2892     for (HRegionInfo hri : regionsFromMetaScan) {
2893       TableName tableName = hri.getTable();
2894       if (!tableStateManager.isTableState(tableName,
2895           ZooKeeperProtos.Table.State.ENABLED)) {
2896         setEnabledTable(tableName);
2897       }
2898     }
2899     // assign all the replicas that were not recorded in the meta
2900     assign(replicaRegionsNotRecordedInMeta(regionsFromMetaScan, server));
2901   }
2902 
2903   /**
2904    * Get a list of replica regions that are:
2905    * not recorded in meta yet. We might not have recorded the locations
2906    * for the replicas since the replicas may not have been online yet, master restarted
2907    * in the middle of assigning, ZK erased, etc.
2908    * @param regionsRecordedInMeta the list of regions we know are recorded in meta
2909    * either as a default, or, as the location of a replica
2910    * @param master
2911    * @return list of replica regions
2912    * @throws IOException
2913    */
2914   public static List<HRegionInfo> replicaRegionsNotRecordedInMeta(
2915       Set<HRegionInfo> regionsRecordedInMeta, MasterServices master)throws IOException {
2916     List<HRegionInfo> regionsNotRecordedInMeta = new ArrayList<HRegionInfo>();
2917     for (HRegionInfo hri : regionsRecordedInMeta) {
2918       TableName table = hri.getTable();
2919       HTableDescriptor htd = master.getTableDescriptors().get(table);
2920       // look at the HTD for the replica count. That's the source of truth
2921       int desiredRegionReplication = htd.getRegionReplication();
2922       for (int i = 0; i < desiredRegionReplication; i++) {
2923         HRegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hri, i);
2924         if (regionsRecordedInMeta.contains(replica)) continue;
2925         regionsNotRecordedInMeta.add(replica);
2926       }
2927     }
2928     return regionsNotRecordedInMeta;
2929   }
2930 
2931   /**
2932    * Wait until no regions in transition.
2933    * @param timeout How long to wait.
2934    * @return True if nothing in regions in transition.
2935    * @throws InterruptedException
2936    */
2937   boolean waitUntilNoRegionsInTransition(final long timeout)
2938       throws InterruptedException {
2939     // Blocks until there are no regions in transition. It is possible that
2940     // there
2941     // are regions in transition immediately after this returns but guarantees
2942     // that if it returns without an exception that there was a period of time
2943     // with no regions in transition from the point-of-view of the in-memory
2944     // state of the Master.
2945     final long endTime = System.currentTimeMillis() + timeout;
2946 
2947     while (!this.server.isStopped() && regionStates.isRegionsInTransition()
2948         && endTime > System.currentTimeMillis()) {
2949       regionStates.waitForUpdate(100);
2950     }
2951 
2952     return !regionStates.isRegionsInTransition();
2953   }
2954 
2955   /**
2956    * Rebuild the list of user regions and assignment information.
2957    * Updates regionstates with findings as we go through list of regions.
2958    * @return set of servers not online that hosted some regions according to a scan of hbase:meta
2959    * @throws IOException
2960    */
2961   Set<ServerName> rebuildUserRegions() throws
2962       IOException, KeeperException, CoordinatedStateException {
2963     Set<TableName> disabledOrEnablingTables = tableStateManager.getTablesInStates(
2964       ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.ENABLING);
2965 
2966     Set<TableName> disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
2967       ZooKeeperProtos.Table.State.DISABLED,
2968       ZooKeeperProtos.Table.State.DISABLING,
2969       ZooKeeperProtos.Table.State.ENABLING);
2970 
2971     // Region assignment from META
2972     List<Result> results = MetaTableAccessor.fullScanOfMeta(server.getConnection());
2973     // Get any new but slow to checkin region server that joined the cluster
2974     Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
2975     // Set of offline servers to be returned
2976     Set<ServerName> offlineServers = new HashSet<ServerName>();
2977     // Iterate regions in META
2978     for (Result result : results) {
2979       if (result == null && LOG.isDebugEnabled()){
2980         LOG.debug("null result from meta - ignoring but this is strange.");
2981         continue;
2982       }
2983       // keep a track of replicas to close. These were the replicas of the originally
2984       // unmerged regions. The master might have closed them before but it mightn't
2985       // maybe because it crashed.
2986       PairOfSameType<HRegionInfo> p = MetaTableAccessor.getMergeRegions(result);
2987       if (p.getFirst() != null && p.getSecond() != null) {
2988         int numReplicas = server.getTableDescriptors().get(p.getFirst().
2989             getTable()).getRegionReplication();
2990         for (HRegionInfo merge : p) {
2991           for (int i = 1; i < numReplicas; i++) {
2992             replicasToClose.add(RegionReplicaUtil.getRegionInfoForReplica(merge, i));
2993           }
2994         }
2995       }
2996       RegionLocations rl =  MetaTableAccessor.getRegionLocations(result);
2997       if (rl == null) continue;
2998       HRegionLocation[] locations = rl.getRegionLocations();
2999       if (locations == null) continue;
3000       for (HRegionLocation hrl : locations) {
3001         if (hrl == null) continue;
3002         HRegionInfo regionInfo = hrl.getRegionInfo();
3003         if (regionInfo == null) continue;
3004         int replicaId = regionInfo.getReplicaId();
3005         State state = RegionStateStore.getRegionState(result, replicaId);
3006         // keep a track of replicas to close. These were the replicas of the split parents
3007         // from the previous life of the master. The master should have closed them before
3008         // but it couldn't maybe because it crashed
3009         if (replicaId == 0 && state.equals(State.SPLIT)) {
3010           for (HRegionLocation h : locations) {
3011             replicasToClose.add(h.getRegionInfo());
3012           }
3013         }
3014         ServerName lastHost = hrl.getServerName();
3015         ServerName regionLocation = RegionStateStore.getRegionServer(result, replicaId);
3016         if (tableStateManager.isTableState(regionInfo.getTable(),
3017              ZooKeeperProtos.Table.State.DISABLED)) {
3018           // force region to forget it hosts for disabled/disabling tables.
3019           // see HBASE-13326
3020           lastHost = null;
3021           regionLocation = null;
3022         }
3023         regionStates.createRegionState(regionInfo, state, regionLocation, lastHost);
3024         if (!regionStates.isRegionInState(regionInfo, State.OPEN)) {
3025           // Region is not open (either offline or in transition), skip
3026           continue;
3027         }
3028         TableName tableName = regionInfo.getTable();
3029         if (!onlineServers.contains(regionLocation)) {
3030           // Region is located on a server that isn't online
3031           offlineServers.add(regionLocation);
3032           if (useZKForAssignment) {
3033             regionStates.regionOffline(regionInfo);
3034           }
3035         } else if (!disabledOrEnablingTables.contains(tableName)) {
3036           // Region is being served and on an active server
3037           // add only if region not in disabled or enabling table
3038           regionStates.regionOnline(regionInfo, regionLocation);
3039           balancer.regionOnline(regionInfo, regionLocation);
3040         } else if (useZKForAssignment) {
3041           regionStates.regionOffline(regionInfo);
3042         }
3043         // need to enable the table if not disabled or disabling or enabling
3044         // this will be used in rolling restarts
3045         if (!disabledOrDisablingOrEnabling.contains(tableName)
3046           && !getTableStateManager().isTableState(tableName,
3047             ZooKeeperProtos.Table.State.ENABLED)) {
3048           setEnabledTable(tableName);
3049         }
3050       }
3051     }
3052     return offlineServers;
3053   }
3054 
3055   /**
3056    * Recover the tables that were not fully moved to DISABLED state. These
3057    * tables are in DISABLING state when the master restarted/switched.
3058    *
3059    * @throws KeeperException
3060    * @throws TableNotFoundException
3061    * @throws IOException
3062    */
3063   private void recoverTableInDisablingState()
3064       throws KeeperException, IOException, CoordinatedStateException {
3065     Set<TableName> disablingTables =
3066       tableStateManager.getTablesInStates(ZooKeeperProtos.Table.State.DISABLING);
3067     if (disablingTables.size() != 0) {
3068       for (TableName tableName : disablingTables) {
3069         // Recover by calling DisableTableHandler
3070         LOG.info("The table " + tableName
3071             + " is in DISABLING state.  Hence recovering by moving the table"
3072             + " to DISABLED state.");
3073         new DisableTableHandler(this.server, tableName,
3074             this, tableLockManager, true).prepare().process();
3075       }
3076     }
3077   }
3078 
3079   /**
3080    * Recover the tables that are not fully moved to ENABLED state. These tables
3081    * are in ENABLING state when the master restarted/switched
3082    *
3083    * @throws KeeperException
3084    * @throws org.apache.hadoop.hbase.TableNotFoundException
3085    * @throws IOException
3086    */
3087   private void recoverTableInEnablingState()
3088       throws KeeperException, IOException, CoordinatedStateException {
3089     Set<TableName> enablingTables = tableStateManager.
3090       getTablesInStates(ZooKeeperProtos.Table.State.ENABLING);
3091     if (enablingTables.size() != 0) {
3092       for (TableName tableName : enablingTables) {
3093         // Recover by calling EnableTableHandler
3094         LOG.info("The table " + tableName
3095             + " is in ENABLING state.  Hence recovering by moving the table"
3096             + " to ENABLED state.");
3097         // enableTable in sync way during master startup,
3098         // no need to invoke coprocessor
3099         EnableTableHandler eth = new EnableTableHandler(this.server, tableName,
3100           this, tableLockManager, true);
3101         try {
3102           eth.prepare();
3103         } catch (TableNotFoundException e) {
3104           LOG.warn("Table " + tableName + " not found in hbase:meta to recover.");
3105           continue;
3106         }
3107         eth.process();
3108       }
3109     }
3110   }
3111 
3112   /**
3113    * Processes list of dead servers from result of hbase:meta scan and regions in RIT.
3114    * This is used for failover to recover the lost regions that belonged to
3115    * RegionServers which failed while there was no active master or are offline for whatever
3116    * reason and for regions that were in RIT.
3117    *
3118    * @param deadServers
3119    *          The list of dead servers which failed while there was no active master. Can be null.
3120    * @throws IOException
3121    * @throws KeeperException
3122    */
3123   private void processDeadServersAndRecoverLostRegions(Set<ServerName> deadServers)
3124   throws IOException, KeeperException {
3125     if (deadServers != null && !deadServers.isEmpty()) {
3126       for (ServerName serverName: deadServers) {
3127         if (!serverManager.isServerDead(serverName)) {
3128           serverManager.expireServer(serverName); // Let SSH do region re-assign
3129         }
3130       }
3131     }
3132 
3133     List<String> nodes = useZKForAssignment ?
3134       ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.assignmentZNode)
3135       : ZKUtil.listChildrenNoWatch(watcher, watcher.assignmentZNode);
3136     if (nodes != null && !nodes.isEmpty()) {
3137       for (String encodedRegionName : nodes) {
3138         processRegionInTransition(encodedRegionName, null);
3139       }
3140     } else if (!useZKForAssignment) {
3141       processRegionInTransitionZkLess();
3142     }
3143   }
3144 
3145   void processRegionInTransitionZkLess() {
3146     // We need to send RPC call again for PENDING_OPEN/PENDING_CLOSE regions
3147     // in case the RPC call is not sent out yet before the master was shut down
3148     // since we update the state before we send the RPC call. We can't update
3149     // the state after the RPC call. Otherwise, we don't know what's happened
3150     // to the region if the master dies right after the RPC call is out.
3151     Map<String, RegionState> rits = regionStates.getRegionsInTransition();
3152     for (RegionState regionState : rits.values()) {
3153       LOG.info("Processing " + regionState);
3154       ServerName serverName = regionState.getServerName();
3155       // Server could be null in case of FAILED_OPEN when master cannot find a region plan. In that
3156       // case, try assigning it here.
3157       if (serverName != null
3158           && !serverManager.getOnlineServers().containsKey(serverName)) {
3159         LOG.info("Server " + serverName + " isn't online. SSH will handle this");
3160         continue;
3161       }
3162       HRegionInfo regionInfo = regionState.getRegion();
3163       State state = regionState.getState();
3164 
3165       switch (state) {
3166       case CLOSED:
3167         invokeAssign(regionInfo);
3168         break;
3169       case PENDING_OPEN:
3170         retrySendRegionOpen(regionState);
3171         break;
3172       case PENDING_CLOSE:
3173         retrySendRegionClose(regionState);
3174         break;
3175       case FAILED_CLOSE:
3176       case FAILED_OPEN:
3177         invokeUnAssign(regionInfo);
3178         break;
3179       default:
3180         // No process for other states
3181       }
3182     }
3183   }
3184 
3185   /**
3186    * At master failover, for pending_open region, make sure
3187    * sendRegionOpen RPC call is sent to the target regionserver
3188    */
3189   private void retrySendRegionOpen(final RegionState regionState) {
3190     this.executorService.submit(
3191       new EventHandler(server, EventType.M_MASTER_RECOVERY) {
3192         @Override
3193         public void process() throws IOException {
3194           HRegionInfo hri = regionState.getRegion();
3195           ServerName serverName = regionState.getServerName();
3196           ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
3197           try {
3198             for (int i = 1; i <= maximumAttempts; i++) {
3199               if (!serverManager.isServerOnline(serverName)
3200                   || server.isStopped() || server.isAborted()) {
3201                 return; // No need any more
3202               }
3203               try {
3204                 if (!regionState.equals(regionStates.getRegionState(hri))) {
3205                   return; // Region is not in the expected state any more
3206                 }
3207                 List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
3208                 if (shouldAssignRegionsWithFavoredNodes) {
3209                   favoredNodes = ((FavoredNodeLoadBalancer)balancer).getFavoredNodes(hri);
3210                 }
3211                 RegionOpeningState regionOpenState = serverManager.sendRegionOpen(
3212                   serverName, hri, -1, favoredNodes);
3213 
3214                 if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
3215                   // Failed opening this region, this means the target server didn't get
3216                   // the original region open RPC, so re-assign it with a new plan
3217                   LOG.debug("Got failed_opening in retry sendRegionOpen for "
3218                     + regionState + ", re-assign it");
3219                   invokeAssign(hri, true);
3220                 }
3221                 return; // Done.
3222               } catch (Throwable t) {
3223                 if (t instanceof RemoteException) {
3224                   t = ((RemoteException) t).unwrapRemoteException();
3225                 }
3226                 // In case SocketTimeoutException/FailedServerException, retry
3227                 if (t instanceof java.net.SocketTimeoutException
3228                     || t instanceof FailedServerException) {
3229                   Threads.sleep(100);
3230                   continue;
3231                 }
3232                 // For other exceptions, re-assign it
3233                 LOG.debug("Got exception in retry sendRegionOpen for "
3234                   + regionState + ", re-assign it", t);
3235                 invokeAssign(hri);
3236                 return; // Done.
3237               }
3238             }
3239           } finally {
3240             lock.unlock();
3241           }
3242         }
3243       });
3244   }
3245 
3246   /**
3247    * At master failover, for pending_close region, make sure
3248    * sendRegionClose RPC call is sent to the target regionserver
3249    */
3250   private void retrySendRegionClose(final RegionState regionState) {
3251     this.executorService.submit(
3252       new EventHandler(server, EventType.M_MASTER_RECOVERY) {
3253         @Override
3254         public void process() throws IOException {
3255           HRegionInfo hri = regionState.getRegion();
3256           ServerName serverName = regionState.getServerName();
3257           ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
3258           try {
3259             for (int i = 1; i <= maximumAttempts; i++) {
3260               if (!serverManager.isServerOnline(serverName)
3261                   || server.isStopped() || server.isAborted()) {
3262                 return; // No need any more
3263               }
3264               try {
3265                 if (!regionState.equals(regionStates.getRegionState(hri))) {
3266                   return; // Region is not in the expected state any more
3267                 }
3268                 if (!serverManager.sendRegionClose(serverName, hri, -1, null, false)) {
3269                   // This means the region is still on the target server
3270                   LOG.debug("Got false in retry sendRegionClose for "
3271                     + regionState + ", re-close it");
3272                   invokeUnAssign(hri);
3273                 }
3274                 return; // Done.
3275               } catch (Throwable t) {
3276                 if (t instanceof RemoteException) {
3277                   t = ((RemoteException) t).unwrapRemoteException();
3278                 }
3279                 // In case SocketTimeoutException/FailedServerException, retry
3280                 if (t instanceof java.net.SocketTimeoutException
3281                     || t instanceof FailedServerException) {
3282                   Threads.sleep(100);
3283                   continue;
3284                 }
3285                 if (!(t instanceof NotServingRegionException
3286                     || t instanceof RegionAlreadyInTransitionException)) {
3287                   // NotServingRegionException/RegionAlreadyInTransitionException
3288                   // means the target server got the original region close request.
3289                   // For other exceptions, re-close it
3290                   LOG.debug("Got exception in retry sendRegionClose for "
3291                     + regionState + ", re-close it", t);
3292                   invokeUnAssign(hri);
3293                 }
3294                 return; // Done.
3295               }
3296             }
3297           } finally {
3298             lock.unlock();
3299           }
3300         }
3301       });
3302   }
3303 
3304   /**
3305    * Set Regions in transitions metrics.
3306    * This takes an iterator on the RegionInTransition map (CLSM), and is not synchronized.
3307    * This iterator is not fail fast, which may lead to stale read; but that's better than
3308    * creating a copy of the map for metrics computation, as this method will be invoked
3309    * on a frequent interval.
3310    */
3311   public void updateRegionsInTransitionMetrics() {
3312     long currentTime = System.currentTimeMillis();
3313     int totalRITs = 0;
3314     int totalRITsOverThreshold = 0;
3315     long oldestRITTime = 0;
3316     int ritThreshold = this.server.getConfiguration().
3317       getInt(HConstants.METRICS_RIT_STUCK_WARNING_THRESHOLD, 60000);
3318     for (RegionState state: regionStates.getRegionsInTransition().values()) {
3319       totalRITs++;
3320       long ritTime = currentTime - state.getStamp();
3321       if (ritTime > ritThreshold) { // more than the threshold
3322         totalRITsOverThreshold++;
3323       }
3324       if (oldestRITTime < ritTime) {
3325         oldestRITTime = ritTime;
3326       }
3327     }
3328     if (this.metricsAssignmentManager != null) {
3329       this.metricsAssignmentManager.updateRITOldestAge(oldestRITTime);
3330       this.metricsAssignmentManager.updateRITCount(totalRITs);
3331       this.metricsAssignmentManager.updateRITCountOverThreshold(totalRITsOverThreshold);
3332     }
3333   }
3334 
3335   /**
3336    * @param region Region whose plan we are to clear.
3337    */
3338   void clearRegionPlan(final HRegionInfo region) {
3339     synchronized (this.regionPlans) {
3340       this.regionPlans.remove(region.getEncodedName());
3341     }
3342   }
3343 
3344   /**
3345    * Wait on region to clear regions-in-transition.
3346    * @param hri Region to wait on.
3347    * @throws IOException
3348    */
3349   public void waitOnRegionToClearRegionsInTransition(final HRegionInfo hri)
3350       throws IOException, InterruptedException {
3351     waitOnRegionToClearRegionsInTransition(hri, -1L);
3352   }
3353 
3354   /**
3355    * Wait on region to clear regions-in-transition or time out
3356    * @param hri
3357    * @param timeOut Milliseconds to wait for current region to be out of transition state.
3358    * @return True when a region clears regions-in-transition before timeout otherwise false
3359    * @throws InterruptedException
3360    */
3361   public boolean waitOnRegionToClearRegionsInTransition(final HRegionInfo hri, long timeOut)
3362       throws InterruptedException {
3363     if (!regionStates.isRegionInTransition(hri)) return true;
3364     long end = (timeOut <= 0) ? Long.MAX_VALUE : EnvironmentEdgeManager.currentTime()
3365         + timeOut;
3366     // There is already a timeout monitor on regions in transition so I
3367     // should not have to have one here too?
3368     LOG.info("Waiting for " + hri.getEncodedName() +
3369         " to leave regions-in-transition, timeOut=" + timeOut + " ms.");
3370     while (!this.server.isStopped() && regionStates.isRegionInTransition(hri)) {
3371       regionStates.waitForUpdate(100);
3372       if (EnvironmentEdgeManager.currentTime() > end) {
3373         LOG.info("Timed out on waiting for " + hri.getEncodedName() + " to be assigned.");
3374         return false;
3375       }
3376     }
3377     if (this.server.isStopped()) {
3378       LOG.info("Giving up wait on regions in transition because stoppable.isStopped is set");
3379       return false;
3380     }
3381     return true;
3382   }
3383 
3384   void invokeAssign(HRegionInfo regionInfo) {
3385     invokeAssign(regionInfo, true);
3386   }
3387 
3388   void invokeAssign(HRegionInfo regionInfo, boolean newPlan) {
3389     threadPoolExecutorService.submit(new AssignCallable(this, regionInfo, newPlan));
3390   }
3391 
3392   void invokeUnAssign(HRegionInfo regionInfo) {
3393     threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
3394   }
3395 
3396   public ServerHostRegion isCarryingMeta(ServerName serverName) {
3397     return isCarryingRegion(serverName, HRegionInfo.FIRST_META_REGIONINFO);
3398   }
3399 
3400   public ServerHostRegion isCarryingMetaReplica(ServerName serverName, int replicaId) {
3401     return isCarryingRegion(serverName,
3402         RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO, replicaId));
3403   }
3404 
3405   public ServerHostRegion isCarryingMetaReplica(ServerName serverName, HRegionInfo metaHri) {
3406     return isCarryingRegion(serverName, metaHri);
3407   }
3408 
3409   /**
3410    * Check if the shutdown server carries the specific region.
3411    * We have a bunch of places that store region location
3412    * Those values aren't consistent. There is a delay of notification.
3413    * The location from zookeeper unassigned node has the most recent data;
3414    * but the node could be deleted after the region is opened by AM.
3415    * The AM's info could be old when OpenedRegionHandler
3416    * processing hasn't finished yet when server shutdown occurs.
3417    * @return whether the serverName currently hosts the region
3418    */
3419   private ServerHostRegion isCarryingRegion(ServerName serverName, HRegionInfo hri) {
3420     RegionTransition rt = null;
3421     try {
3422       byte [] data = ZKAssign.getData(watcher, hri.getEncodedName());
3423       // This call can legitimately come by null
3424       rt = data == null? null: RegionTransition.parseFrom(data);
3425     } catch (KeeperException e) {
3426       server.abort("Exception reading unassigned node for region=" + hri.getEncodedName(), e);
3427     } catch (DeserializationException e) {
3428       server.abort("Exception parsing unassigned node for region=" + hri.getEncodedName(), e);
3429     }
3430 
3431     ServerName addressFromZK = rt != null? rt.getServerName():  null;
3432     if (addressFromZK != null) {
3433       // if we get something from ZK, we will use the data
3434       boolean matchZK = addressFromZK.equals(serverName);
3435       LOG.debug("Checking region=" + hri.getRegionNameAsString() + ", zk server=" + addressFromZK +
3436         " current=" + serverName + ", matches=" + matchZK);
3437       return matchZK ? ServerHostRegion.HOSTING_REGION : ServerHostRegion.NOT_HOSTING_REGION;
3438     }
3439 
3440     ServerName addressFromAM = regionStates.getRegionServerOfRegion(hri);
3441     if (LOG.isDebugEnabled()) {
3442       LOG.debug("based on AM, current region=" + hri.getRegionNameAsString() +
3443         " is on server=" + (addressFromAM != null ? addressFromAM : "null") +
3444         " server being checked: " + serverName);
3445     }
3446     if (addressFromAM != null) {
3447       return addressFromAM.equals(serverName) ?
3448           ServerHostRegion.HOSTING_REGION : ServerHostRegion.NOT_HOSTING_REGION;
3449     }
3450 
3451     if (hri.isMetaRegion() && RegionReplicaUtil.isDefaultReplica(hri)) {
3452       // For the Meta region (default replica), we can do one more check on MetaTableLocator
3453       final ServerName serverNameInZK =
3454           server.getMetaTableLocator().getMetaRegionLocation(this.server.getZooKeeper());
3455       if (LOG.isDebugEnabled()) {
3456         LOG.debug("Based on MetaTableLocator, the META region is on server=" +
3457           (serverNameInZK == null ? "null" : serverNameInZK) +
3458           " server being checked: " + serverName);
3459       }
3460       if (serverNameInZK != null) {
3461         return serverNameInZK.equals(serverName) ?
3462             ServerHostRegion.HOSTING_REGION : ServerHostRegion.NOT_HOSTING_REGION;
3463       }
3464     }
3465 
3466     // Checked everywhere, if reaching here, we are unsure whether the server is carrying region.
3467     return ServerHostRegion.UNKNOWN;
3468   }
3469 
3470   /**
3471    * Clean out crashed server removing any assignments.
3472    * @param sn Server that went down.
3473    * @return list of regions in transition on this server
3474    */
3475   public List<HRegionInfo> cleanOutCrashedServerReferences(final ServerName sn) {
3476     // Clean out any existing assignment plans for this server
3477     synchronized (this.regionPlans) {
3478       for (Iterator <Map.Entry<String, RegionPlan>> i = this.regionPlans.entrySet().iterator();
3479           i.hasNext();) {
3480         Map.Entry<String, RegionPlan> e = i.next();
3481         ServerName otherSn = e.getValue().getDestination();
3482         // The name will be null if the region is planned for a random assign.
3483         if (otherSn != null && otherSn.equals(sn)) {
3484           // Use iterator's remove else we'll get CME
3485           i.remove();
3486         }
3487       }
3488     }
3489     List<HRegionInfo> regions = regionStates.serverOffline(watcher, sn);
3490     for (Iterator<HRegionInfo> it = regions.iterator(); it.hasNext(); ) {
3491       HRegionInfo hri = it.next();
3492       String encodedName = hri.getEncodedName();
3493 
3494       // We need a lock on the region as we could update it
3495       Lock lock = locker.acquireLock(encodedName);
3496       try {
3497         RegionState regionState = regionStates.getRegionTransitionState(encodedName);
3498         if (regionState == null
3499             || (regionState.getServerName() != null && !regionState.isOnServer(sn))
3500             || !(regionState.isFailedClose() || regionState.isOffline()
3501               || regionState.isPendingOpenOrOpening())) {
3502           LOG.info("Skip " + regionState + " since it is not opening/failed_close"
3503             + " on the dead server any more: " + sn);
3504           it.remove();
3505         } else {
3506           try {
3507             // Delete the ZNode if exists
3508             ZKAssign.deleteNodeFailSilent(watcher, hri);
3509           } catch (KeeperException ke) {
3510             server.abort("Unexpected ZK exception deleting node " + hri, ke);
3511           }
3512           if (tableStateManager.isTableState(hri.getTable(),
3513               ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3514             regionStates.regionOffline(hri);
3515             it.remove();
3516             continue;
3517           }
3518           // Mark the region offline and assign it again by SSH
3519           regionStates.updateRegionState(hri, State.OFFLINE);
3520         }
3521       } finally {
3522         lock.unlock();
3523       }
3524     }
3525     return regions;
3526   }
3527 
3528   /**
3529    * @param plan Plan to execute.
3530    */
3531   public void balance(final RegionPlan plan) {
3532 
3533     HRegionInfo hri = plan.getRegionInfo();
3534     TableName tableName = hri.getTable();
3535     if (tableStateManager.isTableState(tableName,
3536       ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3537       LOG.info("Ignored moving region of disabling/disabled table "
3538         + tableName);
3539       return;
3540     }
3541 
3542     // Move the region only if it's assigned
3543     String encodedName = hri.getEncodedName();
3544     ReentrantLock lock = locker.acquireLock(encodedName);
3545     try {
3546       if (!regionStates.isRegionOnline(hri)) {
3547         RegionState state = regionStates.getRegionState(encodedName);
3548         LOG.info("Ignored moving region not assigned: " + hri + ", "
3549           + (state == null ? "not in region states" : state));
3550         return;
3551       }
3552       synchronized (this.regionPlans) {
3553         this.regionPlans.put(plan.getRegionName(), plan);
3554       }
3555       unassign(hri, false, plan.getDestination());
3556     } finally {
3557       lock.unlock();
3558     }
3559   }
3560 
3561   public void stop() {
3562     shutdown(); // Stop executor service, etc
3563   }
3564 
3565   /**
3566    * Shutdown the threadpool executor service
3567    */
3568   public void shutdown() {
3569     // It's an immediate shutdown, so we're clearing the remaining tasks.
3570     synchronized (zkEventWorkerWaitingList){
3571       zkEventWorkerWaitingList.clear();
3572     }
3573 
3574     // Shutdown the threadpool executor service
3575     threadPoolExecutorService.shutdownNow();
3576     zkEventWorkers.shutdownNow();
3577     regionStateStore.stop();
3578   }
3579 
3580   protected void setEnabledTable(TableName tableName) {
3581     try {
3582       this.tableStateManager.setTableState(tableName,
3583         ZooKeeperProtos.Table.State.ENABLED);
3584     } catch (CoordinatedStateException e) {
3585       // here we can abort as it is the start up flow
3586       String errorMsg = "Unable to ensure that the table " + tableName
3587           + " will be" + " enabled because of a ZooKeeper issue";
3588       LOG.error(errorMsg);
3589       this.server.abort(errorMsg, e);
3590     }
3591   }
3592 
3593   /**
3594    * Set region as OFFLINED up in zookeeper asynchronously.
3595    * @param state
3596    * @return True if we succeeded, false otherwise (State was incorrect or failed
3597    * updating zk).
3598    */
3599   private boolean asyncSetOfflineInZooKeeper(final RegionState state,
3600       final AsyncCallback.StringCallback cb, final ServerName destination) {
3601     if (!state.isClosed() && !state.isOffline()) {
3602       this.server.abort("Unexpected state trying to OFFLINE; " + state,
3603         new IllegalStateException());
3604       return false;
3605     }
3606     regionStates.updateRegionState(state.getRegion(), State.OFFLINE);
3607     try {
3608       ZKAssign.asyncCreateNodeOffline(watcher, state.getRegion(),
3609         destination, cb, state);
3610     } catch (KeeperException e) {
3611       if (e instanceof NodeExistsException) {
3612         LOG.warn("Node for " + state.getRegion() + " already exists");
3613       } else {
3614         server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
3615       }
3616       return false;
3617     }
3618     return true;
3619   }
3620 
3621   private boolean deleteNodeInStates(String encodedName,
3622       String desc, ServerName sn, EventType... types) {
3623     try {
3624       for (EventType et: types) {
3625         if (ZKAssign.deleteNode(watcher, encodedName, et, sn)) {
3626           return true;
3627         }
3628       }
3629       LOG.info("Failed to delete the " + desc + " node for "
3630         + encodedName + ". The node type may not match");
3631     } catch (NoNodeException e) {
3632       if (LOG.isDebugEnabled()) {
3633         LOG.debug("The " + desc + " node for " + encodedName + " already deleted");
3634       }
3635     } catch (KeeperException ke) {
3636       server.abort("Unexpected ZK exception deleting " + desc
3637         + " node for the region " + encodedName, ke);
3638     }
3639     return false;
3640   }
3641 
3642   private void deleteMergingNode(String encodedName, ServerName sn) {
3643     deleteNodeInStates(encodedName, "merging", sn, EventType.RS_ZK_REGION_MERGING,
3644       EventType.RS_ZK_REQUEST_REGION_MERGE, EventType.RS_ZK_REGION_MERGED);
3645   }
3646 
3647   private void deleteSplittingNode(String encodedName, ServerName sn) {
3648     deleteNodeInStates(encodedName, "splitting", sn, EventType.RS_ZK_REGION_SPLITTING,
3649       EventType.RS_ZK_REQUEST_REGION_SPLIT, EventType.RS_ZK_REGION_SPLIT);
3650   }
3651 
3652   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
3653       value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
3654       justification="Modification of Maps not ATOMIC!!!! FIX!!!")
3655   private void onRegionFailedOpen(
3656       final HRegionInfo hri, final ServerName sn) {
3657     String encodedName = hri.getEncodedName();
3658     // FindBugs: AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION Worth fixing!!!
3659     AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
3660     if (failedOpenCount == null) {
3661       failedOpenCount = new AtomicInteger();
3662       // No need to use putIfAbsent, or extra synchronization since
3663       // this whole handleRegion block is locked on the encoded region
3664       // name, and failedOpenTracker is updated only in this block
3665       // FindBugs: AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION
3666       failedOpenTracker.put(encodedName, failedOpenCount);
3667     }
3668     if (failedOpenCount.incrementAndGet() >= maximumAttempts && !hri.isMetaRegion()) {
3669       // FindBugs: AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION
3670       regionStates.updateRegionState(hri, State.FAILED_OPEN);
3671       // remove the tracking info to save memory, also reset
3672       // the count for next open initiative
3673       failedOpenTracker.remove(encodedName);
3674     } else {
3675       if (hri.isMetaRegion() && failedOpenCount.get() >= maximumAttempts) {
3676         // Log a warning message if a meta region failedOpenCount exceeds maximumAttempts
3677         // so that we are aware of potential problem if it persists for a long time.
3678         LOG.warn("Failed to open the hbase:meta region " +
3679             hri.getRegionNameAsString() + " after" +
3680             failedOpenCount.get() + " retries. Continue retrying.");
3681       }
3682 
3683       // Handle this the same as if it were opened and then closed.
3684       RegionState regionState = regionStates.updateRegionState(hri, State.CLOSED);
3685       if (regionState != null) {
3686         // When there are more than one region server a new RS is selected as the
3687         // destination and the same is updated in the region plan. (HBASE-5546)
3688         if (getTableStateManager().isTableState(hri.getTable(),
3689             ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING) ||
3690             replicasToClose.contains(hri)) {
3691           offlineDisabledRegion(hri);
3692           return;
3693         }
3694         // ZK Node is in CLOSED state, assign it.
3695          regionStates.updateRegionState(hri, RegionState.State.CLOSED);
3696         // This below has to do w/ online enable/disable of a table
3697         removeClosedRegion(hri);
3698         try {
3699           getRegionPlan(hri, sn, true);
3700         } catch (HBaseIOException e) {
3701           LOG.warn("Failed to get region plan", e);
3702         }
3703         invokeAssign(hri, false);
3704       }
3705     }
3706   }
3707 
3708   private void onRegionOpen(final HRegionInfo hri, final ServerName sn, long openSeqNum) {
3709     regionOnline(hri, sn, openSeqNum);
3710     if (useZKForAssignment) {
3711       try {
3712         // Delete the ZNode if exists
3713         ZKAssign.deleteNodeFailSilent(watcher, hri);
3714       } catch (KeeperException ke) {
3715         server.abort("Unexpected ZK exception deleting node " + hri, ke);
3716       }
3717     }
3718 
3719     // reset the count, if any
3720     failedOpenTracker.remove(hri.getEncodedName());
3721     if (getTableStateManager().isTableState(hri.getTable(),
3722         ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3723       invokeUnAssign(hri);
3724     }
3725   }
3726 
3727   private void onRegionClosed(final HRegionInfo hri) {
3728     if (getTableStateManager().isTableState(hri.getTable(),
3729         ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING) ||
3730         replicasToClose.contains(hri)) {
3731       offlineDisabledRegion(hri);
3732       return;
3733     }
3734     regionStates.updateRegionState(hri, RegionState.State.CLOSED);
3735     sendRegionClosedNotification(hri);
3736     // This below has to do w/ online enable/disable of a table
3737     removeClosedRegion(hri);
3738     invokeAssign(hri, false);
3739   }
3740 
3741   private String checkInStateForSplit(ServerName sn,
3742       final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) {
3743     final RegionState rs_p = regionStates.getRegionState(p);
3744     RegionState rs_a = regionStates.getRegionState(a);
3745     RegionState rs_b = regionStates.getRegionState(b);
3746     if (!(rs_p.isOpenOrSplittingOnServer(sn)
3747         && (rs_a == null || rs_a.isOpenOrSplittingNewOnServer(sn))
3748         && (rs_b == null || rs_b.isOpenOrSplittingNewOnServer(sn)))) {
3749       return "Not in state good for split";
3750     }
3751     return "";
3752   }
3753 
3754   private String onRegionSplitReverted(ServerName sn,
3755       final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) {
3756     String s = checkInStateForSplit(sn, p, a, b);
3757     if (!org.apache.commons.lang.StringUtils.isEmpty(s)) {
3758       return s;
3759     }
3760 
3761     // Always bring the parent back online. Even if it's not offline
3762     // There's no harm in making it online again.
3763     regionOnline(p, sn);
3764 
3765     // Only offline the region if they are known to exist.
3766     RegionState regionStateA = regionStates.getRegionState(a);
3767     RegionState regionStateB = regionStates.getRegionState(b);
3768     if (regionStateA != null) {
3769       regionOffline(a);
3770     }
3771     if (regionStateB != null) {
3772       regionOffline(b);
3773     }
3774 
3775     if (getTableStateManager().isTableState(p.getTable(),
3776         ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3777       invokeUnAssign(p);
3778     }
3779     return null;
3780   }
3781 
3782   private String onRegionSplit(ServerName sn, TransitionCode code,
3783       final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) {
3784     String s = checkInStateForSplit(sn, p, a, b);
3785     if (!org.apache.commons.lang.StringUtils.isEmpty(s)) {
3786       return s;
3787     }
3788 
3789     regionStates.updateRegionState(a, State.SPLITTING_NEW, sn);
3790     regionStates.updateRegionState(b, State.SPLITTING_NEW, sn);
3791     regionStates.updateRegionState(p, State.SPLITTING);
3792 
3793     if (code == TransitionCode.SPLIT) {
3794       if (TEST_SKIP_SPLIT_HANDLING) {
3795         return "Skipping split message, TEST_SKIP_SPLIT_HANDLING is set";
3796       }
3797       regionOffline(p, State.SPLIT);
3798       regionOnline(a, sn, 1);
3799       regionOnline(b, sn, 1);
3800 
3801       // User could disable the table before master knows the new region.
3802       if (getTableStateManager().isTableState(p.getTable(),
3803           ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3804         invokeUnAssign(a);
3805         invokeUnAssign(b);
3806       } else {
3807         Callable<Object> splitReplicasCallable = new Callable<Object>() {
3808           @Override
3809           public Object call() {
3810             doSplittingOfReplicas(p, a, b);
3811             return null;
3812           }
3813         };
3814         threadPoolExecutorService.submit(splitReplicasCallable);
3815       }
3816     } else if (code == TransitionCode.SPLIT_PONR) {
3817       try {
3818         regionStates.splitRegion(p, a, b, sn);
3819       } catch (IOException ioe) {
3820         LOG.info("Failed to record split region " + p.getShortNameToLog());
3821         return "Failed to record the splitting in meta";
3822       }
3823     }
3824     return null;
3825   }
3826 
3827   private String onRegionMerge(ServerName sn, TransitionCode code,
3828       final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) {
3829     RegionState rs_p = regionStates.getRegionState(p);
3830     RegionState rs_a = regionStates.getRegionState(a);
3831     RegionState rs_b = regionStates.getRegionState(b);
3832     if (!(rs_a.isOpenOrMergingOnServer(sn) && rs_b.isOpenOrMergingOnServer(sn)
3833         && (rs_p == null || rs_p.isOpenOrMergingNewOnServer(sn)))) {
3834       return "Not in state good for merge";
3835     }
3836 
3837     regionStates.updateRegionState(a, State.MERGING);
3838     regionStates.updateRegionState(b, State.MERGING);
3839     regionStates.updateRegionState(p, State.MERGING_NEW, sn);
3840 
3841     String encodedName = p.getEncodedName();
3842     if (code == TransitionCode.READY_TO_MERGE) {
3843       mergingRegions.put(encodedName,
3844         new PairOfSameType<HRegionInfo>(a, b));
3845     } else if (code == TransitionCode.MERGED) {
3846       mergingRegions.remove(encodedName);
3847       regionOffline(a, State.MERGED);
3848       regionOffline(b, State.MERGED);
3849       regionOnline(p, sn, 1);
3850 
3851       // User could disable the table before master knows the new region.
3852       if (getTableStateManager().isTableState(p.getTable(),
3853           ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3854         invokeUnAssign(p);
3855       } else {
3856         Callable<Object> mergeReplicasCallable = new Callable<Object>() {
3857           @Override
3858           public Object call() {
3859             doMergingOfReplicas(p, a, b);
3860             return null;
3861           }
3862         };
3863         threadPoolExecutorService.submit(mergeReplicasCallable);
3864       }
3865     } else if (code == TransitionCode.MERGE_PONR) {
3866       try {
3867         regionStates.mergeRegions(p, a, b, sn);
3868       } catch (IOException ioe) {
3869         LOG.info("Failed to record merged region " + p.getShortNameToLog());
3870         return "Failed to record the merging in meta";
3871       }
3872     } else {
3873       mergingRegions.remove(encodedName);
3874       regionOnline(a, sn);
3875       regionOnline(b, sn);
3876       regionOffline(p);
3877 
3878       if (getTableStateManager().isTableState(p.getTable(),
3879           ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3880         invokeUnAssign(a);
3881         invokeUnAssign(b);
3882       }
3883     }
3884     return null;
3885   }
3886 
3887   /**
3888    * A helper to handle region merging transition event.
3889    * It transitions merging regions to MERGING state.
3890    */
3891   private boolean handleRegionMerging(final RegionTransition rt, final String encodedName,
3892       final String prettyPrintedRegionName, final ServerName sn) {
3893     if (!serverManager.isServerOnline(sn)) {
3894       LOG.warn("Dropped merging! ServerName=" + sn + " unknown.");
3895       return false;
3896     }
3897     byte [] payloadOfMerging = rt.getPayload();
3898     List<HRegionInfo> mergingRegions;
3899     try {
3900       mergingRegions = HRegionInfo.parseDelimitedFrom(
3901         payloadOfMerging, 0, payloadOfMerging.length);
3902     } catch (IOException e) {
3903       LOG.error("Dropped merging! Failed reading "  + rt.getEventType()
3904         + " payload for " + prettyPrintedRegionName);
3905       return false;
3906     }
3907     assert mergingRegions.size() == 3;
3908     HRegionInfo p = mergingRegions.get(0);
3909     HRegionInfo hri_a = mergingRegions.get(1);
3910     HRegionInfo hri_b = mergingRegions.get(2);
3911 
3912     RegionState rs_p = regionStates.getRegionState(p);
3913     RegionState rs_a = regionStates.getRegionState(hri_a);
3914     RegionState rs_b = regionStates.getRegionState(hri_b);
3915 
3916     if (!((rs_a == null || rs_a.isOpenOrMergingOnServer(sn))
3917         && (rs_b == null || rs_b.isOpenOrMergingOnServer(sn))
3918         && (rs_p == null || rs_p.isOpenOrMergingNewOnServer(sn)))) {
3919       LOG.warn("Dropped merging! Not in state good for MERGING; rs_p="
3920         + rs_p + ", rs_a=" + rs_a + ", rs_b=" + rs_b);
3921       return false;
3922     }
3923 
3924     EventType et = rt.getEventType();
3925     if (et == EventType.RS_ZK_REQUEST_REGION_MERGE) {
3926       try {
3927         RegionMergeCoordination.RegionMergeDetails std =
3928             ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
3929                 .getRegionMergeCoordination().getDefaultDetails();
3930         ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
3931             .getRegionMergeCoordination().processRegionMergeRequest(p, hri_a, hri_b, sn, std);
3932         if (((ZkRegionMergeCoordination.ZkRegionMergeDetails) std).getZnodeVersion() == -1) {
3933           byte[] data = ZKAssign.getData(watcher, encodedName);
3934          EventType currentType = null;
3935           if (data != null) {
3936             RegionTransition newRt = RegionTransition.parseFrom(data);
3937             currentType = newRt.getEventType();
3938           }
3939           if (currentType == null || (currentType != EventType.RS_ZK_REGION_MERGED
3940               && currentType != EventType.RS_ZK_REGION_MERGING)) {
3941             LOG.warn("Failed to transition pending_merge node "
3942               + encodedName + " to merging, it's now " + currentType);
3943             return false;
3944           }
3945         }
3946       } catch (Exception e) {
3947         LOG.warn("Failed to transition pending_merge node "
3948           + encodedName + " to merging", e);
3949         return false;
3950       }
3951     }
3952 
3953     synchronized (regionStates) {
3954       regionStates.updateRegionState(hri_a, State.MERGING);
3955       regionStates.updateRegionState(hri_b, State.MERGING);
3956       regionStates.updateRegionState(p, State.MERGING_NEW, sn);
3957 
3958       if (et != EventType.RS_ZK_REGION_MERGED) {
3959         this.mergingRegions.put(encodedName,
3960           new PairOfSameType<HRegionInfo>(hri_a, hri_b));
3961       } else {
3962         this.mergingRegions.remove(encodedName);
3963         regionOffline(hri_a, State.MERGED);
3964         regionOffline(hri_b, State.MERGED);
3965         regionOnline(p, sn);
3966       }
3967     }
3968 
3969     if (et == EventType.RS_ZK_REGION_MERGED) {
3970       doMergingOfReplicas(p, hri_a, hri_b);
3971       LOG.debug("Handling MERGED event for " + encodedName + "; deleting node");
3972       // Remove region from ZK
3973       try {
3974         boolean successful = false;
3975         while (!successful) {
3976           // It's possible that the RS tickles in between the reading of the
3977           // znode and the deleting, so it's safe to retry.
3978           successful = ZKAssign.deleteNode(watcher, encodedName,
3979             EventType.RS_ZK_REGION_MERGED, sn);
3980         }
3981       } catch (KeeperException e) {
3982         if (e instanceof NoNodeException) {
3983           String znodePath = ZKUtil.joinZNode(watcher.splitLogZNode, encodedName);
3984           LOG.debug("The znode " + znodePath + " does not exist.  May be deleted already.");
3985         } else {
3986           server.abort("Error deleting MERGED node " + encodedName, e);
3987         }
3988       }
3989       LOG.info("Handled MERGED event; merged=" + p.getRegionNameAsString()
3990         + ", region_a=" + hri_a.getRegionNameAsString() + ", region_b="
3991         + hri_b.getRegionNameAsString() + ", on " + sn);
3992 
3993       // User could disable the table before master knows the new region.
3994       if (tableStateManager.isTableState(p.getTable(),
3995           ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3996         unassign(p);
3997       }
3998     }
3999     return true;
4000   }
4001 
4002   /**
4003    * A helper to handle region splitting transition event.
4004    */
4005   private boolean handleRegionSplitting(final RegionTransition rt, final String encodedName,
4006       final String prettyPrintedRegionName, final ServerName sn) {
4007     if (!serverManager.isServerOnline(sn)) {
4008       LOG.warn("Dropped splitting! ServerName=" + sn + " unknown.");
4009       return false;
4010     }
4011     byte [] payloadOfSplitting = rt.getPayload();
4012     List<HRegionInfo> splittingRegions;
4013     try {
4014       splittingRegions = HRegionInfo.parseDelimitedFrom(
4015         payloadOfSplitting, 0, payloadOfSplitting.length);
4016     } catch (IOException e) {
4017       LOG.error("Dropped splitting! Failed reading " + rt.getEventType()
4018         + " payload for " + prettyPrintedRegionName);
4019       return false;
4020     }
4021     assert splittingRegions.size() == 2;
4022     HRegionInfo hri_a = splittingRegions.get(0);
4023     HRegionInfo hri_b = splittingRegions.get(1);
4024 
4025     RegionState rs_p = regionStates.getRegionState(encodedName);
4026     RegionState rs_a = regionStates.getRegionState(hri_a);
4027     RegionState rs_b = regionStates.getRegionState(hri_b);
4028 
4029     if (!((rs_p == null || rs_p.isOpenOrSplittingOnServer(sn))
4030         && (rs_a == null || rs_a.isOpenOrSplittingNewOnServer(sn))
4031         && (rs_b == null || rs_b.isOpenOrSplittingNewOnServer(sn)))) {
4032       LOG.warn("Dropped splitting! Not in state good for SPLITTING; rs_p="
4033         + rs_p + ", rs_a=" + rs_a + ", rs_b=" + rs_b);
4034       return false;
4035     }
4036 
4037     if (rs_p == null) {
4038       // Splitting region should be online
4039       rs_p = regionStates.updateRegionState(rt, State.OPEN);
4040       if (rs_p == null) {
4041         LOG.warn("Received splitting for region " + prettyPrintedRegionName
4042           + " from server " + sn + " but it doesn't exist anymore,"
4043           + " probably already processed its split");
4044         return false;
4045       }
4046       regionStates.regionOnline(rs_p.getRegion(), sn);
4047     }
4048 
4049     HRegionInfo p = rs_p.getRegion();
4050     EventType et = rt.getEventType();
4051     if (et == EventType.RS_ZK_REQUEST_REGION_SPLIT) {
4052       try {
4053         SplitTransactionDetails std =
4054             ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
4055                 .getSplitTransactionCoordination().getDefaultDetails();
4056         if (((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
4057             .getSplitTransactionCoordination().processTransition(p, hri_a, hri_b, sn, std) == -1) {
4058           byte[] data = ZKAssign.getData(watcher, encodedName);
4059           EventType currentType = null;
4060           if (data != null) {
4061             RegionTransition newRt = RegionTransition.parseFrom(data);
4062             currentType = newRt.getEventType();
4063           }
4064           if (currentType == null
4065               || (currentType != EventType.RS_ZK_REGION_SPLIT && currentType != EventType.RS_ZK_REGION_SPLITTING)) {
4066             LOG.warn("Failed to transition pending_split node " + encodedName
4067                 + " to splitting, it's now " + currentType);
4068             return false;
4069           }
4070         }
4071       } catch (Exception e) {
4072         LOG.warn("Failed to transition pending_split node " + encodedName + " to splitting", e);
4073         return false;
4074       }
4075     }
4076 
4077     synchronized (regionStates) {
4078       splitRegions.put(p, new PairOfSameType<HRegionInfo>(hri_a, hri_b));
4079       regionStates.updateRegionState(hri_a, State.SPLITTING_NEW, sn);
4080       regionStates.updateRegionState(hri_b, State.SPLITTING_NEW, sn);
4081       regionStates.updateRegionState(rt, State.SPLITTING);
4082 
4083       // The below is for testing ONLY!  We can't do fault injection easily, so
4084       // resort to this kinda uglyness -- St.Ack 02/25/2011.
4085       if (TEST_SKIP_SPLIT_HANDLING) {
4086         LOG.warn("Skipping split message, TEST_SKIP_SPLIT_HANDLING is set");
4087         return true; // return true so that the splitting node stays
4088       }
4089 
4090       if (et == EventType.RS_ZK_REGION_SPLIT) {
4091         regionOffline(p, State.SPLIT);
4092         regionOnline(hri_a, sn);
4093         regionOnline(hri_b, sn);
4094         splitRegions.remove(p);
4095       }
4096     }
4097 
4098     if (et == EventType.RS_ZK_REGION_SPLIT) {
4099       // split replicas
4100       doSplittingOfReplicas(rs_p.getRegion(), hri_a, hri_b);
4101       LOG.debug("Handling SPLIT event for " + encodedName + "; deleting node");
4102       // Remove region from ZK
4103       try {
4104         boolean successful = false;
4105         while (!successful) {
4106           // It's possible that the RS tickles in between the reading of the
4107           // znode and the deleting, so it's safe to retry.
4108           successful = ZKAssign.deleteNode(watcher, encodedName,
4109             EventType.RS_ZK_REGION_SPLIT, sn);
4110         }
4111       } catch (KeeperException e) {
4112         if (e instanceof NoNodeException) {
4113           String znodePath = ZKUtil.joinZNode(watcher.splitLogZNode, encodedName);
4114           LOG.debug("The znode " + znodePath + " does not exist.  May be deleted already.");
4115         } else {
4116           server.abort("Error deleting SPLIT node " + encodedName, e);
4117         }
4118       }
4119       LOG.info("Handled SPLIT event; parent=" + p.getRegionNameAsString()
4120         + ", daughter a=" + hri_a.getRegionNameAsString() + ", daughter b="
4121         + hri_b.getRegionNameAsString() + ", on " + sn);
4122 
4123       // User could disable the table before master knows the new region.
4124       if (tableStateManager.isTableState(p.getTable(),
4125           ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
4126         unassign(hri_a);
4127         unassign(hri_b);
4128       }
4129     }
4130     return true;
4131   }
4132 
4133   private void doMergingOfReplicas(HRegionInfo mergedHri, final HRegionInfo hri_a,
4134       final HRegionInfo hri_b) {
4135     // Close replicas for the original unmerged regions. create/assign new replicas
4136     // for the merged parent.
4137     List<HRegionInfo> unmergedRegions = new ArrayList<HRegionInfo>();
4138     unmergedRegions.add(hri_a);
4139     unmergedRegions.add(hri_b);
4140     Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(unmergedRegions);
4141     Collection<List<HRegionInfo>> c = map.values();
4142     for (List<HRegionInfo> l : c) {
4143       for (HRegionInfo h : l) {
4144         if (!RegionReplicaUtil.isDefaultReplica(h)) {
4145           LOG.debug("Unassigning un-merged replica " + h);
4146           unassign(h);
4147         }
4148       }
4149     }
4150     int numReplicas = 1;
4151     try {
4152       numReplicas = server.getTableDescriptors().get(mergedHri.getTable()).
4153           getRegionReplication();
4154     } catch (IOException e) {
4155       LOG.warn("Couldn't get the replication attribute of the table " + mergedHri.getTable() +
4156           " due to " + e.getMessage() + ". The assignment of replicas for the merged region " +
4157           "will not be done");
4158     }
4159     List<HRegionInfo> regions = new ArrayList<HRegionInfo>();
4160     for (int i = 1; i < numReplicas; i++) {
4161       regions.add(RegionReplicaUtil.getRegionInfoForReplica(mergedHri, i));
4162     }
4163     try {
4164       assign(regions);
4165     } catch (IOException ioe) {
4166       LOG.warn("Couldn't assign all replica(s) of region " + mergedHri + " because of " +
4167                 ioe.getMessage());
4168     } catch (InterruptedException ie) {
4169       LOG.warn("Couldn't assign all replica(s) of region " + mergedHri+ " because of " +
4170                 ie.getMessage());
4171     }
4172   }
4173 
4174   private void doSplittingOfReplicas(final HRegionInfo parentHri, final HRegionInfo hri_a,
4175       final HRegionInfo hri_b) {
4176     // create new regions for the replica, and assign them to match with the
4177     // current replica assignments. If replica1 of parent is assigned to RS1,
4178     // the replica1s of daughters will be on the same machine
4179     int numReplicas = 1;
4180     try {
4181       numReplicas = server.getTableDescriptors().get(parentHri.getTable()).
4182           getRegionReplication();
4183     } catch (IOException e) {
4184       LOG.warn("Couldn't get the replication attribute of the table " + parentHri.getTable() +
4185           " due to " + e.getMessage() + ". The assignment of daughter replicas " +
4186           "replicas will not be done");
4187     }
4188     // unassign the old replicas
4189     List<HRegionInfo> parentRegion = new ArrayList<HRegionInfo>();
4190     parentRegion.add(parentHri);
4191     Map<ServerName, List<HRegionInfo>> currentAssign =
4192         regionStates.getRegionAssignments(parentRegion);
4193     Collection<List<HRegionInfo>> c = currentAssign.values();
4194     for (List<HRegionInfo> l : c) {
4195       for (HRegionInfo h : l) {
4196         if (!RegionReplicaUtil.isDefaultReplica(h)) {
4197           LOG.debug("Unassigning parent's replica " + h);
4198           unassign(h);
4199         }
4200       }
4201     }
4202     // assign daughter replicas
4203     Map<HRegionInfo, ServerName> map = new HashMap<HRegionInfo, ServerName>();
4204     for (int i = 1; i < numReplicas; i++) {
4205       prepareDaughterReplicaForAssignment(hri_a, parentHri, i, map);
4206       prepareDaughterReplicaForAssignment(hri_b, parentHri, i, map);
4207     }
4208     try {
4209       assign(map);
4210     } catch (IOException e) {
4211       LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)");
4212     } catch (InterruptedException e) {
4213       LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)");
4214     }
4215   }
4216 
4217   private void prepareDaughterReplicaForAssignment(HRegionInfo daughterHri, HRegionInfo parentHri,
4218       int replicaId, Map<HRegionInfo, ServerName> map) {
4219     HRegionInfo parentReplica = RegionReplicaUtil.getRegionInfoForReplica(parentHri, replicaId);
4220     HRegionInfo daughterReplica = RegionReplicaUtil.getRegionInfoForReplica(daughterHri,
4221         replicaId);
4222     LOG.debug("Created replica region for daughter " + daughterReplica);
4223     ServerName sn;
4224     if ((sn = regionStates.getRegionServerOfRegion(parentReplica)) != null) {
4225       map.put(daughterReplica, sn);
4226     } else {
4227       List<ServerName> servers = serverManager.getOnlineServersList();
4228       sn = servers.get((new Random(System.currentTimeMillis())).nextInt(servers.size()));
4229       map.put(daughterReplica, sn);
4230     }
4231   }
4232 
4233   public Set<HRegionInfo> getReplicasToClose() {
4234     return replicasToClose;
4235   }
4236 
4237   /**
4238    * A region is offline.  The new state should be the specified one,
4239    * if not null.  If the specified state is null, the new state is Offline.
4240    * The specified state can be Split/Merged/Offline/null only.
4241    */
4242   private void regionOffline(final HRegionInfo regionInfo, final State state) {
4243     regionStates.regionOffline(regionInfo, state);
4244     removeClosedRegion(regionInfo);
4245     // remove the region plan as well just in case.
4246     clearRegionPlan(regionInfo);
4247     balancer.regionOffline(regionInfo);
4248 
4249     // Tell our listeners that a region was closed
4250     sendRegionClosedNotification(regionInfo);
4251     // also note that all the replicas of the primary should be closed
4252     if (state != null && state.equals(State.SPLIT)) {
4253       Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1);
4254       c.add(regionInfo);
4255       Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(c);
4256       Collection<List<HRegionInfo>> allReplicas = map.values();
4257       for (List<HRegionInfo> list : allReplicas) {
4258         replicasToClose.addAll(list);
4259       }
4260     }
4261     else if (state != null && state.equals(State.MERGED)) {
4262       Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1);
4263       c.add(regionInfo);
4264       Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(c);
4265       Collection<List<HRegionInfo>> allReplicas = map.values();
4266       for (List<HRegionInfo> list : allReplicas) {
4267         replicasToClose.addAll(list);
4268       }
4269     }
4270   }
4271 
4272   private void sendRegionOpenedNotification(final HRegionInfo regionInfo,
4273       final ServerName serverName) {
4274     if (!this.listeners.isEmpty()) {
4275       for (AssignmentListener listener : this.listeners) {
4276         listener.regionOpened(regionInfo, serverName);
4277       }
4278     }
4279   }
4280 
4281   private void sendRegionClosedNotification(final HRegionInfo regionInfo) {
4282     if (!this.listeners.isEmpty()) {
4283       for (AssignmentListener listener : this.listeners) {
4284         listener.regionClosed(regionInfo);
4285       }
4286     }
4287   }
4288 
4289   /**
4290    * Try to update some region states. If the state machine prevents
4291    * such update, an error message is returned to explain the reason.
4292    *
4293    * It's expected that in each transition there should have just one
4294    * region for opening/closing, 3 regions for splitting/merging.
4295    * These regions should be on the server that requested the change.
4296    *
4297    * Region state machine. Only these transitions
4298    * are expected to be triggered by a region server.
4299    *
4300    * On the state transition:
4301    *  (1) Open/Close should be initiated by master
4302    *      (a) Master sets the region to pending_open/pending_close
4303    *        in memory and hbase:meta after sending the request
4304    *        to the region server
4305    *      (b) Region server reports back to the master
4306    *        after open/close is done (either success/failure)
4307    *      (c) If region server has problem to report the status
4308    *        to master, it must be because the master is down or some
4309    *        temporary network issue. Otherwise, the region server should
4310    *        abort since it must be a bug. If the master is not accessible,
4311    *        the region server should keep trying until the server is
4312    *        stopped or till the status is reported to the (new) master
4313    *      (d) If region server dies in the middle of opening/closing
4314    *        a region, SSH picks it up and finishes it
4315    *      (e) If master dies in the middle, the new master recovers
4316    *        the state during initialization from hbase:meta. Region server
4317    *        can report any transition that has not been reported to
4318    *        the previous active master yet
4319    *  (2) Split/merge is initiated by region servers
4320    *      (a) To split a region, a region server sends a request
4321    *        to master to try to set a region to splitting, together with
4322    *        two daughters (to be created) to splitting new. If approved
4323    *        by the master, the splitting can then move ahead
4324    *      (b) To merge two regions, a region server sends a request to
4325    *        master to try to set the new merged region (to be created) to
4326    *        merging_new, together with two regions (to be merged) to merging.
4327    *        If it is ok with the master, the merge can then move ahead
4328    *      (c) Once the splitting/merging is done, the region server
4329    *        reports the status back to the master either success/failure.
4330    *      (d) Other scenarios should be handled similarly as for
4331    *        region open/close
4332    */
4333   protected String onRegionTransition(final ServerName serverName,
4334       final RegionStateTransition transition) {
4335     TransitionCode code = transition.getTransitionCode();
4336     HRegionInfo hri = HRegionInfo.convert(transition.getRegionInfo(0));
4337     RegionState current = regionStates.getRegionState(hri);
4338     if (LOG.isDebugEnabled()) {
4339       LOG.debug("Got transition " + code + " for "
4340         + (current != null ? current.toString() : hri.getShortNameToLog())
4341         + " from " + serverName);
4342     }
4343     String errorMsg = null;
4344     switch (code) {
4345     case OPENED:
4346       if (current != null && current.isOpened() && current.isOnServer(serverName)) {
4347         LOG.info("Region " + hri.getShortNameToLog() + " is already " + current.getState() + " on "
4348             + serverName);
4349         break;
4350       }
4351     case FAILED_OPEN:
4352       if (current == null
4353           || !current.isPendingOpenOrOpeningOnServer(serverName)) {
4354         errorMsg = hri.getShortNameToLog()
4355           + " is not pending open on " + serverName;
4356       } else if (code == TransitionCode.FAILED_OPEN) {
4357         onRegionFailedOpen(hri, serverName);
4358       } else {
4359         long openSeqNum = HConstants.NO_SEQNUM;
4360         if (transition.hasOpenSeqNum()) {
4361           openSeqNum = transition.getOpenSeqNum();
4362         }
4363         if (openSeqNum < 0) {
4364           errorMsg = "Newly opened region has invalid open seq num " + openSeqNum;
4365         } else {
4366           onRegionOpen(hri, serverName, openSeqNum);
4367         }
4368       }
4369       break;
4370 
4371     case CLOSED:
4372       if (current == null
4373           || !current.isPendingCloseOrClosingOnServer(serverName)) {
4374         errorMsg = hri.getShortNameToLog()
4375           + " is not pending close on " + serverName;
4376       } else {
4377         onRegionClosed(hri);
4378       }
4379       break;
4380 
4381     case READY_TO_SPLIT:
4382       try {
4383         regionStateListener.onRegionSplit(hri);
4384       } catch (IOException exp) {
4385         errorMsg = StringUtils.stringifyException(exp);
4386       }
4387       break;
4388     case SPLIT_PONR:
4389     case SPLIT:
4390       errorMsg =
4391       onRegionSplit(serverName, code, hri, HRegionInfo.convert(transition.getRegionInfo(1)),
4392         HRegionInfo.convert(transition.getRegionInfo(2)));
4393       break;
4394 
4395     case SPLIT_REVERTED:
4396       errorMsg =
4397           onRegionSplitReverted(serverName, hri,
4398             HRegionInfo.convert(transition.getRegionInfo(1)),
4399             HRegionInfo.convert(transition.getRegionInfo(2)));
4400       if (org.apache.commons.lang.StringUtils.isEmpty(errorMsg)) {
4401         try {
4402           regionStateListener.onRegionSplitReverted(hri);
4403         } catch (IOException exp) {
4404           LOG.warn(StringUtils.stringifyException(exp));
4405         }
4406       }
4407       break;
4408     case READY_TO_MERGE:
4409     case MERGE_PONR:
4410     case MERGED:
4411     case MERGE_REVERTED:
4412       errorMsg = onRegionMerge(serverName, code, hri,
4413         HRegionInfo.convert(transition.getRegionInfo(1)),
4414         HRegionInfo.convert(transition.getRegionInfo(2)));
4415       if (code == TransitionCode.MERGED && org.apache.commons.lang.StringUtils.isEmpty(errorMsg)) {
4416         try {
4417           regionStateListener.onRegionMerged(hri);
4418         } catch (IOException exp) {
4419           errorMsg = StringUtils.stringifyException(exp);
4420         }
4421       }
4422       break;
4423 
4424     default:
4425       errorMsg = "Unexpected transition code " + code;
4426     }
4427     if (errorMsg != null) {
4428       LOG.error("Failed to transtion region from " + current + " to "
4429         + code + " by " + serverName + ": " + errorMsg);
4430     }
4431     return errorMsg;
4432   }
4433 
4434   /**
4435    * @return Instance of load balancer
4436    */
4437   public LoadBalancer getBalancer() {
4438     return this.balancer;
4439   }
4440 
4441   public Map<ServerName, List<HRegionInfo>>
4442     getSnapShotOfAssignment(Collection<HRegionInfo> infos) {
4443     return getRegionStates().getRegionAssignments(infos);
4444   }
4445 
4446   void setRegionStateListener(RegionStateListener listener) {
4447     this.regionStateListener = listener;
4448   }
4449 }