View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.util.ArrayList;
24  import java.util.Arrays;
25  import java.util.Collection;
26  import java.util.Collections;
27  import java.util.HashMap;
28  import java.util.HashSet;
29  import java.util.Iterator;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.NavigableMap;
33  import java.util.Random;
34  import java.util.Set;
35  import java.util.TreeMap;
36  import java.util.concurrent.Callable;
37  import java.util.concurrent.ConcurrentHashMap;
38  import java.util.concurrent.CopyOnWriteArrayList;
39  import java.util.concurrent.ThreadFactory;
40  import java.util.concurrent.TimeUnit;
41  import java.util.concurrent.atomic.AtomicBoolean;
42  import java.util.concurrent.atomic.AtomicInteger;
43  import java.util.concurrent.locks.Lock;
44  import java.util.concurrent.locks.ReentrantLock;
45  
46  import org.apache.commons.logging.Log;
47  import org.apache.commons.logging.LogFactory;
48  import org.apache.hadoop.conf.Configuration;
49  import org.apache.hadoop.fs.FileSystem;
50  import org.apache.hadoop.fs.Path;
51  import org.apache.hadoop.hbase.CoordinatedStateException;
52  import org.apache.hadoop.hbase.HBaseIOException;
53  import org.apache.hadoop.hbase.HConstants;
54  import org.apache.hadoop.hbase.HRegionInfo;
55  import org.apache.hadoop.hbase.HRegionLocation;
56  import org.apache.hadoop.hbase.HTableDescriptor;
57  import org.apache.hadoop.hbase.MetaTableAccessor;
58  import org.apache.hadoop.hbase.NotServingRegionException;
59  import org.apache.hadoop.hbase.RegionLocations;
60  import org.apache.hadoop.hbase.RegionStateListener;
61  import org.apache.hadoop.hbase.RegionTransition;
62  import org.apache.hadoop.hbase.ServerName;
63  import org.apache.hadoop.hbase.TableName;
64  import org.apache.hadoop.hbase.TableNotFoundException;
65  import org.apache.hadoop.hbase.TableStateManager;
66  import org.apache.hadoop.hbase.classification.InterfaceAudience;
67  import org.apache.hadoop.hbase.client.RegionReplicaUtil;
68  import org.apache.hadoop.hbase.client.Result;
69  import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
70  import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
71  import org.apache.hadoop.hbase.coordination.RegionMergeCoordination;
72  import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination.SplitTransactionDetails;
73  import org.apache.hadoop.hbase.coordination.ZkOpenRegionCoordination;
74  import org.apache.hadoop.hbase.coordination.ZkRegionMergeCoordination;
75  import org.apache.hadoop.hbase.exceptions.DeserializationException;
76  import org.apache.hadoop.hbase.executor.EventHandler;
77  import org.apache.hadoop.hbase.executor.EventType;
78  import org.apache.hadoop.hbase.executor.ExecutorService;
79  import org.apache.hadoop.hbase.ipc.FailedServerException;
80  import org.apache.hadoop.hbase.ipc.RpcClient;
81  import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
82  import org.apache.hadoop.hbase.master.RegionState.State;
83  import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper;
84  import org.apache.hadoop.hbase.master.balancer.FavoredNodeLoadBalancer;
85  import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler;
86  import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
87  import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
88  import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler;
89  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
90  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
91  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
92  import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
93  import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
94  import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
95  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
96  import org.apache.hadoop.hbase.util.ConfigUtil;
97  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
98  import org.apache.hadoop.hbase.util.FSUtils;
99  import org.apache.hadoop.hbase.util.KeyLocker;
100 import org.apache.hadoop.hbase.util.Pair;
101 import org.apache.hadoop.hbase.util.PairOfSameType;
102 import org.apache.hadoop.hbase.util.Threads;
103 import org.apache.hadoop.hbase.util.Triple;
104 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
105 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
106 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
107 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
108 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
109 import org.apache.hadoop.ipc.RemoteException;
110 import org.apache.hadoop.util.StringUtils;
111 import org.apache.zookeeper.AsyncCallback;
112 import org.apache.zookeeper.KeeperException;
113 import org.apache.zookeeper.KeeperException.NoNodeException;
114 import org.apache.zookeeper.KeeperException.NodeExistsException;
115 import org.apache.zookeeper.data.Stat;
116 
117 import com.google.common.annotations.VisibleForTesting;
118 import com.google.common.collect.LinkedHashMultimap;
119 
120 /**
121  * Manages and performs region assignment.
122  * <p>
123  * Monitors ZooKeeper for events related to regions in transition.
124  * <p>
125  * Handles existing regions in transition during master failover.
126  */
127 @InterfaceAudience.Private
128 public class AssignmentManager extends ZooKeeperListener {
129   private static final Log LOG = LogFactory.getLog(AssignmentManager.class);
130 
131   public static final ServerName HBCK_CODE_SERVERNAME = ServerName.valueOf(HConstants.HBCK_CODE_NAME,
132       -1, -1L);
133 
134   static final String ALREADY_IN_TRANSITION_WAITTIME
135     = "hbase.assignment.already.intransition.waittime";
136   static final int DEFAULT_ALREADY_IN_TRANSITION_WAITTIME = 60000; // 1 minute
137 
138   protected final MasterServices server;
139 
140   private ServerManager serverManager;
141 
142   private boolean shouldAssignRegionsWithFavoredNodes;
143 
144   private LoadBalancer balancer;
145 
146   private final MetricsAssignmentManager metricsAssignmentManager;
147 
148   private final TableLockManager tableLockManager;
149 
150   private AtomicInteger numRegionsOpened = new AtomicInteger(0);
151 
152   final private KeyLocker<String> locker = new KeyLocker<String>();
153 
154   Set<HRegionInfo> replicasToClose = Collections.synchronizedSet(new HashSet<HRegionInfo>());
155 
156   /**
157    * Map of regions to reopen after the schema of a table is changed. Key -
158    * encoded region name, value - HRegionInfo
159    */
160   private final Map <String, HRegionInfo> regionsToReopen;
161 
162   /*
163    * Maximum times we recurse an assignment/unassignment.
164    * See below in {@link #assign()} and {@link #unassign()}.
165    */
166   private final int maximumAttempts;
167 
168   /**
169    * Map of two merging regions from the region to be created.
170    */
171   private final Map<String, PairOfSameType<HRegionInfo>> mergingRegions
172     = new HashMap<String, PairOfSameType<HRegionInfo>>();
173 
174   private final Map<HRegionInfo, PairOfSameType<HRegionInfo>> splitRegions
175   = new HashMap<HRegionInfo, PairOfSameType<HRegionInfo>>();
176 
177   /**
178    * The sleep time for which the assignment will wait before retrying in case of hbase:meta assignment
179    * failure due to lack of availability of region plan or bad region plan
180    */
181   private final long sleepTimeBeforeRetryingMetaAssignment;
182 
183   /** Plans for region movement. Key is the encoded version of a region name*/
184   // TODO: When do plans get cleaned out?  Ever? In server open and in server
185   // shutdown processing -- St.Ack
186   // All access to this Map must be synchronized.
187   final NavigableMap<String, RegionPlan> regionPlans =
188     new TreeMap<String, RegionPlan>();
189 
190   private final TableStateManager tableStateManager;
191 
192   private final ExecutorService executorService;
193 
194   // For unit tests, keep track of calls to ClosedRegionHandler
195   private Map<HRegionInfo, AtomicBoolean> closedRegionHandlerCalled = null;
196 
197   // For unit tests, keep track of calls to OpenedRegionHandler
198   private Map<HRegionInfo, AtomicBoolean> openedRegionHandlerCalled = null;
199 
200   //Thread pool executor service for timeout monitor
201   private java.util.concurrent.ExecutorService threadPoolExecutorService;
202 
203   // A bunch of ZK events workers. Each is a single thread executor service
204   private final java.util.concurrent.ExecutorService zkEventWorkers;
205 
206   private List<EventType> ignoreStatesRSOffline = Arrays.asList(
207       EventType.RS_ZK_REGION_FAILED_OPEN, EventType.RS_ZK_REGION_CLOSED);
208 
209   private final RegionStates regionStates;
210 
211   // The threshold to use bulk assigning. Using bulk assignment
212   // only if assigning at least this many regions to at least this
213   // many servers. If assigning fewer regions to fewer servers,
214   // bulk assigning may be not as efficient.
215   private final int bulkAssignThresholdRegions;
216   private final int bulkAssignThresholdServers;
217   private final int bulkPerRegionOpenTimeGuesstimate;
218 
219   // Should bulk assignment wait till all regions are assigned,
220   // or it is timed out?  This is useful to measure bulk assignment
221   // performance, but not needed in most use cases.
222   private final boolean bulkAssignWaitTillAllAssigned;
223 
224   /**
225    * Indicator that AssignmentManager has recovered the region states so
226    * that ServerShutdownHandler can be fully enabled and re-assign regions
227    * of dead servers. So that when re-assignment happens, AssignmentManager
228    * has proper region states.
229    *
230    * Protected to ease testing.
231    */
232   protected final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
233 
234   /**
235    * A map to track the count a region fails to open in a row.
236    * So that we don't try to open a region forever if the failure is
237    * unrecoverable.  We don't put this information in region states
238    * because we don't expect this to happen frequently; we don't
239    * want to copy this information over during each state transition either.
240    */
241   private final ConcurrentHashMap<String, AtomicInteger>
242     failedOpenTracker = new ConcurrentHashMap<String, AtomicInteger>();
243 
244   // A flag to indicate if we are using ZK for region assignment
245   private final boolean useZKForAssignment;
246 
247   // In case not using ZK for region assignment, region states
248   // are persisted in meta with a state store
249   private final RegionStateStore regionStateStore;
250 
251   /**
252    * For testing only!  Set to true to skip handling of split and merge.
253    */
254   private static boolean TEST_SKIP_SPLIT_HANDLING = false;
255   private static boolean TEST_SKIP_MERGE_HANDLING = false;
256 
257   /** Listeners that are called on assignment events. */
258   private List<AssignmentListener> listeners = new CopyOnWriteArrayList<AssignmentListener>();
259 
260   private RegionStateListener regionStateListener;
261 
262   public enum ServerHostRegion {
263     NOT_HOSTING_REGION, HOSTING_REGION, UNKNOWN,
264   }
265 
266   /**
267    * Constructs a new assignment manager.
268    *
269    * @param server instance of HMaster this AM running inside
270    * @param serverManager serverManager for associated HMaster
271    * @param balancer implementation of {@link LoadBalancer}
272    * @param service Executor service
273    * @param metricsMaster metrics manager
274    * @param tableLockManager TableLock manager
275    * @throws KeeperException
276    * @throws IOException
277    */
278   public AssignmentManager(MasterServices server, ServerManager serverManager,
279       final LoadBalancer balancer,
280       final ExecutorService service, MetricsMaster metricsMaster,
281       final TableLockManager tableLockManager) throws KeeperException,
282         IOException, CoordinatedStateException {
283     super(server.getZooKeeper());
284     this.server = server;
285     this.serverManager = serverManager;
286     this.executorService = service;
287     this.regionStateStore = new RegionStateStore(server);
288     this.regionsToReopen = Collections.synchronizedMap
289                            (new HashMap<String, HRegionInfo> ());
290     Configuration conf = server.getConfiguration();
291     // Only read favored nodes if using the favored nodes load balancer.
292     this.shouldAssignRegionsWithFavoredNodes = conf.getClass(
293            HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class).equals(
294            FavoredNodeLoadBalancer.class);
295     try {
296       if (server.getCoordinatedStateManager() != null) {
297         this.tableStateManager = server.getCoordinatedStateManager().getTableStateManager();
298       } else {
299         this.tableStateManager = null;
300       }
301     } catch (InterruptedException e) {
302       throw new InterruptedIOException();
303     }
304     // This is the max attempts, not retries, so it should be at least 1.
305     this.maximumAttempts = Math.max(1,
306       this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10));
307     this.sleepTimeBeforeRetryingMetaAssignment = this.server.getConfiguration().getLong(
308         "hbase.meta.assignment.retry.sleeptime", 1000l);
309     this.balancer = balancer;
310     int maxThreads = conf.getInt("hbase.assignment.threads.max", 30);
311     this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool(
312       maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("AM."));
313     this.regionStates = new RegionStates(
314       server, tableStateManager, serverManager, regionStateStore);
315 
316     this.bulkAssignWaitTillAllAssigned =
317       conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false);
318     this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7);
319     this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3);
320     this.bulkPerRegionOpenTimeGuesstimate =
321       conf.getInt("hbase.bulk.assignment.perregion.open.time", 10000);
322 
323     int workers = conf.getInt("hbase.assignment.zkevent.workers", 20);
324     ThreadFactory threadFactory = Threads.newDaemonThreadFactory("AM.ZK.Worker");
325     zkEventWorkers = Threads.getBoundedCachedThreadPool(workers, 60L,
326             TimeUnit.SECONDS, threadFactory);
327     this.tableLockManager = tableLockManager;
328 
329     this.metricsAssignmentManager = new MetricsAssignmentManager();
330     useZKForAssignment = ConfigUtil.useZKForAssignment(conf);
331   }
332 
333   MetricsAssignmentManager getAssignmentManagerMetrics() {
334     return this.metricsAssignmentManager;
335   }
336 
337   /**
338    * Add the listener to the notification list.
339    * @param listener The AssignmentListener to register
340    */
341   public void registerListener(final AssignmentListener listener) {
342     this.listeners.add(listener);
343   }
344 
345   /**
346    * Remove the listener from the notification list.
347    * @param listener The AssignmentListener to unregister
348    */
349   public boolean unregisterListener(final AssignmentListener listener) {
350     return this.listeners.remove(listener);
351   }
352 
353   /**
354    * @return Instance of ZKTableStateManager.
355    */
356   public TableStateManager getTableStateManager() {
357     // These are 'expensive' to make involving trip to zk ensemble so allow
358     // sharing.
359     return this.tableStateManager;
360   }
361 
362   /**
363    * This SHOULD not be public. It is public now
364    * because of some unit tests.
365    *
366    * TODO: make it package private and keep RegionStates in the master package
367    */
368   public RegionStates getRegionStates() {
369     return regionStates;
370   }
371 
372   /**
373    * Used in some tests to mock up region state in meta
374    */
375   @VisibleForTesting
376   RegionStateStore getRegionStateStore() {
377     return regionStateStore;
378   }
379 
380   public RegionPlan getRegionReopenPlan(HRegionInfo hri) {
381     return new RegionPlan(hri, null, regionStates.getRegionServerOfRegion(hri));
382   }
383 
384   /**
385    * Add a regionPlan for the specified region.
386    * @param encodedName
387    * @param plan
388    */
389   public void addPlan(String encodedName, RegionPlan plan) {
390     synchronized (regionPlans) {
391       regionPlans.put(encodedName, plan);
392     }
393   }
394 
395   /**
396    * Add a map of region plans.
397    */
398   public void addPlans(Map<String, RegionPlan> plans) {
399     synchronized (regionPlans) {
400       regionPlans.putAll(plans);
401     }
402   }
403 
404   /**
405    * Set the list of regions that will be reopened
406    * because of an update in table schema
407    *
408    * @param regions
409    *          list of regions that should be tracked for reopen
410    */
411   public void setRegionsToReopen(List <HRegionInfo> regions) {
412     for(HRegionInfo hri : regions) {
413       regionsToReopen.put(hri.getEncodedName(), hri);
414     }
415   }
416 
417   /**
418    * Used by the client to identify if all regions have the schema updates
419    *
420    * @param tableName
421    * @return Pair indicating the status of the alter command
422    * @throws IOException
423    */
424   public Pair<Integer, Integer> getReopenStatus(TableName tableName)
425       throws IOException {
426     List<HRegionInfo> hris;
427     if (TableName.META_TABLE_NAME.equals(tableName)) {
428       hris = new MetaTableLocator().getMetaRegions(server.getZooKeeper());
429     } else {
430       hris = MetaTableAccessor.getTableRegions(server.getZooKeeper(),
431         server.getConnection(), tableName, true);
432     }
433 
434     Integer pending = 0;
435     for (HRegionInfo hri : hris) {
436       String name = hri.getEncodedName();
437       // no lock concurrent access ok: sequential consistency respected.
438       if (regionsToReopen.containsKey(name)
439           || regionStates.isRegionInTransition(name)) {
440         pending++;
441       }
442     }
443     return new Pair<Integer, Integer>(pending, hris.size());
444   }
445 
446   /**
447    * Used by ServerShutdownHandler to make sure AssignmentManager has completed
448    * the failover cleanup before re-assigning regions of dead servers. So that
449    * when re-assignment happens, AssignmentManager has proper region states.
450    */
451   public boolean isFailoverCleanupDone() {
452     return failoverCleanupDone.get();
453   }
454 
455   /**
456    * To avoid racing with AM, external entities may need to lock a region,
457    * for example, when SSH checks what regions to skip re-assigning.
458    */
459   public Lock acquireRegionLock(final String encodedName) {
460     return locker.acquireLock(encodedName);
461   }
462 
463   /**
464    * Now, failover cleanup is completed. Notify server manager to
465    * process queued up dead servers processing, if any.
466    */
467   void failoverCleanupDone() {
468     failoverCleanupDone.set(true);
469     serverManager.processQueuedDeadServers();
470   }
471 
472   /**
473    * Called on startup.
474    * Figures whether a fresh cluster start of we are joining extant running cluster.
475    * @throws IOException
476    * @throws KeeperException
477    * @throws InterruptedException
478    * @throws CoordinatedStateException
479    */
480   void joinCluster() throws IOException,
481       KeeperException, InterruptedException, CoordinatedStateException {
482     long startTime = System.currentTimeMillis();
483     // Concurrency note: In the below the accesses on regionsInTransition are
484     // outside of a synchronization block where usually all accesses to RIT are
485     // synchronized.  The presumption is that in this case it is safe since this
486     // method is being played by a single thread on startup.
487 
488     // TODO: Regions that have a null location and are not in regionsInTransitions
489     // need to be handled.
490 
491     // Scan hbase:meta to build list of existing regions, servers, and assignment
492     // Returns servers who have not checked in (assumed dead) that some regions
493     // were assigned to (according to the meta)
494     Set<ServerName> deadServers = rebuildUserRegions();
495 
496     // This method will assign all user regions if a clean server startup or
497     // it will reconstruct master state and cleanup any leftovers from previous master process.
498     boolean failover = processDeadServersAndRegionsInTransition(deadServers);
499 
500     if (!useZKForAssignment) {
501       // Not use ZK for assignment any more, remove the ZNode
502       ZKUtil.deleteNodeRecursively(watcher, watcher.assignmentZNode);
503     }
504     recoverTableInDisablingState();
505     recoverTableInEnablingState();
506     LOG.info("Joined the cluster in " + (System.currentTimeMillis()
507       - startTime) + "ms, failover=" + failover);
508   }
509 
510   /**
511    * Process all regions that are in transition in zookeeper and also
512    * processes the list of dead servers.
513    * Used by master joining an cluster.  If we figure this is a clean cluster
514    * startup, will assign all user regions.
515    * @param deadServers Set of servers that are offline probably legitimately that were carrying
516    * regions according to a scan of hbase:meta. Can be null.
517    * @throws KeeperException
518    * @throws IOException
519    * @throws InterruptedException
520    */
521   boolean processDeadServersAndRegionsInTransition(final Set<ServerName> deadServers)
522   throws KeeperException, IOException, InterruptedException, CoordinatedStateException {
523     List<String> nodes = ZKUtil.listChildrenNoWatch(watcher, watcher.assignmentZNode);
524 
525     if (useZKForAssignment && nodes == null) {
526       String errorMessage = "Failed to get the children from ZK";
527       server.abort(errorMessage, new IOException(errorMessage));
528       return true; // Doesn't matter in this case
529     }
530 
531     boolean failover = !serverManager.getDeadServers().isEmpty();
532     if (failover) {
533       // This may not be a failover actually, especially if meta is on this master.
534       if (LOG.isDebugEnabled()) {
535         LOG.debug("Found dead servers out on cluster " + serverManager.getDeadServers());
536       }
537     } else {
538       // If any one region except meta is assigned, it's a failover.
539       Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
540       for (Map.Entry<HRegionInfo, ServerName> en:
541           regionStates.getRegionAssignments().entrySet()) {
542         HRegionInfo hri = en.getKey();
543         if (!hri.isMetaTable()
544             && onlineServers.contains(en.getValue())) {
545           LOG.debug("Found " + hri + " out on cluster");
546           failover = true;
547           break;
548         }
549       }
550       if (!failover && nodes != null) {
551         // If any one region except meta is in transition, it's a failover.
552         for (String encodedName: nodes) {
553           RegionState regionState = regionStates.getRegionState(encodedName);
554           if (regionState != null && !regionState.getRegion().isMetaRegion()) {
555             LOG.debug("Found " + regionState + " in RITs");
556             failover = true;
557             break;
558           }
559         }
560       }
561     }
562     if (!failover && !useZKForAssignment) {
563       // If any region except meta is in transition on a live server, it's a failover.
564       Map<String, RegionState> regionsInTransition = regionStates.getRegionsInTransition();
565       if (!regionsInTransition.isEmpty()) {
566         Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
567         for (RegionState regionState: regionsInTransition.values()) {
568           ServerName serverName = regionState.getServerName();
569           if (!regionState.getRegion().isMetaRegion()
570               && serverName != null && onlineServers.contains(serverName)) {
571             LOG.debug("Found " + regionState + " in RITs");
572             failover = true;
573             break;
574           }
575         }
576       }
577     }
578     if (!failover) {
579       // If we get here, we have a full cluster restart. It is a failover only
580       // if there are some WALs are not split yet. For meta WALs, they should have
581       // been split already, if any. We can walk through those queued dead servers,
582       // if they don't have any WALs, this restart should be considered as a clean one
583       Set<ServerName> queuedDeadServers = serverManager.getRequeuedDeadServers().keySet();
584       if (!queuedDeadServers.isEmpty()) {
585         Configuration conf = server.getConfiguration();
586         Path rootdir = FSUtils.getRootDir(conf);
587         FileSystem fs = rootdir.getFileSystem(conf);
588         for (ServerName serverName: queuedDeadServers) {
589           // In the case of a clean exit, the shutdown handler would have presplit any WALs and
590           // removed empty directories.
591           Path logDir = new Path(rootdir,
592               DefaultWALProvider.getWALDirectoryName(serverName.toString()));
593           Path splitDir = logDir.suffix(DefaultWALProvider.SPLITTING_EXT);
594           if (fs.exists(logDir) || fs.exists(splitDir)) {
595             LOG.debug("Found queued dead server " + serverName);
596             failover = true;
597             break;
598           }
599         }
600         if (!failover) {
601           // We figured that it's not a failover, so no need to
602           // work on these re-queued dead servers any more.
603           LOG.info("AM figured that it's not a failover and cleaned up "
604             + queuedDeadServers.size() + " queued dead servers");
605           serverManager.removeRequeuedDeadServers();
606         }
607       }
608     }
609 
610     Set<TableName> disabledOrDisablingOrEnabling = null;
611     Map<HRegionInfo, ServerName> allRegions = null;
612 
613     if (!failover) {
614       disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
615         ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING,
616         ZooKeeperProtos.Table.State.ENABLING);
617 
618       // Clean re/start, mark all user regions closed before reassignment
619       allRegions = regionStates.closeAllUserRegions(
620         disabledOrDisablingOrEnabling);
621     }
622 
623     // Now region states are restored
624     regionStateStore.start();
625 
626     // If we found user regions out on cluster, its a failover.
627     if (failover) {
628       LOG.info("Found regions out on cluster or in RIT; presuming failover");
629       // Process list of dead servers and regions in RIT.
630       // See HBASE-4580 for more information.
631       processDeadServersAndRecoverLostRegions(deadServers);
632     }
633 
634     if (!failover && useZKForAssignment) {
635       // Cleanup any existing ZK nodes and start watching
636       ZKAssign.deleteAllNodes(watcher);
637       ZKUtil.listChildrenAndWatchForNewChildren(this.watcher,
638         this.watcher.assignmentZNode);
639     }
640 
641     // Now we can safely claim failover cleanup completed and enable
642     // ServerShutdownHandler for further processing. The nodes (below)
643     // in transition, if any, are for regions not related to those
644     // dead servers at all, and can be done in parallel to SSH.
645     failoverCleanupDone();
646     if (!failover) {
647       // Fresh cluster startup.
648       LOG.info("Clean cluster startup. Assigning user regions");
649       assignAllUserRegions(allRegions);
650     }
651     // unassign replicas of the split parents and the merged regions
652     // the daughter replicas are opened in assignAllUserRegions if it was
653     // not already opened.
654     for (HRegionInfo h : replicasToClose) {
655       unassign(h);
656     }
657     replicasToClose.clear();
658     return failover;
659   }
660 
661   /**
662    * If region is up in zk in transition, then do fixup and block and wait until
663    * the region is assigned and out of transition.  Used on startup for
664    * catalog regions.
665    * @param hri Region to look for.
666    * @return True if we processed a region in transition else false if region
667    * was not up in zk in transition.
668    * @throws InterruptedException
669    * @throws KeeperException
670    * @throws IOException
671    */
672   boolean processRegionInTransitionAndBlockUntilAssigned(final HRegionInfo hri)
673       throws InterruptedException, KeeperException, IOException {
674     String encodedRegionName = hri.getEncodedName();
675     if (!processRegionInTransition(encodedRegionName, hri)) {
676       return false; // The region is not in transition
677     }
678     LOG.debug("Waiting on " + HRegionInfo.prettyPrint(encodedRegionName));
679     while (!this.server.isStopped() &&
680         this.regionStates.isRegionInTransition(encodedRegionName)) {
681       RegionState state = this.regionStates.getRegionTransitionState(encodedRegionName);
682       if (state == null || !serverManager.isServerOnline(state.getServerName())) {
683         // The region is not in transition, or not in transition on an online
684         // server. Doesn't help to block here any more. Caller need to
685         // verify the region is actually assigned.
686         break;
687       }
688       this.regionStates.waitForUpdate(100);
689     }
690     return true;
691   }
692 
693   /**
694    * Process failover of new master for region <code>encodedRegionName</code>
695    * up in zookeeper.
696    * @param encodedRegionName Region to process failover for.
697    * @param regionInfo If null we'll go get it from meta table.
698    * @return True if we processed <code>regionInfo</code> as a RIT.
699    * @throws KeeperException
700    * @throws IOException
701    */
702   boolean processRegionInTransition(final String encodedRegionName,
703       final HRegionInfo regionInfo) throws KeeperException, IOException {
704     // We need a lock here to ensure that we will not put the same region twice
705     // It has no reason to be a lock shared with the other operations.
706     // We can do the lock on the region only, instead of a global lock: what we want to ensure
707     // is that we don't have two threads working on the same region.
708     Lock lock = locker.acquireLock(encodedRegionName);
709     try {
710       Stat stat = new Stat();
711       byte [] data = ZKAssign.getDataAndWatch(watcher, encodedRegionName, stat);
712       if (data == null) return false;
713       RegionTransition rt;
714       try {
715         rt = RegionTransition.parseFrom(data);
716       } catch (DeserializationException e) {
717         LOG.warn("Failed parse znode data", e);
718         return false;
719       }
720       HRegionInfo hri = regionInfo;
721       if (hri == null) {
722         // The region info is not passed in. We will try to find the region
723         // from region states map/meta based on the encoded region name. But we
724         // may not be able to find it. This is valid for online merge that
725         // the region may have not been created if the merge is not completed.
726         // Therefore, it is not in meta at master recovery time.
727         hri = regionStates.getRegionInfo(rt.getRegionName());
728         EventType et = rt.getEventType();
729         if (hri == null && et != EventType.RS_ZK_REGION_MERGING
730             && et != EventType.RS_ZK_REQUEST_REGION_MERGE) {
731           LOG.warn("Couldn't find the region in recovering " + rt);
732           return false;
733         }
734       }
735 
736       // TODO: This code is tied to ZK anyway, so for now leaving it as is,
737       // will refactor when whole region assignment will be abstracted from ZK
738       BaseCoordinatedStateManager cp =
739         (BaseCoordinatedStateManager) this.server.getCoordinatedStateManager();
740       OpenRegionCoordination openRegionCoordination = cp.getOpenRegionCoordination();
741 
742       ZkOpenRegionCoordination.ZkOpenRegionDetails zkOrd =
743         new ZkOpenRegionCoordination.ZkOpenRegionDetails();
744       zkOrd.setVersion(stat.getVersion());
745       zkOrd.setServerName(cp.getServer().getServerName());
746 
747       return processRegionsInTransition(
748         rt, hri, openRegionCoordination, zkOrd);
749     } finally {
750       lock.unlock();
751     }
752   }
753 
754   /**
755    * This call is invoked only (1) master assign meta;
756    * (2) during failover mode startup, zk assignment node processing.
757    * The locker is set in the caller. It returns true if the region
758    * is in transition for sure, false otherwise.
759    *
760    * It should be private but it is used by some test too.
761    */
762   boolean processRegionsInTransition(
763       final RegionTransition rt, final HRegionInfo regionInfo,
764       OpenRegionCoordination coordination,
765       final OpenRegionCoordination.OpenRegionDetails ord) throws KeeperException {
766     EventType et = rt.getEventType();
767     // Get ServerName.  Could not be null.
768     final ServerName sn = rt.getServerName();
769     final byte[] regionName = rt.getRegionName();
770     final String encodedName = HRegionInfo.encodeRegionName(regionName);
771     final String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
772     LOG.info("Processing " + prettyPrintedRegionName + " in state: " + et);
773 
774     if (regionStates.isRegionInTransition(encodedName)
775         && (regionInfo.isMetaRegion() || !useZKForAssignment)) {
776       LOG.info("Processed region " + prettyPrintedRegionName + " in state: "
777         + et + ", does nothing since the region is already in transition "
778         + regionStates.getRegionTransitionState(encodedName));
779       // Just return
780       return true;
781     }
782     if (!serverManager.isServerOnline(sn)) {
783       // It was transitioning on a dead server, so it's closed now.
784       // Force to OFFLINE and put it in transition, but not assign it
785       // since log splitting for the dead server is not done yet.
786       LOG.debug("RIT " + encodedName + " in state=" + rt.getEventType() +
787         " was on deadserver; forcing offline");
788       if (regionStates.isRegionOnline(regionInfo)) {
789         // Meta could still show the region is assigned to the previous
790         // server. If that server is online, when we reload the meta, the
791         // region is put back to online, we need to offline it.
792         regionStates.regionOffline(regionInfo);
793         sendRegionClosedNotification(regionInfo);
794       }
795       // Put it back in transition so that SSH can re-assign it
796       regionStates.updateRegionState(regionInfo, State.OFFLINE, sn);
797 
798       if (regionInfo.isMetaRegion()) {
799         // If it's meta region, reset the meta location.
800         // So that master knows the right meta region server.
801         MetaTableLocator.setMetaLocation(watcher, sn, State.OPEN);
802       } else {
803         // No matter the previous server is online or offline,
804         // we need to reset the last region server of the region.
805         regionStates.setLastRegionServerOfRegion(sn, encodedName);
806         // Make sure we know the server is dead.
807         if (!serverManager.isServerDead(sn)) {
808           serverManager.expireServer(sn);
809         }
810       }
811       return false;
812     }
813     switch (et) {
814       case M_ZK_REGION_CLOSING:
815         // Insert into RIT & resend the query to the region server: may be the previous master
816         // died before sending the query the first time.
817         final RegionState rsClosing = regionStates.updateRegionState(rt, State.CLOSING);
818         this.executorService.submit(
819           new EventHandler(server, EventType.M_MASTER_RECOVERY) {
820             @Override
821             public void process() throws IOException {
822               ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
823               try {
824                 final int expectedVersion = ((ZkOpenRegionCoordination.ZkOpenRegionDetails) ord)
825                   .getVersion();
826                 unassign(regionInfo, rsClosing, expectedVersion, null, useZKForAssignment, null);
827                 if (regionStates.isRegionOffline(regionInfo)) {
828                   assign(regionInfo, true);
829                 }
830               } finally {
831                 lock.unlock();
832               }
833             }
834           });
835         break;
836 
837       case RS_ZK_REGION_CLOSED:
838       case RS_ZK_REGION_FAILED_OPEN:
839         // Region is closed, insert into RIT and handle it
840         regionStates.setRegionStateTOCLOSED(regionInfo, sn);
841         if (!replicasToClose.contains(regionInfo)) {
842           invokeAssign(regionInfo);
843         } else {
844           offlineDisabledRegion(regionInfo);
845         }
846         break;
847 
848       case M_ZK_REGION_OFFLINE:
849         // Insert in RIT and resend to the regionserver
850         regionStates.updateRegionState(rt, State.OFFLINE);
851         final RegionState rsOffline = regionStates.getRegionState(regionInfo);
852         this.executorService.submit(
853           new EventHandler(server, EventType.M_MASTER_RECOVERY) {
854             @Override
855             public void process() throws IOException {
856               ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
857               try {
858                 RegionPlan plan = new RegionPlan(regionInfo, null, sn);
859                 addPlan(encodedName, plan);
860                 assign(rsOffline, true, false);
861               } finally {
862                 lock.unlock();
863               }
864             }
865           });
866         break;
867 
868       case RS_ZK_REGION_OPENING:
869         regionStates.updateRegionState(rt, State.OPENING);
870         break;
871 
872       case RS_ZK_REGION_OPENED:
873         // Region is opened, insert into RIT and handle it
874         // This could be done asynchronously, we would need then to acquire the lock in the
875         //  handler.
876         regionStates.updateRegionState(rt, State.OPEN);
877         new OpenedRegionHandler(server, this, regionInfo, coordination, ord).process();
878         break;
879       case RS_ZK_REQUEST_REGION_SPLIT:
880       case RS_ZK_REGION_SPLITTING:
881       case RS_ZK_REGION_SPLIT:
882         // Splitting region should be online. We could have skipped it during
883         // user region rebuilding since we may consider the split is completed.
884         // Put it in SPLITTING state to avoid complications.
885         regionStates.regionOnline(regionInfo, sn);
886         regionStates.updateRegionState(rt, State.SPLITTING);
887         if (!handleRegionSplitting(
888             rt, encodedName, prettyPrintedRegionName, sn)) {
889           deleteSplittingNode(encodedName, sn);
890         }
891         break;
892       case RS_ZK_REQUEST_REGION_MERGE:
893       case RS_ZK_REGION_MERGING:
894       case RS_ZK_REGION_MERGED:
895         if (!handleRegionMerging(
896             rt, encodedName, prettyPrintedRegionName, sn)) {
897           deleteMergingNode(encodedName, sn);
898         }
899         break;
900       default:
901         throw new IllegalStateException("Received region in state:" + et + " is not valid.");
902     }
903     LOG.info("Processed region " + prettyPrintedRegionName + " in state "
904       + et + ", on " + (serverManager.isServerOnline(sn) ? "" : "dead ")
905       + "server: " + sn);
906     return true;
907   }
908 
909   /**
910    * When a region is closed, it should be removed from the regionsToReopen
911    * @param hri HRegionInfo of the region which was closed
912    */
913   public void removeClosedRegion(HRegionInfo hri) {
914     if (regionsToReopen.remove(hri.getEncodedName()) != null) {
915       LOG.debug("Removed region from reopening regions because it was closed");
916     }
917   }
918 
919   /**
920    * Handles various states an unassigned node can be in.
921    * <p>
922    * Method is called when a state change is suspected for an unassigned node.
923    * <p>
924    * This deals with skipped transitions (we got a CLOSED but didn't see CLOSING
925    * yet).
926    * @param rt region transition
927    * @param coordination coordination for opening region
928    * @param ord details about opening region
929    */
930   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
931       value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
932       justification="Needs work; says access to ConcurrentHashMaps not ATOMIC!!!")
933   void handleRegion(final RegionTransition rt, OpenRegionCoordination coordination,
934                     OpenRegionCoordination.OpenRegionDetails ord) {
935     if (rt == null) {
936       LOG.warn("Unexpected NULL input for RegionTransition rt");
937       return;
938     }
939     final ServerName sn = rt.getServerName();
940     // Check if this is a special HBCK transition
941     if (sn.equals(HBCK_CODE_SERVERNAME)) {
942       handleHBCK(rt);
943       return;
944     }
945     final long createTime = rt.getCreateTime();
946     final byte[] regionName = rt.getRegionName();
947     String encodedName = HRegionInfo.encodeRegionName(regionName);
948     String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
949     // Verify this is a known server
950     if (!serverManager.isServerOnline(sn)
951       && !ignoreStatesRSOffline.contains(rt.getEventType())) {
952       LOG.warn("Attempted to handle region transition for server but " +
953         "it is not online: " + prettyPrintedRegionName + ", " + rt);
954       return;
955     }
956 
957     RegionState regionState =
958       regionStates.getRegionState(encodedName);
959     long startTime = System.currentTimeMillis();
960     if (LOG.isDebugEnabled()) {
961       boolean lateEvent = createTime < (startTime - 15000);
962       LOG.debug("Handling " + rt.getEventType() +
963         ", server=" + sn + ", region=" +
964         (prettyPrintedRegionName == null ? "null" : prettyPrintedRegionName) +
965         (lateEvent ? ", which is more than 15 seconds late" : "") +
966         ", current_state=" + regionState);
967     }
968     // We don't do anything for this event,
969     // so separate it out, no need to lock/unlock anything
970     if (rt.getEventType() == EventType.M_ZK_REGION_OFFLINE) {
971       return;
972     }
973 
974     // We need a lock on the region as we could update it
975     Lock lock = locker.acquireLock(encodedName);
976     try {
977       RegionState latestState =
978         regionStates.getRegionState(encodedName);
979       if ((regionState == null && latestState != null)
980           || (regionState != null && latestState == null)
981           || (regionState != null && latestState != null
982             && latestState.getState() != regionState.getState())) {
983         LOG.warn("Region state changed from " + regionState + " to "
984           + latestState + ", while acquiring lock");
985       }
986       long waitedTime = System.currentTimeMillis() - startTime;
987       if (waitedTime > 5000) {
988         LOG.warn("Took " + waitedTime + "ms to acquire the lock");
989       }
990       regionState = latestState;
991       switch (rt.getEventType()) {
992       case RS_ZK_REQUEST_REGION_SPLIT:
993       case RS_ZK_REGION_SPLITTING:
994       case RS_ZK_REGION_SPLIT:
995         if (!handleRegionSplitting(
996             rt, encodedName, prettyPrintedRegionName, sn)) {
997           deleteSplittingNode(encodedName, sn);
998         }
999         break;
1000 
1001       case RS_ZK_REQUEST_REGION_MERGE:
1002       case RS_ZK_REGION_MERGING:
1003       case RS_ZK_REGION_MERGED:
1004         // Merged region is a new region, we can't find it in the region states now.
1005         // However, the two merging regions are not new. They should be in state for merging.
1006         if (!handleRegionMerging(
1007             rt, encodedName, prettyPrintedRegionName, sn)) {
1008           deleteMergingNode(encodedName, sn);
1009         }
1010         break;
1011 
1012       case M_ZK_REGION_CLOSING:
1013         // Should see CLOSING after we have asked it to CLOSE or additional
1014         // times after already being in state of CLOSING
1015         if (regionState == null
1016             || !regionState.isPendingCloseOrClosingOnServer(sn)) {
1017           LOG.warn("Received CLOSING for " + prettyPrintedRegionName
1018             + " from " + sn + " but the region isn't PENDING_CLOSE/CLOSING here: "
1019             + regionStates.getRegionState(encodedName));
1020           return;
1021         }
1022         // Transition to CLOSING (or update stamp if already CLOSING)
1023         regionStates.updateRegionState(rt, State.CLOSING);
1024         break;
1025 
1026       case RS_ZK_REGION_CLOSED:
1027         // Should see CLOSED after CLOSING but possible after PENDING_CLOSE
1028         if (regionState == null
1029             || !regionState.isPendingCloseOrClosingOnServer(sn)) {
1030           LOG.warn("Received CLOSED for " + prettyPrintedRegionName
1031             + " from " + sn + " but the region isn't PENDING_CLOSE/CLOSING here: "
1032             + regionStates.getRegionState(encodedName));
1033           return;
1034         }
1035         // Handle CLOSED by assigning elsewhere or stopping if a disable
1036         // If we got here all is good.  Need to update RegionState -- else
1037         // what follows will fail because not in expected state.
1038         new ClosedRegionHandler(server, this, regionState.getRegion()).process();
1039         updateClosedRegionHandlerTracker(regionState.getRegion());
1040         break;
1041 
1042         case RS_ZK_REGION_FAILED_OPEN:
1043           if (regionState == null
1044               || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
1045             LOG.warn("Received FAILED_OPEN for " + prettyPrintedRegionName
1046               + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
1047               + regionStates.getRegionState(encodedName));
1048             return;
1049           }
1050           AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
1051           if (failedOpenCount == null) {
1052             failedOpenCount = new AtomicInteger();
1053             // No need to use putIfAbsent, or extra synchronization since
1054             // this whole handleRegion block is locked on the encoded region
1055             // name, and failedOpenTracker is updated only in this block
1056             // FindBugs: AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION
1057             failedOpenTracker.put(encodedName, failedOpenCount);
1058           }
1059           if (failedOpenCount.incrementAndGet() >= maximumAttempts) {
1060             // FindBugs: AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION
1061             regionStates.updateRegionState(rt, State.FAILED_OPEN);
1062             // remove the tracking info to save memory, also reset
1063             // the count for next open initiative
1064             failedOpenTracker.remove(encodedName);
1065           } else {
1066             // Handle this the same as if it were opened and then closed.
1067             regionState = regionStates.setRegionStateTOCLOSED(rt.getRegionName(), sn);
1068             if (regionState != null) {
1069               // When there are more than one region server a new RS is selected as the
1070               // destination and the same is updated in the regionplan. (HBASE-5546)
1071               try {
1072                 getRegionPlan(regionState.getRegion(), sn, true);
1073                 new ClosedRegionHandler(server, this, regionState.getRegion()).process();
1074               } catch (HBaseIOException e) {
1075                 LOG.warn("Failed to get region plan", e);
1076               }
1077             }
1078           }
1079           break;
1080 
1081         case RS_ZK_REGION_OPENING:
1082           // Should see OPENING after we have asked it to OPEN or additional
1083           // times after already being in state of OPENING
1084           if (regionState == null
1085               || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
1086             LOG.warn("Received OPENING for " + prettyPrintedRegionName
1087               + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
1088               + regionStates.getRegionState(encodedName));
1089             return;
1090           }
1091           // Transition to OPENING (or update stamp if already OPENING)
1092           regionStates.updateRegionState(rt, State.OPENING);
1093           break;
1094 
1095         case RS_ZK_REGION_OPENED:
1096           // Should see OPENED after OPENING but possible after PENDING_OPEN.
1097           if (regionState == null
1098               || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
1099             LOG.warn("Received OPENED for " + prettyPrintedRegionName
1100               + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
1101               + regionStates.getRegionState(encodedName));
1102 
1103             if (regionState != null) {
1104               if(regionState.isOpened() && regionState.getServerName().equals(sn)) {
1105                 //if this region was opened before on this rs, we don't have to unassign it. It won't cause
1106                 //double assign. One possible scenario of what happened is HBASE-17275
1107                 failedOpenTracker.remove(encodedName); // reset the count, if any
1108                 new OpenedRegionHandler(
1109                     server, this, regionState.getRegion(), coordination, ord).process();
1110                 updateOpenedRegionHandlerTracker(regionState.getRegion());
1111               } else {
1112                 // Close it without updating the internal region states,
1113                 // so as not to create double assignments in unlucky scenarios
1114                 // mentioned in OpenRegionHandler#process
1115                 unassign(regionState.getRegion(), null, -1, null, false, sn);
1116               }
1117             }
1118             return;
1119           }
1120           // Handle OPENED by removing from transition and deleted zk node
1121           regionState =
1122               regionStates.transitionOpenFromPendingOpenOrOpeningOnServer(rt,regionState, sn);
1123           if (regionState != null) {
1124             failedOpenTracker.remove(encodedName); // reset the count, if any
1125             new OpenedRegionHandler(
1126               server, this, regionState.getRegion(), coordination, ord).process();
1127             updateOpenedRegionHandlerTracker(regionState.getRegion());
1128           }
1129           break;
1130 
1131         default:
1132           throw new IllegalStateException("Received event is not valid.");
1133       }
1134     } finally {
1135       lock.unlock();
1136     }
1137   }
1138 
1139   //For unit tests only
1140   boolean wasClosedHandlerCalled(HRegionInfo hri) {
1141     AtomicBoolean b = closedRegionHandlerCalled.get(hri);
1142     //compareAndSet to be sure that unit tests don't see stale values. Means,
1143     //we will return true exactly once unless the handler code resets to true
1144     //this value.
1145     return b == null ? false : b.compareAndSet(true, false);
1146   }
1147 
1148   //For unit tests only
1149   boolean wasOpenedHandlerCalled(HRegionInfo hri) {
1150     AtomicBoolean b = openedRegionHandlerCalled.get(hri);
1151     //compareAndSet to be sure that unit tests don't see stale values. Means,
1152     //we will return true exactly once unless the handler code resets to true
1153     //this value.
1154     return b == null ? false : b.compareAndSet(true, false);
1155   }
1156 
1157   //For unit tests only
1158   void initializeHandlerTrackers() {
1159     closedRegionHandlerCalled = new HashMap<HRegionInfo, AtomicBoolean>();
1160     openedRegionHandlerCalled = new HashMap<HRegionInfo, AtomicBoolean>();
1161   }
1162 
1163   void updateClosedRegionHandlerTracker(HRegionInfo hri) {
1164     if (closedRegionHandlerCalled != null) { //only for unit tests this is true
1165       closedRegionHandlerCalled.put(hri, new AtomicBoolean(true));
1166     }
1167   }
1168 
1169   void updateOpenedRegionHandlerTracker(HRegionInfo hri) {
1170     if (openedRegionHandlerCalled != null) { //only for unit tests this is true
1171       openedRegionHandlerCalled.put(hri, new AtomicBoolean(true));
1172     }
1173   }
1174 
1175   // TODO: processFavoredNodes might throw an exception, for e.g., if the
1176   // meta could not be contacted/updated. We need to see how seriously to treat
1177   // this problem as. Should we fail the current assignment. We should be able
1178   // to recover from this problem eventually (if the meta couldn't be updated
1179   // things should work normally and eventually get fixed up).
1180   void processFavoredNodes(List<HRegionInfo> regions) throws IOException {
1181     if (!shouldAssignRegionsWithFavoredNodes) return;
1182     // The AM gets the favored nodes info for each region and updates the meta
1183     // table with that info
1184     Map<HRegionInfo, List<ServerName>> regionToFavoredNodes =
1185         new HashMap<HRegionInfo, List<ServerName>>();
1186     for (HRegionInfo region : regions) {
1187       regionToFavoredNodes.put(region,
1188           ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region));
1189     }
1190     FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(regionToFavoredNodes,
1191       this.server.getConnection());
1192   }
1193 
1194   /**
1195    * Handle a ZK unassigned node transition triggered by HBCK repair tool.
1196    * <p>
1197    * This is handled in a separate code path because it breaks the normal rules.
1198    * @param rt
1199    */
1200   @SuppressWarnings("deprecation")
1201   private void handleHBCK(RegionTransition rt) {
1202     String encodedName = HRegionInfo.encodeRegionName(rt.getRegionName());
1203     LOG.info("Handling HBCK triggered transition=" + rt.getEventType() +
1204       ", server=" + rt.getServerName() + ", region=" +
1205       HRegionInfo.prettyPrint(encodedName));
1206     RegionState regionState = regionStates.getRegionTransitionState(encodedName);
1207     switch (rt.getEventType()) {
1208       case M_ZK_REGION_OFFLINE:
1209         HRegionInfo regionInfo;
1210         if (regionState != null) {
1211           regionInfo = regionState.getRegion();
1212         } else {
1213           try {
1214             byte [] name = rt.getRegionName();
1215             Pair<HRegionInfo, ServerName> p = MetaTableAccessor.getRegion(
1216               this.server.getConnection(), name);
1217             regionInfo = p.getFirst();
1218           } catch (IOException e) {
1219             LOG.info("Exception reading hbase:meta doing HBCK repair operation", e);
1220             return;
1221           }
1222         }
1223         LOG.info("HBCK repair is triggering assignment of region=" +
1224             regionInfo.getRegionNameAsString());
1225         // trigger assign, node is already in OFFLINE so don't need to update ZK
1226         assign(regionInfo, false);
1227         break;
1228 
1229       default:
1230         LOG.warn("Received unexpected region state from HBCK: " + rt.toString());
1231         break;
1232     }
1233 
1234   }
1235 
1236   // ZooKeeper events
1237 
1238   /**
1239    * New unassigned node has been created.
1240    *
1241    * <p>This happens when an RS begins the OPENING or CLOSING of a region by
1242    * creating an unassigned node.
1243    *
1244    * <p>When this happens we must:
1245    * <ol>
1246    *   <li>Watch the node for further events</li>
1247    *   <li>Read and handle the state in the node</li>
1248    * </ol>
1249    */
1250   @Override
1251   public void nodeCreated(String path) {
1252     handleAssignmentEvent(path);
1253   }
1254 
1255   /**
1256    * Existing unassigned node has had data changed.
1257    *
1258    * <p>This happens when an RS transitions from OFFLINE to OPENING, or between
1259    * OPENING/OPENED and CLOSING/CLOSED.
1260    *
1261    * <p>When this happens we must:
1262    * <ol>
1263    *   <li>Watch the node for further events</li>
1264    *   <li>Read and handle the state in the node</li>
1265    * </ol>
1266    */
1267   @Override
1268   public void nodeDataChanged(String path) {
1269     handleAssignmentEvent(path);
1270   }
1271 
1272 
1273   // We  don't want to have two events on the same region managed simultaneously.
1274   // For this reason, we need to wait if an event on the same region is currently in progress.
1275   // So we track the region names of the events in progress, and we keep a waiting list.
1276   private final Set<String> regionsInProgress = new HashSet<String>();
1277   // In a LinkedHashMultimap, the put order is kept when we retrieve the collection back. We need
1278   //  this as we want the events to be managed in the same order as we received them.
1279   private final LinkedHashMultimap <String, RegionRunnable>
1280       zkEventWorkerWaitingList = LinkedHashMultimap.create();
1281 
1282   /**
1283    * A specific runnable that works only on a region.
1284    */
1285   private interface RegionRunnable extends Runnable{
1286     /**
1287      * @return - the name of the region it works on.
1288      */
1289     String getRegionName();
1290   }
1291 
1292   /**
1293    * Submit a task, ensuring that there is only one task at a time that working on a given region.
1294    * Order is respected.
1295    */
1296   protected void zkEventWorkersSubmit(final RegionRunnable regRunnable) {
1297 
1298     synchronized (regionsInProgress) {
1299       // If we're there is already a task with this region, we add it to the
1300       //  waiting list and return.
1301       if (regionsInProgress.contains(regRunnable.getRegionName())) {
1302         synchronized (zkEventWorkerWaitingList){
1303           zkEventWorkerWaitingList.put(regRunnable.getRegionName(), regRunnable);
1304         }
1305         return;
1306       }
1307 
1308       // No event in progress on this region => we can submit a new task immediately.
1309       regionsInProgress.add(regRunnable.getRegionName());
1310       zkEventWorkers.submit(new Runnable() {
1311         @Override
1312         public void run() {
1313           try {
1314             regRunnable.run();
1315           } finally {
1316             // now that we have finished, let's see if there is an event for the same region in the
1317             //  waiting list. If it's the case, we can now submit it to the pool.
1318             synchronized (regionsInProgress) {
1319               regionsInProgress.remove(regRunnable.getRegionName());
1320               synchronized (zkEventWorkerWaitingList) {
1321                 java.util.Set<RegionRunnable> waiting = zkEventWorkerWaitingList.get(
1322                     regRunnable.getRegionName());
1323                 if (!waiting.isEmpty()) {
1324                   // We want the first object only. The only way to get it is through an iterator.
1325                   RegionRunnable toSubmit = waiting.iterator().next();
1326                   zkEventWorkerWaitingList.remove(toSubmit.getRegionName(), toSubmit);
1327                   zkEventWorkersSubmit(toSubmit);
1328                 }
1329               }
1330             }
1331           }
1332         }
1333       });
1334     }
1335   }
1336 
1337   @Override
1338   public void nodeDeleted(final String path) {
1339     if (path.startsWith(watcher.assignmentZNode)) {
1340       final String regionName = ZKAssign.getRegionName(watcher, path);
1341       zkEventWorkersSubmit(new RegionRunnable() {
1342         @Override
1343         public String getRegionName() {
1344           return regionName;
1345         }
1346 
1347         @Override
1348         public void run() {
1349           Lock lock = locker.acquireLock(regionName);
1350           try {
1351             RegionState rs = regionStates.getRegionTransitionState(regionName);
1352             if (rs == null) {
1353               rs = regionStates.getRegionState(regionName);
1354               if (rs == null || !rs.isMergingNew()) {
1355                 // MergingNew is an offline state
1356                 return;
1357               }
1358             }
1359 
1360             HRegionInfo regionInfo = rs.getRegion();
1361             String regionNameStr = regionInfo.getRegionNameAsString();
1362             LOG.debug("Znode " + regionNameStr + " deleted, state: " + rs);
1363 
1364             boolean disabled = getTableStateManager().isTableState(regionInfo.getTable(),
1365                 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING);
1366 
1367             ServerName serverName = rs.getServerName();
1368             if (serverManager.isServerOnline(serverName)) {
1369               if (rs.isOnServer(serverName) && (rs.isOpened() || rs.isSplitting())) {
1370                 synchronized (regionStates) {
1371                   regionOnline(regionInfo, serverName);
1372                   if (rs.isSplitting() && splitRegions.containsKey(regionInfo)) {
1373                     // Check if the daugter regions are still there, if they are present, offline
1374                     // as its the case of a rollback.
1375                     HRegionInfo hri_a = splitRegions.get(regionInfo).getFirst();
1376                     HRegionInfo hri_b = splitRegions.get(regionInfo).getSecond();
1377                     if (!regionStates.isRegionInTransition(hri_a.getEncodedName())) {
1378                       LOG.warn("Split daughter region not in transition " + hri_a);
1379                     }
1380                     if (!regionStates.isRegionInTransition(hri_b.getEncodedName())) {
1381                       LOG.warn("Split daughter region not in transition" + hri_b);
1382                     }
1383                     regionOffline(hri_a);
1384                     regionOffline(hri_b);
1385                     splitRegions.remove(regionInfo);
1386                   }
1387                   if (disabled) {
1388                     // if server is offline, no hurt to unassign again
1389                     LOG.info("Opened " + regionNameStr
1390                         + "but this table is disabled, triggering close of region");
1391                     unassign(regionInfo);
1392                   }
1393                 }
1394               } else if (rs.isMergingNew()) {
1395                 synchronized (regionStates) {
1396                   String p = regionInfo.getEncodedName();
1397                   PairOfSameType<HRegionInfo> regions = mergingRegions.get(p);
1398                   if (regions != null) {
1399                     onlineMergingRegion(disabled, regions.getFirst(), serverName);
1400                     onlineMergingRegion(disabled, regions.getSecond(), serverName);
1401                   }
1402                 }
1403               }
1404             }
1405           } finally {
1406             lock.unlock();
1407           }
1408         }
1409 
1410         private void onlineMergingRegion(boolean disabled,
1411             final HRegionInfo hri, final ServerName serverName) {
1412           RegionState regionState = regionStates.getRegionState(hri);
1413           if (regionState != null && regionState.isMerging()
1414               && regionState.isOnServer(serverName)) {
1415             regionOnline(regionState.getRegion(), serverName);
1416             if (disabled) {
1417               unassign(hri);
1418             }
1419           }
1420         }
1421       });
1422     }
1423   }
1424 
1425   /**
1426    * New unassigned node has been created.
1427    *
1428    * <p>This happens when an RS begins the OPENING, SPLITTING or CLOSING of a
1429    * region by creating a znode.
1430    *
1431    * <p>When this happens we must:
1432    * <ol>
1433    *   <li>Watch the node for further children changed events</li>
1434    *   <li>Watch all new children for changed events</li>
1435    * </ol>
1436    */
1437   @Override
1438   public void nodeChildrenChanged(String path) {
1439     if (path.equals(watcher.assignmentZNode)) {
1440       zkEventWorkers.submit(new Runnable() {
1441         @Override
1442         public void run() {
1443           try {
1444             // Just make sure we see the changes for the new znodes
1445             List<String> children =
1446               ZKUtil.listChildrenAndWatchForNewChildren(
1447                 watcher, watcher.assignmentZNode);
1448             if (children != null) {
1449               Stat stat = new Stat();
1450               for (String child : children) {
1451                 // if region is in transition, we already have a watch
1452                 // on it, so no need to watch it again. So, as I know for now,
1453                 // this is needed to watch splitting nodes only.
1454                 if (!regionStates.isRegionInTransition(child)) {
1455                   ZKAssign.getDataAndWatch(watcher, child, stat);
1456                 }
1457               }
1458             }
1459           } catch (KeeperException e) {
1460             server.abort("Unexpected ZK exception reading unassigned children", e);
1461           }
1462         }
1463       });
1464     }
1465   }
1466 
1467 
1468   /**
1469    * Marks the region as online.  Removes it from regions in transition and
1470    * updates the in-memory assignment information.
1471    * <p>
1472    * Used when a region has been successfully opened on a region server.
1473    * @param regionInfo
1474    * @param sn
1475    */
1476   void regionOnline(HRegionInfo regionInfo, ServerName sn) {
1477     regionOnline(regionInfo, sn, HConstants.NO_SEQNUM);
1478   }
1479 
1480   void regionOnline(HRegionInfo regionInfo, ServerName sn, long openSeqNum) {
1481     numRegionsOpened.incrementAndGet();
1482     regionStates.regionOnline(regionInfo, sn, openSeqNum);
1483 
1484     // Remove plan if one.
1485     clearRegionPlan(regionInfo);
1486     balancer.regionOnline(regionInfo, sn);
1487 
1488     // Tell our listeners that a region was opened
1489     sendRegionOpenedNotification(regionInfo, sn);
1490   }
1491 
1492   /**
1493    * Pass the assignment event to a worker for processing.
1494    * Each worker is a single thread executor service.  The reason
1495    * for just one thread is to make sure all events for a given
1496    * region are processed in order.
1497    *
1498    * @param path
1499    */
1500   private void handleAssignmentEvent(final String path) {
1501     if (path.startsWith(watcher.assignmentZNode)) {
1502       final String regionName = ZKAssign.getRegionName(watcher, path);
1503 
1504       zkEventWorkersSubmit(new RegionRunnable() {
1505         @Override
1506         public String getRegionName() {
1507           return regionName;
1508         }
1509 
1510         @Override
1511         public void run() {
1512           try {
1513             Stat stat = new Stat();
1514             byte [] data = ZKAssign.getDataAndWatch(watcher, path, stat);
1515             if (data == null) return;
1516 
1517             RegionTransition rt = RegionTransition.parseFrom(data);
1518 
1519             // TODO: This code is tied to ZK anyway, so for now leaving it as is,
1520             // will refactor when whole region assignment will be abstracted from ZK
1521             BaseCoordinatedStateManager csm =
1522               (BaseCoordinatedStateManager) server.getCoordinatedStateManager();
1523             OpenRegionCoordination openRegionCoordination = csm.getOpenRegionCoordination();
1524 
1525             ZkOpenRegionCoordination.ZkOpenRegionDetails zkOrd =
1526               new ZkOpenRegionCoordination.ZkOpenRegionDetails();
1527             zkOrd.setVersion(stat.getVersion());
1528             zkOrd.setServerName(csm.getServer().getServerName());
1529 
1530             handleRegion(rt, openRegionCoordination, zkOrd);
1531           } catch (KeeperException e) {
1532             server.abort("Unexpected ZK exception reading unassigned node data", e);
1533           } catch (DeserializationException e) {
1534             server.abort("Unexpected exception deserializing node data", e);
1535           }
1536         }
1537       });
1538     }
1539   }
1540 
1541   /**
1542    * Marks the region as offline.  Removes it from regions in transition and
1543    * removes in-memory assignment information.
1544    * <p>
1545    * Used when a region has been closed and should remain closed.
1546    * @param regionInfo
1547    */
1548   public void regionOffline(final HRegionInfo regionInfo) {
1549     if (regionStates.isRegionInState(regionInfo, State.MERGED, State.SPLIT)) {
1550       LOG.info("Try to offline region " + regionInfo.getEncodedName() +
1551           ", which is at state " + regionStates.getRegionState(regionInfo).getState() + ", skip");
1552       return;
1553     }
1554     regionOffline(regionInfo, null);
1555   }
1556 
1557   public void offlineDisabledRegion(HRegionInfo regionInfo) {
1558     if (useZKForAssignment) {
1559       // Disabling so should not be reassigned, just delete the CLOSED node
1560       LOG.debug("Table being disabled so deleting ZK node and removing from " +
1561         "regions in transition, skipping assignment of region " +
1562           regionInfo.getRegionNameAsString());
1563       String encodedName = regionInfo.getEncodedName();
1564       deleteNodeInStates(encodedName, "closed", null,
1565         EventType.RS_ZK_REGION_CLOSED, EventType.M_ZK_REGION_OFFLINE);
1566     }
1567     replicasToClose.remove(regionInfo);
1568     regionOffline(regionInfo);
1569   }
1570 
1571   // Assignment methods
1572 
1573   /**
1574    * Assigns the specified region.
1575    * <p>
1576    * If a RegionPlan is available with a valid destination then it will be used
1577    * to determine what server region is assigned to.  If no RegionPlan is
1578    * available, region will be assigned to a random available server.
1579    * <p>
1580    * Updates the RegionState and sends the OPEN RPC.
1581    * <p>
1582    * This will only succeed if the region is in transition and in a CLOSED or
1583    * OFFLINE state or not in transition (in-memory not zk), and of course, the
1584    * chosen server is up and running (It may have just crashed!).  If the
1585    * in-memory checks pass, the zk node is forced to OFFLINE before assigning.
1586    *
1587    * @param region server to be assigned
1588    * @param setOfflineInZK whether ZK node should be created/transitioned to an
1589    *                       OFFLINE state before assigning the region
1590    */
1591   public void assign(HRegionInfo region, boolean setOfflineInZK) {
1592     assign(region, setOfflineInZK, false);
1593   }
1594 
1595   /**
1596    * Use care with forceNewPlan. It could cause double assignment.
1597    */
1598   @VisibleForTesting
1599   public void assign(HRegionInfo region,
1600       boolean setOfflineInZK, boolean forceNewPlan) {
1601     if (isDisabledorDisablingRegionInRIT(region)) {
1602       return;
1603     }
1604     String encodedName = region.getEncodedName();
1605     Lock lock = locker.acquireLock(encodedName);
1606     try {
1607       RegionState state = forceRegionStateToOffline(region, forceNewPlan);
1608       if (state != null) {
1609         if (regionStates.wasRegionOnDeadServer(encodedName)) {
1610           LOG.info("Skip assigning " + region.getRegionNameAsString()
1611             + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
1612             + " is dead but not processed yet");
1613           return;
1614         }
1615         assign(state, setOfflineInZK && useZKForAssignment, forceNewPlan);
1616       }
1617     } finally {
1618       lock.unlock();
1619     }
1620   }
1621 
1622   /**
1623    * Bulk assign regions to <code>destination</code>.
1624    * @param destination
1625    * @param regions Regions to assign.
1626    * @return true if successful
1627    */
1628   boolean assign(final ServerName destination, final List<HRegionInfo> regions)
1629     throws InterruptedException {
1630     long startTime = EnvironmentEdgeManager.currentTime();
1631     try {
1632       int regionCount = regions.size();
1633       if (regionCount == 0) {
1634         return true;
1635       }
1636       LOG.info("Assigning " + regionCount + " region(s) to " + destination.toString());
1637       Set<String> encodedNames = new HashSet<String>(regionCount);
1638       for (HRegionInfo region : regions) {
1639         encodedNames.add(region.getEncodedName());
1640       }
1641 
1642       List<HRegionInfo> failedToOpenRegions = new ArrayList<HRegionInfo>();
1643       Map<String, Lock> locks = locker.acquireLocks(encodedNames);
1644       try {
1645         AtomicInteger counter = new AtomicInteger(0);
1646         Map<String, Integer> offlineNodesVersions = new ConcurrentHashMap<String, Integer>();
1647         OfflineCallback cb = new OfflineCallback(
1648           watcher, destination, counter, offlineNodesVersions);
1649         Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>(regions.size());
1650         List<RegionState> states = new ArrayList<RegionState>(regions.size());
1651         for (HRegionInfo region : regions) {
1652           String encodedName = region.getEncodedName();
1653           if (!isDisabledorDisablingRegionInRIT(region)) {
1654             RegionState state = forceRegionStateToOffline(region, false);
1655             boolean onDeadServer = false;
1656             if (state != null) {
1657               if (regionStates.wasRegionOnDeadServer(encodedName)) {
1658                 LOG.info("Skip assigning " + region.getRegionNameAsString()
1659                   + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
1660                   + " is dead but not processed yet");
1661                 onDeadServer = true;
1662               } else if (!useZKForAssignment
1663                   || asyncSetOfflineInZooKeeper(state, cb, destination)) {
1664                 RegionPlan plan = new RegionPlan(region, state.getServerName(), destination);
1665                 plans.put(encodedName, plan);
1666                 states.add(state);
1667                 continue;
1668               }
1669             }
1670             // Reassign if the region wasn't on a dead server
1671             if (!onDeadServer) {
1672               LOG.info("failed to force region state to offline or "
1673                 + "failed to set it offline in ZK, will reassign later: " + region);
1674               failedToOpenRegions.add(region); // assign individually later
1675             }
1676           }
1677           // Release the lock, this region is excluded from bulk assign because
1678           // we can't update its state, or set its znode to offline.
1679           Lock lock = locks.remove(encodedName);
1680           lock.unlock();
1681         }
1682 
1683         if (useZKForAssignment) {
1684           // Wait until all unassigned nodes have been put up and watchers set.
1685           int total = states.size();
1686           for (int oldCounter = 0; !server.isStopped();) {
1687             int count = counter.get();
1688             if (oldCounter != count) {
1689               LOG.debug(destination.toString() + " unassigned znodes=" + count +
1690                 " of total=" + total + "; oldCounter=" + oldCounter);
1691               oldCounter = count;
1692             }
1693             if (count >= total) break;
1694             Thread.sleep(5);
1695           }
1696         }
1697 
1698         if (server.isStopped()) {
1699           return false;
1700         }
1701 
1702         // Add region plans, so we can updateTimers when one region is opened so
1703         // that unnecessary timeout on RIT is reduced.
1704         this.addPlans(plans);
1705 
1706         List<Triple<HRegionInfo, Integer, List<ServerName>>> regionOpenInfos =
1707           new ArrayList<Triple<HRegionInfo, Integer, List<ServerName>>>(states.size());
1708         for (RegionState state: states) {
1709           HRegionInfo region = state.getRegion();
1710           String encodedRegionName = region.getEncodedName();
1711           Integer nodeVersion = offlineNodesVersions.get(encodedRegionName);
1712           if (useZKForAssignment && (nodeVersion == null || nodeVersion == -1)) {
1713             LOG.warn("failed to offline in zookeeper: " + region);
1714             failedToOpenRegions.add(region); // assign individually later
1715             Lock lock = locks.remove(encodedRegionName);
1716             lock.unlock();
1717           } else {
1718             regionStates.updateRegionState(
1719               region, State.PENDING_OPEN, destination);
1720             List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
1721             if (this.shouldAssignRegionsWithFavoredNodes) {
1722               favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
1723             }
1724             regionOpenInfos.add(new Triple<HRegionInfo, Integer,  List<ServerName>>(
1725               region, nodeVersion, favoredNodes));
1726           }
1727         }
1728 
1729         // Move on to open regions.
1730         try {
1731           // Send OPEN RPC. If it fails on a IOE or RemoteException,
1732           // regions will be assigned individually.
1733           long maxWaitTime = System.currentTimeMillis() +
1734             this.server.getConfiguration().
1735               getLong("hbase.regionserver.rpc.startup.waittime", 60000);
1736           for (int i = 1; i <= maximumAttempts && !server.isStopped(); i++) {
1737             try {
1738               // regionOpenInfos is empty if all regions are in failedToOpenRegions list
1739               if (regionOpenInfos.isEmpty()) {
1740                 break;
1741               }
1742               List<RegionOpeningState> regionOpeningStateList = serverManager
1743                 .sendRegionOpen(destination, regionOpenInfos);
1744               if (regionOpeningStateList == null) {
1745                 // Failed getting RPC connection to this server
1746                 return false;
1747               }
1748               for (int k = 0, n = regionOpeningStateList.size(); k < n; k++) {
1749                 RegionOpeningState openingState = regionOpeningStateList.get(k);
1750                 if (openingState != RegionOpeningState.OPENED) {
1751                   HRegionInfo region = regionOpenInfos.get(k).getFirst();
1752                   if (openingState == RegionOpeningState.ALREADY_OPENED) {
1753                     processAlreadyOpenedRegion(region, destination);
1754                   } else if (openingState == RegionOpeningState.FAILED_OPENING) {
1755                     // Failed opening this region, reassign it later
1756                     failedToOpenRegions.add(region);
1757                   } else {
1758                     LOG.warn("THIS SHOULD NOT HAPPEN: unknown opening state "
1759                       + openingState + " in assigning region " + region);
1760                   }
1761                 }
1762               }
1763               break;
1764             } catch (IOException e) {
1765               if (e instanceof RemoteException) {
1766                 e = ((RemoteException)e).unwrapRemoteException();
1767               }
1768               if (e instanceof RegionServerStoppedException) {
1769                 LOG.warn("The region server was shut down, ", e);
1770                 // No need to retry, the region server is a goner.
1771                 return false;
1772               } else if (e instanceof ServerNotRunningYetException) {
1773                 long now = System.currentTimeMillis();
1774                 if (now < maxWaitTime) {
1775                   LOG.debug("Server is not yet up; waiting up to " +
1776                     (maxWaitTime - now) + "ms", e);
1777                   Thread.sleep(100);
1778                   i--; // reset the try count
1779                   continue;
1780                 }
1781               } else if (e instanceof java.net.SocketTimeoutException
1782                   && this.serverManager.isServerOnline(destination)) {
1783                 // In case socket is timed out and the region server is still online,
1784                 // the openRegion RPC could have been accepted by the server and
1785                 // just the response didn't go through.  So we will retry to
1786                 // open the region on the same server.
1787                 if (LOG.isDebugEnabled()) {
1788                   LOG.debug("Bulk assigner openRegion() to " + destination
1789                     + " has timed out, but the regions might"
1790                     + " already be opened on it.", e);
1791                 }
1792                 // wait and reset the re-try count, server might be just busy.
1793                 Thread.sleep(100);
1794                 i--;
1795                 continue;
1796               }
1797               throw e;
1798             }
1799           }
1800         } catch (IOException e) {
1801           // Can be a socket timeout, EOF, NoRouteToHost, etc
1802           LOG.info("Unable to communicate with " + destination
1803             + " in order to assign regions, ", e);
1804           return false;
1805         }
1806       } finally {
1807         for (Lock lock : locks.values()) {
1808           lock.unlock();
1809         }
1810       }
1811 
1812       if (!failedToOpenRegions.isEmpty()) {
1813         for (HRegionInfo region : failedToOpenRegions) {
1814           if (!regionStates.isRegionOnline(region)) {
1815             invokeAssign(region);
1816           }
1817         }
1818       }
1819 
1820       // wait for assignment completion
1821       ArrayList<HRegionInfo> userRegionSet = new ArrayList<HRegionInfo>(regions.size());
1822       for (HRegionInfo region: regions) {
1823         if (!region.getTable().isSystemTable()) {
1824           userRegionSet.add(region);
1825         }
1826       }
1827       if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
1828             System.currentTimeMillis())) {
1829         LOG.debug("some user regions are still in transition: " + userRegionSet);
1830       }
1831       LOG.debug("Bulk assigning done for " + destination);
1832       return true;
1833     } finally {
1834       metricsAssignmentManager.updateBulkAssignTime(EnvironmentEdgeManager.currentTime() - startTime);
1835     }
1836   }
1837 
1838   /**
1839    * Send CLOSE RPC if the server is online, otherwise, offline the region.
1840    *
1841    * The RPC will be sent only to the region sever found in the region state
1842    * if it is passed in, otherwise, to the src server specified. If region
1843    * state is not specified, we don't update region state at all, instead
1844    * we just send the RPC call. This is useful for some cleanup without
1845    * messing around the region states (see handleRegion, on region opened
1846    * on an unexpected server scenario, for an example)
1847    */
1848   private void unassign(final HRegionInfo region,
1849       final RegionState state, final int versionOfClosingNode,
1850       final ServerName dest, final boolean transitionInZK,
1851       final ServerName src) {
1852     ServerName server = src;
1853     if (state != null) {
1854       server = state.getServerName();
1855     }
1856     long maxWaitTime = -1;
1857     for (int i = 1; i <= this.maximumAttempts; i++) {
1858       if (this.server.isStopped() || this.server.isAborted()) {
1859         LOG.debug("Server stopped/aborted; skipping unassign of " + region);
1860         return;
1861       }
1862       // ClosedRegionhandler can remove the server from this.regions
1863       if (!serverManager.isServerOnline(server)) {
1864         LOG.debug("Offline " + region.getRegionNameAsString()
1865           + ", no need to unassign since it's on a dead server: " + server);
1866         if (transitionInZK) {
1867           // delete the node. if no node exists need not bother.
1868           deleteClosingOrClosedNode(region, server);
1869         }
1870         if (state != null) {
1871           regionOffline(region);
1872         }
1873         return;
1874       }
1875       try {
1876         // Send CLOSE RPC
1877         if (serverManager.sendRegionClose(server, region,
1878           versionOfClosingNode, dest, transitionInZK)) {
1879           LOG.debug("Sent CLOSE to " + server + " for region " +
1880             region.getRegionNameAsString());
1881           if (useZKForAssignment && !transitionInZK && state != null) {
1882             // Retry to make sure the region is
1883             // closed so as to avoid double assignment.
1884             unassign(region, state, versionOfClosingNode,
1885               dest, transitionInZK, src);
1886           }
1887           return;
1888         }
1889         // This never happens. Currently regionserver close always return true.
1890         // Todo; this can now happen (0.96) if there is an exception in a coprocessor
1891         LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
1892           region.getRegionNameAsString());
1893       } catch (Throwable t) {
1894         long sleepTime = 0;
1895         Configuration conf = this.server.getConfiguration();
1896         if (t instanceof RemoteException) {
1897           t = ((RemoteException)t).unwrapRemoteException();
1898         }
1899         boolean logRetries = true;
1900         if (t instanceof RegionServerAbortedException
1901             || t instanceof RegionServerStoppedException
1902             || t instanceof ServerNotRunningYetException) {
1903           // RS is aborting or stopping, we cannot offline the region since the region may need
1904           // to do WAL recovery. Until we see  the RS expiration, we should retry.
1905           sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1906             RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
1907 
1908         } else if (t instanceof NotServingRegionException) {
1909           LOG.debug("Offline " + region.getRegionNameAsString()
1910             + ", it's not any more on " + server, t);
1911           if (transitionInZK) {
1912             deleteClosingOrClosedNode(region, server);
1913           }
1914           if (state != null) {
1915             regionOffline(region);
1916           }
1917           return;
1918         } else if ((t instanceof FailedServerException) || (state != null &&
1919             t instanceof RegionAlreadyInTransitionException)) {
1920           if(t instanceof FailedServerException) {
1921             sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1922                   RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
1923           } else {
1924             // RS is already processing this region, only need to update the timestamp
1925             LOG.debug("update " + state + " the timestamp.");
1926             state.updateTimestampToNow();
1927             if (maxWaitTime < 0) {
1928               maxWaitTime =
1929                   EnvironmentEdgeManager.currentTime()
1930                       + conf.getLong(ALREADY_IN_TRANSITION_WAITTIME,
1931                         DEFAULT_ALREADY_IN_TRANSITION_WAITTIME);
1932             }
1933             long now = EnvironmentEdgeManager.currentTime();
1934             if (now < maxWaitTime) {
1935               LOG.debug("Region is already in transition; "
1936                 + "waiting up to " + (maxWaitTime - now) + "ms", t);
1937               sleepTime = 100;
1938               i--; // reset the try count
1939               logRetries = false;
1940             }
1941           }
1942         }
1943 
1944         try {
1945           if (sleepTime > 0) {
1946             Thread.sleep(sleepTime);
1947           }
1948         } catch (InterruptedException ie) {
1949           LOG.warn("Failed to unassign "
1950             + region.getRegionNameAsString() + " since interrupted", ie);
1951           Thread.currentThread().interrupt();
1952           if (state != null) {
1953             regionStates.updateRegionState(region, State.FAILED_CLOSE);
1954           }
1955           return;
1956         }
1957 
1958         if (logRetries) {
1959           LOG.info("Server " + server + " returned " + t + " for "
1960             + region.getRegionNameAsString() + ", try=" + i
1961             + " of " + this.maximumAttempts, t);
1962           // Presume retry or server will expire.
1963         }
1964       }
1965     }
1966     // Run out of attempts
1967     if (state != null) {
1968       regionStates.updateRegionState(region, State.FAILED_CLOSE);
1969     }
1970   }
1971 
1972   /**
1973    * Set region to OFFLINE unless it is opening and forceNewPlan is false.
1974    */
1975   private RegionState forceRegionStateToOffline(
1976       final HRegionInfo region, final boolean forceNewPlan) {
1977     RegionState state = regionStates.getRegionState(region);
1978     if (state == null) {
1979       LOG.warn("Assigning but not in region states: " + region);
1980       state = regionStates.createRegionState(region);
1981     }
1982 
1983     ServerName sn = state.getServerName();
1984     if (forceNewPlan && LOG.isDebugEnabled()) {
1985       LOG.debug("Force region state offline " + state);
1986     }
1987 
1988     switch (state.getState()) {
1989     case OPEN:
1990     case OPENING:
1991     case PENDING_OPEN:
1992     case CLOSING:
1993     case PENDING_CLOSE:
1994       if (!forceNewPlan) {
1995         LOG.debug("Skip assigning " +
1996           region + ", it is already " + state);
1997         return null;
1998       }
1999     case FAILED_CLOSE:
2000     case FAILED_OPEN:
2001       unassign(region, state, -1, null, false, null);
2002       state = regionStates.getRegionState(region);
2003       if (state.isFailedClose()) {
2004         // If we can't close the region, we can't re-assign
2005         // it so as to avoid possible double assignment/data loss.
2006         LOG.info("Skip assigning " +
2007           region + ", we couldn't close it: " + state);
2008         return null;
2009       }
2010     case OFFLINE:
2011       // This region could have been open on this server
2012       // for a while. If the server is dead and not processed
2013       // yet, we can move on only if the meta shows the
2014       // region is not on this server actually, or on a server
2015       // not dead, or dead and processed already.
2016       // In case not using ZK, we don't need this check because
2017       // we have the latest info in memory, and the caller
2018       // will do another round checking any way.
2019       if (useZKForAssignment
2020           && regionStates.isServerDeadAndNotProcessed(sn)
2021           && wasRegionOnDeadServerByMeta(region, sn)) {
2022         if (!regionStates.isRegionInTransition(region)) {
2023           LOG.info("Updating the state to " + State.OFFLINE + " to allow to be reassigned by SSH");
2024           regionStates.updateRegionState(region, State.OFFLINE);
2025         }
2026         LOG.info("Skip assigning " + region.getRegionNameAsString()
2027             + ", it is on a dead but not processed yet server: " + sn);
2028         return null;
2029       }
2030     case CLOSED:
2031       break;
2032     default:
2033       LOG.error("Trying to assign region " + region
2034         + ", which is " + state);
2035       return null;
2036     }
2037     return state;
2038   }
2039 
2040   @SuppressWarnings("deprecation")
2041   protected boolean wasRegionOnDeadServerByMeta(
2042       final HRegionInfo region, final ServerName sn) {
2043     try {
2044       if (region.isMetaRegion()) {
2045         ServerName server = this.server.getMetaTableLocator().
2046           getMetaRegionLocation(this.server.getZooKeeper());
2047         return regionStates.isServerDeadAndNotProcessed(server);
2048       }
2049       while (!server.isStopped()) {
2050         try {
2051           this.server.getMetaTableLocator().waitMetaRegionLocation(server.getZooKeeper());
2052           Result r = MetaTableAccessor.getRegionResult(server.getConnection(),
2053             region.getRegionName());
2054           if (r == null || r.isEmpty()) return false;
2055           ServerName server = HRegionInfo.getServerName(r);
2056           return regionStates.isServerDeadAndNotProcessed(server);
2057         } catch (IOException ioe) {
2058           LOG.info("Received exception accessing hbase:meta during force assign "
2059             + region.getRegionNameAsString() + ", retrying", ioe);
2060         }
2061       }
2062     } catch (InterruptedException e) {
2063       Thread.currentThread().interrupt();
2064       LOG.info("Interrupted accessing hbase:meta", e);
2065     }
2066     // Call is interrupted or server is stopped.
2067     return regionStates.isServerDeadAndNotProcessed(sn);
2068   }
2069 
2070   /**
2071    * Caller must hold lock on the passed <code>state</code> object.
2072    * @param state
2073    * @param setOfflineInZK
2074    * @param forceNewPlan
2075    */
2076   public void assign(RegionState state,
2077       boolean setOfflineInZK, final boolean forceNewPlan) {
2078     long startTime = EnvironmentEdgeManager.currentTime();
2079     try {
2080       Configuration conf = server.getConfiguration();
2081       RegionState currentState = state;
2082       int versionOfOfflineNode = -1;
2083       RegionPlan plan = null;
2084       long maxWaitTime = -1;
2085       HRegionInfo region = state.getRegion();
2086       RegionOpeningState regionOpenState;
2087       Throwable previousException = null;
2088       for (int i = 1; i <= maximumAttempts; i++) {
2089         if (server.isStopped() || server.isAborted()) {
2090           LOG.info("Skip assigning " + region.getRegionNameAsString()
2091             + ", the server is stopped/aborted");
2092           return;
2093         }
2094 
2095         if (plan == null) { // Get a server for the region at first
2096           try {
2097             plan = getRegionPlan(region, forceNewPlan);
2098           } catch (HBaseIOException e) {
2099             LOG.warn("Failed to get region plan", e);
2100           }
2101         }
2102 
2103         if (plan == null) {
2104           LOG.warn("Unable to determine a plan to assign " + region);
2105 
2106           // For meta region, we have to keep retrying until succeeding
2107           if (region.isMetaRegion()) {
2108             if (i == maximumAttempts) {
2109               i = 0; // re-set attempt count to 0 for at least 1 retry
2110 
2111               LOG.warn("Unable to determine a plan to assign a hbase:meta region " + region +
2112                 " after maximumAttempts (" + this.maximumAttempts +
2113                 "). Reset attempts count and continue retrying.");
2114             }
2115             waitForRetryingMetaAssignment();
2116             continue;
2117           }
2118 
2119           regionStates.updateRegionState(region, State.FAILED_OPEN);
2120           return;
2121         }
2122         if (setOfflineInZK && versionOfOfflineNode == -1) {
2123           LOG.info("Setting node as OFFLINED in ZooKeeper for region " + region);
2124           // get the version of the znode after setting it to OFFLINE.
2125           // versionOfOfflineNode will be -1 if the znode was not set to OFFLINE
2126           versionOfOfflineNode = setOfflineInZooKeeper(currentState, plan.getDestination());
2127           if (versionOfOfflineNode != -1) {
2128             if (isDisabledorDisablingRegionInRIT(region)) {
2129               return;
2130             }
2131             // In case of assignment from EnableTableHandler table state is ENABLING. Any how
2132             // EnableTableHandler will set ENABLED after assigning all the table regions. If we
2133             // try to set to ENABLED directly then client API may think table is enabled.
2134             // When we have a case such as all the regions are added directly into hbase:meta and we call
2135             // assignRegion then we need to make the table ENABLED. Hence in such case the table
2136             // will not be in ENABLING or ENABLED state.
2137             TableName tableName = region.getTable();
2138             if (!tableStateManager.isTableState(tableName,
2139               ZooKeeperProtos.Table.State.ENABLED, ZooKeeperProtos.Table.State.ENABLING)) {
2140               LOG.debug("Setting table " + tableName + " to ENABLED state.");
2141               setEnabledTable(tableName);
2142             }
2143           }
2144         }
2145         if (setOfflineInZK && versionOfOfflineNode == -1) {
2146           LOG.info("Unable to set offline in ZooKeeper to assign " + region);
2147           // Setting offline in ZK must have been failed due to ZK racing or some
2148           // exception which may make the server to abort. If it is ZK racing,
2149           // we should retry since we already reset the region state,
2150           // existing (re)assignment will fail anyway.
2151           if (!server.isAborted()) {
2152             continue;
2153           }
2154         }
2155         LOG.info("Assigning " + region.getRegionNameAsString() +
2156             " to " + plan.getDestination().toString());
2157         // Transition RegionState to PENDING_OPEN
2158         currentState = regionStates.updateRegionState(region,
2159           State.PENDING_OPEN, plan.getDestination());
2160 
2161         boolean needNewPlan;
2162         final String assignMsg = "Failed assignment of " + region.getRegionNameAsString() +
2163             " to " + plan.getDestination();
2164         try {
2165           List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
2166           if (this.shouldAssignRegionsWithFavoredNodes) {
2167             favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
2168           }
2169           regionOpenState = serverManager.sendRegionOpen(
2170               plan.getDestination(), region, versionOfOfflineNode, favoredNodes);
2171 
2172           if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
2173             // Failed opening this region, looping again on a new server.
2174             needNewPlan = true;
2175             LOG.warn(assignMsg + ", regionserver says 'FAILED_OPENING', " +
2176                 " trying to assign elsewhere instead; " +
2177                 "try=" + i + " of " + this.maximumAttempts);
2178           } else {
2179             // we're done
2180             if (regionOpenState == RegionOpeningState.ALREADY_OPENED) {
2181               processAlreadyOpenedRegion(region, plan.getDestination());
2182             }
2183             return;
2184           }
2185 
2186         } catch (Throwable t) {
2187           if (t instanceof RemoteException) {
2188             t = ((RemoteException) t).unwrapRemoteException();
2189           }
2190           previousException = t;
2191 
2192           // Should we wait a little before retrying? If the server is starting it's yes.
2193           // If the region is already in transition, it's yes as well: we want to be sure that
2194           //  the region will get opened but we don't want a double assignment.
2195           boolean hold = (t instanceof RegionAlreadyInTransitionException ||
2196               t instanceof ServerNotRunningYetException);
2197 
2198           // In case socket is timed out and the region server is still online,
2199           // the openRegion RPC could have been accepted by the server and
2200           // just the response didn't go through.  So we will retry to
2201           // open the region on the same server to avoid possible
2202           // double assignment.
2203           boolean retry = !hold && (t instanceof java.net.SocketTimeoutException
2204               && this.serverManager.isServerOnline(plan.getDestination()));
2205 
2206 
2207           if (hold) {
2208             LOG.warn(assignMsg + ", waiting a little before trying on the same region server " +
2209               "try=" + i + " of " + this.maximumAttempts, t);
2210 
2211             if (maxWaitTime < 0) {
2212               if (t instanceof RegionAlreadyInTransitionException) {
2213                 maxWaitTime = EnvironmentEdgeManager.currentTime()
2214                   + this.server.getConfiguration().getLong(ALREADY_IN_TRANSITION_WAITTIME,
2215                     DEFAULT_ALREADY_IN_TRANSITION_WAITTIME);
2216               } else {
2217                 maxWaitTime = EnvironmentEdgeManager.currentTime()
2218                   + this.server.getConfiguration().getLong(
2219                     "hbase.regionserver.rpc.startup.waittime", 60000);
2220               }
2221             }
2222             try {
2223               needNewPlan = false;
2224               long now = EnvironmentEdgeManager.currentTime();
2225               if (now < maxWaitTime) {
2226                 LOG.debug("Server is not yet up or region is already in transition; "
2227                   + "waiting up to " + (maxWaitTime - now) + "ms", t);
2228                 Thread.sleep(100);
2229                 i--; // reset the try count
2230               } else if (!(t instanceof RegionAlreadyInTransitionException)) {
2231                 LOG.debug("Server is not up for a while; try a new one", t);
2232                 needNewPlan = true;
2233               }
2234             } catch (InterruptedException ie) {
2235               LOG.warn("Failed to assign "
2236                   + region.getRegionNameAsString() + " since interrupted", ie);
2237               regionStates.updateRegionState(region, State.FAILED_OPEN);
2238               Thread.currentThread().interrupt();
2239               return;
2240             }
2241           } else if (retry) {
2242             needNewPlan = false;
2243             i--; // we want to retry as many times as needed as long as the RS is not dead.
2244             LOG.warn(assignMsg + ", trying to assign to the same region server due ", t);
2245           } else {
2246             needNewPlan = true;
2247             LOG.warn(assignMsg + ", trying to assign elsewhere instead;" +
2248                 " try=" + i + " of " + this.maximumAttempts, t);
2249           }
2250         }
2251 
2252         if (i == this.maximumAttempts) {
2253           // For meta region, we have to keep retrying until succeeding
2254           if (region.isMetaRegion()) {
2255             i = 0; // re-set attempt count to 0 for at least 1 retry
2256             LOG.warn(assignMsg +
2257                 ", trying to assign a hbase:meta region reached to maximumAttempts (" +
2258                 this.maximumAttempts + ").  Reset attempt counts and continue retrying.");
2259             waitForRetryingMetaAssignment();
2260           }
2261           else {
2262             // Don't reset the region state or get a new plan any more.
2263             // This is the last try.
2264             continue;
2265           }
2266         }
2267 
2268         // If region opened on destination of present plan, reassigning to new
2269         // RS may cause double assignments. In case of RegionAlreadyInTransitionException
2270         // reassigning to same RS.
2271         if (needNewPlan) {
2272           // Force a new plan and reassign. Will return null if no servers.
2273           // The new plan could be the same as the existing plan since we don't
2274           // exclude the server of the original plan, which should not be
2275           // excluded since it could be the only server up now.
2276           RegionPlan newPlan = null;
2277           try {
2278             newPlan = getRegionPlan(region, true);
2279           } catch (HBaseIOException e) {
2280             LOG.warn("Failed to get region plan", e);
2281           }
2282           if (newPlan == null) {
2283             regionStates.updateRegionState(region, State.FAILED_OPEN);
2284             LOG.warn("Unable to find a viable location to assign region " +
2285                 region.getRegionNameAsString());
2286             return;
2287           }
2288 
2289           if (plan != newPlan && !plan.getDestination().equals(newPlan.getDestination())) {
2290             // Clean out plan we failed execute and one that doesn't look like it'll
2291             // succeed anyways; we need a new plan!
2292             // Transition back to OFFLINE
2293             LOG.info("Region assignment plan changed from " + plan.getDestination() + " to "
2294                 + newPlan.getDestination() + " server.");
2295             currentState = regionStates.updateRegionState(region, State.OFFLINE);
2296             versionOfOfflineNode = -1;
2297             if (useZKForAssignment) {
2298               setOfflineInZK = true;
2299             }
2300             plan = newPlan;
2301           } else if(plan.getDestination().equals(newPlan.getDestination()) &&
2302               previousException instanceof FailedServerException) {
2303             try {
2304               LOG.info("Trying to re-assign " + region.getRegionNameAsString() +
2305                 " to the same failed server.");
2306               Thread.sleep(1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
2307                 RpcClient.FAILED_SERVER_EXPIRY_DEFAULT));
2308             } catch (InterruptedException ie) {
2309               LOG.warn("Failed to assign "
2310                   + region.getRegionNameAsString() + " since interrupted", ie);
2311               regionStates.updateRegionState(region, State.FAILED_OPEN);
2312               Thread.currentThread().interrupt();
2313               return;
2314             }
2315           }
2316         }
2317       }
2318       // Run out of attempts
2319       regionStates.updateRegionState(region, State.FAILED_OPEN);
2320     } finally {
2321       metricsAssignmentManager.updateAssignmentTime(EnvironmentEdgeManager.currentTime() - startTime);
2322     }
2323   }
2324 
2325   private void processAlreadyOpenedRegion(HRegionInfo region, ServerName sn) {
2326     // Remove region from in-memory transition and unassigned node from ZK
2327     // While trying to enable the table the regions of the table were
2328     // already enabled.
2329     LOG.debug("ALREADY_OPENED " + region.getRegionNameAsString()
2330       + " to " + sn);
2331     String encodedName = region.getEncodedName();
2332     
2333     //If use ZkForAssignment, region already Opened event should not be handled, 
2334     //leave it to zk event. See HBase-14407.
2335     if(useZKForAssignment){
2336       String node = ZKAssign.getNodeName(watcher, encodedName);
2337       Stat stat = new Stat();
2338       try {
2339         byte[] existingBytes = ZKUtil.getDataNoWatch(watcher, node, stat);
2340         if(existingBytes!=null){
2341           RegionTransition rt= RegionTransition.parseFrom(existingBytes);
2342           EventType et = rt.getEventType();
2343           if (et.equals(EventType.RS_ZK_REGION_OPENED)) {
2344             LOG.debug("ALREADY_OPENED " + region.getRegionNameAsString()
2345               + " and node in "+et+" state");
2346             return;
2347           }
2348         }
2349       } catch (KeeperException ke) {
2350         LOG.warn("Unexpected ZK exception getData " + node
2351           + " node for the region " + encodedName, ke);
2352       } catch (DeserializationException e) {
2353         LOG.warn("Get RegionTransition from zk deserialization failed! ", e);
2354       }
2355       
2356       deleteNodeInStates(encodedName, "offline", sn, EventType.M_ZK_REGION_OFFLINE);
2357     }
2358     
2359     regionStates.regionOnline(region, sn);
2360   }
2361 
2362   private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) {
2363     if (this.tableStateManager.isTableState(region.getTable(),
2364         ZooKeeperProtos.Table.State.DISABLED,
2365         ZooKeeperProtos.Table.State.DISABLING) || replicasToClose.contains(region)) {
2366       LOG.info("Table " + region.getTable() + " is disabled or disabling;"
2367         + " skipping assign of " + region.getRegionNameAsString());
2368       offlineDisabledRegion(region);
2369       return true;
2370     }
2371     return false;
2372   }
2373 
2374   /**
2375    * Set region as OFFLINED up in zookeeper
2376    *
2377    * @param state
2378    * @return the version of the offline node if setting of the OFFLINE node was
2379    *         successful, -1 otherwise.
2380    */
2381   private int setOfflineInZooKeeper(final RegionState state, final ServerName destination) {
2382     if (!state.isClosed() && !state.isOffline()) {
2383       String msg = "Unexpected state : " + state + " .. Cannot transit it to OFFLINE.";
2384       this.server.abort(msg, new IllegalStateException(msg));
2385       return -1;
2386     }
2387     regionStates.updateRegionState(state.getRegion(), State.OFFLINE);
2388     int versionOfOfflineNode;
2389     try {
2390       // get the version after setting the znode to OFFLINE
2391       versionOfOfflineNode = ZKAssign.createOrForceNodeOffline(watcher,
2392         state.getRegion(), destination);
2393       if (versionOfOfflineNode == -1) {
2394         LOG.warn("Attempted to create/force node into OFFLINE state before "
2395             + "completing assignment but failed to do so for " + state);
2396         return -1;
2397       }
2398     } catch (KeeperException e) {
2399       server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
2400       return -1;
2401     }
2402     return versionOfOfflineNode;
2403   }
2404 
2405   /**
2406    * @param region the region to assign
2407    * @return Plan for passed <code>region</code> (If none currently, it creates one or
2408    * if no servers to assign, it returns null).
2409    */
2410   private RegionPlan getRegionPlan(final HRegionInfo region,
2411       final boolean forceNewPlan)  throws HBaseIOException {
2412     return getRegionPlan(region, null, forceNewPlan);
2413   }
2414 
2415   /**
2416    * @param region the region to assign
2417    * @param serverToExclude Server to exclude (we know its bad). Pass null if
2418    * all servers are thought to be assignable.
2419    * @param forceNewPlan If true, then if an existing plan exists, a new plan
2420    * will be generated.
2421    * @return Plan for passed <code>region</code> (If none currently, it creates one or
2422    * if no servers to assign, it returns null).
2423    */
2424   private RegionPlan getRegionPlan(final HRegionInfo region,
2425       final ServerName serverToExclude, final boolean forceNewPlan) throws HBaseIOException {
2426     // Pickup existing plan or make a new one
2427     final String encodedName = region.getEncodedName();
2428     final List<ServerName> destServers =
2429       serverManager.createDestinationServersList(serverToExclude);
2430 
2431     if (destServers.isEmpty()){
2432       LOG.warn("Can't move " + encodedName +
2433         ", there is no destination server available.");
2434       return null;
2435     }
2436 
2437     RegionPlan randomPlan = null;
2438     boolean newPlan = false;
2439     RegionPlan existingPlan;
2440 
2441     synchronized (this.regionPlans) {
2442       existingPlan = this.regionPlans.get(encodedName);
2443 
2444       if (existingPlan != null && existingPlan.getDestination() != null) {
2445         LOG.debug("Found an existing plan for " + region.getRegionNameAsString()
2446           + " destination server is " + existingPlan.getDestination() +
2447             " accepted as a dest server = " + destServers.contains(existingPlan.getDestination()));
2448       }
2449 
2450       if (forceNewPlan
2451           || existingPlan == null
2452           || existingPlan.getDestination() == null
2453           || !destServers.contains(existingPlan.getDestination())) {
2454         newPlan = true;
2455       }
2456     }
2457 
2458     if (newPlan) {
2459       ServerName destination = balancer.randomAssignment(region, destServers);
2460       if (destination == null) {
2461         LOG.warn("Can't find a destination for " + encodedName);
2462         return null;
2463       }
2464       synchronized (this.regionPlans) {
2465         randomPlan = new RegionPlan(region, null, destination);
2466         if (!region.isMetaTable() && shouldAssignRegionsWithFavoredNodes) {
2467           List<HRegionInfo> regions = new ArrayList<HRegionInfo>(1);
2468           regions.add(region);
2469           try {
2470             processFavoredNodes(regions);
2471           } catch (IOException ie) {
2472             LOG.warn("Ignoring exception in processFavoredNodes " + ie);
2473           }
2474         }
2475         this.regionPlans.put(encodedName, randomPlan);
2476       }
2477       LOG.debug("No previous transition plan found (or ignoring " + "an existing plan) for "
2478           + region.getRegionNameAsString() + "; generated random plan=" + randomPlan + "; "
2479           + destServers.size() + " (online=" + serverManager.getOnlineServers().size()
2480           + ") available servers, forceNewPlan=" + forceNewPlan);
2481       return randomPlan;
2482     }
2483     LOG.debug("Using pre-existing plan for " +
2484       region.getRegionNameAsString() + "; plan=" + existingPlan);
2485     return existingPlan;
2486   }
2487 
2488   /**
2489    * Wait for some time before retrying meta table region assignment
2490    */
2491   private void waitForRetryingMetaAssignment() {
2492     try {
2493       Thread.sleep(this.sleepTimeBeforeRetryingMetaAssignment);
2494     } catch (InterruptedException e) {
2495       LOG.error("Got exception while waiting for hbase:meta assignment");
2496       Thread.currentThread().interrupt();
2497     }
2498   }
2499 
2500   /**
2501    * Unassigns the specified region.
2502    * <p>
2503    * Updates the RegionState and sends the CLOSE RPC unless region is being
2504    * split by regionserver; then the unassign fails (silently) because we
2505    * presume the region being unassigned no longer exists (its been split out
2506    * of existence). TODO: What to do if split fails and is rolled back and
2507    * parent is revivified?
2508    * <p>
2509    * If a RegionPlan is already set, it will remain.
2510    *
2511    * @param region server to be unassigned
2512    */
2513   public void unassign(HRegionInfo region) {
2514     unassign(region, false);
2515   }
2516 
2517 
2518   /**
2519    * Unassigns the specified region.
2520    * <p>
2521    * Updates the RegionState and sends the CLOSE RPC unless region is being
2522    * split by regionserver; then the unassign fails (silently) because we
2523    * presume the region being unassigned no longer exists (its been split out
2524    * of existence). TODO: What to do if split fails and is rolled back and
2525    * parent is revivified?
2526    * <p>
2527    * If a RegionPlan is already set, it will remain.
2528    *
2529    * @param region server to be unassigned
2530    * @param force if region should be closed even if already closing
2531    */
2532   public void unassign(HRegionInfo region, boolean force, ServerName dest) {
2533     // TODO: Method needs refactoring.  Ugly buried returns throughout.  Beware!
2534     LOG.debug("Starting unassign of " + region.getRegionNameAsString()
2535       + " (offlining), current state: " + regionStates.getRegionState(region));
2536 
2537     String encodedName = region.getEncodedName();
2538     // Grab the state of this region and synchronize on it
2539     int versionOfClosingNode = -1;
2540     // We need a lock here as we're going to do a put later and we don't want multiple states
2541     //  creation
2542     ReentrantLock lock = locker.acquireLock(encodedName);
2543     RegionState state = regionStates.getRegionTransitionState(encodedName);
2544     boolean reassign = true;
2545     try {
2546       if (state == null) {
2547         // Region is not in transition.
2548         // We can unassign it only if it's not SPLIT/MERGED.
2549         state = regionStates.getRegionState(encodedName);
2550         if (state != null && state.isUnassignable()) {
2551           LOG.info("Attempting to unassign " + state + ", ignored");
2552           // Offline region will be reassigned below
2553           return;
2554         }
2555         // Create the znode in CLOSING state
2556         try {
2557           if (state == null || state.getServerName() == null) {
2558             // We don't know where the region is, offline it.
2559             // No need to send CLOSE RPC
2560             LOG.warn("Attempting to unassign a region not in RegionStates "
2561               + region.getRegionNameAsString() + ", offlined");
2562             regionOffline(region);
2563             return;
2564           }
2565           if (useZKForAssignment) {
2566             versionOfClosingNode = ZKAssign.createNodeClosing(
2567               watcher, region, state.getServerName());
2568             if (versionOfClosingNode == -1) {
2569               LOG.info("Attempting to unassign " +
2570                 region.getRegionNameAsString() + " but ZK closing node "
2571                 + "can't be created.");
2572               reassign = false; // not unassigned at all
2573               return;
2574             }
2575           }
2576         } catch (KeeperException e) {
2577           if (e instanceof NodeExistsException) {
2578             // Handle race between master initiated close and regionserver
2579             // orchestrated splitting. See if existing node is in a
2580             // SPLITTING or SPLIT state.  If so, the regionserver started
2581             // an op on node before we could get our CLOSING in.  Deal.
2582             NodeExistsException nee = (NodeExistsException)e;
2583             String path = nee.getPath();
2584             try {
2585               if (isSplitOrSplittingOrMergedOrMerging(path)) {
2586                 LOG.debug(path + " is SPLIT or SPLITTING or MERGED or MERGING; " +
2587                   "skipping unassign because region no longer exists -- its split or merge");
2588                 reassign = false; // no need to reassign for split/merged region
2589                 return;
2590               }
2591             } catch (KeeperException.NoNodeException ke) {
2592               LOG.warn("Failed getData on SPLITTING/SPLIT at " + path +
2593                 "; presuming split and that the region to unassign, " +
2594                 encodedName + ", no longer exists -- confirm", ke);
2595               return;
2596             } catch (KeeperException ke) {
2597               LOG.error("Unexpected zk state", ke);
2598             } catch (DeserializationException de) {
2599               LOG.error("Failed parse", de);
2600             }
2601           }
2602           // If we get here, don't understand whats going on -- abort.
2603           server.abort("Unexpected ZK exception creating node CLOSING", e);
2604           reassign = false; // heading out already
2605           return;
2606         }
2607         state = regionStates.updateRegionState(region, State.PENDING_CLOSE);
2608       } else if (state.isFailedOpen()) {
2609         // The region is not open yet
2610         regionOffline(region);
2611         return;
2612       } else if (force && state.isPendingCloseOrClosing()) {
2613         LOG.debug("Attempting to unassign " + region.getRegionNameAsString() +
2614           " which is already " + state.getState()  +
2615           " but forcing to send a CLOSE RPC again ");
2616         if (state.isFailedClose()) {
2617           state = regionStates.updateRegionState(region, State.PENDING_CLOSE);
2618         }
2619         state.updateTimestampToNow();
2620       } else {
2621         LOG.debug("Attempting to unassign " +
2622           region.getRegionNameAsString() + " but it is " +
2623           "already in transition (" + state.getState() + ", force=" + force + ")");
2624         return;
2625       }
2626 
2627       unassign(region, state, versionOfClosingNode, dest, useZKForAssignment, null);
2628     } finally {
2629       lock.unlock();
2630 
2631       // Region is expected to be reassigned afterwards
2632       if (!replicasToClose.contains(region) && reassign && regionStates.isRegionOffline(region)) {
2633         assign(region, true);
2634       }
2635     }
2636   }
2637 
2638   public void unassign(HRegionInfo region, boolean force){
2639      unassign(region, force, null);
2640   }
2641 
2642   /**
2643    * @param region regioninfo of znode to be deleted.
2644    */
2645   public void deleteClosingOrClosedNode(HRegionInfo region, ServerName sn) {
2646     String encodedName = region.getEncodedName();
2647     deleteNodeInStates(encodedName, "closing", sn, EventType.M_ZK_REGION_CLOSING,
2648       EventType.RS_ZK_REGION_CLOSED);
2649   }
2650 
2651   /**
2652    * @param path
2653    * @return True if znode is in SPLIT or SPLITTING or MERGED or MERGING state.
2654    * @throws KeeperException Can happen if the znode went away in meantime.
2655    * @throws DeserializationException
2656    */
2657   private boolean isSplitOrSplittingOrMergedOrMerging(final String path)
2658       throws KeeperException, DeserializationException {
2659     boolean result = false;
2660     // This may fail if the SPLIT or SPLITTING or MERGED or MERGING znode gets
2661     // cleaned up before we can get data from it.
2662     byte [] data = ZKAssign.getData(watcher, path);
2663     if (data == null) {
2664       LOG.info("Node " + path + " is gone");
2665       return false;
2666     }
2667     RegionTransition rt = RegionTransition.parseFrom(data);
2668     switch (rt.getEventType()) {
2669     case RS_ZK_REQUEST_REGION_SPLIT:
2670     case RS_ZK_REGION_SPLIT:
2671     case RS_ZK_REGION_SPLITTING:
2672     case RS_ZK_REQUEST_REGION_MERGE:
2673     case RS_ZK_REGION_MERGED:
2674     case RS_ZK_REGION_MERGING:
2675       result = true;
2676       break;
2677     default:
2678       LOG.info("Node " + path + " is in " + rt.getEventType());
2679       break;
2680     }
2681     return result;
2682   }
2683 
2684   /**
2685    * Used by unit tests. Return the number of regions opened so far in the life
2686    * of the master. Increases by one every time the master opens a region
2687    * @return the counter value of the number of regions opened so far
2688    */
2689   public int getNumRegionsOpened() {
2690     return numRegionsOpened.get();
2691   }
2692 
2693   /**
2694    * Waits until the specified region has completed assignment.
2695    * <p>
2696    * If the region is already assigned, returns immediately.  Otherwise, method
2697    * blocks until the region is assigned.
2698    * @param regionInfo region to wait on assignment for
2699    * @return true if the region is assigned false otherwise.
2700    * @throws InterruptedException
2701    */
2702   public boolean waitForAssignment(HRegionInfo regionInfo)
2703       throws InterruptedException {
2704     ArrayList<HRegionInfo> regionSet = new ArrayList<HRegionInfo>(1);
2705     regionSet.add(regionInfo);
2706     return waitForAssignment(regionSet, true, Long.MAX_VALUE);
2707   }
2708 
2709   /**
2710    * Waits until the specified region has completed assignment, or the deadline is reached.
2711    */
2712   protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
2713       final boolean waitTillAllAssigned, final int reassigningRegions,
2714       final long minEndTime) throws InterruptedException {
2715     long deadline = minEndTime + bulkPerRegionOpenTimeGuesstimate * (reassigningRegions + 1);
2716     if (deadline < 0) { // Overflow
2717       deadline = Long.MAX_VALUE; // wait forever
2718     }
2719     return waitForAssignment(regionSet, waitTillAllAssigned, deadline);
2720   }
2721 
2722   /**
2723    * Waits until the specified region has completed assignment, or the deadline is reached.
2724    * @param regionSet set of region to wait on. the set is modified and the assigned regions removed
2725    * @param waitTillAllAssigned true if we should wait all the regions to be assigned
2726    * @param deadline the timestamp after which the wait is aborted
2727    * @return true if all the regions are assigned false otherwise.
2728    * @throws InterruptedException
2729    */
2730   protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
2731       final boolean waitTillAllAssigned, final long deadline) throws InterruptedException {
2732     // We're not synchronizing on regionsInTransition now because we don't use any iterator.
2733     while (!regionSet.isEmpty() && !server.isStopped() && deadline > System.currentTimeMillis()) {
2734       int failedOpenCount = 0;
2735       Iterator<HRegionInfo> regionInfoIterator = regionSet.iterator();
2736       while (regionInfoIterator.hasNext()) {
2737         HRegionInfo hri = regionInfoIterator.next();
2738         if (regionStates.isRegionOnline(hri) || regionStates.isRegionInState(hri,
2739             State.SPLITTING, State.SPLIT, State.MERGING, State.MERGED)) {
2740           regionInfoIterator.remove();
2741         } else if (regionStates.isRegionInState(hri, State.FAILED_OPEN)) {
2742           failedOpenCount++;
2743         }
2744       }
2745       if (!waitTillAllAssigned) {
2746         // No need to wait, let assignment going on asynchronously
2747         break;
2748       }
2749       if (!regionSet.isEmpty()) {
2750         if (failedOpenCount == regionSet.size()) {
2751           // all the regions we are waiting had an error on open.
2752           break;
2753         }
2754         regionStates.waitForUpdate(100);
2755       }
2756     }
2757     return regionSet.isEmpty();
2758   }
2759 
2760   /**
2761    * Assigns the hbase:meta region or a replica.
2762    * <p>
2763    * Assumes that hbase:meta is currently closed and is not being actively served by
2764    * any RegionServer.
2765    * <p>
2766    * Forcibly unsets the current meta region location in ZooKeeper and assigns
2767    * hbase:meta to a random RegionServer.
2768    * @param hri TODO
2769    * @throws KeeperException
2770    */
2771   public void assignMeta(HRegionInfo hri) throws KeeperException {
2772     this.server.getMetaTableLocator().deleteMetaLocation(this.watcher, hri.getReplicaId());
2773     assign(hri, true);
2774   }
2775 
2776   /**
2777    * Assigns specified regions retaining assignments, if any.
2778    * <p>
2779    * This is a synchronous call and will return once every region has been
2780    * assigned.  If anything fails, an exception is thrown
2781    * @throws InterruptedException
2782    * @throws IOException
2783    */
2784   public void assign(Map<HRegionInfo, ServerName> regions)
2785         throws IOException, InterruptedException {
2786     if (regions == null || regions.isEmpty()) {
2787       return;
2788     }
2789     List<ServerName> servers = serverManager.createDestinationServersList();
2790     if (servers == null || servers.isEmpty()) {
2791       throw new IOException("Found no destination server to assign region(s)");
2792     }
2793 
2794     // Reuse existing assignment info
2795     Map<ServerName, List<HRegionInfo>> bulkPlan =
2796       balancer.retainAssignment(regions, servers);
2797     if (bulkPlan == null) {
2798       throw new IOException("Unable to determine a plan to assign region(s)");
2799     }
2800 
2801     assign(regions.size(), servers.size(),
2802       "retainAssignment=true", bulkPlan);
2803   }
2804 
2805   /**
2806    * Assigns specified regions round robin, if any.
2807    * <p>
2808    * This is a synchronous call and will return once every region has been
2809    * assigned.  If anything fails, an exception is thrown
2810    * @throws InterruptedException
2811    * @throws IOException
2812    */
2813   public void assign(List<HRegionInfo> regions)
2814         throws IOException, InterruptedException {
2815     if (regions == null || regions.isEmpty()) {
2816       return;
2817     }
2818 
2819     List<ServerName> servers = serverManager.createDestinationServersList();
2820     if (servers == null || servers.isEmpty()) {
2821       throw new IOException("Found no destination server to assign region(s)");
2822     }
2823 
2824     // Generate a round-robin bulk assignment plan
2825     Map<ServerName, List<HRegionInfo>> bulkPlan = balancer.roundRobinAssignment(regions, servers);
2826     if (bulkPlan == null) {
2827       throw new IOException("Unable to determine a plan to assign region(s)");
2828     }
2829 
2830     processFavoredNodes(regions);
2831     assign(regions.size(), servers.size(), "round-robin=true", bulkPlan);
2832   }
2833 
2834   private void assign(int regions, int totalServers,
2835       String message, Map<ServerName, List<HRegionInfo>> bulkPlan)
2836           throws InterruptedException, IOException {
2837 
2838     int servers = bulkPlan.size();
2839     if (servers == 1 || (regions < bulkAssignThresholdRegions
2840         && servers < bulkAssignThresholdServers)) {
2841 
2842       // Not use bulk assignment.  This could be more efficient in small
2843       // cluster, especially mini cluster for testing, so that tests won't time out
2844       if (LOG.isTraceEnabled()) {
2845         LOG.trace("Not using bulk assignment since we are assigning only " + regions +
2846           " region(s) to " + servers + " server(s)");
2847       }
2848 
2849       // invoke assignment (async)
2850       ArrayList<HRegionInfo> userRegionSet = new ArrayList<HRegionInfo>(regions);
2851       for (Map.Entry<ServerName, List<HRegionInfo>> plan: bulkPlan.entrySet()) {
2852         if (!assign(plan.getKey(), plan.getValue())) {
2853           for (HRegionInfo region: plan.getValue()) {
2854             if (!regionStates.isRegionOnline(region)) {
2855               invokeAssign(region);
2856               if (!region.getTable().isSystemTable()) {
2857                 userRegionSet.add(region);
2858               }
2859             }
2860           }
2861         }
2862       }
2863 
2864       // wait for assignment completion
2865       if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
2866             System.currentTimeMillis())) {
2867         LOG.debug("some user regions are still in transition: " + userRegionSet);
2868       }
2869     } else {
2870       LOG.info("Bulk assigning " + regions + " region(s) across "
2871         + totalServers + " server(s), " + message);
2872 
2873       // Use fixed count thread pool assigning.
2874       BulkAssigner ba = new GeneralBulkAssigner(
2875         this.server, bulkPlan, this, bulkAssignWaitTillAllAssigned);
2876       ba.bulkAssign();
2877       LOG.info("Bulk assigning done");
2878     }
2879   }
2880 
2881   /**
2882    * Assigns all user regions, if any exist.  Used during cluster startup.
2883    * <p>
2884    * This is a synchronous call and will return once every region has been
2885    * assigned.  If anything fails, an exception is thrown and the cluster
2886    * should be shutdown.
2887    * @throws InterruptedException
2888    * @throws IOException
2889    */
2890   private void assignAllUserRegions(Map<HRegionInfo, ServerName> allRegions)
2891       throws IOException, InterruptedException {
2892     if (allRegions == null || allRegions.isEmpty()) return;
2893 
2894     // Determine what type of assignment to do on startup
2895     boolean retainAssignment = server.getConfiguration().
2896       getBoolean("hbase.master.startup.retainassign", true);
2897 
2898     Set<HRegionInfo> regionsFromMetaScan = allRegions.keySet();
2899     if (retainAssignment) {
2900       assign(allRegions);
2901     } else {
2902       List<HRegionInfo> regions = new ArrayList<HRegionInfo>(regionsFromMetaScan);
2903       assign(regions);
2904     }
2905 
2906     for (HRegionInfo hri : regionsFromMetaScan) {
2907       TableName tableName = hri.getTable();
2908       if (!tableStateManager.isTableState(tableName,
2909           ZooKeeperProtos.Table.State.ENABLED)) {
2910         setEnabledTable(tableName);
2911       }
2912     }
2913     // assign all the replicas that were not recorded in the meta
2914     assign(replicaRegionsNotRecordedInMeta(regionsFromMetaScan, server));
2915   }
2916 
2917   /**
2918    * Get a list of replica regions that are:
2919    * not recorded in meta yet. We might not have recorded the locations
2920    * for the replicas since the replicas may not have been online yet, master restarted
2921    * in the middle of assigning, ZK erased, etc.
2922    * @param regionsRecordedInMeta the list of regions we know are recorded in meta
2923    * either as a default, or, as the location of a replica
2924    * @param master
2925    * @return list of replica regions
2926    * @throws IOException
2927    */
2928   public static List<HRegionInfo> replicaRegionsNotRecordedInMeta(
2929       Set<HRegionInfo> regionsRecordedInMeta, MasterServices master)throws IOException {
2930     List<HRegionInfo> regionsNotRecordedInMeta = new ArrayList<HRegionInfo>();
2931     for (HRegionInfo hri : regionsRecordedInMeta) {
2932       TableName table = hri.getTable();
2933       HTableDescriptor htd = master.getTableDescriptors().get(table);
2934       // look at the HTD for the replica count. That's the source of truth
2935       int desiredRegionReplication = htd.getRegionReplication();
2936       for (int i = 0; i < desiredRegionReplication; i++) {
2937         HRegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hri, i);
2938         if (regionsRecordedInMeta.contains(replica)) continue;
2939         regionsNotRecordedInMeta.add(replica);
2940       }
2941     }
2942     return regionsNotRecordedInMeta;
2943   }
2944 
2945   /**
2946    * Wait until no regions in transition.
2947    * @param timeout How long to wait.
2948    * @return True if nothing in regions in transition.
2949    * @throws InterruptedException
2950    */
2951   boolean waitUntilNoRegionsInTransition(final long timeout)
2952       throws InterruptedException {
2953     // Blocks until there are no regions in transition. It is possible that
2954     // there
2955     // are regions in transition immediately after this returns but guarantees
2956     // that if it returns without an exception that there was a period of time
2957     // with no regions in transition from the point-of-view of the in-memory
2958     // state of the Master.
2959     final long endTime = System.currentTimeMillis() + timeout;
2960 
2961     while (!this.server.isStopped() && regionStates.isRegionsInTransition()
2962         && endTime > System.currentTimeMillis()) {
2963       regionStates.waitForUpdate(100);
2964     }
2965 
2966     return !regionStates.isRegionsInTransition();
2967   }
2968 
2969   /**
2970    * Rebuild the list of user regions and assignment information.
2971    * Updates regionstates with findings as we go through list of regions.
2972    * @return set of servers not online that hosted some regions according to a scan of hbase:meta
2973    * @throws IOException
2974    */
2975   Set<ServerName> rebuildUserRegions() throws
2976       IOException, KeeperException, CoordinatedStateException {
2977     Set<TableName> disabledOrEnablingTables = tableStateManager.getTablesInStates(
2978       ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.ENABLING);
2979 
2980     Set<TableName> disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
2981       ZooKeeperProtos.Table.State.DISABLED,
2982       ZooKeeperProtos.Table.State.DISABLING,
2983       ZooKeeperProtos.Table.State.ENABLING);
2984 
2985     // Region assignment from META
2986     List<Result> results = MetaTableAccessor.fullScanOfMeta(server.getConnection());
2987     // Get any new but slow to checkin region server that joined the cluster
2988     Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
2989     // Set of offline servers to be returned
2990     Set<ServerName> offlineServers = new HashSet<ServerName>();
2991     // Iterate regions in META
2992     for (Result result : results) {
2993       if (result == null && LOG.isDebugEnabled()){
2994         LOG.debug("null result from meta - ignoring but this is strange.");
2995         continue;
2996       }
2997       // keep a track of replicas to close. These were the replicas of the originally
2998       // unmerged regions. The master might have closed them before but it mightn't
2999       // maybe because it crashed.
3000       PairOfSameType<HRegionInfo> p = MetaTableAccessor.getMergeRegions(result);
3001       if (p.getFirst() != null && p.getSecond() != null) {
3002         int numReplicas = server.getTableDescriptors().get(p.getFirst().
3003             getTable()).getRegionReplication();
3004         for (HRegionInfo merge : p) {
3005           for (int i = 1; i < numReplicas; i++) {
3006             replicasToClose.add(RegionReplicaUtil.getRegionInfoForReplica(merge, i));
3007           }
3008         }
3009       }
3010       RegionLocations rl =  MetaTableAccessor.getRegionLocations(result);
3011       if (rl == null) continue;
3012       HRegionLocation[] locations = rl.getRegionLocations();
3013       if (locations == null) continue;
3014       for (HRegionLocation hrl : locations) {
3015         if (hrl == null) continue;
3016         HRegionInfo regionInfo = hrl.getRegionInfo();
3017         if (regionInfo == null) continue;
3018         int replicaId = regionInfo.getReplicaId();
3019         State state = RegionStateStore.getRegionState(result, replicaId);
3020         // keep a track of replicas to close. These were the replicas of the split parents
3021         // from the previous life of the master. The master should have closed them before
3022         // but it couldn't maybe because it crashed
3023         if (replicaId == 0 && state.equals(State.SPLIT)) {
3024           for (HRegionLocation h : locations) {
3025             replicasToClose.add(h.getRegionInfo());
3026           }
3027         }
3028         ServerName lastHost = hrl.getServerName();
3029         ServerName regionLocation = RegionStateStore.getRegionServer(result, replicaId);
3030         if (tableStateManager.isTableState(regionInfo.getTable(),
3031              ZooKeeperProtos.Table.State.DISABLED)) {
3032           // force region to forget it hosts for disabled/disabling tables.
3033           // see HBASE-13326
3034           lastHost = null;
3035           regionLocation = null;
3036         }
3037         regionStates.createRegionState(regionInfo, state, regionLocation, lastHost);
3038         if (!regionStates.isRegionInState(regionInfo, State.OPEN)) {
3039           // Region is not open (either offline or in transition), skip
3040           continue;
3041         }
3042         TableName tableName = regionInfo.getTable();
3043         if (!onlineServers.contains(regionLocation)) {
3044           // Region is located on a server that isn't online
3045           offlineServers.add(regionLocation);
3046           if (useZKForAssignment) {
3047             regionStates.regionOffline(regionInfo);
3048           }
3049         } else if (!disabledOrEnablingTables.contains(tableName)) {
3050           // Region is being served and on an active server
3051           // add only if region not in disabled or enabling table
3052           regionStates.regionOnline(regionInfo, regionLocation);
3053           balancer.regionOnline(regionInfo, regionLocation);
3054         } else if (useZKForAssignment) {
3055           regionStates.regionOffline(regionInfo);
3056         }
3057         // need to enable the table if not disabled or disabling or enabling
3058         // this will be used in rolling restarts
3059         if (!disabledOrDisablingOrEnabling.contains(tableName)
3060           && !getTableStateManager().isTableState(tableName,
3061             ZooKeeperProtos.Table.State.ENABLED)) {
3062           setEnabledTable(tableName);
3063         }
3064       }
3065     }
3066     return offlineServers;
3067   }
3068 
3069   /**
3070    * Recover the tables that were not fully moved to DISABLED state. These
3071    * tables are in DISABLING state when the master restarted/switched.
3072    *
3073    * @throws KeeperException
3074    * @throws TableNotFoundException
3075    * @throws IOException
3076    */
3077   private void recoverTableInDisablingState()
3078       throws KeeperException, IOException, CoordinatedStateException {
3079     Set<TableName> disablingTables =
3080       tableStateManager.getTablesInStates(ZooKeeperProtos.Table.State.DISABLING);
3081     if (disablingTables.size() != 0) {
3082       for (TableName tableName : disablingTables) {
3083         // Recover by calling DisableTableHandler
3084         LOG.info("The table " + tableName
3085             + " is in DISABLING state.  Hence recovering by moving the table"
3086             + " to DISABLED state.");
3087         new DisableTableHandler(this.server, tableName,
3088             this, tableLockManager, true).prepare().process();
3089       }
3090     }
3091   }
3092 
3093   /**
3094    * Recover the tables that are not fully moved to ENABLED state. These tables
3095    * are in ENABLING state when the master restarted/switched
3096    *
3097    * @throws KeeperException
3098    * @throws org.apache.hadoop.hbase.TableNotFoundException
3099    * @throws IOException
3100    */
3101   private void recoverTableInEnablingState()
3102       throws KeeperException, IOException, CoordinatedStateException {
3103     Set<TableName> enablingTables = tableStateManager.
3104       getTablesInStates(ZooKeeperProtos.Table.State.ENABLING);
3105     if (enablingTables.size() != 0) {
3106       for (TableName tableName : enablingTables) {
3107         // Recover by calling EnableTableHandler
3108         LOG.info("The table " + tableName
3109             + " is in ENABLING state.  Hence recovering by moving the table"
3110             + " to ENABLED state.");
3111         // enableTable in sync way during master startup,
3112         // no need to invoke coprocessor
3113         EnableTableHandler eth = new EnableTableHandler(this.server, tableName,
3114           this, tableLockManager, true);
3115         try {
3116           eth.prepare();
3117         } catch (TableNotFoundException e) {
3118           LOG.warn("Table " + tableName + " not found in hbase:meta to recover.");
3119           continue;
3120         }
3121         eth.process();
3122       }
3123     }
3124   }
3125 
3126   /**
3127    * Processes list of dead servers from result of hbase:meta scan and regions in RIT.
3128    * This is used for failover to recover the lost regions that belonged to
3129    * RegionServers which failed while there was no active master or are offline for whatever
3130    * reason and for regions that were in RIT.
3131    *
3132    * @param deadServers
3133    *          The list of dead servers which failed while there was no active master. Can be null.
3134    * @throws IOException
3135    * @throws KeeperException
3136    */
3137   private void processDeadServersAndRecoverLostRegions(Set<ServerName> deadServers)
3138   throws IOException, KeeperException {
3139     if (deadServers != null && !deadServers.isEmpty()) {
3140       for (ServerName serverName: deadServers) {
3141         if (!serverManager.isServerDead(serverName)) {
3142           serverManager.expireServer(serverName); // Let SSH do region re-assign
3143         }
3144       }
3145     }
3146 
3147     List<String> nodes = useZKForAssignment ?
3148       ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.assignmentZNode)
3149       : ZKUtil.listChildrenNoWatch(watcher, watcher.assignmentZNode);
3150     if (nodes != null && !nodes.isEmpty()) {
3151       for (String encodedRegionName : nodes) {
3152         processRegionInTransition(encodedRegionName, null);
3153       }
3154     } else if (!useZKForAssignment) {
3155       processRegionInTransitionZkLess();
3156     }
3157   }
3158 
3159   void processRegionInTransitionZkLess() {
3160     // We need to send RPC call again for PENDING_OPEN/PENDING_CLOSE regions
3161     // in case the RPC call is not sent out yet before the master was shut down
3162     // since we update the state before we send the RPC call. We can't update
3163     // the state after the RPC call. Otherwise, we don't know what's happened
3164     // to the region if the master dies right after the RPC call is out.
3165     Map<String, RegionState> rits = regionStates.getRegionsInTransition();
3166     for (RegionState regionState : rits.values()) {
3167       LOG.info("Processing " + regionState);
3168       ServerName serverName = regionState.getServerName();
3169       // Server could be null in case of FAILED_OPEN when master cannot find a region plan. In that
3170       // case, try assigning it here.
3171       if (serverName != null
3172           && !serverManager.getOnlineServers().containsKey(serverName)) {
3173         LOG.info("Server " + serverName + " isn't online. SSH will handle this");
3174         continue;
3175       }
3176       HRegionInfo regionInfo = regionState.getRegion();
3177       State state = regionState.getState();
3178 
3179       switch (state) {
3180       case CLOSED:
3181         invokeAssign(regionInfo);
3182         break;
3183       case PENDING_OPEN:
3184         retrySendRegionOpen(regionState);
3185         break;
3186       case PENDING_CLOSE:
3187         retrySendRegionClose(regionState);
3188         break;
3189       case FAILED_CLOSE:
3190       case FAILED_OPEN:
3191         invokeUnAssign(regionInfo);
3192         break;
3193       default:
3194         // No process for other states
3195       }
3196     }
3197   }
3198 
3199   /**
3200    * At master failover, for pending_open region, make sure
3201    * sendRegionOpen RPC call is sent to the target regionserver
3202    */
3203   private void retrySendRegionOpen(final RegionState regionState) {
3204     this.executorService.submit(
3205       new EventHandler(server, EventType.M_MASTER_RECOVERY) {
3206         @Override
3207         public void process() throws IOException {
3208           HRegionInfo hri = regionState.getRegion();
3209           ServerName serverName = regionState.getServerName();
3210           ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
3211           try {
3212             for (int i = 1; i <= maximumAttempts; i++) {
3213               if (!serverManager.isServerOnline(serverName)
3214                   || server.isStopped() || server.isAborted()) {
3215                 return; // No need any more
3216               }
3217               try {
3218                 if (!regionState.equals(regionStates.getRegionState(hri))) {
3219                   return; // Region is not in the expected state any more
3220                 }
3221                 List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
3222                 if (shouldAssignRegionsWithFavoredNodes) {
3223                   favoredNodes = ((FavoredNodeLoadBalancer)balancer).getFavoredNodes(hri);
3224                 }
3225                 RegionOpeningState regionOpenState = serverManager.sendRegionOpen(
3226                   serverName, hri, -1, favoredNodes);
3227 
3228                 if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
3229                   // Failed opening this region, this means the target server didn't get
3230                   // the original region open RPC, so re-assign it with a new plan
3231                   LOG.debug("Got failed_opening in retry sendRegionOpen for "
3232                     + regionState + ", re-assign it");
3233                   invokeAssign(hri, true);
3234                 }
3235                 return; // Done.
3236               } catch (Throwable t) {
3237                 if (t instanceof RemoteException) {
3238                   t = ((RemoteException) t).unwrapRemoteException();
3239                 }
3240                 // In case SocketTimeoutException/FailedServerException, retry
3241                 if (t instanceof java.net.SocketTimeoutException
3242                     || t instanceof FailedServerException) {
3243                   Threads.sleep(100);
3244                   continue;
3245                 }
3246                 // For other exceptions, re-assign it
3247                 LOG.debug("Got exception in retry sendRegionOpen for "
3248                   + regionState + ", re-assign it", t);
3249                 invokeAssign(hri);
3250                 return; // Done.
3251               }
3252             }
3253           } finally {
3254             lock.unlock();
3255           }
3256         }
3257       });
3258   }
3259 
3260   /**
3261    * At master failover, for pending_close region, make sure
3262    * sendRegionClose RPC call is sent to the target regionserver
3263    */
3264   private void retrySendRegionClose(final RegionState regionState) {
3265     this.executorService.submit(
3266       new EventHandler(server, EventType.M_MASTER_RECOVERY) {
3267         @Override
3268         public void process() throws IOException {
3269           HRegionInfo hri = regionState.getRegion();
3270           ServerName serverName = regionState.getServerName();
3271           ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
3272           try {
3273             for (int i = 1; i <= maximumAttempts; i++) {
3274               if (!serverManager.isServerOnline(serverName)
3275                   || server.isStopped() || server.isAborted()) {
3276                 return; // No need any more
3277               }
3278               try {
3279                 if (!regionState.equals(regionStates.getRegionState(hri))) {
3280                   return; // Region is not in the expected state any more
3281                 }
3282                 if (!serverManager.sendRegionClose(serverName, hri, -1, null, false)) {
3283                   // This means the region is still on the target server
3284                   LOG.debug("Got false in retry sendRegionClose for "
3285                     + regionState + ", re-close it");
3286                   invokeUnAssign(hri);
3287                 }
3288                 return; // Done.
3289               } catch (Throwable t) {
3290                 if (t instanceof RemoteException) {
3291                   t = ((RemoteException) t).unwrapRemoteException();
3292                 }
3293                 // In case SocketTimeoutException/FailedServerException, retry
3294                 if (t instanceof java.net.SocketTimeoutException
3295                     || t instanceof FailedServerException) {
3296                   Threads.sleep(100);
3297                   continue;
3298                 }
3299                 if (!(t instanceof NotServingRegionException
3300                     || t instanceof RegionAlreadyInTransitionException)) {
3301                   // NotServingRegionException/RegionAlreadyInTransitionException
3302                   // means the target server got the original region close request.
3303                   // For other exceptions, re-close it
3304                   LOG.debug("Got exception in retry sendRegionClose for "
3305                     + regionState + ", re-close it", t);
3306                   invokeUnAssign(hri);
3307                 }
3308                 return; // Done.
3309               }
3310             }
3311           } finally {
3312             lock.unlock();
3313           }
3314         }
3315       });
3316   }
3317 
3318   /**
3319    * Set Regions in transitions metrics.
3320    * This takes an iterator on the RegionInTransition map (CLSM), and is not synchronized.
3321    * This iterator is not fail fast, which may lead to stale read; but that's better than
3322    * creating a copy of the map for metrics computation, as this method will be invoked
3323    * on a frequent interval.
3324    */
3325   public void updateRegionsInTransitionMetrics() {
3326     long currentTime = System.currentTimeMillis();
3327     int totalRITs = 0;
3328     int totalRITsOverThreshold = 0;
3329     long oldestRITTime = 0;
3330     int ritThreshold = this.server.getConfiguration().
3331       getInt(HConstants.METRICS_RIT_STUCK_WARNING_THRESHOLD, 60000);
3332     for (RegionState state: regionStates.getRegionsInTransition().values()) {
3333       totalRITs++;
3334       long ritTime = currentTime - state.getStamp();
3335       if (ritTime > ritThreshold) { // more than the threshold
3336         totalRITsOverThreshold++;
3337       }
3338       if (oldestRITTime < ritTime) {
3339         oldestRITTime = ritTime;
3340       }
3341     }
3342     if (this.metricsAssignmentManager != null) {
3343       this.metricsAssignmentManager.updateRITOldestAge(oldestRITTime);
3344       this.metricsAssignmentManager.updateRITCount(totalRITs);
3345       this.metricsAssignmentManager.updateRITCountOverThreshold(totalRITsOverThreshold);
3346     }
3347   }
3348 
3349   /**
3350    * @param region Region whose plan we are to clear.
3351    */
3352   void clearRegionPlan(final HRegionInfo region) {
3353     synchronized (this.regionPlans) {
3354       this.regionPlans.remove(region.getEncodedName());
3355     }
3356   }
3357 
3358   /**
3359    * Wait on region to clear regions-in-transition.
3360    * @param hri Region to wait on.
3361    * @throws IOException
3362    */
3363   public void waitOnRegionToClearRegionsInTransition(final HRegionInfo hri)
3364       throws IOException, InterruptedException {
3365     waitOnRegionToClearRegionsInTransition(hri, -1L);
3366   }
3367 
3368   /**
3369    * Wait on region to clear regions-in-transition or time out
3370    * @param hri
3371    * @param timeOut Milliseconds to wait for current region to be out of transition state.
3372    * @return True when a region clears regions-in-transition before timeout otherwise false
3373    * @throws InterruptedException
3374    */
3375   public boolean waitOnRegionToClearRegionsInTransition(final HRegionInfo hri, long timeOut)
3376       throws InterruptedException {
3377     if (!regionStates.isRegionInTransition(hri)) return true;
3378     long end = (timeOut <= 0) ? Long.MAX_VALUE : EnvironmentEdgeManager.currentTime()
3379         + timeOut;
3380     // There is already a timeout monitor on regions in transition so I
3381     // should not have to have one here too?
3382     LOG.info("Waiting for " + hri.getEncodedName() +
3383         " to leave regions-in-transition, timeOut=" + timeOut + " ms.");
3384     while (!this.server.isStopped() && regionStates.isRegionInTransition(hri)) {
3385       regionStates.waitForUpdate(100);
3386       if (EnvironmentEdgeManager.currentTime() > end) {
3387         LOG.info("Timed out on waiting for " + hri.getEncodedName() + " to be assigned.");
3388         return false;
3389       }
3390     }
3391     if (this.server.isStopped()) {
3392       LOG.info("Giving up wait on regions in transition because stoppable.isStopped is set");
3393       return false;
3394     }
3395     return true;
3396   }
3397 
3398   void invokeAssign(HRegionInfo regionInfo) {
3399     invokeAssign(regionInfo, true);
3400   }
3401 
3402   void invokeAssign(HRegionInfo regionInfo, boolean newPlan) {
3403     threadPoolExecutorService.submit(new AssignCallable(this, regionInfo, newPlan));
3404   }
3405 
3406   void invokeUnAssign(HRegionInfo regionInfo) {
3407     threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
3408   }
3409 
3410   public ServerHostRegion isCarryingMeta(ServerName serverName) {
3411     return isCarryingRegion(serverName, HRegionInfo.FIRST_META_REGIONINFO);
3412   }
3413 
3414   public ServerHostRegion isCarryingMetaReplica(ServerName serverName, int replicaId) {
3415     return isCarryingRegion(serverName,
3416         RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO, replicaId));
3417   }
3418 
3419   public ServerHostRegion isCarryingMetaReplica(ServerName serverName, HRegionInfo metaHri) {
3420     return isCarryingRegion(serverName, metaHri);
3421   }
3422 
3423   /**
3424    * Check if the shutdown server carries the specific region.
3425    * We have a bunch of places that store region location
3426    * Those values aren't consistent. There is a delay of notification.
3427    * The location from zookeeper unassigned node has the most recent data;
3428    * but the node could be deleted after the region is opened by AM.
3429    * The AM's info could be old when OpenedRegionHandler
3430    * processing hasn't finished yet when server shutdown occurs.
3431    * @return whether the serverName currently hosts the region
3432    */
3433   private ServerHostRegion isCarryingRegion(ServerName serverName, HRegionInfo hri) {
3434     RegionTransition rt = null;
3435     try {
3436       byte [] data = ZKAssign.getData(watcher, hri.getEncodedName());
3437       // This call can legitimately come by null
3438       rt = data == null? null: RegionTransition.parseFrom(data);
3439     } catch (KeeperException e) {
3440       server.abort("Exception reading unassigned node for region=" + hri.getEncodedName(), e);
3441     } catch (DeserializationException e) {
3442       server.abort("Exception parsing unassigned node for region=" + hri.getEncodedName(), e);
3443     }
3444 
3445     ServerName addressFromZK = rt != null? rt.getServerName():  null;
3446     if (addressFromZK != null) {
3447       // if we get something from ZK, we will use the data
3448       boolean matchZK = addressFromZK.equals(serverName);
3449       LOG.debug("Checking region=" + hri.getRegionNameAsString() + ", zk server=" + addressFromZK +
3450         " current=" + serverName + ", matches=" + matchZK);
3451       return matchZK ? ServerHostRegion.HOSTING_REGION : ServerHostRegion.NOT_HOSTING_REGION;
3452     }
3453 
3454     ServerName addressFromAM = regionStates.getRegionServerOfRegion(hri);
3455     if (LOG.isDebugEnabled()) {
3456       LOG.debug("based on AM, current region=" + hri.getRegionNameAsString() +
3457         " is on server=" + (addressFromAM != null ? addressFromAM : "null") +
3458         " server being checked: " + serverName);
3459     }
3460     if (addressFromAM != null) {
3461       return addressFromAM.equals(serverName) ?
3462           ServerHostRegion.HOSTING_REGION : ServerHostRegion.NOT_HOSTING_REGION;
3463     }
3464 
3465     if (hri.isMetaRegion() && RegionReplicaUtil.isDefaultReplica(hri)) {
3466       // For the Meta region (default replica), we can do one more check on MetaTableLocator
3467       final ServerName serverNameInZK =
3468           server.getMetaTableLocator().getMetaRegionLocation(this.server.getZooKeeper());
3469       if (LOG.isDebugEnabled()) {
3470         LOG.debug("Based on MetaTableLocator, the META region is on server=" +
3471           (serverNameInZK == null ? "null" : serverNameInZK) +
3472           " server being checked: " + serverName);
3473       }
3474       if (serverNameInZK != null) {
3475         return serverNameInZK.equals(serverName) ?
3476             ServerHostRegion.HOSTING_REGION : ServerHostRegion.NOT_HOSTING_REGION;
3477       }
3478     }
3479 
3480     // Checked everywhere, if reaching here, we are unsure whether the server is carrying region.
3481     return ServerHostRegion.UNKNOWN;
3482   }
3483 
3484   /**
3485    * Clean out crashed server removing any assignments.
3486    * @param sn Server that went down.
3487    * @return list of regions in transition on this server
3488    */
3489   public List<HRegionInfo> cleanOutCrashedServerReferences(final ServerName sn) {
3490     // Clean out any existing assignment plans for this server
3491     synchronized (this.regionPlans) {
3492       for (Iterator <Map.Entry<String, RegionPlan>> i = this.regionPlans.entrySet().iterator();
3493           i.hasNext();) {
3494         Map.Entry<String, RegionPlan> e = i.next();
3495         ServerName otherSn = e.getValue().getDestination();
3496         // The name will be null if the region is planned for a random assign.
3497         if (otherSn != null && otherSn.equals(sn)) {
3498           // Use iterator's remove else we'll get CME
3499           i.remove();
3500         }
3501       }
3502     }
3503     List<HRegionInfo> regions = regionStates.serverOffline(watcher, sn);
3504     for (Iterator<HRegionInfo> it = regions.iterator(); it.hasNext(); ) {
3505       HRegionInfo hri = it.next();
3506       String encodedName = hri.getEncodedName();
3507 
3508       // We need a lock on the region as we could update it
3509       Lock lock = locker.acquireLock(encodedName);
3510       try {
3511         RegionState regionState = regionStates.getRegionTransitionState(encodedName);
3512         if (regionState == null
3513             || (regionState.getServerName() != null && !regionState.isOnServer(sn))
3514             || !(regionState.isFailedClose() || regionState.isOffline()
3515               || regionState.isPendingOpenOrOpening())) {
3516           LOG.info("Skip " + regionState + " since it is not opening/failed_close"
3517             + " on the dead server any more: " + sn);
3518           it.remove();
3519         } else {
3520           try {
3521             // Delete the ZNode if exists
3522             ZKAssign.deleteNodeFailSilent(watcher, hri);
3523           } catch (KeeperException ke) {
3524             server.abort("Unexpected ZK exception deleting node " + hri, ke);
3525           }
3526           if (tableStateManager.isTableState(hri.getTable(),
3527               ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3528             regionStates.regionOffline(hri);
3529             it.remove();
3530             continue;
3531           }
3532           // Mark the region offline and assign it again by SSH
3533           regionStates.updateRegionState(hri, State.OFFLINE);
3534         }
3535       } finally {
3536         lock.unlock();
3537       }
3538     }
3539     return regions;
3540   }
3541 
3542   /**
3543    * @param plan Plan to execute.
3544    */
3545   public void balance(final RegionPlan plan) {
3546 
3547     HRegionInfo hri = plan.getRegionInfo();
3548     TableName tableName = hri.getTable();
3549     if (tableStateManager.isTableState(tableName,
3550       ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3551       LOG.info("Ignored moving region of disabling/disabled table "
3552         + tableName);
3553       return;
3554     }
3555 
3556     // Move the region only if it's assigned
3557     String encodedName = hri.getEncodedName();
3558     ReentrantLock lock = locker.acquireLock(encodedName);
3559     try {
3560       if (!regionStates.isRegionOnline(hri)) {
3561         RegionState state = regionStates.getRegionState(encodedName);
3562         LOG.info("Ignored moving region not assigned: " + hri + ", "
3563           + (state == null ? "not in region states" : state));
3564         return;
3565       }
3566       synchronized (this.regionPlans) {
3567         this.regionPlans.put(plan.getRegionName(), plan);
3568       }
3569       unassign(hri, false, plan.getDestination());
3570     } finally {
3571       lock.unlock();
3572     }
3573   }
3574 
3575   public void stop() {
3576     shutdown(); // Stop executor service, etc
3577   }
3578 
3579   /**
3580    * Shutdown the threadpool executor service
3581    */
3582   public void shutdown() {
3583     // It's an immediate shutdown, so we're clearing the remaining tasks.
3584     synchronized (zkEventWorkerWaitingList){
3585       zkEventWorkerWaitingList.clear();
3586     }
3587 
3588     // Shutdown the threadpool executor service
3589     threadPoolExecutorService.shutdownNow();
3590     zkEventWorkers.shutdownNow();
3591     regionStateStore.stop();
3592   }
3593 
3594   protected void setEnabledTable(TableName tableName) {
3595     try {
3596       this.tableStateManager.setTableState(tableName,
3597         ZooKeeperProtos.Table.State.ENABLED);
3598     } catch (CoordinatedStateException e) {
3599       // here we can abort as it is the start up flow
3600       String errorMsg = "Unable to ensure that the table " + tableName
3601           + " will be" + " enabled because of a ZooKeeper issue";
3602       LOG.error(errorMsg);
3603       this.server.abort(errorMsg, e);
3604     }
3605   }
3606 
3607   /**
3608    * Set region as OFFLINED up in zookeeper asynchronously.
3609    * @param state
3610    * @return True if we succeeded, false otherwise (State was incorrect or failed
3611    * updating zk).
3612    */
3613   private boolean asyncSetOfflineInZooKeeper(final RegionState state,
3614       final AsyncCallback.StringCallback cb, final ServerName destination) {
3615     if (!state.isClosed() && !state.isOffline()) {
3616       this.server.abort("Unexpected state trying to OFFLINE; " + state,
3617         new IllegalStateException());
3618       return false;
3619     }
3620     regionStates.updateRegionState(state.getRegion(), State.OFFLINE);
3621     try {
3622       ZKAssign.asyncCreateNodeOffline(watcher, state.getRegion(),
3623         destination, cb, state);
3624     } catch (KeeperException e) {
3625       if (e instanceof NodeExistsException) {
3626         LOG.warn("Node for " + state.getRegion() + " already exists");
3627       } else {
3628         server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
3629       }
3630       return false;
3631     }
3632     return true;
3633   }
3634 
3635   private boolean deleteNodeInStates(String encodedName,
3636       String desc, ServerName sn, EventType... types) {
3637     try {
3638       for (EventType et: types) {
3639         if (ZKAssign.deleteNode(watcher, encodedName, et, sn)) {
3640           return true;
3641         }
3642       }
3643       LOG.info("Failed to delete the " + desc + " node for "
3644         + encodedName + ". The node type may not match");
3645     } catch (NoNodeException e) {
3646       if (LOG.isDebugEnabled()) {
3647         LOG.debug("The " + desc + " node for " + encodedName + " already deleted");
3648       }
3649     } catch (KeeperException ke) {
3650       server.abort("Unexpected ZK exception deleting " + desc
3651         + " node for the region " + encodedName, ke);
3652     }
3653     return false;
3654   }
3655 
3656   private void deleteMergingNode(String encodedName, ServerName sn) {
3657     deleteNodeInStates(encodedName, "merging", sn, EventType.RS_ZK_REGION_MERGING,
3658       EventType.RS_ZK_REQUEST_REGION_MERGE, EventType.RS_ZK_REGION_MERGED);
3659   }
3660 
3661   private void deleteSplittingNode(String encodedName, ServerName sn) {
3662     deleteNodeInStates(encodedName, "splitting", sn, EventType.RS_ZK_REGION_SPLITTING,
3663       EventType.RS_ZK_REQUEST_REGION_SPLIT, EventType.RS_ZK_REGION_SPLIT);
3664   }
3665 
3666   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
3667       value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
3668       justification="Modification of Maps not ATOMIC!!!! FIX!!!")
3669   private void onRegionFailedOpen(
3670       final HRegionInfo hri, final ServerName sn) {
3671     String encodedName = hri.getEncodedName();
3672     // FindBugs: AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION Worth fixing!!!
3673     AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
3674     if (failedOpenCount == null) {
3675       failedOpenCount = new AtomicInteger();
3676       // No need to use putIfAbsent, or extra synchronization since
3677       // this whole handleRegion block is locked on the encoded region
3678       // name, and failedOpenTracker is updated only in this block
3679       // FindBugs: AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION
3680       failedOpenTracker.put(encodedName, failedOpenCount);
3681     }
3682     if (failedOpenCount.incrementAndGet() >= maximumAttempts && !hri.isMetaRegion()) {
3683       // FindBugs: AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION
3684       regionStates.updateRegionState(hri, State.FAILED_OPEN);
3685       // remove the tracking info to save memory, also reset
3686       // the count for next open initiative
3687       failedOpenTracker.remove(encodedName);
3688     } else {
3689       if (hri.isMetaRegion() && failedOpenCount.get() >= maximumAttempts) {
3690         // Log a warning message if a meta region failedOpenCount exceeds maximumAttempts
3691         // so that we are aware of potential problem if it persists for a long time.
3692         LOG.warn("Failed to open the hbase:meta region " +
3693             hri.getRegionNameAsString() + " after" +
3694             failedOpenCount.get() + " retries. Continue retrying.");
3695       }
3696 
3697       // Handle this the same as if it were opened and then closed.
3698       RegionState regionState = regionStates.updateRegionState(hri, State.CLOSED);
3699       if (regionState != null) {
3700         // When there are more than one region server a new RS is selected as the
3701         // destination and the same is updated in the region plan. (HBASE-5546)
3702         if (getTableStateManager().isTableState(hri.getTable(),
3703             ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING) ||
3704             replicasToClose.contains(hri)) {
3705           offlineDisabledRegion(hri);
3706           return;
3707         }
3708         // ZK Node is in CLOSED state, assign it.
3709          regionStates.updateRegionState(hri, RegionState.State.CLOSED);
3710         // This below has to do w/ online enable/disable of a table
3711         removeClosedRegion(hri);
3712         try {
3713           getRegionPlan(hri, sn, true);
3714         } catch (HBaseIOException e) {
3715           LOG.warn("Failed to get region plan", e);
3716         }
3717         invokeAssign(hri, false);
3718       }
3719     }
3720   }
3721 
3722   private void onRegionOpen(final HRegionInfo hri, final ServerName sn, long openSeqNum) {
3723     regionOnline(hri, sn, openSeqNum);
3724     if (useZKForAssignment) {
3725       try {
3726         // Delete the ZNode if exists
3727         ZKAssign.deleteNodeFailSilent(watcher, hri);
3728       } catch (KeeperException ke) {
3729         server.abort("Unexpected ZK exception deleting node " + hri, ke);
3730       }
3731     }
3732 
3733     // reset the count, if any
3734     failedOpenTracker.remove(hri.getEncodedName());
3735     if (getTableStateManager().isTableState(hri.getTable(),
3736         ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3737       invokeUnAssign(hri);
3738     }
3739   }
3740 
3741   private void onRegionClosed(final HRegionInfo hri) {
3742     if (getTableStateManager().isTableState(hri.getTable(),
3743         ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING) ||
3744         replicasToClose.contains(hri)) {
3745       offlineDisabledRegion(hri);
3746       return;
3747     }
3748     regionStates.updateRegionState(hri, RegionState.State.CLOSED);
3749     sendRegionClosedNotification(hri);
3750     // This below has to do w/ online enable/disable of a table
3751     removeClosedRegion(hri);
3752     invokeAssign(hri, false);
3753   }
3754 
3755   private String checkInStateForSplit(ServerName sn,
3756       final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) {
3757     final RegionState rs_p = regionStates.getRegionState(p);
3758     RegionState rs_a = regionStates.getRegionState(a);
3759     RegionState rs_b = regionStates.getRegionState(b);
3760     if (!(rs_p.isOpenOrSplittingOnServer(sn)
3761         && (rs_a == null || rs_a.isOpenOrSplittingNewOnServer(sn))
3762         && (rs_b == null || rs_b.isOpenOrSplittingNewOnServer(sn)))) {
3763       return "Not in state good for split";
3764     }
3765     return "";
3766   }
3767 
3768   private String onRegionSplitReverted(ServerName sn,
3769       final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) {
3770     String s = checkInStateForSplit(sn, p, a, b);
3771     if (!org.apache.commons.lang.StringUtils.isEmpty(s)) {
3772       return s;
3773     }
3774 
3775     // Always bring the parent back online. Even if it's not offline
3776     // There's no harm in making it online again.
3777     regionOnline(p, sn);
3778 
3779     // Only offline the region if they are known to exist.
3780     RegionState regionStateA = regionStates.getRegionState(a);
3781     RegionState regionStateB = regionStates.getRegionState(b);
3782     if (regionStateA != null) {
3783       regionOffline(a);
3784     }
3785     if (regionStateB != null) {
3786       regionOffline(b);
3787     }
3788 
3789     if (getTableStateManager().isTableState(p.getTable(),
3790         ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3791       invokeUnAssign(p);
3792     }
3793     return null;
3794   }
3795 
3796   private String onRegionSplit(ServerName sn, TransitionCode code,
3797       final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) {
3798     String s = checkInStateForSplit(sn, p, a, b);
3799     if (!org.apache.commons.lang.StringUtils.isEmpty(s)) {
3800       return s;
3801     }
3802 
3803     regionStates.updateRegionState(a, State.SPLITTING_NEW, sn);
3804     regionStates.updateRegionState(b, State.SPLITTING_NEW, sn);
3805     regionStates.updateRegionState(p, State.SPLITTING);
3806 
3807     if (code == TransitionCode.SPLIT) {
3808       if (TEST_SKIP_SPLIT_HANDLING) {
3809         return "Skipping split message, TEST_SKIP_SPLIT_HANDLING is set";
3810       }
3811       regionOffline(p, State.SPLIT);
3812       regionOnline(a, sn, 1);
3813       regionOnline(b, sn, 1);
3814 
3815       // User could disable the table before master knows the new region.
3816       if (getTableStateManager().isTableState(p.getTable(),
3817           ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3818         invokeUnAssign(a);
3819         invokeUnAssign(b);
3820       } else {
3821         Callable<Object> splitReplicasCallable = new Callable<Object>() {
3822           @Override
3823           public Object call() {
3824             doSplittingOfReplicas(p, a, b);
3825             return null;
3826           }
3827         };
3828         threadPoolExecutorService.submit(splitReplicasCallable);
3829       }
3830     } else if (code == TransitionCode.SPLIT_PONR) {
3831       try {
3832         regionStates.splitRegion(p, a, b, sn);
3833       } catch (IOException ioe) {
3834         LOG.info("Failed to record split region " + p.getShortNameToLog());
3835         return "Failed to record the splitting in meta";
3836       }
3837     }
3838     return null;
3839   }
3840 
3841   private String onRegionMerge(ServerName sn, TransitionCode code,
3842       final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) {
3843     RegionState rs_p = regionStates.getRegionState(p);
3844     RegionState rs_a = regionStates.getRegionState(a);
3845     RegionState rs_b = regionStates.getRegionState(b);
3846     if (!(rs_a.isOpenOrMergingOnServer(sn) && rs_b.isOpenOrMergingOnServer(sn)
3847         && (rs_p == null || rs_p.isOpenOrMergingNewOnServer(sn)))) {
3848       return "Not in state good for merge";
3849     }
3850 
3851     regionStates.updateRegionState(a, State.MERGING);
3852     regionStates.updateRegionState(b, State.MERGING);
3853     regionStates.updateRegionState(p, State.MERGING_NEW, sn);
3854 
3855     String encodedName = p.getEncodedName();
3856     if (code == TransitionCode.READY_TO_MERGE) {
3857       mergingRegions.put(encodedName,
3858         new PairOfSameType<HRegionInfo>(a, b));
3859     } else if (code == TransitionCode.MERGED) {
3860 
3861       if (TEST_SKIP_MERGE_HANDLING) {
3862         return "Skipping merge message, TEST_SKIP_MERGE_HANDLING is set for merge parent: " + p;
3863       }
3864 
3865       mergingRegions.remove(encodedName);
3866       regionOffline(a, State.MERGED);
3867       regionOffline(b, State.MERGED);
3868       regionOnline(p, sn, 1);
3869 
3870       // User could disable the table before master knows the new region.
3871       if (getTableStateManager().isTableState(p.getTable(),
3872           ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3873         invokeUnAssign(p);
3874       } else {
3875         Callable<Object> mergeReplicasCallable = new Callable<Object>() {
3876           @Override
3877           public Object call() {
3878             doMergingOfReplicas(p, a, b);
3879             return null;
3880           }
3881         };
3882         threadPoolExecutorService.submit(mergeReplicasCallable);
3883       }
3884     } else if (code == TransitionCode.MERGE_PONR) {
3885       try {
3886         regionStates.mergeRegions(p, a, b, sn);
3887       } catch (IOException ioe) {
3888         LOG.info("Failed to record merged region " + p.getShortNameToLog());
3889         return "Failed to record the merging in meta";
3890       }
3891     }
3892     return null;
3893   }
3894 
3895   private String onRegionMergeReverted(ServerName sn, TransitionCode code,
3896 	      final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) {
3897     RegionState rs_p = regionStates.getRegionState(p);
3898     String encodedName = p.getEncodedName();
3899     mergingRegions.remove(encodedName);
3900 
3901     // Always bring the children back online. Even if they are not offline
3902     // there's no harm in making them online again.
3903     regionOnline(a, sn);
3904     regionOnline(b, sn);
3905 
3906     // Only offline the merging region if it is known to exist.
3907     if (rs_p != null) {
3908       regionOffline(p);
3909     }
3910 
3911     if (getTableStateManager().isTableState(p.getTable(),
3912         ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3913       invokeUnAssign(a);
3914       invokeUnAssign(b);
3915     }
3916 
3917     return null;
3918   }
3919 
3920   /**
3921    * A helper to handle region merging transition event.
3922    * It transitions merging regions to MERGING state.
3923    */
3924   private boolean handleRegionMerging(final RegionTransition rt, final String encodedName,
3925       final String prettyPrintedRegionName, final ServerName sn) {
3926     if (!serverManager.isServerOnline(sn)) {
3927       LOG.warn("Dropped merging! ServerName=" + sn + " unknown.");
3928       return false;
3929     }
3930     byte [] payloadOfMerging = rt.getPayload();
3931     List<HRegionInfo> mergingRegions;
3932     try {
3933       mergingRegions = HRegionInfo.parseDelimitedFrom(
3934         payloadOfMerging, 0, payloadOfMerging.length);
3935     } catch (IOException e) {
3936       LOG.error("Dropped merging! Failed reading "  + rt.getEventType()
3937         + " payload for " + prettyPrintedRegionName);
3938       return false;
3939     }
3940     assert mergingRegions.size() == 3;
3941     HRegionInfo p = mergingRegions.get(0);
3942     HRegionInfo hri_a = mergingRegions.get(1);
3943     HRegionInfo hri_b = mergingRegions.get(2);
3944 
3945     RegionState rs_p = regionStates.getRegionState(p);
3946     RegionState rs_a = regionStates.getRegionState(hri_a);
3947     RegionState rs_b = regionStates.getRegionState(hri_b);
3948 
3949     if (!((rs_a == null || rs_a.isOpenOrMergingOnServer(sn))
3950         && (rs_b == null || rs_b.isOpenOrMergingOnServer(sn))
3951         && (rs_p == null || rs_p.isOpenOrMergingNewOnServer(sn)))) {
3952       LOG.warn("Dropped merging! Not in state good for MERGING; rs_p="
3953         + rs_p + ", rs_a=" + rs_a + ", rs_b=" + rs_b);
3954       return false;
3955     }
3956 
3957     EventType et = rt.getEventType();
3958     if (et == EventType.RS_ZK_REQUEST_REGION_MERGE) {
3959       try {
3960         RegionMergeCoordination.RegionMergeDetails std =
3961             ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
3962                 .getRegionMergeCoordination().getDefaultDetails();
3963         ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
3964             .getRegionMergeCoordination().processRegionMergeRequest(p, hri_a, hri_b, sn, std);
3965         if (((ZkRegionMergeCoordination.ZkRegionMergeDetails) std).getZnodeVersion() == -1) {
3966           byte[] data = ZKAssign.getData(watcher, encodedName);
3967          EventType currentType = null;
3968           if (data != null) {
3969             RegionTransition newRt = RegionTransition.parseFrom(data);
3970             currentType = newRt.getEventType();
3971           }
3972           if (currentType == null || (currentType != EventType.RS_ZK_REGION_MERGED
3973               && currentType != EventType.RS_ZK_REGION_MERGING)) {
3974             LOG.warn("Failed to transition pending_merge node "
3975               + encodedName + " to merging, it's now " + currentType);
3976             return false;
3977           }
3978         }
3979       } catch (Exception e) {
3980         LOG.warn("Failed to transition pending_merge node "
3981           + encodedName + " to merging", e);
3982         return false;
3983       }
3984     }
3985 
3986     synchronized (regionStates) {
3987       regionStates.updateRegionState(hri_a, State.MERGING);
3988       regionStates.updateRegionState(hri_b, State.MERGING);
3989       regionStates.updateRegionState(p, State.MERGING_NEW, sn);
3990 
3991       if (TEST_SKIP_MERGE_HANDLING) {
3992         LOG.warn("Skipping merge message, TEST_SKIP_MERGE_HANDLING is set for merge parent: " + p);
3993         return true; // return true so that the merging node stays
3994       }
3995 
3996       if (et != EventType.RS_ZK_REGION_MERGED) {
3997         this.mergingRegions.put(encodedName,
3998           new PairOfSameType<HRegionInfo>(hri_a, hri_b));
3999       } else {
4000         this.mergingRegions.remove(encodedName);
4001         regionOffline(hri_a, State.MERGED);
4002         regionOffline(hri_b, State.MERGED);
4003         regionOnline(p, sn);
4004       }
4005     }
4006 
4007     if (et == EventType.RS_ZK_REGION_MERGED) {
4008       doMergingOfReplicas(p, hri_a, hri_b);
4009       LOG.debug("Handling MERGED event for " + encodedName + "; deleting node");
4010       // Remove region from ZK
4011       try {
4012         boolean successful = false;
4013         while (!successful) {
4014           // It's possible that the RS tickles in between the reading of the
4015           // znode and the deleting, so it's safe to retry.
4016           successful = ZKAssign.deleteNode(watcher, encodedName,
4017             EventType.RS_ZK_REGION_MERGED, sn);
4018         }
4019       } catch (KeeperException e) {
4020         if (e instanceof NoNodeException) {
4021           String znodePath = ZKUtil.joinZNode(watcher.splitLogZNode, encodedName);
4022           LOG.debug("The znode " + znodePath + " does not exist.  May be deleted already.");
4023         } else {
4024           server.abort("Error deleting MERGED node " + encodedName, e);
4025         }
4026       }
4027       LOG.info("Handled MERGED event; merged=" + p.getRegionNameAsString()
4028         + ", region_a=" + hri_a.getRegionNameAsString() + ", region_b="
4029         + hri_b.getRegionNameAsString() + ", on " + sn);
4030 
4031       // User could disable the table before master knows the new region.
4032       if (tableStateManager.isTableState(p.getTable(),
4033           ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
4034         unassign(p);
4035       }
4036     }
4037     return true;
4038   }
4039 
4040   /**
4041    * A helper to handle region splitting transition event.
4042    */
4043   private boolean handleRegionSplitting(final RegionTransition rt, final String encodedName,
4044       final String prettyPrintedRegionName, final ServerName sn) {
4045     if (!serverManager.isServerOnline(sn)) {
4046       LOG.warn("Dropped splitting! ServerName=" + sn + " unknown.");
4047       return false;
4048     }
4049     byte [] payloadOfSplitting = rt.getPayload();
4050     List<HRegionInfo> splittingRegions;
4051     try {
4052       splittingRegions = HRegionInfo.parseDelimitedFrom(
4053         payloadOfSplitting, 0, payloadOfSplitting.length);
4054     } catch (IOException e) {
4055       LOG.error("Dropped splitting! Failed reading " + rt.getEventType()
4056         + " payload for " + prettyPrintedRegionName);
4057       return false;
4058     }
4059     assert splittingRegions.size() == 2;
4060     HRegionInfo hri_a = splittingRegions.get(0);
4061     HRegionInfo hri_b = splittingRegions.get(1);
4062 
4063     RegionState rs_p = regionStates.getRegionState(encodedName);
4064     RegionState rs_a = regionStates.getRegionState(hri_a);
4065     RegionState rs_b = regionStates.getRegionState(hri_b);
4066 
4067     if (!((rs_p == null || rs_p.isOpenOrSplittingOnServer(sn))
4068         && (rs_a == null || rs_a.isOpenOrSplittingNewOnServer(sn))
4069         && (rs_b == null || rs_b.isOpenOrSplittingNewOnServer(sn)))) {
4070       LOG.warn("Dropped splitting! Not in state good for SPLITTING; rs_p="
4071         + rs_p + ", rs_a=" + rs_a + ", rs_b=" + rs_b);
4072       return false;
4073     }
4074 
4075     if (rs_p == null) {
4076       // Splitting region should be online
4077       rs_p = regionStates.updateRegionState(rt, State.OPEN);
4078       if (rs_p == null) {
4079         LOG.warn("Received splitting for region " + prettyPrintedRegionName
4080           + " from server " + sn + " but it doesn't exist anymore,"
4081           + " probably already processed its split");
4082         return false;
4083       }
4084       regionStates.regionOnline(rs_p.getRegion(), sn);
4085     }
4086 
4087     HRegionInfo p = rs_p.getRegion();
4088     EventType et = rt.getEventType();
4089     if (et == EventType.RS_ZK_REQUEST_REGION_SPLIT) {
4090       try {
4091         SplitTransactionDetails std =
4092             ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
4093                 .getSplitTransactionCoordination().getDefaultDetails();
4094         if (((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
4095             .getSplitTransactionCoordination().processTransition(p, hri_a, hri_b, sn, std) == -1) {
4096           byte[] data = ZKAssign.getData(watcher, encodedName);
4097           EventType currentType = null;
4098           if (data != null) {
4099             RegionTransition newRt = RegionTransition.parseFrom(data);
4100             currentType = newRt.getEventType();
4101           }
4102           if (currentType == null
4103               || (currentType != EventType.RS_ZK_REGION_SPLIT && currentType != EventType.RS_ZK_REGION_SPLITTING)) {
4104             LOG.warn("Failed to transition pending_split node " + encodedName
4105                 + " to splitting, it's now " + currentType);
4106             return false;
4107           }
4108         }
4109       } catch (Exception e) {
4110         LOG.warn("Failed to transition pending_split node " + encodedName + " to splitting", e);
4111         return false;
4112       }
4113     }
4114 
4115     synchronized (regionStates) {
4116       splitRegions.put(p, new PairOfSameType<HRegionInfo>(hri_a, hri_b));
4117       regionStates.updateRegionState(hri_a, State.SPLITTING_NEW, sn);
4118       regionStates.updateRegionState(hri_b, State.SPLITTING_NEW, sn);
4119       regionStates.updateRegionState(rt, State.SPLITTING);
4120 
4121       // The below is for testing ONLY!  We can't do fault injection easily, so
4122       // resort to this kinda uglyness -- St.Ack 02/25/2011.
4123       if (TEST_SKIP_SPLIT_HANDLING) {
4124         LOG.warn("Skipping split message, TEST_SKIP_SPLIT_HANDLING is set");
4125         return true; // return true so that the splitting node stays
4126       }
4127 
4128       if (et == EventType.RS_ZK_REGION_SPLIT) {
4129         regionOffline(p, State.SPLIT);
4130         regionOnline(hri_a, sn);
4131         regionOnline(hri_b, sn);
4132         splitRegions.remove(p);
4133       }
4134     }
4135 
4136     if (et == EventType.RS_ZK_REGION_SPLIT) {
4137       // split replicas
4138       doSplittingOfReplicas(rs_p.getRegion(), hri_a, hri_b);
4139       LOG.debug("Handling SPLIT event for " + encodedName + "; deleting node");
4140       // Remove region from ZK
4141       try {
4142         boolean successful = false;
4143         while (!successful) {
4144           // It's possible that the RS tickles in between the reading of the
4145           // znode and the deleting, so it's safe to retry.
4146           successful = ZKAssign.deleteNode(watcher, encodedName,
4147             EventType.RS_ZK_REGION_SPLIT, sn);
4148         }
4149       } catch (KeeperException e) {
4150         if (e instanceof NoNodeException) {
4151           String znodePath = ZKUtil.joinZNode(watcher.splitLogZNode, encodedName);
4152           LOG.debug("The znode " + znodePath + " does not exist.  May be deleted already.");
4153         } else {
4154           server.abort("Error deleting SPLIT node " + encodedName, e);
4155         }
4156       }
4157       LOG.info("Handled SPLIT event; parent=" + p.getRegionNameAsString()
4158         + ", daughter a=" + hri_a.getRegionNameAsString() + ", daughter b="
4159         + hri_b.getRegionNameAsString() + ", on " + sn);
4160 
4161       // User could disable the table before master knows the new region.
4162       if (tableStateManager.isTableState(p.getTable(),
4163           ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
4164         unassign(hri_a);
4165         unassign(hri_b);
4166       }
4167     }
4168     return true;
4169   }
4170 
4171   private void doMergingOfReplicas(HRegionInfo mergedHri, final HRegionInfo hri_a,
4172       final HRegionInfo hri_b) {
4173     // Close replicas for the original unmerged regions. create/assign new replicas
4174     // for the merged parent.
4175     List<HRegionInfo> unmergedRegions = new ArrayList<HRegionInfo>();
4176     unmergedRegions.add(hri_a);
4177     unmergedRegions.add(hri_b);
4178     Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(unmergedRegions);
4179     Collection<List<HRegionInfo>> c = map.values();
4180     for (List<HRegionInfo> l : c) {
4181       for (HRegionInfo h : l) {
4182         if (!RegionReplicaUtil.isDefaultReplica(h)) {
4183           LOG.debug("Unassigning un-merged replica " + h);
4184           unassign(h);
4185         }
4186       }
4187     }
4188     int numReplicas = 1;
4189     try {
4190       numReplicas = server.getTableDescriptors().get(mergedHri.getTable()).
4191           getRegionReplication();
4192     } catch (IOException e) {
4193       LOG.warn("Couldn't get the replication attribute of the table " + mergedHri.getTable() +
4194           " due to " + e.getMessage() + ". The assignment of replicas for the merged region " +
4195           "will not be done");
4196     }
4197     List<HRegionInfo> regions = new ArrayList<HRegionInfo>();
4198     for (int i = 1; i < numReplicas; i++) {
4199       regions.add(RegionReplicaUtil.getRegionInfoForReplica(mergedHri, i));
4200     }
4201     try {
4202       assign(regions);
4203     } catch (IOException ioe) {
4204       LOG.warn("Couldn't assign all replica(s) of region " + mergedHri + " because of " +
4205                 ioe.getMessage());
4206     } catch (InterruptedException ie) {
4207       LOG.warn("Couldn't assign all replica(s) of region " + mergedHri+ " because of " +
4208                 ie.getMessage());
4209     }
4210   }
4211 
4212   private void doSplittingOfReplicas(final HRegionInfo parentHri, final HRegionInfo hri_a,
4213       final HRegionInfo hri_b) {
4214     // create new regions for the replica, and assign them to match with the
4215     // current replica assignments. If replica1 of parent is assigned to RS1,
4216     // the replica1s of daughters will be on the same machine
4217     int numReplicas = 1;
4218     try {
4219       numReplicas = server.getTableDescriptors().get(parentHri.getTable()).
4220           getRegionReplication();
4221     } catch (IOException e) {
4222       LOG.warn("Couldn't get the replication attribute of the table " + parentHri.getTable() +
4223           " due to " + e.getMessage() + ". The assignment of daughter replicas " +
4224           "replicas will not be done");
4225     }
4226     // unassign the old replicas
4227     List<HRegionInfo> parentRegion = new ArrayList<HRegionInfo>();
4228     parentRegion.add(parentHri);
4229     Map<ServerName, List<HRegionInfo>> currentAssign =
4230         regionStates.getRegionAssignments(parentRegion);
4231     Collection<List<HRegionInfo>> c = currentAssign.values();
4232     for (List<HRegionInfo> l : c) {
4233       for (HRegionInfo h : l) {
4234         if (!RegionReplicaUtil.isDefaultReplica(h)) {
4235           LOG.debug("Unassigning parent's replica " + h);
4236           unassign(h);
4237         }
4238       }
4239     }
4240     // assign daughter replicas
4241     Map<HRegionInfo, ServerName> map = new HashMap<HRegionInfo, ServerName>();
4242     for (int i = 1; i < numReplicas; i++) {
4243       prepareDaughterReplicaForAssignment(hri_a, parentHri, i, map);
4244       prepareDaughterReplicaForAssignment(hri_b, parentHri, i, map);
4245     }
4246     try {
4247       assign(map);
4248     } catch (IOException e) {
4249       LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)");
4250     } catch (InterruptedException e) {
4251       LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)");
4252     }
4253   }
4254 
4255   private void prepareDaughterReplicaForAssignment(HRegionInfo daughterHri, HRegionInfo parentHri,
4256       int replicaId, Map<HRegionInfo, ServerName> map) {
4257     HRegionInfo parentReplica = RegionReplicaUtil.getRegionInfoForReplica(parentHri, replicaId);
4258     HRegionInfo daughterReplica = RegionReplicaUtil.getRegionInfoForReplica(daughterHri,
4259         replicaId);
4260     LOG.debug("Created replica region for daughter " + daughterReplica);
4261     ServerName sn;
4262     if ((sn = regionStates.getRegionServerOfRegion(parentReplica)) != null) {
4263       map.put(daughterReplica, sn);
4264     } else {
4265       List<ServerName> servers = serverManager.getOnlineServersList();
4266       sn = servers.get((new Random(System.currentTimeMillis())).nextInt(servers.size()));
4267       map.put(daughterReplica, sn);
4268     }
4269   }
4270 
4271   public Set<HRegionInfo> getReplicasToClose() {
4272     return replicasToClose;
4273   }
4274 
4275   /**
4276    * A region is offline.  The new state should be the specified one,
4277    * if not null.  If the specified state is null, the new state is Offline.
4278    * The specified state can be Split/Merged/Offline/null only.
4279    */
4280   private void regionOffline(final HRegionInfo regionInfo, final State state) {
4281     regionStates.regionOffline(regionInfo, state);
4282     removeClosedRegion(regionInfo);
4283     // remove the region plan as well just in case.
4284     clearRegionPlan(regionInfo);
4285     balancer.regionOffline(regionInfo);
4286 
4287     // Tell our listeners that a region was closed
4288     sendRegionClosedNotification(regionInfo);
4289     // also note that all the replicas of the primary should be closed
4290     if (state != null && state.equals(State.SPLIT)) {
4291       Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1);
4292       c.add(regionInfo);
4293       Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(c);
4294       Collection<List<HRegionInfo>> allReplicas = map.values();
4295       for (List<HRegionInfo> list : allReplicas) {
4296         replicasToClose.addAll(list);
4297       }
4298     }
4299     else if (state != null && state.equals(State.MERGED)) {
4300       Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1);
4301       c.add(regionInfo);
4302       Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(c);
4303       Collection<List<HRegionInfo>> allReplicas = map.values();
4304       for (List<HRegionInfo> list : allReplicas) {
4305         replicasToClose.addAll(list);
4306       }
4307     }
4308   }
4309 
4310   private void sendRegionOpenedNotification(final HRegionInfo regionInfo,
4311       final ServerName serverName) {
4312     if (!this.listeners.isEmpty()) {
4313       for (AssignmentListener listener : this.listeners) {
4314         listener.regionOpened(regionInfo, serverName);
4315       }
4316     }
4317   }
4318 
4319   private void sendRegionClosedNotification(final HRegionInfo regionInfo) {
4320     if (!this.listeners.isEmpty()) {
4321       for (AssignmentListener listener : this.listeners) {
4322         listener.regionClosed(regionInfo);
4323       }
4324     }
4325   }
4326 
4327   /**
4328    * Try to update some region states. If the state machine prevents
4329    * such update, an error message is returned to explain the reason.
4330    *
4331    * It's expected that in each transition there should have just one
4332    * region for opening/closing, 3 regions for splitting/merging.
4333    * These regions should be on the server that requested the change.
4334    *
4335    * Region state machine. Only these transitions
4336    * are expected to be triggered by a region server.
4337    *
4338    * On the state transition:
4339    *  (1) Open/Close should be initiated by master
4340    *      (a) Master sets the region to pending_open/pending_close
4341    *        in memory and hbase:meta after sending the request
4342    *        to the region server
4343    *      (b) Region server reports back to the master
4344    *        after open/close is done (either success/failure)
4345    *      (c) If region server has problem to report the status
4346    *        to master, it must be because the master is down or some
4347    *        temporary network issue. Otherwise, the region server should
4348    *        abort since it must be a bug. If the master is not accessible,
4349    *        the region server should keep trying until the server is
4350    *        stopped or till the status is reported to the (new) master
4351    *      (d) If region server dies in the middle of opening/closing
4352    *        a region, SSH picks it up and finishes it
4353    *      (e) If master dies in the middle, the new master recovers
4354    *        the state during initialization from hbase:meta. Region server
4355    *        can report any transition that has not been reported to
4356    *        the previous active master yet
4357    *  (2) Split/merge is initiated by region servers
4358    *      (a) To split a region, a region server sends a request
4359    *        to master to try to set a region to splitting, together with
4360    *        two daughters (to be created) to splitting new. If approved
4361    *        by the master, the splitting can then move ahead
4362    *      (b) To merge two regions, a region server sends a request to
4363    *        master to try to set the new merged region (to be created) to
4364    *        merging_new, together with two regions (to be merged) to merging.
4365    *        If it is ok with the master, the merge can then move ahead
4366    *      (c) Once the splitting/merging is done, the region server
4367    *        reports the status back to the master either success/failure.
4368    *      (d) Other scenarios should be handled similarly as for
4369    *        region open/close
4370    */
4371   protected String onRegionTransition(final ServerName serverName,
4372       final RegionStateTransition transition) {
4373     TransitionCode code = transition.getTransitionCode();
4374     HRegionInfo hri = HRegionInfo.convert(transition.getRegionInfo(0));
4375     RegionState current = regionStates.getRegionState(hri);
4376     if (LOG.isDebugEnabled()) {
4377       LOG.debug("Got transition " + code + " for "
4378         + (current != null ? current.toString() : hri.getShortNameToLog())
4379         + " from " + serverName);
4380     }
4381     String errorMsg = null;
4382     switch (code) {
4383     case OPENED:
4384       if (current != null && current.isOpened() && current.isOnServer(serverName)) {
4385         LOG.info("Region " + hri.getShortNameToLog() + " is already " + current.getState() + " on "
4386             + serverName);
4387         break;
4388       }
4389     case FAILED_OPEN:
4390       if (current == null
4391           || !current.isPendingOpenOrOpeningOnServer(serverName)) {
4392         errorMsg = hri.getShortNameToLog()
4393           + " is not pending open on " + serverName;
4394       } else if (code == TransitionCode.FAILED_OPEN) {
4395         onRegionFailedOpen(hri, serverName);
4396       } else {
4397         long openSeqNum = HConstants.NO_SEQNUM;
4398         if (transition.hasOpenSeqNum()) {
4399           openSeqNum = transition.getOpenSeqNum();
4400         }
4401         if (openSeqNum < 0) {
4402           errorMsg = "Newly opened region has invalid open seq num " + openSeqNum;
4403         } else {
4404           onRegionOpen(hri, serverName, openSeqNum);
4405         }
4406       }
4407       break;
4408 
4409     case CLOSED:
4410       if (current == null
4411           || !current.isPendingCloseOrClosingOnServer(serverName)) {
4412         errorMsg = hri.getShortNameToLog()
4413           + " is not pending close on " + serverName;
4414       } else {
4415         onRegionClosed(hri);
4416       }
4417       break;
4418 
4419     case READY_TO_SPLIT:
4420       try {
4421         regionStateListener.onRegionSplit(hri);
4422       } catch (IOException exp) {
4423         errorMsg = StringUtils.stringifyException(exp);
4424       }
4425       break;
4426     case SPLIT_PONR:
4427     case SPLIT:
4428       errorMsg =
4429       onRegionSplit(serverName, code, hri, HRegionInfo.convert(transition.getRegionInfo(1)),
4430         HRegionInfo.convert(transition.getRegionInfo(2)));
4431       break;
4432 
4433     case SPLIT_REVERTED:
4434       errorMsg =
4435           onRegionSplitReverted(serverName, hri,
4436             HRegionInfo.convert(transition.getRegionInfo(1)),
4437             HRegionInfo.convert(transition.getRegionInfo(2)));
4438       if (org.apache.commons.lang.StringUtils.isEmpty(errorMsg)) {
4439         try {
4440           regionStateListener.onRegionSplitReverted(hri);
4441         } catch (IOException exp) {
4442           LOG.warn(StringUtils.stringifyException(exp));
4443         }
4444       }
4445       break;
4446     case READY_TO_MERGE:
4447     case MERGE_PONR:
4448     case MERGED:
4449       errorMsg = onRegionMerge(serverName, code, hri,
4450         HRegionInfo.convert(transition.getRegionInfo(1)),
4451         HRegionInfo.convert(transition.getRegionInfo(2)));
4452       if (code == TransitionCode.MERGED && org.apache.commons.lang.StringUtils.isEmpty(errorMsg)) {
4453         try {
4454           regionStateListener.onRegionMerged(hri);
4455         } catch (IOException exp) {
4456           errorMsg = StringUtils.stringifyException(exp);
4457         }
4458       }
4459       break;
4460     case MERGE_REVERTED:
4461         errorMsg = onRegionMergeReverted(serverName, code, hri,
4462                 HRegionInfo.convert(transition.getRegionInfo(1)),
4463                 HRegionInfo.convert(transition.getRegionInfo(2)));
4464       break;
4465 
4466     default:
4467       errorMsg = "Unexpected transition code " + code;
4468     }
4469     if (errorMsg != null) {
4470       LOG.error("Failed to transtion region from " + current + " to "
4471         + code + " by " + serverName + ": " + errorMsg);
4472     }
4473     return errorMsg;
4474   }
4475 
4476   /**
4477    * @return Instance of load balancer
4478    */
4479   public LoadBalancer getBalancer() {
4480     return this.balancer;
4481   }
4482 
4483   public Map<ServerName, List<HRegionInfo>>
4484     getSnapShotOfAssignment(Collection<HRegionInfo> infos) {
4485     return getRegionStates().getRegionAssignments(infos);
4486   }
4487 
4488   void setRegionStateListener(RegionStateListener listener) {
4489     this.regionStateListener = listener;
4490   }
4491 
4492   /*
4493    * This is only used for unit-testing split failures.
4494    */
4495   @VisibleForTesting
4496   public static void setTestSkipSplitHandling(boolean skipSplitHandling) {
4497     TEST_SKIP_SPLIT_HANDLING = skipSplitHandling;
4498   }
4499 
4500   /*
4501    * This is only used for unit-testing merge failures.
4502    */
4503   @VisibleForTesting
4504   public static void setTestSkipMergeHandling(boolean skipMergeHandling) {
4505     TEST_SKIP_MERGE_HANDLING = skipMergeHandling;
4506   }
4507 }