View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.util.ArrayList;
24  import java.util.Arrays;
25  import java.util.Collection;
26  import java.util.Collections;
27  import java.util.HashMap;
28  import java.util.HashSet;
29  import java.util.Iterator;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.NavigableMap;
33  import java.util.Random;
34  import java.util.Set;
35  import java.util.TreeMap;
36  import java.util.concurrent.Callable;
37  import java.util.concurrent.ConcurrentHashMap;
38  import java.util.concurrent.CopyOnWriteArrayList;
39  import java.util.concurrent.ThreadFactory;
40  import java.util.concurrent.TimeUnit;
41  import java.util.concurrent.atomic.AtomicBoolean;
42  import java.util.concurrent.atomic.AtomicInteger;
43  import java.util.concurrent.locks.Lock;
44  import java.util.concurrent.locks.ReentrantLock;
45  
46  import org.apache.commons.logging.Log;
47  import org.apache.commons.logging.LogFactory;
48  import org.apache.hadoop.conf.Configuration;
49  import org.apache.hadoop.fs.FileSystem;
50  import org.apache.hadoop.fs.Path;
51  import org.apache.hadoop.hbase.CoordinatedStateException;
52  import org.apache.hadoop.hbase.HBaseIOException;
53  import org.apache.hadoop.hbase.HConstants;
54  import org.apache.hadoop.hbase.HRegionInfo;
55  import org.apache.hadoop.hbase.HRegionLocation;
56  import org.apache.hadoop.hbase.HTableDescriptor;
57  import org.apache.hadoop.hbase.MetaTableAccessor;
58  import org.apache.hadoop.hbase.NotServingRegionException;
59  import org.apache.hadoop.hbase.RegionLocations;
60  import org.apache.hadoop.hbase.RegionTransition;
61  import org.apache.hadoop.hbase.Server;
62  import org.apache.hadoop.hbase.ServerName;
63  import org.apache.hadoop.hbase.TableName;
64  import org.apache.hadoop.hbase.TableNotFoundException;
65  import org.apache.hadoop.hbase.TableStateManager;
66  import org.apache.hadoop.hbase.classification.InterfaceAudience;
67  import org.apache.hadoop.hbase.client.RegionReplicaUtil;
68  import org.apache.hadoop.hbase.client.Result;
69  import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
70  import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
71  import org.apache.hadoop.hbase.coordination.RegionMergeCoordination;
72  import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination.SplitTransactionDetails;
73  import org.apache.hadoop.hbase.coordination.ZkOpenRegionCoordination;
74  import org.apache.hadoop.hbase.coordination.ZkRegionMergeCoordination;
75  import org.apache.hadoop.hbase.exceptions.DeserializationException;
76  import org.apache.hadoop.hbase.executor.EventHandler;
77  import org.apache.hadoop.hbase.executor.EventType;
78  import org.apache.hadoop.hbase.executor.ExecutorService;
79  import org.apache.hadoop.hbase.ipc.FailedServerException;
80  import org.apache.hadoop.hbase.ipc.RpcClient;
81  import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
82  import org.apache.hadoop.hbase.master.RegionState.State;
83  import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper;
84  import org.apache.hadoop.hbase.master.balancer.FavoredNodeLoadBalancer;
85  import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler;
86  import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
87  import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
88  import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler;
89  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
90  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
91  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
92  import org.apache.hadoop.hbase.quotas.RegionStateListener;
93  import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
94  import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
95  import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
96  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
97  import org.apache.hadoop.hbase.util.ConfigUtil;
98  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
99  import org.apache.hadoop.hbase.util.FSUtils;
100 import org.apache.hadoop.hbase.util.KeyLocker;
101 import org.apache.hadoop.hbase.util.Pair;
102 import org.apache.hadoop.hbase.util.PairOfSameType;
103 import org.apache.hadoop.hbase.util.Threads;
104 import org.apache.hadoop.hbase.util.Triple;
105 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
106 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
107 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
108 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
109 import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
110 import org.apache.hadoop.ipc.RemoteException;
111 import org.apache.hadoop.util.StringUtils;
112 import org.apache.zookeeper.AsyncCallback;
113 import org.apache.zookeeper.KeeperException;
114 import org.apache.zookeeper.KeeperException.NoNodeException;
115 import org.apache.zookeeper.KeeperException.NodeExistsException;
116 import org.apache.zookeeper.data.Stat;
117 
118 import com.google.common.annotations.VisibleForTesting;
119 import com.google.common.collect.LinkedHashMultimap;
120 
121 /**
122  * Manages and performs region assignment.
123  * <p>
124  * Monitors ZooKeeper for events related to regions in transition.
125  * <p>
126  * Handles existing regions in transition during master failover.
127  */
128 @InterfaceAudience.Private
129 public class AssignmentManager extends ZooKeeperListener {
130   private static final Log LOG = LogFactory.getLog(AssignmentManager.class);
131 
132   public static final ServerName HBCK_CODE_SERVERNAME = ServerName.valueOf(HConstants.HBCK_CODE_NAME,
133       -1, -1L);
134 
135   static final String ALREADY_IN_TRANSITION_WAITTIME
136     = "hbase.assignment.already.intransition.waittime";
137   static final int DEFAULT_ALREADY_IN_TRANSITION_WAITTIME = 60000; // 1 minute
138 
139   protected final Server server;
140 
141   private ServerManager serverManager;
142 
143   private boolean shouldAssignRegionsWithFavoredNodes;
144 
145   private LoadBalancer balancer;
146 
147   private final MetricsAssignmentManager metricsAssignmentManager;
148 
149   private final TableLockManager tableLockManager;
150 
151   private AtomicInteger numRegionsOpened = new AtomicInteger(0);
152 
153   final private KeyLocker<String> locker = new KeyLocker<String>();
154 
155   Set<HRegionInfo> replicasToClose = Collections.synchronizedSet(new HashSet<HRegionInfo>());
156 
157   /**
158    * Map of regions to reopen after the schema of a table is changed. Key -
159    * encoded region name, value - HRegionInfo
160    */
161   private final Map <String, HRegionInfo> regionsToReopen;
162 
163   /*
164    * Maximum times we recurse an assignment/unassignment.
165    * See below in {@link #assign()} and {@link #unassign()}.
166    */
167   private final int maximumAttempts;
168 
169   /**
170    * Map of two merging regions from the region to be created.
171    */
172   private final Map<String, PairOfSameType<HRegionInfo>> mergingRegions
173     = new HashMap<String, PairOfSameType<HRegionInfo>>();
174 
175   private final Map<HRegionInfo, PairOfSameType<HRegionInfo>> splitRegions
176   = new HashMap<HRegionInfo, PairOfSameType<HRegionInfo>>();
177 
178   /**
179    * The sleep time for which the assignment will wait before retrying in case of hbase:meta assignment
180    * failure due to lack of availability of region plan or bad region plan
181    */
182   private final long sleepTimeBeforeRetryingMetaAssignment;
183 
184   /** Plans for region movement. Key is the encoded version of a region name*/
185   // TODO: When do plans get cleaned out?  Ever? In server open and in server
186   // shutdown processing -- St.Ack
187   // All access to this Map must be synchronized.
188   final NavigableMap<String, RegionPlan> regionPlans =
189     new TreeMap<String, RegionPlan>();
190 
191   private final TableStateManager tableStateManager;
192 
193   private final ExecutorService executorService;
194 
195   // For unit tests, keep track of calls to ClosedRegionHandler
196   private Map<HRegionInfo, AtomicBoolean> closedRegionHandlerCalled = null;
197 
198   // For unit tests, keep track of calls to OpenedRegionHandler
199   private Map<HRegionInfo, AtomicBoolean> openedRegionHandlerCalled = null;
200 
201   //Thread pool executor service for timeout monitor
202   private java.util.concurrent.ExecutorService threadPoolExecutorService;
203 
204   // A bunch of ZK events workers. Each is a single thread executor service
205   private final java.util.concurrent.ExecutorService zkEventWorkers;
206 
207   private List<EventType> ignoreStatesRSOffline = Arrays.asList(
208       EventType.RS_ZK_REGION_FAILED_OPEN, EventType.RS_ZK_REGION_CLOSED);
209 
210   private final RegionStates regionStates;
211 
212   // The threshold to use bulk assigning. Using bulk assignment
213   // only if assigning at least this many regions to at least this
214   // many servers. If assigning fewer regions to fewer servers,
215   // bulk assigning may be not as efficient.
216   private final int bulkAssignThresholdRegions;
217   private final int bulkAssignThresholdServers;
218   private final int bulkPerRegionOpenTimeGuesstimate;
219 
220   // Should bulk assignment wait till all regions are assigned,
221   // or it is timed out?  This is useful to measure bulk assignment
222   // performance, but not needed in most use cases.
223   private final boolean bulkAssignWaitTillAllAssigned;
224 
225   /**
226    * Indicator that AssignmentManager has recovered the region states so
227    * that ServerShutdownHandler can be fully enabled and re-assign regions
228    * of dead servers. So that when re-assignment happens, AssignmentManager
229    * has proper region states.
230    *
231    * Protected to ease testing.
232    */
233   protected final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
234 
235   /**
236    * A map to track the count a region fails to open in a row.
237    * So that we don't try to open a region forever if the failure is
238    * unrecoverable.  We don't put this information in region states
239    * because we don't expect this to happen frequently; we don't
240    * want to copy this information over during each state transition either.
241    */
242   private final ConcurrentHashMap<String, AtomicInteger>
243     failedOpenTracker = new ConcurrentHashMap<String, AtomicInteger>();
244 
245   // A flag to indicate if we are using ZK for region assignment
246   private final boolean useZKForAssignment;
247 
248   // In case not using ZK for region assignment, region states
249   // are persisted in meta with a state store
250   private final RegionStateStore regionStateStore;
251 
252   /**
253    * For testing only!  Set to true to skip handling of split.
254    */
255   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
256   public static boolean TEST_SKIP_SPLIT_HANDLING = false;
257 
258   /** Listeners that are called on assignment events. */
259   private List<AssignmentListener> listeners = new CopyOnWriteArrayList<AssignmentListener>();
260 
261   private RegionStateListener regionStateListener;
262 
263   public enum ServerHostRegion {
264     NOT_HOSTING_REGION, HOSTING_REGION, UNKNOWN,
265   }
266 
267   /**
268    * Constructs a new assignment manager.
269    *
270    * @param server instance of HMaster this AM running inside
271    * @param serverManager serverManager for associated HMaster
272    * @param balancer implementation of {@link LoadBalancer}
273    * @param service Executor service
274    * @param metricsMaster metrics manager
275    * @param tableLockManager TableLock manager
276    * @throws KeeperException
277    * @throws IOException
278    */
279   public AssignmentManager(Server server, ServerManager serverManager,
280       final LoadBalancer balancer,
281       final ExecutorService service, MetricsMaster metricsMaster,
282       final TableLockManager tableLockManager) throws KeeperException,
283         IOException, CoordinatedStateException {
284     super(server.getZooKeeper());
285     this.server = server;
286     this.serverManager = serverManager;
287     this.executorService = service;
288     this.regionStateStore = new RegionStateStore(server);
289     this.regionsToReopen = Collections.synchronizedMap
290                            (new HashMap<String, HRegionInfo> ());
291     Configuration conf = server.getConfiguration();
292     // Only read favored nodes if using the favored nodes load balancer.
293     this.shouldAssignRegionsWithFavoredNodes = conf.getClass(
294            HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class).equals(
295            FavoredNodeLoadBalancer.class);
296     try {
297       if (server.getCoordinatedStateManager() != null) {
298         this.tableStateManager = server.getCoordinatedStateManager().getTableStateManager();
299       } else {
300         this.tableStateManager = null;
301       }
302     } catch (InterruptedException e) {
303       throw new InterruptedIOException();
304     }
305     // This is the max attempts, not retries, so it should be at least 1.
306     this.maximumAttempts = Math.max(1,
307       this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10));
308     this.sleepTimeBeforeRetryingMetaAssignment = this.server.getConfiguration().getLong(
309         "hbase.meta.assignment.retry.sleeptime", 1000l);
310     this.balancer = balancer;
311     int maxThreads = conf.getInt("hbase.assignment.threads.max", 30);
312     this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool(
313       maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("AM."));
314     this.regionStates = new RegionStates(
315       server, tableStateManager, serverManager, regionStateStore);
316 
317     this.bulkAssignWaitTillAllAssigned =
318       conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false);
319     this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7);
320     this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3);
321     this.bulkPerRegionOpenTimeGuesstimate =
322       conf.getInt("hbase.bulk.assignment.perregion.open.time", 10000);
323 
324     int workers = conf.getInt("hbase.assignment.zkevent.workers", 20);
325     ThreadFactory threadFactory = Threads.newDaemonThreadFactory("AM.ZK.Worker");
326     zkEventWorkers = Threads.getBoundedCachedThreadPool(workers, 60L,
327             TimeUnit.SECONDS, threadFactory);
328     this.tableLockManager = tableLockManager;
329 
330     this.metricsAssignmentManager = new MetricsAssignmentManager();
331     useZKForAssignment = ConfigUtil.useZKForAssignment(conf);
332   }
333 
334   /**
335    * Add the listener to the notification list.
336    * @param listener The AssignmentListener to register
337    */
338   public void registerListener(final AssignmentListener listener) {
339     this.listeners.add(listener);
340   }
341 
342   /**
343    * Remove the listener from the notification list.
344    * @param listener The AssignmentListener to unregister
345    */
346   public boolean unregisterListener(final AssignmentListener listener) {
347     return this.listeners.remove(listener);
348   }
349 
350   /**
351    * @return Instance of ZKTableStateManager.
352    */
353   public TableStateManager getTableStateManager() {
354     // These are 'expensive' to make involving trip to zk ensemble so allow
355     // sharing.
356     return this.tableStateManager;
357   }
358 
359   /**
360    * This SHOULD not be public. It is public now
361    * because of some unit tests.
362    *
363    * TODO: make it package private and keep RegionStates in the master package
364    */
365   public RegionStates getRegionStates() {
366     return regionStates;
367   }
368 
369   /**
370    * Used in some tests to mock up region state in meta
371    */
372   @VisibleForTesting
373   RegionStateStore getRegionStateStore() {
374     return regionStateStore;
375   }
376 
377   public RegionPlan getRegionReopenPlan(HRegionInfo hri) {
378     return new RegionPlan(hri, null, regionStates.getRegionServerOfRegion(hri));
379   }
380 
381   /**
382    * Add a regionPlan for the specified region.
383    * @param encodedName
384    * @param plan
385    */
386   public void addPlan(String encodedName, RegionPlan plan) {
387     synchronized (regionPlans) {
388       regionPlans.put(encodedName, plan);
389     }
390   }
391 
392   /**
393    * Add a map of region plans.
394    */
395   public void addPlans(Map<String, RegionPlan> plans) {
396     synchronized (regionPlans) {
397       regionPlans.putAll(plans);
398     }
399   }
400 
401   /**
402    * Set the list of regions that will be reopened
403    * because of an update in table schema
404    *
405    * @param regions
406    *          list of regions that should be tracked for reopen
407    */
408   public void setRegionsToReopen(List <HRegionInfo> regions) {
409     for(HRegionInfo hri : regions) {
410       regionsToReopen.put(hri.getEncodedName(), hri);
411     }
412   }
413 
414   /**
415    * Used by the client to identify if all regions have the schema updates
416    *
417    * @param tableName
418    * @return Pair indicating the status of the alter command
419    * @throws IOException
420    */
421   public Pair<Integer, Integer> getReopenStatus(TableName tableName)
422       throws IOException {
423     List<HRegionInfo> hris;
424     if (TableName.META_TABLE_NAME.equals(tableName)) {
425       hris = new MetaTableLocator().getMetaRegions(server.getZooKeeper());
426     } else {
427       hris = MetaTableAccessor.getTableRegions(server.getZooKeeper(),
428         server.getConnection(), tableName, true);
429     }
430 
431     Integer pending = 0;
432     for (HRegionInfo hri : hris) {
433       String name = hri.getEncodedName();
434       // no lock concurrent access ok: sequential consistency respected.
435       if (regionsToReopen.containsKey(name)
436           || regionStates.isRegionInTransition(name)) {
437         pending++;
438       }
439     }
440     return new Pair<Integer, Integer>(pending, hris.size());
441   }
442 
443   /**
444    * Used by ServerShutdownHandler to make sure AssignmentManager has completed
445    * the failover cleanup before re-assigning regions of dead servers. So that
446    * when re-assignment happens, AssignmentManager has proper region states.
447    */
448   public boolean isFailoverCleanupDone() {
449     return failoverCleanupDone.get();
450   }
451 
452   /**
453    * To avoid racing with AM, external entities may need to lock a region,
454    * for example, when SSH checks what regions to skip re-assigning.
455    */
456   public Lock acquireRegionLock(final String encodedName) {
457     return locker.acquireLock(encodedName);
458   }
459 
460   /**
461    * Now, failover cleanup is completed. Notify server manager to
462    * process queued up dead servers processing, if any.
463    */
464   void failoverCleanupDone() {
465     failoverCleanupDone.set(true);
466     serverManager.processQueuedDeadServers();
467   }
468 
469   /**
470    * Called on startup.
471    * Figures whether a fresh cluster start of we are joining extant running cluster.
472    * @throws IOException
473    * @throws KeeperException
474    * @throws InterruptedException
475    * @throws CoordinatedStateException
476    */
477   void joinCluster() throws IOException,
478       KeeperException, InterruptedException, CoordinatedStateException {
479     long startTime = System.currentTimeMillis();
480     // Concurrency note: In the below the accesses on regionsInTransition are
481     // outside of a synchronization block where usually all accesses to RIT are
482     // synchronized.  The presumption is that in this case it is safe since this
483     // method is being played by a single thread on startup.
484 
485     // TODO: Regions that have a null location and are not in regionsInTransitions
486     // need to be handled.
487 
488     // Scan hbase:meta to build list of existing regions, servers, and assignment
489     // Returns servers who have not checked in (assumed dead) that some regions
490     // were assigned to (according to the meta)
491     Set<ServerName> deadServers = rebuildUserRegions();
492 
493     // This method will assign all user regions if a clean server startup or
494     // it will reconstruct master state and cleanup any leftovers from
495     // previous master process.
496     boolean failover = processDeadServersAndRegionsInTransition(deadServers);
497 
498     if (!useZKForAssignment) {
499       // Not use ZK for assignment any more, remove the ZNode
500       ZKUtil.deleteNodeRecursively(watcher, watcher.assignmentZNode);
501     }
502     recoverTableInDisablingState();
503     recoverTableInEnablingState();
504     LOG.info("Joined the cluster in " + (System.currentTimeMillis()
505       - startTime) + "ms, failover=" + failover);
506   }
507 
508   /**
509    * Process all regions that are in transition in zookeeper and also
510    * processes the list of dead servers by scanning the META.
511    * Used by master joining an cluster.  If we figure this is a clean cluster
512    * startup, will assign all user regions.
513    * @param deadServers
514    *          Map of dead servers and their regions. Can be null.
515    * @throws KeeperException
516    * @throws IOException
517    * @throws InterruptedException
518    */
519   boolean processDeadServersAndRegionsInTransition(
520       final Set<ServerName> deadServers) throws KeeperException,
521         IOException, InterruptedException, CoordinatedStateException {
522     List<String> nodes = ZKUtil.listChildrenNoWatch(watcher,
523       watcher.assignmentZNode);
524 
525     if (useZKForAssignment && nodes == null) {
526       String errorMessage = "Failed to get the children from ZK";
527       server.abort(errorMessage, new IOException(errorMessage));
528       return true; // Doesn't matter in this case
529     }
530 
531     boolean failover = !serverManager.getDeadServers().isEmpty();
532     if (failover) {
533       // This may not be a failover actually, especially if meta is on this master.
534       if (LOG.isDebugEnabled()) {
535         LOG.debug("Found dead servers out on cluster " + serverManager.getDeadServers());
536       }
537     } else {
538       // If any one region except meta is assigned, it's a failover.
539       Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
540       for (Map.Entry<HRegionInfo, ServerName> en:
541           regionStates.getRegionAssignments().entrySet()) {
542         HRegionInfo hri = en.getKey();
543         if (!hri.isMetaTable()
544             && onlineServers.contains(en.getValue())) {
545           LOG.debug("Found " + hri + " out on cluster");
546           failover = true;
547           break;
548         }
549       }
550       if (!failover && nodes != null) {
551         // If any one region except meta is in transition, it's a failover.
552         for (String encodedName: nodes) {
553           RegionState regionState = regionStates.getRegionState(encodedName);
554           if (regionState != null && !regionState.getRegion().isMetaRegion()) {
555             LOG.debug("Found " + regionState + " in RITs");
556             failover = true;
557             break;
558           }
559         }
560       }
561     }
562     if (!failover && !useZKForAssignment) {
563       // If any region except meta is in transition on a live server, it's a failover.
564       Map<String, RegionState> regionsInTransition = regionStates.getRegionsInTransition();
565       if (!regionsInTransition.isEmpty()) {
566         Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
567         for (RegionState regionState: regionsInTransition.values()) {
568           ServerName serverName = regionState.getServerName();
569           if (!regionState.getRegion().isMetaRegion()
570               && serverName != null && onlineServers.contains(serverName)) {
571             LOG.debug("Found " + regionState + " in RITs");
572             failover = true;
573             break;
574           }
575         }
576       }
577     }
578     if (!failover) {
579       // If we get here, we have a full cluster restart. It is a failover only
580       // if there are some WALs are not split yet. For meta WALs, they should have
581       // been split already, if any. We can walk through those queued dead servers,
582       // if they don't have any WALs, this restart should be considered as a clean one
583       Set<ServerName> queuedDeadServers = serverManager.getRequeuedDeadServers().keySet();
584       if (!queuedDeadServers.isEmpty()) {
585         Configuration conf = server.getConfiguration();
586         Path rootdir = FSUtils.getRootDir(conf);
587         FileSystem fs = rootdir.getFileSystem(conf);
588         for (ServerName serverName: queuedDeadServers) {
589           // In the case of a clean exit, the shutdown handler would have presplit any WALs and
590           // removed empty directories.
591           Path logDir = new Path(rootdir,
592               DefaultWALProvider.getWALDirectoryName(serverName.toString()));
593           Path splitDir = logDir.suffix(DefaultWALProvider.SPLITTING_EXT);
594           if (fs.exists(logDir) || fs.exists(splitDir)) {
595             LOG.debug("Found queued dead server " + serverName);
596             failover = true;
597             break;
598           }
599         }
600         if (!failover) {
601           // We figured that it's not a failover, so no need to
602           // work on these re-queued dead servers any more.
603           LOG.info("AM figured that it's not a failover and cleaned up "
604             + queuedDeadServers.size() + " queued dead servers");
605           serverManager.removeRequeuedDeadServers();
606         }
607       }
608     }
609 
610     Set<TableName> disabledOrDisablingOrEnabling = null;
611     Map<HRegionInfo, ServerName> allRegions = null;
612 
613     if (!failover) {
614       disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
615         ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING,
616         ZooKeeperProtos.Table.State.ENABLING);
617 
618       // Clean re/start, mark all user regions closed before reassignment
619       allRegions = regionStates.closeAllUserRegions(
620         disabledOrDisablingOrEnabling);
621     }
622 
623     // Now region states are restored
624     regionStateStore.start();
625 
626     // If we found user regions out on cluster, its a failover.
627     if (failover) {
628       LOG.info("Found regions out on cluster or in RIT; presuming failover");
629       // Process list of dead servers and regions in RIT.
630       // See HBASE-4580 for more information.
631       processDeadServersAndRecoverLostRegions(deadServers);
632     }
633 
634     if (!failover && useZKForAssignment) {
635       // Cleanup any existing ZK nodes and start watching
636       ZKAssign.deleteAllNodes(watcher);
637       ZKUtil.listChildrenAndWatchForNewChildren(this.watcher,
638         this.watcher.assignmentZNode);
639     }
640 
641     // Now we can safely claim failover cleanup completed and enable
642     // ServerShutdownHandler for further processing. The nodes (below)
643     // in transition, if any, are for regions not related to those
644     // dead servers at all, and can be done in parallel to SSH.
645     failoverCleanupDone();
646     if (!failover) {
647       // Fresh cluster startup.
648       LOG.info("Clean cluster startup. Assigning user regions");
649       assignAllUserRegions(allRegions);
650     }
651     // unassign replicas of the split parents and the merged regions
652     // the daughter replicas are opened in assignAllUserRegions if it was
653     // not already opened.
654     for (HRegionInfo h : replicasToClose) {
655       unassign(h);
656     }
657     replicasToClose.clear();
658     return failover;
659   }
660 
661   /**
662    * If region is up in zk in transition, then do fixup and block and wait until
663    * the region is assigned and out of transition.  Used on startup for
664    * catalog regions.
665    * @param hri Region to look for.
666    * @return True if we processed a region in transition else false if region
667    * was not up in zk in transition.
668    * @throws InterruptedException
669    * @throws KeeperException
670    * @throws IOException
671    */
672   boolean processRegionInTransitionAndBlockUntilAssigned(final HRegionInfo hri)
673       throws InterruptedException, KeeperException, IOException {
674     String encodedRegionName = hri.getEncodedName();
675     if (!processRegionInTransition(encodedRegionName, hri)) {
676       return false; // The region is not in transition
677     }
678     LOG.debug("Waiting on " + HRegionInfo.prettyPrint(encodedRegionName));
679     while (!this.server.isStopped() &&
680         this.regionStates.isRegionInTransition(encodedRegionName)) {
681       RegionState state = this.regionStates.getRegionTransitionState(encodedRegionName);
682       if (state == null || !serverManager.isServerOnline(state.getServerName())) {
683         // The region is not in transition, or not in transition on an online
684         // server. Doesn't help to block here any more. Caller need to
685         // verify the region is actually assigned.
686         break;
687       }
688       this.regionStates.waitForUpdate(100);
689     }
690     return true;
691   }
692 
693   /**
694    * Process failover of new master for region <code>encodedRegionName</code>
695    * up in zookeeper.
696    * @param encodedRegionName Region to process failover for.
697    * @param regionInfo If null we'll go get it from meta table.
698    * @return True if we processed <code>regionInfo</code> as a RIT.
699    * @throws KeeperException
700    * @throws IOException
701    */
702   boolean processRegionInTransition(final String encodedRegionName,
703       final HRegionInfo regionInfo) throws KeeperException, IOException {
704     // We need a lock here to ensure that we will not put the same region twice
705     // It has no reason to be a lock shared with the other operations.
706     // We can do the lock on the region only, instead of a global lock: what we want to ensure
707     // is that we don't have two threads working on the same region.
708     Lock lock = locker.acquireLock(encodedRegionName);
709     try {
710       Stat stat = new Stat();
711       byte [] data = ZKAssign.getDataAndWatch(watcher, encodedRegionName, stat);
712       if (data == null) return false;
713       RegionTransition rt;
714       try {
715         rt = RegionTransition.parseFrom(data);
716       } catch (DeserializationException e) {
717         LOG.warn("Failed parse znode data", e);
718         return false;
719       }
720       HRegionInfo hri = regionInfo;
721       if (hri == null) {
722         // The region info is not passed in. We will try to find the region
723         // from region states map/meta based on the encoded region name. But we
724         // may not be able to find it. This is valid for online merge that
725         // the region may have not been created if the merge is not completed.
726         // Therefore, it is not in meta at master recovery time.
727         hri = regionStates.getRegionInfo(rt.getRegionName());
728         EventType et = rt.getEventType();
729         if (hri == null && et != EventType.RS_ZK_REGION_MERGING
730             && et != EventType.RS_ZK_REQUEST_REGION_MERGE) {
731           LOG.warn("Couldn't find the region in recovering " + rt);
732           return false;
733         }
734       }
735 
736       // TODO: This code is tied to ZK anyway, so for now leaving it as is,
737       // will refactor when whole region assignment will be abstracted from ZK
738       BaseCoordinatedStateManager cp =
739         (BaseCoordinatedStateManager) this.server.getCoordinatedStateManager();
740       OpenRegionCoordination openRegionCoordination = cp.getOpenRegionCoordination();
741 
742       ZkOpenRegionCoordination.ZkOpenRegionDetails zkOrd =
743         new ZkOpenRegionCoordination.ZkOpenRegionDetails();
744       zkOrd.setVersion(stat.getVersion());
745       zkOrd.setServerName(cp.getServer().getServerName());
746 
747       return processRegionsInTransition(
748         rt, hri, openRegionCoordination, zkOrd);
749     } finally {
750       lock.unlock();
751     }
752   }
753 
754   /**
755    * This call is invoked only (1) master assign meta;
756    * (2) during failover mode startup, zk assignment node processing.
757    * The locker is set in the caller. It returns true if the region
758    * is in transition for sure, false otherwise.
759    *
760    * It should be private but it is used by some test too.
761    */
762   boolean processRegionsInTransition(
763       final RegionTransition rt, final HRegionInfo regionInfo,
764       OpenRegionCoordination coordination,
765       final OpenRegionCoordination.OpenRegionDetails ord) throws KeeperException {
766     EventType et = rt.getEventType();
767     // Get ServerName.  Could not be null.
768     final ServerName sn = rt.getServerName();
769     final byte[] regionName = rt.getRegionName();
770     final String encodedName = HRegionInfo.encodeRegionName(regionName);
771     final String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
772     LOG.info("Processing " + prettyPrintedRegionName + " in state: " + et);
773 
774     if (regionStates.isRegionInTransition(encodedName)
775         && (regionInfo.isMetaRegion() || !useZKForAssignment)) {
776       LOG.info("Processed region " + prettyPrintedRegionName + " in state: "
777         + et + ", does nothing since the region is already in transition "
778         + regionStates.getRegionTransitionState(encodedName));
779       // Just return
780       return true;
781     }
782     if (!serverManager.isServerOnline(sn)) {
783       // It was transitioning on a dead server, so it's closed now.
784       // Force to OFFLINE and put it in transition, but not assign it
785       // since log splitting for the dead server is not done yet.
786       LOG.debug("RIT " + encodedName + " in state=" + rt.getEventType() +
787         " was on deadserver; forcing offline");
788       if (regionStates.isRegionOnline(regionInfo)) {
789         // Meta could still show the region is assigned to the previous
790         // server. If that server is online, when we reload the meta, the
791         // region is put back to online, we need to offline it.
792         regionStates.regionOffline(regionInfo);
793         sendRegionClosedNotification(regionInfo);
794       }
795       // Put it back in transition so that SSH can re-assign it
796       regionStates.updateRegionState(regionInfo, State.OFFLINE, sn);
797 
798       if (regionInfo.isMetaRegion()) {
799         // If it's meta region, reset the meta location.
800         // So that master knows the right meta region server.
801         MetaTableLocator.setMetaLocation(watcher, sn, State.OPEN);
802       } else {
803         // No matter the previous server is online or offline,
804         // we need to reset the last region server of the region.
805         regionStates.setLastRegionServerOfRegion(sn, encodedName);
806         // Make sure we know the server is dead.
807         if (!serverManager.isServerDead(sn)) {
808           serverManager.expireServer(sn);
809         }
810       }
811       return false;
812     }
813     switch (et) {
814       case M_ZK_REGION_CLOSING:
815         // Insert into RIT & resend the query to the region server: may be the previous master
816         // died before sending the query the first time.
817         final RegionState rsClosing = regionStates.updateRegionState(rt, State.CLOSING);
818         this.executorService.submit(
819           new EventHandler(server, EventType.M_MASTER_RECOVERY) {
820             @Override
821             public void process() throws IOException {
822               ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
823               try {
824                 final int expectedVersion = ((ZkOpenRegionCoordination.ZkOpenRegionDetails) ord)
825                   .getVersion();
826                 unassign(regionInfo, rsClosing, expectedVersion, null, useZKForAssignment, null);
827                 if (regionStates.isRegionOffline(regionInfo)) {
828                   assign(regionInfo, true);
829                 }
830               } finally {
831                 lock.unlock();
832               }
833             }
834           });
835         break;
836 
837       case RS_ZK_REGION_CLOSED:
838       case RS_ZK_REGION_FAILED_OPEN:
839         // Region is closed, insert into RIT and handle it
840         regionStates.setLastRegionServerOfRegion(sn, encodedName);
841         regionStates.updateRegionState(regionInfo, State.CLOSED, sn);
842         if (!replicasToClose.contains(regionInfo)) {
843           invokeAssign(regionInfo);
844         } else {
845           offlineDisabledRegion(regionInfo);
846         }
847         break;
848 
849       case M_ZK_REGION_OFFLINE:
850         // Insert in RIT and resend to the regionserver
851         regionStates.updateRegionState(rt, State.PENDING_OPEN);
852         final RegionState rsOffline = regionStates.getRegionState(regionInfo);
853         this.executorService.submit(
854           new EventHandler(server, EventType.M_MASTER_RECOVERY) {
855             @Override
856             public void process() throws IOException {
857               ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
858               try {
859                 RegionPlan plan = new RegionPlan(regionInfo, null, sn);
860                 addPlan(encodedName, plan);
861                 assign(rsOffline, false, false);
862               } finally {
863                 lock.unlock();
864               }
865             }
866           });
867         break;
868 
869       case RS_ZK_REGION_OPENING:
870         regionStates.updateRegionState(rt, State.OPENING);
871         break;
872 
873       case RS_ZK_REGION_OPENED:
874         // Region is opened, insert into RIT and handle it
875         // This could be done asynchronously, we would need then to acquire the lock in the
876         //  handler.
877         regionStates.updateRegionState(rt, State.OPEN);
878         new OpenedRegionHandler(server, this, regionInfo, coordination, ord).process();
879         break;
880       case RS_ZK_REQUEST_REGION_SPLIT:
881       case RS_ZK_REGION_SPLITTING:
882       case RS_ZK_REGION_SPLIT:
883         // Splitting region should be online. We could have skipped it during
884         // user region rebuilding since we may consider the split is completed.
885         // Put it in SPLITTING state to avoid complications.
886         regionStates.regionOnline(regionInfo, sn);
887         regionStates.updateRegionState(rt, State.SPLITTING);
888         if (!handleRegionSplitting(
889             rt, encodedName, prettyPrintedRegionName, sn)) {
890           deleteSplittingNode(encodedName, sn);
891         }
892         break;
893       case RS_ZK_REQUEST_REGION_MERGE:
894       case RS_ZK_REGION_MERGING:
895       case RS_ZK_REGION_MERGED:
896         if (!handleRegionMerging(
897             rt, encodedName, prettyPrintedRegionName, sn)) {
898           deleteMergingNode(encodedName, sn);
899         }
900         break;
901       default:
902         throw new IllegalStateException("Received region in state:" + et + " is not valid.");
903     }
904     LOG.info("Processed region " + prettyPrintedRegionName + " in state "
905       + et + ", on " + (serverManager.isServerOnline(sn) ? "" : "dead ")
906       + "server: " + sn);
907     return true;
908   }
909 
910   /**
911    * When a region is closed, it should be removed from the regionsToReopen
912    * @param hri HRegionInfo of the region which was closed
913    */
914   public void removeClosedRegion(HRegionInfo hri) {
915     if (regionsToReopen.remove(hri.getEncodedName()) != null) {
916       LOG.debug("Removed region from reopening regions because it was closed");
917     }
918   }
919 
920   /**
921    * Handles various states an unassigned node can be in.
922    * <p>
923    * Method is called when a state change is suspected for an unassigned node.
924    * <p>
925    * This deals with skipped transitions (we got a CLOSED but didn't see CLOSING
926    * yet).
927    * @param rt region transition
928    * @param coordination coordination for opening region
929    * @param ord details about opening region
930    */
931   void handleRegion(final RegionTransition rt, OpenRegionCoordination coordination,
932                     OpenRegionCoordination.OpenRegionDetails ord) {
933     if (rt == null) {
934       LOG.warn("Unexpected NULL input for RegionTransition rt");
935       return;
936     }
937     final ServerName sn = rt.getServerName();
938     // Check if this is a special HBCK transition
939     if (sn.equals(HBCK_CODE_SERVERNAME)) {
940       handleHBCK(rt);
941       return;
942     }
943     final long createTime = rt.getCreateTime();
944     final byte[] regionName = rt.getRegionName();
945     String encodedName = HRegionInfo.encodeRegionName(regionName);
946     String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
947     // Verify this is a known server
948     if (!serverManager.isServerOnline(sn)
949       && !ignoreStatesRSOffline.contains(rt.getEventType())) {
950       LOG.warn("Attempted to handle region transition for server but " +
951         "it is not online: " + prettyPrintedRegionName + ", " + rt);
952       return;
953     }
954 
955     RegionState regionState =
956       regionStates.getRegionState(encodedName);
957     long startTime = System.currentTimeMillis();
958     if (LOG.isDebugEnabled()) {
959       boolean lateEvent = createTime < (startTime - 15000);
960       LOG.debug("Handling " + rt.getEventType() +
961         ", server=" + sn + ", region=" +
962         (prettyPrintedRegionName == null ? "null" : prettyPrintedRegionName) +
963         (lateEvent ? ", which is more than 15 seconds late" : "") +
964         ", current_state=" + regionState);
965     }
966     // We don't do anything for this event,
967     // so separate it out, no need to lock/unlock anything
968     if (rt.getEventType() == EventType.M_ZK_REGION_OFFLINE) {
969       return;
970     }
971 
972     // We need a lock on the region as we could update it
973     Lock lock = locker.acquireLock(encodedName);
974     try {
975       RegionState latestState =
976         regionStates.getRegionState(encodedName);
977       if ((regionState == null && latestState != null)
978           || (regionState != null && latestState == null)
979           || (regionState != null && latestState != null
980             && latestState.getState() != regionState.getState())) {
981         LOG.warn("Region state changed from " + regionState + " to "
982           + latestState + ", while acquiring lock");
983       }
984       long waitedTime = System.currentTimeMillis() - startTime;
985       if (waitedTime > 5000) {
986         LOG.warn("Took " + waitedTime + "ms to acquire the lock");
987       }
988       regionState = latestState;
989       switch (rt.getEventType()) {
990       case RS_ZK_REQUEST_REGION_SPLIT:
991       case RS_ZK_REGION_SPLITTING:
992       case RS_ZK_REGION_SPLIT:
993         if (!handleRegionSplitting(
994             rt, encodedName, prettyPrintedRegionName, sn)) {
995           deleteSplittingNode(encodedName, sn);
996         }
997         break;
998 
999       case RS_ZK_REQUEST_REGION_MERGE:
1000       case RS_ZK_REGION_MERGING:
1001       case RS_ZK_REGION_MERGED:
1002         // Merged region is a new region, we can't find it in the region states now.
1003         // However, the two merging regions are not new. They should be in state for merging.
1004         if (!handleRegionMerging(
1005             rt, encodedName, prettyPrintedRegionName, sn)) {
1006           deleteMergingNode(encodedName, sn);
1007         }
1008         break;
1009 
1010       case M_ZK_REGION_CLOSING:
1011         // Should see CLOSING after we have asked it to CLOSE or additional
1012         // times after already being in state of CLOSING
1013         if (regionState == null
1014             || !regionState.isPendingCloseOrClosingOnServer(sn)) {
1015           LOG.warn("Received CLOSING for " + prettyPrintedRegionName
1016             + " from " + sn + " but the region isn't PENDING_CLOSE/CLOSING here: "
1017             + regionStates.getRegionState(encodedName));
1018           return;
1019         }
1020         // Transition to CLOSING (or update stamp if already CLOSING)
1021         regionStates.updateRegionState(rt, State.CLOSING);
1022         break;
1023 
1024       case RS_ZK_REGION_CLOSED:
1025         // Should see CLOSED after CLOSING but possible after PENDING_CLOSE
1026         if (regionState == null
1027             || !regionState.isPendingCloseOrClosingOnServer(sn)) {
1028           LOG.warn("Received CLOSED for " + prettyPrintedRegionName
1029             + " from " + sn + " but the region isn't PENDING_CLOSE/CLOSING here: "
1030             + regionStates.getRegionState(encodedName));
1031           return;
1032         }
1033         // Handle CLOSED by assigning elsewhere or stopping if a disable
1034         // If we got here all is good.  Need to update RegionState -- else
1035         // what follows will fail because not in expected state.
1036         new ClosedRegionHandler(server, this, regionState.getRegion()).process();
1037         updateClosedRegionHandlerTracker(regionState.getRegion());
1038         break;
1039 
1040         case RS_ZK_REGION_FAILED_OPEN:
1041           if (regionState == null
1042               || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
1043             LOG.warn("Received FAILED_OPEN for " + prettyPrintedRegionName
1044               + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
1045               + regionStates.getRegionState(encodedName));
1046             return;
1047           }
1048           AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
1049           if (failedOpenCount == null) {
1050             failedOpenCount = new AtomicInteger();
1051             // No need to use putIfAbsent, or extra synchronization since
1052             // this whole handleRegion block is locked on the encoded region
1053             // name, and failedOpenTracker is updated only in this block
1054             failedOpenTracker.put(encodedName, failedOpenCount);
1055           }
1056           if (failedOpenCount.incrementAndGet() >= maximumAttempts) {
1057             regionStates.updateRegionState(rt, State.FAILED_OPEN);
1058             // remove the tracking info to save memory, also reset
1059             // the count for next open initiative
1060             failedOpenTracker.remove(encodedName);
1061           } else {
1062             // Handle this the same as if it were opened and then closed.
1063             regionState = regionStates.updateRegionState(rt, State.CLOSED);
1064             if (regionState != null) {
1065               // When there are more than one region server a new RS is selected as the
1066               // destination and the same is updated in the regionplan. (HBASE-5546)
1067               try {
1068                 getRegionPlan(regionState.getRegion(), sn, true);
1069                 new ClosedRegionHandler(server, this, regionState.getRegion()).process();
1070               } catch (HBaseIOException e) {
1071                 LOG.warn("Failed to get region plan", e);
1072               }
1073             }
1074           }
1075           break;
1076 
1077         case RS_ZK_REGION_OPENING:
1078           // Should see OPENING after we have asked it to OPEN or additional
1079           // times after already being in state of OPENING
1080           if (regionState == null
1081               || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
1082             LOG.warn("Received OPENING for " + prettyPrintedRegionName
1083               + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
1084               + regionStates.getRegionState(encodedName));
1085             return;
1086           }
1087           // Transition to OPENING (or update stamp if already OPENING)
1088           regionStates.updateRegionState(rt, State.OPENING);
1089           break;
1090 
1091         case RS_ZK_REGION_OPENED:
1092           // Should see OPENED after OPENING but possible after PENDING_OPEN.
1093           if (regionState == null
1094               || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
1095             LOG.warn("Received OPENED for " + prettyPrintedRegionName
1096               + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
1097               + regionStates.getRegionState(encodedName));
1098 
1099             if (regionState != null) {
1100               // Close it without updating the internal region states,
1101               // so as not to create double assignments in unlucky scenarios
1102               // mentioned in OpenRegionHandler#process
1103               unassign(regionState.getRegion(), null, -1, null, false, sn);
1104             }
1105             return;
1106           }
1107           // Handle OPENED by removing from transition and deleted zk node
1108           regionState =
1109               regionStates.transitionOpenFromPendingOpenOrOpeningOnServer(rt,regionState, sn);
1110           if (regionState != null) {
1111             failedOpenTracker.remove(encodedName); // reset the count, if any
1112             new OpenedRegionHandler(
1113               server, this, regionState.getRegion(), coordination, ord).process();
1114             updateOpenedRegionHandlerTracker(regionState.getRegion());
1115           }
1116           break;
1117 
1118         default:
1119           throw new IllegalStateException("Received event is not valid.");
1120       }
1121     } finally {
1122       lock.unlock();
1123     }
1124   }
1125 
1126   //For unit tests only
1127   boolean wasClosedHandlerCalled(HRegionInfo hri) {
1128     AtomicBoolean b = closedRegionHandlerCalled.get(hri);
1129     //compareAndSet to be sure that unit tests don't see stale values. Means,
1130     //we will return true exactly once unless the handler code resets to true
1131     //this value.
1132     return b == null ? false : b.compareAndSet(true, false);
1133   }
1134 
1135   //For unit tests only
1136   boolean wasOpenedHandlerCalled(HRegionInfo hri) {
1137     AtomicBoolean b = openedRegionHandlerCalled.get(hri);
1138     //compareAndSet to be sure that unit tests don't see stale values. Means,
1139     //we will return true exactly once unless the handler code resets to true
1140     //this value.
1141     return b == null ? false : b.compareAndSet(true, false);
1142   }
1143 
1144   //For unit tests only
1145   void initializeHandlerTrackers() {
1146     closedRegionHandlerCalled = new HashMap<HRegionInfo, AtomicBoolean>();
1147     openedRegionHandlerCalled = new HashMap<HRegionInfo, AtomicBoolean>();
1148   }
1149 
1150   void updateClosedRegionHandlerTracker(HRegionInfo hri) {
1151     if (closedRegionHandlerCalled != null) { //only for unit tests this is true
1152       closedRegionHandlerCalled.put(hri, new AtomicBoolean(true));
1153     }
1154   }
1155 
1156   void updateOpenedRegionHandlerTracker(HRegionInfo hri) {
1157     if (openedRegionHandlerCalled != null) { //only for unit tests this is true
1158       openedRegionHandlerCalled.put(hri, new AtomicBoolean(true));
1159     }
1160   }
1161 
1162   // TODO: processFavoredNodes might throw an exception, for e.g., if the
1163   // meta could not be contacted/updated. We need to see how seriously to treat
1164   // this problem as. Should we fail the current assignment. We should be able
1165   // to recover from this problem eventually (if the meta couldn't be updated
1166   // things should work normally and eventually get fixed up).
1167   void processFavoredNodes(List<HRegionInfo> regions) throws IOException {
1168     if (!shouldAssignRegionsWithFavoredNodes) return;
1169     // The AM gets the favored nodes info for each region and updates the meta
1170     // table with that info
1171     Map<HRegionInfo, List<ServerName>> regionToFavoredNodes =
1172         new HashMap<HRegionInfo, List<ServerName>>();
1173     for (HRegionInfo region : regions) {
1174       regionToFavoredNodes.put(region,
1175           ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region));
1176     }
1177     FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(regionToFavoredNodes,
1178       this.server.getConnection());
1179   }
1180 
1181   /**
1182    * Handle a ZK unassigned node transition triggered by HBCK repair tool.
1183    * <p>
1184    * This is handled in a separate code path because it breaks the normal rules.
1185    * @param rt
1186    */
1187   @SuppressWarnings("deprecation")
1188   private void handleHBCK(RegionTransition rt) {
1189     String encodedName = HRegionInfo.encodeRegionName(rt.getRegionName());
1190     LOG.info("Handling HBCK triggered transition=" + rt.getEventType() +
1191       ", server=" + rt.getServerName() + ", region=" +
1192       HRegionInfo.prettyPrint(encodedName));
1193     RegionState regionState = regionStates.getRegionTransitionState(encodedName);
1194     switch (rt.getEventType()) {
1195       case M_ZK_REGION_OFFLINE:
1196         HRegionInfo regionInfo;
1197         if (regionState != null) {
1198           regionInfo = regionState.getRegion();
1199         } else {
1200           try {
1201             byte [] name = rt.getRegionName();
1202             Pair<HRegionInfo, ServerName> p = MetaTableAccessor.getRegion(
1203               this.server.getConnection(), name);
1204             regionInfo = p.getFirst();
1205           } catch (IOException e) {
1206             LOG.info("Exception reading hbase:meta doing HBCK repair operation", e);
1207             return;
1208           }
1209         }
1210         LOG.info("HBCK repair is triggering assignment of region=" +
1211             regionInfo.getRegionNameAsString());
1212         // trigger assign, node is already in OFFLINE so don't need to update ZK
1213         assign(regionInfo, false);
1214         break;
1215 
1216       default:
1217         LOG.warn("Received unexpected region state from HBCK: " + rt.toString());
1218         break;
1219     }
1220 
1221   }
1222 
1223   // ZooKeeper events
1224 
1225   /**
1226    * New unassigned node has been created.
1227    *
1228    * <p>This happens when an RS begins the OPENING or CLOSING of a region by
1229    * creating an unassigned node.
1230    *
1231    * <p>When this happens we must:
1232    * <ol>
1233    *   <li>Watch the node for further events</li>
1234    *   <li>Read and handle the state in the node</li>
1235    * </ol>
1236    */
1237   @Override
1238   public void nodeCreated(String path) {
1239     handleAssignmentEvent(path);
1240   }
1241 
1242   /**
1243    * Existing unassigned node has had data changed.
1244    *
1245    * <p>This happens when an RS transitions from OFFLINE to OPENING, or between
1246    * OPENING/OPENED and CLOSING/CLOSED.
1247    *
1248    * <p>When this happens we must:
1249    * <ol>
1250    *   <li>Watch the node for further events</li>
1251    *   <li>Read and handle the state in the node</li>
1252    * </ol>
1253    */
1254   @Override
1255   public void nodeDataChanged(String path) {
1256     handleAssignmentEvent(path);
1257   }
1258 
1259 
1260   // We  don't want to have two events on the same region managed simultaneously.
1261   // For this reason, we need to wait if an event on the same region is currently in progress.
1262   // So we track the region names of the events in progress, and we keep a waiting list.
1263   private final Set<String> regionsInProgress = new HashSet<String>();
1264   // In a LinkedHashMultimap, the put order is kept when we retrieve the collection back. We need
1265   //  this as we want the events to be managed in the same order as we received them.
1266   private final LinkedHashMultimap <String, RegionRunnable>
1267       zkEventWorkerWaitingList = LinkedHashMultimap.create();
1268 
1269   /**
1270    * A specific runnable that works only on a region.
1271    */
1272   private interface RegionRunnable extends Runnable{
1273     /**
1274      * @return - the name of the region it works on.
1275      */
1276     String getRegionName();
1277   }
1278 
1279   /**
1280    * Submit a task, ensuring that there is only one task at a time that working on a given region.
1281    * Order is respected.
1282    */
1283   protected void zkEventWorkersSubmit(final RegionRunnable regRunnable) {
1284 
1285     synchronized (regionsInProgress) {
1286       // If we're there is already a task with this region, we add it to the
1287       //  waiting list and return.
1288       if (regionsInProgress.contains(regRunnable.getRegionName())) {
1289         synchronized (zkEventWorkerWaitingList){
1290           zkEventWorkerWaitingList.put(regRunnable.getRegionName(), regRunnable);
1291         }
1292         return;
1293       }
1294 
1295       // No event in progress on this region => we can submit a new task immediately.
1296       regionsInProgress.add(regRunnable.getRegionName());
1297       zkEventWorkers.submit(new Runnable() {
1298         @Override
1299         public void run() {
1300           try {
1301             regRunnable.run();
1302           } finally {
1303             // now that we have finished, let's see if there is an event for the same region in the
1304             //  waiting list. If it's the case, we can now submit it to the pool.
1305             synchronized (regionsInProgress) {
1306               regionsInProgress.remove(regRunnable.getRegionName());
1307               synchronized (zkEventWorkerWaitingList) {
1308                 java.util.Set<RegionRunnable> waiting = zkEventWorkerWaitingList.get(
1309                     regRunnable.getRegionName());
1310                 if (!waiting.isEmpty()) {
1311                   // We want the first object only. The only way to get it is through an iterator.
1312                   RegionRunnable toSubmit = waiting.iterator().next();
1313                   zkEventWorkerWaitingList.remove(toSubmit.getRegionName(), toSubmit);
1314                   zkEventWorkersSubmit(toSubmit);
1315                 }
1316               }
1317             }
1318           }
1319         }
1320       });
1321     }
1322   }
1323 
1324   @Override
1325   public void nodeDeleted(final String path) {
1326     if (path.startsWith(watcher.assignmentZNode)) {
1327       final String regionName = ZKAssign.getRegionName(watcher, path);
1328       zkEventWorkersSubmit(new RegionRunnable() {
1329         @Override
1330         public String getRegionName() {
1331           return regionName;
1332         }
1333 
1334         @Override
1335         public void run() {
1336           Lock lock = locker.acquireLock(regionName);
1337           try {
1338             RegionState rs = regionStates.getRegionTransitionState(regionName);
1339             if (rs == null) {
1340               rs = regionStates.getRegionState(regionName);
1341               if (rs == null || !rs.isMergingNew()) {
1342                 // MergingNew is an offline state
1343                 return;
1344               }
1345             }
1346 
1347             HRegionInfo regionInfo = rs.getRegion();
1348             String regionNameStr = regionInfo.getRegionNameAsString();
1349             LOG.debug("Znode " + regionNameStr + " deleted, state: " + rs);
1350 
1351             boolean disabled = getTableStateManager().isTableState(regionInfo.getTable(),
1352                 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING);
1353 
1354             ServerName serverName = rs.getServerName();
1355             if (serverManager.isServerOnline(serverName)) {
1356               if (rs.isOnServer(serverName) && (rs.isOpened() || rs.isSplitting())) {
1357                 synchronized (regionStates) {
1358                   regionOnline(regionInfo, serverName);
1359                   if (rs.isSplitting() && splitRegions.containsKey(regionInfo)) {
1360                     // Check if the daugter regions are still there, if they are present, offline
1361                     // as its the case of a rollback.
1362                     HRegionInfo hri_a = splitRegions.get(regionInfo).getFirst();
1363                     HRegionInfo hri_b = splitRegions.get(regionInfo).getSecond();
1364                     if (!regionStates.isRegionInTransition(hri_a.getEncodedName())) {
1365                       LOG.warn("Split daughter region not in transition " + hri_a);
1366                     }
1367                     if (!regionStates.isRegionInTransition(hri_b.getEncodedName())) {
1368                       LOG.warn("Split daughter region not in transition" + hri_b);
1369                     }
1370                     regionOffline(hri_a);
1371                     regionOffline(hri_b);
1372                     splitRegions.remove(regionInfo);
1373                   }
1374                   if (disabled) {
1375                     // if server is offline, no hurt to unassign again
1376                     LOG.info("Opened " + regionNameStr
1377                         + "but this table is disabled, triggering close of region");
1378                     unassign(regionInfo);
1379                   }
1380                 }
1381               } else if (rs.isMergingNew()) {
1382                 synchronized (regionStates) {
1383                   String p = regionInfo.getEncodedName();
1384                   PairOfSameType<HRegionInfo> regions = mergingRegions.get(p);
1385                   if (regions != null) {
1386                     onlineMergingRegion(disabled, regions.getFirst(), serverName);
1387                     onlineMergingRegion(disabled, regions.getSecond(), serverName);
1388                   }
1389                 }
1390               }
1391             }
1392           } finally {
1393             lock.unlock();
1394           }
1395         }
1396 
1397         private void onlineMergingRegion(boolean disabled,
1398             final HRegionInfo hri, final ServerName serverName) {
1399           RegionState regionState = regionStates.getRegionState(hri);
1400           if (regionState != null && regionState.isMerging()
1401               && regionState.isOnServer(serverName)) {
1402             regionOnline(regionState.getRegion(), serverName);
1403             if (disabled) {
1404               unassign(hri);
1405             }
1406           }
1407         }
1408       });
1409     }
1410   }
1411 
1412   /**
1413    * New unassigned node has been created.
1414    *
1415    * <p>This happens when an RS begins the OPENING, SPLITTING or CLOSING of a
1416    * region by creating a znode.
1417    *
1418    * <p>When this happens we must:
1419    * <ol>
1420    *   <li>Watch the node for further children changed events</li>
1421    *   <li>Watch all new children for changed events</li>
1422    * </ol>
1423    */
1424   @Override
1425   public void nodeChildrenChanged(String path) {
1426     if (path.equals(watcher.assignmentZNode)) {
1427       zkEventWorkers.submit(new Runnable() {
1428         @Override
1429         public void run() {
1430           try {
1431             // Just make sure we see the changes for the new znodes
1432             List<String> children =
1433               ZKUtil.listChildrenAndWatchForNewChildren(
1434                 watcher, watcher.assignmentZNode);
1435             if (children != null) {
1436               Stat stat = new Stat();
1437               for (String child : children) {
1438                 // if region is in transition, we already have a watch
1439                 // on it, so no need to watch it again. So, as I know for now,
1440                 // this is needed to watch splitting nodes only.
1441                 if (!regionStates.isRegionInTransition(child)) {
1442                   ZKAssign.getDataAndWatch(watcher, child, stat);
1443                 }
1444               }
1445             }
1446           } catch (KeeperException e) {
1447             server.abort("Unexpected ZK exception reading unassigned children", e);
1448           }
1449         }
1450       });
1451     }
1452   }
1453 
1454 
1455   /**
1456    * Marks the region as online.  Removes it from regions in transition and
1457    * updates the in-memory assignment information.
1458    * <p>
1459    * Used when a region has been successfully opened on a region server.
1460    * @param regionInfo
1461    * @param sn
1462    */
1463   void regionOnline(HRegionInfo regionInfo, ServerName sn) {
1464     regionOnline(regionInfo, sn, HConstants.NO_SEQNUM);
1465   }
1466 
1467   void regionOnline(HRegionInfo regionInfo, ServerName sn, long openSeqNum) {
1468     numRegionsOpened.incrementAndGet();
1469     regionStates.regionOnline(regionInfo, sn, openSeqNum);
1470 
1471     // Remove plan if one.
1472     clearRegionPlan(regionInfo);
1473     balancer.regionOnline(regionInfo, sn);
1474 
1475     // Tell our listeners that a region was opened
1476     sendRegionOpenedNotification(regionInfo, sn);
1477   }
1478 
1479   /**
1480    * Pass the assignment event to a worker for processing.
1481    * Each worker is a single thread executor service.  The reason
1482    * for just one thread is to make sure all events for a given
1483    * region are processed in order.
1484    *
1485    * @param path
1486    */
1487   private void handleAssignmentEvent(final String path) {
1488     if (path.startsWith(watcher.assignmentZNode)) {
1489       final String regionName = ZKAssign.getRegionName(watcher, path);
1490 
1491       zkEventWorkersSubmit(new RegionRunnable() {
1492         @Override
1493         public String getRegionName() {
1494           return regionName;
1495         }
1496 
1497         @Override
1498         public void run() {
1499           try {
1500             Stat stat = new Stat();
1501             byte [] data = ZKAssign.getDataAndWatch(watcher, path, stat);
1502             if (data == null) return;
1503 
1504             RegionTransition rt = RegionTransition.parseFrom(data);
1505 
1506             // TODO: This code is tied to ZK anyway, so for now leaving it as is,
1507             // will refactor when whole region assignment will be abstracted from ZK
1508             BaseCoordinatedStateManager csm =
1509               (BaseCoordinatedStateManager) server.getCoordinatedStateManager();
1510             OpenRegionCoordination openRegionCoordination = csm.getOpenRegionCoordination();
1511 
1512             ZkOpenRegionCoordination.ZkOpenRegionDetails zkOrd =
1513               new ZkOpenRegionCoordination.ZkOpenRegionDetails();
1514             zkOrd.setVersion(stat.getVersion());
1515             zkOrd.setServerName(csm.getServer().getServerName());
1516 
1517             handleRegion(rt, openRegionCoordination, zkOrd);
1518           } catch (KeeperException e) {
1519             server.abort("Unexpected ZK exception reading unassigned node data", e);
1520           } catch (DeserializationException e) {
1521             server.abort("Unexpected exception deserializing node data", e);
1522           }
1523         }
1524       });
1525     }
1526   }
1527 
1528   /**
1529    * Marks the region as offline.  Removes it from regions in transition and
1530    * removes in-memory assignment information.
1531    * <p>
1532    * Used when a region has been closed and should remain closed.
1533    * @param regionInfo
1534    */
1535   public void regionOffline(final HRegionInfo regionInfo) {
1536     regionOffline(regionInfo, null);
1537   }
1538 
1539   public void offlineDisabledRegion(HRegionInfo regionInfo) {
1540     if (useZKForAssignment) {
1541       // Disabling so should not be reassigned, just delete the CLOSED node
1542       LOG.debug("Table being disabled so deleting ZK node and removing from " +
1543         "regions in transition, skipping assignment of region " +
1544           regionInfo.getRegionNameAsString());
1545       String encodedName = regionInfo.getEncodedName();
1546       deleteNodeInStates(encodedName, "closed", null,
1547         EventType.RS_ZK_REGION_CLOSED, EventType.M_ZK_REGION_OFFLINE);
1548     }
1549     replicasToClose.remove(regionInfo);
1550     regionOffline(regionInfo);
1551   }
1552 
1553   // Assignment methods
1554 
1555   /**
1556    * Assigns the specified region.
1557    * <p>
1558    * If a RegionPlan is available with a valid destination then it will be used
1559    * to determine what server region is assigned to.  If no RegionPlan is
1560    * available, region will be assigned to a random available server.
1561    * <p>
1562    * Updates the RegionState and sends the OPEN RPC.
1563    * <p>
1564    * This will only succeed if the region is in transition and in a CLOSED or
1565    * OFFLINE state or not in transition (in-memory not zk), and of course, the
1566    * chosen server is up and running (It may have just crashed!).  If the
1567    * in-memory checks pass, the zk node is forced to OFFLINE before assigning.
1568    *
1569    * @param region server to be assigned
1570    * @param setOfflineInZK whether ZK node should be created/transitioned to an
1571    *                       OFFLINE state before assigning the region
1572    */
1573   public void assign(HRegionInfo region, boolean setOfflineInZK) {
1574     assign(region, setOfflineInZK, false);
1575   }
1576 
1577   /**
1578    * Use care with forceNewPlan. It could cause double assignment.
1579    */
1580   public void assign(HRegionInfo region,
1581       boolean setOfflineInZK, boolean forceNewPlan) {
1582     if (isDisabledorDisablingRegionInRIT(region)) {
1583       return;
1584     }
1585     String encodedName = region.getEncodedName();
1586     Lock lock = locker.acquireLock(encodedName);
1587     try {
1588       RegionState state = forceRegionStateToOffline(region, forceNewPlan);
1589       if (state != null) {
1590         if (regionStates.wasRegionOnDeadServer(encodedName)) {
1591           LOG.info("Skip assigning " + region.getRegionNameAsString()
1592             + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
1593             + " is dead but not processed yet");
1594           return;
1595         }
1596         assign(state, setOfflineInZK && useZKForAssignment, forceNewPlan);
1597       }
1598     } finally {
1599       lock.unlock();
1600     }
1601   }
1602 
1603   /**
1604    * Bulk assign regions to <code>destination</code>.
1605    * @param destination
1606    * @param regions Regions to assign.
1607    * @return true if successful
1608    */
1609   boolean assign(final ServerName destination, final List<HRegionInfo> regions)
1610     throws InterruptedException {
1611     long startTime = EnvironmentEdgeManager.currentTime();
1612     try {
1613       int regionCount = regions.size();
1614       if (regionCount == 0) {
1615         return true;
1616       }
1617       LOG.info("Assigning " + regionCount + " region(s) to " + destination.toString());
1618       Set<String> encodedNames = new HashSet<String>(regionCount);
1619       for (HRegionInfo region : regions) {
1620         encodedNames.add(region.getEncodedName());
1621       }
1622 
1623       List<HRegionInfo> failedToOpenRegions = new ArrayList<HRegionInfo>();
1624       Map<String, Lock> locks = locker.acquireLocks(encodedNames);
1625       try {
1626         AtomicInteger counter = new AtomicInteger(0);
1627         Map<String, Integer> offlineNodesVersions = new ConcurrentHashMap<String, Integer>();
1628         OfflineCallback cb = new OfflineCallback(
1629           watcher, destination, counter, offlineNodesVersions);
1630         Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>(regions.size());
1631         List<RegionState> states = new ArrayList<RegionState>(regions.size());
1632         for (HRegionInfo region : regions) {
1633           String encodedName = region.getEncodedName();
1634           if (!isDisabledorDisablingRegionInRIT(region)) {
1635             RegionState state = forceRegionStateToOffline(region, false);
1636             boolean onDeadServer = false;
1637             if (state != null) {
1638               if (regionStates.wasRegionOnDeadServer(encodedName)) {
1639                 LOG.info("Skip assigning " + region.getRegionNameAsString()
1640                   + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
1641                   + " is dead but not processed yet");
1642                 onDeadServer = true;
1643               } else if (!useZKForAssignment
1644                   || asyncSetOfflineInZooKeeper(state, cb, destination)) {
1645                 RegionPlan plan = new RegionPlan(region, state.getServerName(), destination);
1646                 plans.put(encodedName, plan);
1647                 states.add(state);
1648                 continue;
1649               }
1650             }
1651             // Reassign if the region wasn't on a dead server
1652             if (!onDeadServer) {
1653               LOG.info("failed to force region state to offline or "
1654                 + "failed to set it offline in ZK, will reassign later: " + region);
1655               failedToOpenRegions.add(region); // assign individually later
1656             }
1657           }
1658           // Release the lock, this region is excluded from bulk assign because
1659           // we can't update its state, or set its znode to offline.
1660           Lock lock = locks.remove(encodedName);
1661           lock.unlock();
1662         }
1663 
1664         if (useZKForAssignment) {
1665           // Wait until all unassigned nodes have been put up and watchers set.
1666           int total = states.size();
1667           for (int oldCounter = 0; !server.isStopped();) {
1668             int count = counter.get();
1669             if (oldCounter != count) {
1670               LOG.debug(destination.toString() + " unassigned znodes=" + count +
1671                 " of total=" + total + "; oldCounter=" + oldCounter);
1672               oldCounter = count;
1673             }
1674             if (count >= total) break;
1675             Thread.sleep(5);
1676           }
1677         }
1678 
1679         if (server.isStopped()) {
1680           return false;
1681         }
1682 
1683         // Add region plans, so we can updateTimers when one region is opened so
1684         // that unnecessary timeout on RIT is reduced.
1685         this.addPlans(plans);
1686 
1687         List<Triple<HRegionInfo, Integer, List<ServerName>>> regionOpenInfos =
1688           new ArrayList<Triple<HRegionInfo, Integer, List<ServerName>>>(states.size());
1689         for (RegionState state: states) {
1690           HRegionInfo region = state.getRegion();
1691           String encodedRegionName = region.getEncodedName();
1692           Integer nodeVersion = offlineNodesVersions.get(encodedRegionName);
1693           if (useZKForAssignment && (nodeVersion == null || nodeVersion == -1)) {
1694             LOG.warn("failed to offline in zookeeper: " + region);
1695             failedToOpenRegions.add(region); // assign individually later
1696             Lock lock = locks.remove(encodedRegionName);
1697             lock.unlock();
1698           } else {
1699             regionStates.updateRegionState(
1700               region, State.PENDING_OPEN, destination);
1701             List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
1702             if (this.shouldAssignRegionsWithFavoredNodes) {
1703               favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
1704             }
1705             regionOpenInfos.add(new Triple<HRegionInfo, Integer,  List<ServerName>>(
1706               region, nodeVersion, favoredNodes));
1707           }
1708         }
1709 
1710         // Move on to open regions.
1711         try {
1712           // Send OPEN RPC. If it fails on a IOE or RemoteException,
1713           // regions will be assigned individually.
1714           long maxWaitTime = System.currentTimeMillis() +
1715             this.server.getConfiguration().
1716               getLong("hbase.regionserver.rpc.startup.waittime", 60000);
1717           for (int i = 1; i <= maximumAttempts && !server.isStopped(); i++) {
1718             try {
1719               // regionOpenInfos is empty if all regions are in failedToOpenRegions list
1720               if (regionOpenInfos.isEmpty()) {
1721                 break;
1722               }
1723               List<RegionOpeningState> regionOpeningStateList = serverManager
1724                 .sendRegionOpen(destination, regionOpenInfos);
1725               if (regionOpeningStateList == null) {
1726                 // Failed getting RPC connection to this server
1727                 return false;
1728               }
1729               for (int k = 0, n = regionOpeningStateList.size(); k < n; k++) {
1730                 RegionOpeningState openingState = regionOpeningStateList.get(k);
1731                 if (openingState != RegionOpeningState.OPENED) {
1732                   HRegionInfo region = regionOpenInfos.get(k).getFirst();
1733                   if (openingState == RegionOpeningState.ALREADY_OPENED) {
1734                     processAlreadyOpenedRegion(region, destination);
1735                   } else if (openingState == RegionOpeningState.FAILED_OPENING) {
1736                     // Failed opening this region, reassign it later
1737                     failedToOpenRegions.add(region);
1738                   } else {
1739                     LOG.warn("THIS SHOULD NOT HAPPEN: unknown opening state "
1740                       + openingState + " in assigning region " + region);
1741                   }
1742                 }
1743               }
1744               break;
1745             } catch (IOException e) {
1746               if (e instanceof RemoteException) {
1747                 e = ((RemoteException)e).unwrapRemoteException();
1748               }
1749               if (e instanceof RegionServerStoppedException) {
1750                 LOG.warn("The region server was shut down, ", e);
1751                 // No need to retry, the region server is a goner.
1752                 return false;
1753               } else if (e instanceof ServerNotRunningYetException) {
1754                 long now = System.currentTimeMillis();
1755                 if (now < maxWaitTime) {
1756                   LOG.debug("Server is not yet up; waiting up to " +
1757                     (maxWaitTime - now) + "ms", e);
1758                   Thread.sleep(100);
1759                   i--; // reset the try count
1760                   continue;
1761                 }
1762               } else if (e instanceof java.net.SocketTimeoutException
1763                   && this.serverManager.isServerOnline(destination)) {
1764                 // In case socket is timed out and the region server is still online,
1765                 // the openRegion RPC could have been accepted by the server and
1766                 // just the response didn't go through.  So we will retry to
1767                 // open the region on the same server.
1768                 if (LOG.isDebugEnabled()) {
1769                   LOG.debug("Bulk assigner openRegion() to " + destination
1770                     + " has timed out, but the regions might"
1771                     + " already be opened on it.", e);
1772                 }
1773                 // wait and reset the re-try count, server might be just busy.
1774                 Thread.sleep(100);
1775                 i--;
1776                 continue;
1777               }
1778               throw e;
1779             }
1780           }
1781         } catch (IOException e) {
1782           // Can be a socket timeout, EOF, NoRouteToHost, etc
1783           LOG.info("Unable to communicate with " + destination
1784             + " in order to assign regions, ", e);
1785           return false;
1786         }
1787       } finally {
1788         for (Lock lock : locks.values()) {
1789           lock.unlock();
1790         }
1791       }
1792 
1793       if (!failedToOpenRegions.isEmpty()) {
1794         for (HRegionInfo region : failedToOpenRegions) {
1795           if (!regionStates.isRegionOnline(region)) {
1796             invokeAssign(region);
1797           }
1798         }
1799       }
1800 
1801       // wait for assignment completion
1802       ArrayList<HRegionInfo> userRegionSet = new ArrayList<HRegionInfo>(regions.size());
1803       for (HRegionInfo region: regions) {
1804         if (!region.getTable().isSystemTable()) {
1805           userRegionSet.add(region);
1806         }
1807       }
1808       if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
1809             System.currentTimeMillis())) {
1810         LOG.debug("some user regions are still in transition: " + userRegionSet);
1811       }
1812       LOG.debug("Bulk assigning done for " + destination);
1813       return true;
1814     } finally {
1815       metricsAssignmentManager.updateBulkAssignTime(EnvironmentEdgeManager.currentTime() - startTime);
1816     }
1817   }
1818 
1819   /**
1820    * Send CLOSE RPC if the server is online, otherwise, offline the region.
1821    *
1822    * The RPC will be sent only to the region sever found in the region state
1823    * if it is passed in, otherwise, to the src server specified. If region
1824    * state is not specified, we don't update region state at all, instead
1825    * we just send the RPC call. This is useful for some cleanup without
1826    * messing around the region states (see handleRegion, on region opened
1827    * on an unexpected server scenario, for an example)
1828    */
1829   private void unassign(final HRegionInfo region,
1830       final RegionState state, final int versionOfClosingNode,
1831       final ServerName dest, final boolean transitionInZK,
1832       final ServerName src) {
1833     ServerName server = src;
1834     if (state != null) {
1835       server = state.getServerName();
1836     }
1837     long maxWaitTime = -1;
1838     for (int i = 1; i <= this.maximumAttempts; i++) {
1839       if (this.server.isStopped() || this.server.isAborted()) {
1840         LOG.debug("Server stopped/aborted; skipping unassign of " + region);
1841         return;
1842       }
1843       // ClosedRegionhandler can remove the server from this.regions
1844       if (!serverManager.isServerOnline(server)) {
1845         LOG.debug("Offline " + region.getRegionNameAsString()
1846           + ", no need to unassign since it's on a dead server: " + server);
1847         if (transitionInZK) {
1848           // delete the node. if no node exists need not bother.
1849           deleteClosingOrClosedNode(region, server);
1850         }
1851         if (state != null) {
1852           regionOffline(region);
1853         }
1854         return;
1855       }
1856       try {
1857         // Send CLOSE RPC
1858         if (serverManager.sendRegionClose(server, region,
1859           versionOfClosingNode, dest, transitionInZK)) {
1860           LOG.debug("Sent CLOSE to " + server + " for region " +
1861             region.getRegionNameAsString());
1862           if (useZKForAssignment && !transitionInZK && state != null) {
1863             // Retry to make sure the region is
1864             // closed so as to avoid double assignment.
1865             unassign(region, state, versionOfClosingNode,
1866               dest, transitionInZK, src);
1867           }
1868           return;
1869         }
1870         // This never happens. Currently regionserver close always return true.
1871         // Todo; this can now happen (0.96) if there is an exception in a coprocessor
1872         LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
1873           region.getRegionNameAsString());
1874       } catch (Throwable t) {
1875         long sleepTime = 0;
1876         Configuration conf = this.server.getConfiguration();
1877         if (t instanceof RemoteException) {
1878           t = ((RemoteException)t).unwrapRemoteException();
1879         }
1880         boolean logRetries = true;
1881         if (t instanceof RegionServerAbortedException
1882             || t instanceof RegionServerStoppedException
1883             || t instanceof ServerNotRunningYetException) {
1884           // RS is aborting or stopping, we cannot offline the region since the region may need
1885           // to do WAL recovery. Until we see  the RS expiration, we should retry.
1886           sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1887             RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
1888 
1889         } else if (t instanceof NotServingRegionException) {
1890           LOG.debug("Offline " + region.getRegionNameAsString()
1891             + ", it's not any more on " + server, t);
1892           if (transitionInZK) {
1893             deleteClosingOrClosedNode(region, server);
1894           }
1895           if (state != null) {
1896             regionOffline(region);
1897           }
1898           return;
1899         } else if ((t instanceof FailedServerException) || (state != null &&
1900             t instanceof RegionAlreadyInTransitionException)) {
1901           if(t instanceof FailedServerException) {
1902             sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1903                   RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
1904           } else {
1905             // RS is already processing this region, only need to update the timestamp
1906             LOG.debug("update " + state + " the timestamp.");
1907             state.updateTimestampToNow();
1908             if (maxWaitTime < 0) {
1909               maxWaitTime =
1910                   EnvironmentEdgeManager.currentTime()
1911                       + conf.getLong(ALREADY_IN_TRANSITION_WAITTIME,
1912                         DEFAULT_ALREADY_IN_TRANSITION_WAITTIME);
1913             }
1914             long now = EnvironmentEdgeManager.currentTime();
1915             if (now < maxWaitTime) {
1916               LOG.debug("Region is already in transition; "
1917                 + "waiting up to " + (maxWaitTime - now) + "ms", t);
1918               sleepTime = 100;
1919               i--; // reset the try count
1920               logRetries = false;
1921             }
1922           }
1923         }
1924 
1925         try {
1926           if (sleepTime > 0) {
1927             Thread.sleep(sleepTime);
1928           }
1929         } catch (InterruptedException ie) {
1930           LOG.warn("Failed to unassign "
1931             + region.getRegionNameAsString() + " since interrupted", ie);
1932           Thread.currentThread().interrupt();
1933           if (state != null) {
1934             regionStates.updateRegionState(region, State.FAILED_CLOSE);
1935           }
1936           return;
1937         }
1938 
1939         if (logRetries) {
1940           LOG.info("Server " + server + " returned " + t + " for "
1941             + region.getRegionNameAsString() + ", try=" + i
1942             + " of " + this.maximumAttempts, t);
1943           // Presume retry or server will expire.
1944         }
1945       }
1946     }
1947     // Run out of attempts
1948     if (state != null) {
1949       regionStates.updateRegionState(region, State.FAILED_CLOSE);
1950     }
1951   }
1952 
1953   /**
1954    * Set region to OFFLINE unless it is opening and forceNewPlan is false.
1955    */
1956   private RegionState forceRegionStateToOffline(
1957       final HRegionInfo region, final boolean forceNewPlan) {
1958     RegionState state = regionStates.getRegionState(region);
1959     if (state == null) {
1960       LOG.warn("Assigning but not in region states: " + region);
1961       state = regionStates.createRegionState(region);
1962     }
1963 
1964     ServerName sn = state.getServerName();
1965     if (forceNewPlan && LOG.isDebugEnabled()) {
1966       LOG.debug("Force region state offline " + state);
1967     }
1968 
1969     switch (state.getState()) {
1970     case OPEN:
1971     case OPENING:
1972     case PENDING_OPEN:
1973     case CLOSING:
1974     case PENDING_CLOSE:
1975       if (!forceNewPlan) {
1976         LOG.debug("Skip assigning " +
1977           region + ", it is already " + state);
1978         return null;
1979       }
1980     case FAILED_CLOSE:
1981     case FAILED_OPEN:
1982       unassign(region, state, -1, null, false, null);
1983       state = regionStates.getRegionState(region);
1984       if (state.isFailedClose()) {
1985         // If we can't close the region, we can't re-assign
1986         // it so as to avoid possible double assignment/data loss.
1987         LOG.info("Skip assigning " +
1988           region + ", we couldn't close it: " + state);
1989         return null;
1990       }
1991     case OFFLINE:
1992       // This region could have been open on this server
1993       // for a while. If the server is dead and not processed
1994       // yet, we can move on only if the meta shows the
1995       // region is not on this server actually, or on a server
1996       // not dead, or dead and processed already.
1997       // In case not using ZK, we don't need this check because
1998       // we have the latest info in memory, and the caller
1999       // will do another round checking any way.
2000       if (useZKForAssignment
2001           && regionStates.isServerDeadAndNotProcessed(sn)
2002           && wasRegionOnDeadServerByMeta(region, sn)) {
2003         if (!regionStates.isRegionInTransition(region)) {
2004           LOG.info("Updating the state to " + State.OFFLINE + " to allow to be reassigned by SSH");
2005           regionStates.updateRegionState(region, State.OFFLINE);
2006         }
2007         LOG.info("Skip assigning " + region.getRegionNameAsString()
2008             + ", it is on a dead but not processed yet server: " + sn);
2009         return null;
2010       }
2011     case CLOSED:
2012       break;
2013     default:
2014       LOG.error("Trying to assign region " + region
2015         + ", which is " + state);
2016       return null;
2017     }
2018     return state;
2019   }
2020 
2021   @SuppressWarnings("deprecation")
2022   protected boolean wasRegionOnDeadServerByMeta(
2023       final HRegionInfo region, final ServerName sn) {
2024     try {
2025       if (region.isMetaRegion()) {
2026         ServerName server = this.server.getMetaTableLocator().
2027           getMetaRegionLocation(this.server.getZooKeeper());
2028         return regionStates.isServerDeadAndNotProcessed(server);
2029       }
2030       while (!server.isStopped()) {
2031         try {
2032           this.server.getMetaTableLocator().waitMetaRegionLocation(server.getZooKeeper());
2033           Result r = MetaTableAccessor.getRegionResult(server.getConnection(),
2034             region.getRegionName());
2035           if (r == null || r.isEmpty()) return false;
2036           ServerName server = HRegionInfo.getServerName(r);
2037           return regionStates.isServerDeadAndNotProcessed(server);
2038         } catch (IOException ioe) {
2039           LOG.info("Received exception accessing hbase:meta during force assign "
2040             + region.getRegionNameAsString() + ", retrying", ioe);
2041         }
2042       }
2043     } catch (InterruptedException e) {
2044       Thread.currentThread().interrupt();
2045       LOG.info("Interrupted accessing hbase:meta", e);
2046     }
2047     // Call is interrupted or server is stopped.
2048     return regionStates.isServerDeadAndNotProcessed(sn);
2049   }
2050 
2051   /**
2052    * Caller must hold lock on the passed <code>state</code> object.
2053    * @param state
2054    * @param setOfflineInZK
2055    * @param forceNewPlan
2056    */
2057   private void assign(RegionState state,
2058       boolean setOfflineInZK, final boolean forceNewPlan) {
2059     long startTime = EnvironmentEdgeManager.currentTime();
2060     try {
2061       Configuration conf = server.getConfiguration();
2062       RegionState currentState = state;
2063       int versionOfOfflineNode = -1;
2064       RegionPlan plan = null;
2065       long maxWaitTime = -1;
2066       HRegionInfo region = state.getRegion();
2067       RegionOpeningState regionOpenState;
2068       Throwable previousException = null;
2069       for (int i = 1; i <= maximumAttempts; i++) {
2070         if (server.isStopped() || server.isAborted()) {
2071           LOG.info("Skip assigning " + region.getRegionNameAsString()
2072             + ", the server is stopped/aborted");
2073           return;
2074         }
2075 
2076         if (plan == null) { // Get a server for the region at first
2077           try {
2078             plan = getRegionPlan(region, forceNewPlan);
2079           } catch (HBaseIOException e) {
2080             LOG.warn("Failed to get region plan", e);
2081           }
2082         }
2083 
2084         if (plan == null) {
2085           LOG.warn("Unable to determine a plan to assign " + region);
2086 
2087           // For meta region, we have to keep retrying until succeeding
2088           if (region.isMetaRegion()) {
2089             if (i == maximumAttempts) {
2090               i = 0; // re-set attempt count to 0 for at least 1 retry
2091 
2092               LOG.warn("Unable to determine a plan to assign a hbase:meta region " + region +
2093                 " after maximumAttempts (" + this.maximumAttempts +
2094                 "). Reset attempts count and continue retrying.");
2095             }
2096             waitForRetryingMetaAssignment();
2097             continue;
2098           }
2099 
2100           regionStates.updateRegionState(region, State.FAILED_OPEN);
2101           return;
2102         }
2103         if (setOfflineInZK && versionOfOfflineNode == -1) {
2104           LOG.info("Setting node as OFFLINED in ZooKeeper for region " + region);
2105           // get the version of the znode after setting it to OFFLINE.
2106           // versionOfOfflineNode will be -1 if the znode was not set to OFFLINE
2107           versionOfOfflineNode = setOfflineInZooKeeper(currentState, plan.getDestination());
2108           if (versionOfOfflineNode != -1) {
2109             if (isDisabledorDisablingRegionInRIT(region)) {
2110               return;
2111             }
2112             // In case of assignment from EnableTableHandler table state is ENABLING. Any how
2113             // EnableTableHandler will set ENABLED after assigning all the table regions. If we
2114             // try to set to ENABLED directly then client API may think table is enabled.
2115             // When we have a case such as all the regions are added directly into hbase:meta and we call
2116             // assignRegion then we need to make the table ENABLED. Hence in such case the table
2117             // will not be in ENABLING or ENABLED state.
2118             TableName tableName = region.getTable();
2119             if (!tableStateManager.isTableState(tableName,
2120               ZooKeeperProtos.Table.State.ENABLED, ZooKeeperProtos.Table.State.ENABLING)) {
2121               LOG.debug("Setting table " + tableName + " to ENABLED state.");
2122               setEnabledTable(tableName);
2123             }
2124           }
2125         }
2126         if (setOfflineInZK && versionOfOfflineNode == -1) {
2127           LOG.info("Unable to set offline in ZooKeeper to assign " + region);
2128           // Setting offline in ZK must have been failed due to ZK racing or some
2129           // exception which may make the server to abort. If it is ZK racing,
2130           // we should retry since we already reset the region state,
2131           // existing (re)assignment will fail anyway.
2132           if (!server.isAborted()) {
2133             continue;
2134           }
2135         }
2136         LOG.info("Assigning " + region.getRegionNameAsString() +
2137             " to " + plan.getDestination().toString());
2138         // Transition RegionState to PENDING_OPEN
2139         currentState = regionStates.updateRegionState(region,
2140           State.PENDING_OPEN, plan.getDestination());
2141 
2142         boolean needNewPlan;
2143         final String assignMsg = "Failed assignment of " + region.getRegionNameAsString() +
2144             " to " + plan.getDestination();
2145         try {
2146           List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
2147           if (this.shouldAssignRegionsWithFavoredNodes) {
2148             favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
2149           }
2150           regionOpenState = serverManager.sendRegionOpen(
2151               plan.getDestination(), region, versionOfOfflineNode, favoredNodes);
2152 
2153           if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
2154             // Failed opening this region, looping again on a new server.
2155             needNewPlan = true;
2156             LOG.warn(assignMsg + ", regionserver says 'FAILED_OPENING', " +
2157                 " trying to assign elsewhere instead; " +
2158                 "try=" + i + " of " + this.maximumAttempts);
2159           } else {
2160             // we're done
2161             if (regionOpenState == RegionOpeningState.ALREADY_OPENED) {
2162               processAlreadyOpenedRegion(region, plan.getDestination());
2163             }
2164             return;
2165           }
2166 
2167         } catch (Throwable t) {
2168           if (t instanceof RemoteException) {
2169             t = ((RemoteException) t).unwrapRemoteException();
2170           }
2171           previousException = t;
2172 
2173           // Should we wait a little before retrying? If the server is starting it's yes.
2174           // If the region is already in transition, it's yes as well: we want to be sure that
2175           //  the region will get opened but we don't want a double assignment.
2176           boolean hold = (t instanceof RegionAlreadyInTransitionException ||
2177               t instanceof ServerNotRunningYetException);
2178 
2179           // In case socket is timed out and the region server is still online,
2180           // the openRegion RPC could have been accepted by the server and
2181           // just the response didn't go through.  So we will retry to
2182           // open the region on the same server to avoid possible
2183           // double assignment.
2184           boolean retry = !hold && (t instanceof java.net.SocketTimeoutException
2185               && this.serverManager.isServerOnline(plan.getDestination()));
2186 
2187 
2188           if (hold) {
2189             LOG.warn(assignMsg + ", waiting a little before trying on the same region server " +
2190               "try=" + i + " of " + this.maximumAttempts, t);
2191 
2192             if (maxWaitTime < 0) {
2193               if (t instanceof RegionAlreadyInTransitionException) {
2194                 maxWaitTime = EnvironmentEdgeManager.currentTime()
2195                   + this.server.getConfiguration().getLong(ALREADY_IN_TRANSITION_WAITTIME,
2196                     DEFAULT_ALREADY_IN_TRANSITION_WAITTIME);
2197               } else {
2198                 maxWaitTime = EnvironmentEdgeManager.currentTime()
2199                   + this.server.getConfiguration().getLong(
2200                     "hbase.regionserver.rpc.startup.waittime", 60000);
2201               }
2202             }
2203             try {
2204               needNewPlan = false;
2205               long now = EnvironmentEdgeManager.currentTime();
2206               if (now < maxWaitTime) {
2207                 LOG.debug("Server is not yet up or region is already in transition; "
2208                   + "waiting up to " + (maxWaitTime - now) + "ms", t);
2209                 Thread.sleep(100);
2210                 i--; // reset the try count
2211               } else if (!(t instanceof RegionAlreadyInTransitionException)) {
2212                 LOG.debug("Server is not up for a while; try a new one", t);
2213                 needNewPlan = true;
2214               }
2215             } catch (InterruptedException ie) {
2216               LOG.warn("Failed to assign "
2217                   + region.getRegionNameAsString() + " since interrupted", ie);
2218               regionStates.updateRegionState(region, State.FAILED_OPEN);
2219               Thread.currentThread().interrupt();
2220               return;
2221             }
2222           } else if (retry) {
2223             needNewPlan = false;
2224             i--; // we want to retry as many times as needed as long as the RS is not dead.
2225             LOG.warn(assignMsg + ", trying to assign to the same region server due ", t);
2226           } else {
2227             needNewPlan = true;
2228             LOG.warn(assignMsg + ", trying to assign elsewhere instead;" +
2229                 " try=" + i + " of " + this.maximumAttempts, t);
2230           }
2231         }
2232 
2233         if (i == this.maximumAttempts) {
2234           // For meta region, we have to keep retrying until succeeding
2235           if (region.isMetaRegion()) {
2236             i = 0; // re-set attempt count to 0 for at least 1 retry
2237             LOG.warn(assignMsg +
2238                 ", trying to assign a hbase:meta region reached to maximumAttempts (" +
2239                 this.maximumAttempts + ").  Reset attempt counts and continue retrying.");
2240             waitForRetryingMetaAssignment();
2241           }
2242           else {
2243             // Don't reset the region state or get a new plan any more.
2244             // This is the last try.
2245             continue;
2246           }
2247         }
2248 
2249         // If region opened on destination of present plan, reassigning to new
2250         // RS may cause double assignments. In case of RegionAlreadyInTransitionException
2251         // reassigning to same RS.
2252         if (needNewPlan) {
2253           // Force a new plan and reassign. Will return null if no servers.
2254           // The new plan could be the same as the existing plan since we don't
2255           // exclude the server of the original plan, which should not be
2256           // excluded since it could be the only server up now.
2257           RegionPlan newPlan = null;
2258           try {
2259             newPlan = getRegionPlan(region, true);
2260           } catch (HBaseIOException e) {
2261             LOG.warn("Failed to get region plan", e);
2262           }
2263           if (newPlan == null) {
2264             regionStates.updateRegionState(region, State.FAILED_OPEN);
2265             LOG.warn("Unable to find a viable location to assign region " +
2266                 region.getRegionNameAsString());
2267             return;
2268           }
2269 
2270           if (plan != newPlan && !plan.getDestination().equals(newPlan.getDestination())) {
2271             // Clean out plan we failed execute and one that doesn't look like it'll
2272             // succeed anyways; we need a new plan!
2273             // Transition back to OFFLINE
2274             LOG.info("Region assignment plan changed from " + plan.getDestination() + " to "
2275                 + newPlan.getDestination() + " server.");
2276             currentState = regionStates.updateRegionState(region, State.OFFLINE);
2277             versionOfOfflineNode = -1;
2278             if (useZKForAssignment) {
2279               setOfflineInZK = true;
2280             }
2281             plan = newPlan;
2282           } else if(plan.getDestination().equals(newPlan.getDestination()) &&
2283               previousException instanceof FailedServerException) {
2284             try {
2285               LOG.info("Trying to re-assign " + region.getRegionNameAsString() +
2286                 " to the same failed server.");
2287               Thread.sleep(1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
2288                 RpcClient.FAILED_SERVER_EXPIRY_DEFAULT));
2289             } catch (InterruptedException ie) {
2290               LOG.warn("Failed to assign "
2291                   + region.getRegionNameAsString() + " since interrupted", ie);
2292               regionStates.updateRegionState(region, State.FAILED_OPEN);
2293               Thread.currentThread().interrupt();
2294               return;
2295             }
2296           }
2297         }
2298       }
2299       // Run out of attempts
2300       regionStates.updateRegionState(region, State.FAILED_OPEN);
2301     } finally {
2302       metricsAssignmentManager.updateAssignmentTime(EnvironmentEdgeManager.currentTime() - startTime);
2303     }
2304   }
2305 
2306   private void processAlreadyOpenedRegion(HRegionInfo region, ServerName sn) {
2307     // Remove region from in-memory transition and unassigned node from ZK
2308     // While trying to enable the table the regions of the table were
2309     // already enabled.
2310     LOG.debug("ALREADY_OPENED " + region.getRegionNameAsString()
2311       + " to " + sn);
2312     String encodedName = region.getEncodedName();
2313     
2314     //If use ZkForAssignment, region already Opened event should not be handled, 
2315     //leave it to zk event. See HBase-14407.
2316     if(useZKForAssignment){
2317       String node = ZKAssign.getNodeName(watcher, encodedName);
2318       Stat stat = new Stat();
2319       try {
2320         byte[] existingBytes = ZKUtil.getDataNoWatch(watcher, node, stat);
2321         if(existingBytes!=null){
2322           RegionTransition rt= RegionTransition.parseFrom(existingBytes);
2323           EventType et = rt.getEventType();
2324           if (et.equals(EventType.RS_ZK_REGION_OPENED)) {
2325             LOG.debug("ALREADY_OPENED " + region.getRegionNameAsString()
2326               + " and node in "+et+" state");
2327             return;
2328           }
2329         }
2330       } catch (KeeperException ke) {
2331         LOG.warn("Unexpected ZK exception getData " + node
2332           + " node for the region " + encodedName, ke);
2333       } catch (DeserializationException e) {
2334         LOG.warn("Get RegionTransition from zk deserialization failed! ", e);
2335       }
2336       
2337       deleteNodeInStates(encodedName, "offline", sn, EventType.M_ZK_REGION_OFFLINE);
2338     }
2339     
2340     regionStates.regionOnline(region, sn);
2341   }
2342 
2343   private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) {
2344     if (this.tableStateManager.isTableState(region.getTable(),
2345         ZooKeeperProtos.Table.State.DISABLED,
2346         ZooKeeperProtos.Table.State.DISABLING) || replicasToClose.contains(region)) {
2347       LOG.info("Table " + region.getTable() + " is disabled or disabling;"
2348         + " skipping assign of " + region.getRegionNameAsString());
2349       offlineDisabledRegion(region);
2350       return true;
2351     }
2352     return false;
2353   }
2354 
2355   /**
2356    * Set region as OFFLINED up in zookeeper
2357    *
2358    * @param state
2359    * @return the version of the offline node if setting of the OFFLINE node was
2360    *         successful, -1 otherwise.
2361    */
2362   private int setOfflineInZooKeeper(final RegionState state, final ServerName destination) {
2363     if (!state.isClosed() && !state.isOffline()) {
2364       String msg = "Unexpected state : " + state + " .. Cannot transit it to OFFLINE.";
2365       this.server.abort(msg, new IllegalStateException(msg));
2366       return -1;
2367     }
2368     regionStates.updateRegionState(state.getRegion(), State.OFFLINE);
2369     int versionOfOfflineNode;
2370     try {
2371       // get the version after setting the znode to OFFLINE
2372       versionOfOfflineNode = ZKAssign.createOrForceNodeOffline(watcher,
2373         state.getRegion(), destination);
2374       if (versionOfOfflineNode == -1) {
2375         LOG.warn("Attempted to create/force node into OFFLINE state before "
2376             + "completing assignment but failed to do so for " + state);
2377         return -1;
2378       }
2379     } catch (KeeperException e) {
2380       server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
2381       return -1;
2382     }
2383     return versionOfOfflineNode;
2384   }
2385 
2386   /**
2387    * @param region the region to assign
2388    * @return Plan for passed <code>region</code> (If none currently, it creates one or
2389    * if no servers to assign, it returns null).
2390    */
2391   private RegionPlan getRegionPlan(final HRegionInfo region,
2392       final boolean forceNewPlan)  throws HBaseIOException {
2393     return getRegionPlan(region, null, forceNewPlan);
2394   }
2395 
2396   /**
2397    * @param region the region to assign
2398    * @param serverToExclude Server to exclude (we know its bad). Pass null if
2399    * all servers are thought to be assignable.
2400    * @param forceNewPlan If true, then if an existing plan exists, a new plan
2401    * will be generated.
2402    * @return Plan for passed <code>region</code> (If none currently, it creates one or
2403    * if no servers to assign, it returns null).
2404    */
2405   private RegionPlan getRegionPlan(final HRegionInfo region,
2406       final ServerName serverToExclude, final boolean forceNewPlan) throws HBaseIOException {
2407     // Pickup existing plan or make a new one
2408     final String encodedName = region.getEncodedName();
2409     final List<ServerName> destServers =
2410       serverManager.createDestinationServersList(serverToExclude);
2411 
2412     if (destServers.isEmpty()){
2413       LOG.warn("Can't move " + encodedName +
2414         ", there is no destination server available.");
2415       return null;
2416     }
2417 
2418     RegionPlan randomPlan = null;
2419     boolean newPlan = false;
2420     RegionPlan existingPlan;
2421 
2422     synchronized (this.regionPlans) {
2423       existingPlan = this.regionPlans.get(encodedName);
2424 
2425       if (existingPlan != null && existingPlan.getDestination() != null) {
2426         LOG.debug("Found an existing plan for " + region.getRegionNameAsString()
2427           + " destination server is " + existingPlan.getDestination() +
2428             " accepted as a dest server = " + destServers.contains(existingPlan.getDestination()));
2429       }
2430 
2431       if (forceNewPlan
2432           || existingPlan == null
2433           || existingPlan.getDestination() == null
2434           || !destServers.contains(existingPlan.getDestination())) {
2435         newPlan = true;
2436       }
2437     }
2438 
2439     if (newPlan) {
2440       ServerName destination = balancer.randomAssignment(region, destServers);
2441       if (destination == null) {
2442         LOG.warn("Can't find a destination for " + encodedName);
2443         return null;
2444       }
2445       synchronized (this.regionPlans) {
2446         randomPlan = new RegionPlan(region, null, destination);
2447         if (!region.isMetaTable() && shouldAssignRegionsWithFavoredNodes) {
2448           List<HRegionInfo> regions = new ArrayList<HRegionInfo>(1);
2449           regions.add(region);
2450           try {
2451             processFavoredNodes(regions);
2452           } catch (IOException ie) {
2453             LOG.warn("Ignoring exception in processFavoredNodes " + ie);
2454           }
2455         }
2456         this.regionPlans.put(encodedName, randomPlan);
2457       }
2458       LOG.debug("No previous transition plan found (or ignoring " + "an existing plan) for "
2459           + region.getRegionNameAsString() + "; generated random plan=" + randomPlan + "; "
2460           + destServers.size() + " (online=" + serverManager.getOnlineServers().size()
2461           + ") available servers, forceNewPlan=" + forceNewPlan);
2462       return randomPlan;
2463     }
2464     LOG.debug("Using pre-existing plan for " +
2465       region.getRegionNameAsString() + "; plan=" + existingPlan);
2466     return existingPlan;
2467   }
2468 
2469   /**
2470    * Wait for some time before retrying meta table region assignment
2471    */
2472   private void waitForRetryingMetaAssignment() {
2473     try {
2474       Thread.sleep(this.sleepTimeBeforeRetryingMetaAssignment);
2475     } catch (InterruptedException e) {
2476       LOG.error("Got exception while waiting for hbase:meta assignment");
2477       Thread.currentThread().interrupt();
2478     }
2479   }
2480 
2481   /**
2482    * Unassigns the specified region.
2483    * <p>
2484    * Updates the RegionState and sends the CLOSE RPC unless region is being
2485    * split by regionserver; then the unassign fails (silently) because we
2486    * presume the region being unassigned no longer exists (its been split out
2487    * of existence). TODO: What to do if split fails and is rolled back and
2488    * parent is revivified?
2489    * <p>
2490    * If a RegionPlan is already set, it will remain.
2491    *
2492    * @param region server to be unassigned
2493    */
2494   public void unassign(HRegionInfo region) {
2495     unassign(region, false);
2496   }
2497 
2498 
2499   /**
2500    * Unassigns the specified region.
2501    * <p>
2502    * Updates the RegionState and sends the CLOSE RPC unless region is being
2503    * split by regionserver; then the unassign fails (silently) because we
2504    * presume the region being unassigned no longer exists (its been split out
2505    * of existence). TODO: What to do if split fails and is rolled back and
2506    * parent is revivified?
2507    * <p>
2508    * If a RegionPlan is already set, it will remain.
2509    *
2510    * @param region server to be unassigned
2511    * @param force if region should be closed even if already closing
2512    */
2513   public void unassign(HRegionInfo region, boolean force, ServerName dest) {
2514     // TODO: Method needs refactoring.  Ugly buried returns throughout.  Beware!
2515     LOG.debug("Starting unassign of " + region.getRegionNameAsString()
2516       + " (offlining), current state: " + regionStates.getRegionState(region));
2517 
2518     String encodedName = region.getEncodedName();
2519     // Grab the state of this region and synchronize on it
2520     int versionOfClosingNode = -1;
2521     // We need a lock here as we're going to do a put later and we don't want multiple states
2522     //  creation
2523     ReentrantLock lock = locker.acquireLock(encodedName);
2524     RegionState state = regionStates.getRegionTransitionState(encodedName);
2525     boolean reassign = true;
2526     try {
2527       if (state == null) {
2528         // Region is not in transition.
2529         // We can unassign it only if it's not SPLIT/MERGED.
2530         state = regionStates.getRegionState(encodedName);
2531         if (state != null && state.isUnassignable()) {
2532           LOG.info("Attempting to unassign " + state + ", ignored");
2533           // Offline region will be reassigned below
2534           return;
2535         }
2536         // Create the znode in CLOSING state
2537         try {
2538           if (state == null || state.getServerName() == null) {
2539             // We don't know where the region is, offline it.
2540             // No need to send CLOSE RPC
2541             LOG.warn("Attempting to unassign a region not in RegionStates "
2542               + region.getRegionNameAsString() + ", offlined");
2543             regionOffline(region);
2544             return;
2545           }
2546           if (useZKForAssignment) {
2547             versionOfClosingNode = ZKAssign.createNodeClosing(
2548               watcher, region, state.getServerName());
2549             if (versionOfClosingNode == -1) {
2550               LOG.info("Attempting to unassign " +
2551                 region.getRegionNameAsString() + " but ZK closing node "
2552                 + "can't be created.");
2553               reassign = false; // not unassigned at all
2554               return;
2555             }
2556           }
2557         } catch (KeeperException e) {
2558           if (e instanceof NodeExistsException) {
2559             // Handle race between master initiated close and regionserver
2560             // orchestrated splitting. See if existing node is in a
2561             // SPLITTING or SPLIT state.  If so, the regionserver started
2562             // an op on node before we could get our CLOSING in.  Deal.
2563             NodeExistsException nee = (NodeExistsException)e;
2564             String path = nee.getPath();
2565             try {
2566               if (isSplitOrSplittingOrMergedOrMerging(path)) {
2567                 LOG.debug(path + " is SPLIT or SPLITTING or MERGED or MERGING; " +
2568                   "skipping unassign because region no longer exists -- its split or merge");
2569                 reassign = false; // no need to reassign for split/merged region
2570                 return;
2571               }
2572             } catch (KeeperException.NoNodeException ke) {
2573               LOG.warn("Failed getData on SPLITTING/SPLIT at " + path +
2574                 "; presuming split and that the region to unassign, " +
2575                 encodedName + ", no longer exists -- confirm", ke);
2576               return;
2577             } catch (KeeperException ke) {
2578               LOG.error("Unexpected zk state", ke);
2579             } catch (DeserializationException de) {
2580               LOG.error("Failed parse", de);
2581             }
2582           }
2583           // If we get here, don't understand whats going on -- abort.
2584           server.abort("Unexpected ZK exception creating node CLOSING", e);
2585           reassign = false; // heading out already
2586           return;
2587         }
2588         state = regionStates.updateRegionState(region, State.PENDING_CLOSE);
2589       } else if (state.isFailedOpen()) {
2590         // The region is not open yet
2591         regionOffline(region);
2592         return;
2593       } else if (force && state.isPendingCloseOrClosing()) {
2594         LOG.debug("Attempting to unassign " + region.getRegionNameAsString() +
2595           " which is already " + state.getState()  +
2596           " but forcing to send a CLOSE RPC again ");
2597         if (state.isFailedClose()) {
2598           state = regionStates.updateRegionState(region, State.PENDING_CLOSE);
2599         }
2600         state.updateTimestampToNow();
2601       } else {
2602         LOG.debug("Attempting to unassign " +
2603           region.getRegionNameAsString() + " but it is " +
2604           "already in transition (" + state.getState() + ", force=" + force + ")");
2605         return;
2606       }
2607 
2608       unassign(region, state, versionOfClosingNode, dest, useZKForAssignment, null);
2609     } finally {
2610       lock.unlock();
2611 
2612       // Region is expected to be reassigned afterwards
2613       if (!replicasToClose.contains(region) && reassign && regionStates.isRegionOffline(region)) {
2614         assign(region, true);
2615       }
2616     }
2617   }
2618 
2619   public void unassign(HRegionInfo region, boolean force){
2620      unassign(region, force, null);
2621   }
2622 
2623   /**
2624    * @param region regioninfo of znode to be deleted.
2625    */
2626   public void deleteClosingOrClosedNode(HRegionInfo region, ServerName sn) {
2627     String encodedName = region.getEncodedName();
2628     deleteNodeInStates(encodedName, "closing", sn, EventType.M_ZK_REGION_CLOSING,
2629       EventType.RS_ZK_REGION_CLOSED);
2630   }
2631 
2632   /**
2633    * @param path
2634    * @return True if znode is in SPLIT or SPLITTING or MERGED or MERGING state.
2635    * @throws KeeperException Can happen if the znode went away in meantime.
2636    * @throws DeserializationException
2637    */
2638   private boolean isSplitOrSplittingOrMergedOrMerging(final String path)
2639       throws KeeperException, DeserializationException {
2640     boolean result = false;
2641     // This may fail if the SPLIT or SPLITTING or MERGED or MERGING znode gets
2642     // cleaned up before we can get data from it.
2643     byte [] data = ZKAssign.getData(watcher, path);
2644     if (data == null) {
2645       LOG.info("Node " + path + " is gone");
2646       return false;
2647     }
2648     RegionTransition rt = RegionTransition.parseFrom(data);
2649     switch (rt.getEventType()) {
2650     case RS_ZK_REQUEST_REGION_SPLIT:
2651     case RS_ZK_REGION_SPLIT:
2652     case RS_ZK_REGION_SPLITTING:
2653     case RS_ZK_REQUEST_REGION_MERGE:
2654     case RS_ZK_REGION_MERGED:
2655     case RS_ZK_REGION_MERGING:
2656       result = true;
2657       break;
2658     default:
2659       LOG.info("Node " + path + " is in " + rt.getEventType());
2660       break;
2661     }
2662     return result;
2663   }
2664 
2665   /**
2666    * Used by unit tests. Return the number of regions opened so far in the life
2667    * of the master. Increases by one every time the master opens a region
2668    * @return the counter value of the number of regions opened so far
2669    */
2670   public int getNumRegionsOpened() {
2671     return numRegionsOpened.get();
2672   }
2673 
2674   /**
2675    * Waits until the specified region has completed assignment.
2676    * <p>
2677    * If the region is already assigned, returns immediately.  Otherwise, method
2678    * blocks until the region is assigned.
2679    * @param regionInfo region to wait on assignment for
2680    * @return true if the region is assigned false otherwise.
2681    * @throws InterruptedException
2682    */
2683   public boolean waitForAssignment(HRegionInfo regionInfo)
2684       throws InterruptedException {
2685     ArrayList<HRegionInfo> regionSet = new ArrayList<HRegionInfo>(1);
2686     regionSet.add(regionInfo);
2687     return waitForAssignment(regionSet, true, Long.MAX_VALUE);
2688   }
2689 
2690   /**
2691    * Waits until the specified region has completed assignment, or the deadline is reached.
2692    */
2693   protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
2694       final boolean waitTillAllAssigned, final int reassigningRegions,
2695       final long minEndTime) throws InterruptedException {
2696     long deadline = minEndTime + bulkPerRegionOpenTimeGuesstimate * (reassigningRegions + 1);
2697     if (deadline < 0) { // Overflow
2698       deadline = Long.MAX_VALUE; // wait forever
2699     }
2700     return waitForAssignment(regionSet, waitTillAllAssigned, deadline);
2701   }
2702 
2703   /**
2704    * Waits until the specified region has completed assignment, or the deadline is reached.
2705    * @param regionSet set of region to wait on. the set is modified and the assigned regions removed
2706    * @param waitTillAllAssigned true if we should wait all the regions to be assigned
2707    * @param deadline the timestamp after which the wait is aborted
2708    * @return true if all the regions are assigned false otherwise.
2709    * @throws InterruptedException
2710    */
2711   protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
2712       final boolean waitTillAllAssigned, final long deadline) throws InterruptedException {
2713     // We're not synchronizing on regionsInTransition now because we don't use any iterator.
2714     while (!regionSet.isEmpty() && !server.isStopped() && deadline > System.currentTimeMillis()) {
2715       int failedOpenCount = 0;
2716       Iterator<HRegionInfo> regionInfoIterator = regionSet.iterator();
2717       while (regionInfoIterator.hasNext()) {
2718         HRegionInfo hri = regionInfoIterator.next();
2719         if (regionStates.isRegionOnline(hri) || regionStates.isRegionInState(hri,
2720             State.SPLITTING, State.SPLIT, State.MERGING, State.MERGED)) {
2721           regionInfoIterator.remove();
2722         } else if (regionStates.isRegionInState(hri, State.FAILED_OPEN)) {
2723           failedOpenCount++;
2724         }
2725       }
2726       if (!waitTillAllAssigned) {
2727         // No need to wait, let assignment going on asynchronously
2728         break;
2729       }
2730       if (!regionSet.isEmpty()) {
2731         if (failedOpenCount == regionSet.size()) {
2732           // all the regions we are waiting had an error on open.
2733           break;
2734         }
2735         regionStates.waitForUpdate(100);
2736       }
2737     }
2738     return regionSet.isEmpty();
2739   }
2740 
2741   /**
2742    * Assigns the hbase:meta region or a replica.
2743    * <p>
2744    * Assumes that hbase:meta is currently closed and is not being actively served by
2745    * any RegionServer.
2746    * <p>
2747    * Forcibly unsets the current meta region location in ZooKeeper and assigns
2748    * hbase:meta to a random RegionServer.
2749    * @param hri TODO
2750    * @throws KeeperException
2751    */
2752   public void assignMeta(HRegionInfo hri) throws KeeperException {
2753     this.server.getMetaTableLocator().deleteMetaLocation(this.watcher, hri.getReplicaId());
2754     assign(hri, true);
2755   }
2756 
2757   /**
2758    * Assigns specified regions retaining assignments, if any.
2759    * <p>
2760    * This is a synchronous call and will return once every region has been
2761    * assigned.  If anything fails, an exception is thrown
2762    * @throws InterruptedException
2763    * @throws IOException
2764    */
2765   public void assign(Map<HRegionInfo, ServerName> regions)
2766         throws IOException, InterruptedException {
2767     if (regions == null || regions.isEmpty()) {
2768       return;
2769     }
2770     List<ServerName> servers = serverManager.createDestinationServersList();
2771     if (servers == null || servers.isEmpty()) {
2772       throw new IOException("Found no destination server to assign region(s)");
2773     }
2774 
2775     // Reuse existing assignment info
2776     Map<ServerName, List<HRegionInfo>> bulkPlan =
2777       balancer.retainAssignment(regions, servers);
2778     if (bulkPlan == null) {
2779       throw new IOException("Unable to determine a plan to assign region(s)");
2780     }
2781 
2782     assign(regions.size(), servers.size(),
2783       "retainAssignment=true", bulkPlan);
2784   }
2785 
2786   /**
2787    * Assigns specified regions round robin, if any.
2788    * <p>
2789    * This is a synchronous call and will return once every region has been
2790    * assigned.  If anything fails, an exception is thrown
2791    * @throws InterruptedException
2792    * @throws IOException
2793    */
2794   public void assign(List<HRegionInfo> regions)
2795         throws IOException, InterruptedException {
2796     if (regions == null || regions.isEmpty()) {
2797       return;
2798     }
2799 
2800     List<ServerName> servers = serverManager.createDestinationServersList();
2801     if (servers == null || servers.isEmpty()) {
2802       throw new IOException("Found no destination server to assign region(s)");
2803     }
2804 
2805     // Generate a round-robin bulk assignment plan
2806     Map<ServerName, List<HRegionInfo>> bulkPlan
2807       = balancer.roundRobinAssignment(regions, servers);
2808     if (bulkPlan == null) {
2809       throw new IOException("Unable to determine a plan to assign region(s)");
2810     }
2811 
2812     processFavoredNodes(regions);
2813     assign(regions.size(), servers.size(),
2814       "round-robin=true", bulkPlan);
2815   }
2816 
2817   private void assign(int regions, int totalServers,
2818       String message, Map<ServerName, List<HRegionInfo>> bulkPlan)
2819           throws InterruptedException, IOException {
2820 
2821     int servers = bulkPlan.size();
2822     if (servers == 1 || (regions < bulkAssignThresholdRegions
2823         && servers < bulkAssignThresholdServers)) {
2824 
2825       // Not use bulk assignment.  This could be more efficient in small
2826       // cluster, especially mini cluster for testing, so that tests won't time out
2827       if (LOG.isTraceEnabled()) {
2828         LOG.trace("Not using bulk assignment since we are assigning only " + regions +
2829           " region(s) to " + servers + " server(s)");
2830       }
2831 
2832       // invoke assignment (async)
2833       ArrayList<HRegionInfo> userRegionSet = new ArrayList<HRegionInfo>(regions);
2834       for (Map.Entry<ServerName, List<HRegionInfo>> plan: bulkPlan.entrySet()) {
2835         if (!assign(plan.getKey(), plan.getValue())) {
2836           for (HRegionInfo region: plan.getValue()) {
2837             if (!regionStates.isRegionOnline(region)) {
2838               invokeAssign(region);
2839               if (!region.getTable().isSystemTable()) {
2840                 userRegionSet.add(region);
2841               }
2842             }
2843           }
2844         }
2845       }
2846 
2847       // wait for assignment completion
2848       if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
2849             System.currentTimeMillis())) {
2850         LOG.debug("some user regions are still in transition: " + userRegionSet);
2851       }
2852     } else {
2853       LOG.info("Bulk assigning " + regions + " region(s) across "
2854         + totalServers + " server(s), " + message);
2855 
2856       // Use fixed count thread pool assigning.
2857       BulkAssigner ba = new GeneralBulkAssigner(
2858         this.server, bulkPlan, this, bulkAssignWaitTillAllAssigned);
2859       ba.bulkAssign();
2860       LOG.info("Bulk assigning done");
2861     }
2862   }
2863 
2864   /**
2865    * Assigns all user regions, if any exist.  Used during cluster startup.
2866    * <p>
2867    * This is a synchronous call and will return once every region has been
2868    * assigned.  If anything fails, an exception is thrown and the cluster
2869    * should be shutdown.
2870    * @throws InterruptedException
2871    * @throws IOException
2872    */
2873   private void assignAllUserRegions(Map<HRegionInfo, ServerName> allRegions)
2874       throws IOException, InterruptedException {
2875     if (allRegions == null || allRegions.isEmpty()) return;
2876 
2877     // Determine what type of assignment to do on startup
2878     boolean retainAssignment = server.getConfiguration().
2879       getBoolean("hbase.master.startup.retainassign", true);
2880 
2881     Set<HRegionInfo> regionsFromMetaScan = allRegions.keySet();
2882     if (retainAssignment) {
2883       assign(allRegions);
2884     } else {
2885       List<HRegionInfo> regions = new ArrayList<HRegionInfo>(regionsFromMetaScan);
2886       assign(regions);
2887     }
2888 
2889     for (HRegionInfo hri : regionsFromMetaScan) {
2890       TableName tableName = hri.getTable();
2891       if (!tableStateManager.isTableState(tableName,
2892           ZooKeeperProtos.Table.State.ENABLED)) {
2893         setEnabledTable(tableName);
2894       }
2895     }
2896     // assign all the replicas that were not recorded in the meta
2897     assign(replicaRegionsNotRecordedInMeta(regionsFromMetaScan, (MasterServices)server));
2898   }
2899 
2900   /**
2901    * Get a list of replica regions that are:
2902    * not recorded in meta yet. We might not have recorded the locations
2903    * for the replicas since the replicas may not have been online yet, master restarted
2904    * in the middle of assigning, ZK erased, etc.
2905    * @param regionsRecordedInMeta the list of regions we know are recorded in meta
2906    * either as a default, or, as the location of a replica
2907    * @param master
2908    * @return list of replica regions
2909    * @throws IOException
2910    */
2911   public static List<HRegionInfo> replicaRegionsNotRecordedInMeta(
2912       Set<HRegionInfo> regionsRecordedInMeta, MasterServices master)throws IOException {
2913     List<HRegionInfo> regionsNotRecordedInMeta = new ArrayList<HRegionInfo>();
2914     for (HRegionInfo hri : regionsRecordedInMeta) {
2915       TableName table = hri.getTable();
2916       HTableDescriptor htd = master.getTableDescriptors().get(table);
2917       // look at the HTD for the replica count. That's the source of truth
2918       int desiredRegionReplication = htd.getRegionReplication();
2919       for (int i = 0; i < desiredRegionReplication; i++) {
2920         HRegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hri, i);
2921         if (regionsRecordedInMeta.contains(replica)) continue;
2922         regionsNotRecordedInMeta.add(replica);
2923       }
2924     }
2925     return regionsNotRecordedInMeta;
2926   }
2927 
2928   /**
2929    * Wait until no regions in transition.
2930    * @param timeout How long to wait.
2931    * @return True if nothing in regions in transition.
2932    * @throws InterruptedException
2933    */
2934   boolean waitUntilNoRegionsInTransition(final long timeout)
2935       throws InterruptedException {
2936     // Blocks until there are no regions in transition. It is possible that
2937     // there
2938     // are regions in transition immediately after this returns but guarantees
2939     // that if it returns without an exception that there was a period of time
2940     // with no regions in transition from the point-of-view of the in-memory
2941     // state of the Master.
2942     final long endTime = System.currentTimeMillis() + timeout;
2943 
2944     while (!this.server.isStopped() && regionStates.isRegionsInTransition()
2945         && endTime > System.currentTimeMillis()) {
2946       regionStates.waitForUpdate(100);
2947     }
2948 
2949     return !regionStates.isRegionsInTransition();
2950   }
2951 
2952   /**
2953    * Rebuild the list of user regions and assignment information.
2954    * <p>
2955    * Returns a set of servers that are not found to be online that hosted
2956    * some regions.
2957    * @return set of servers not online that hosted some regions per meta
2958    * @throws IOException
2959    */
2960   Set<ServerName> rebuildUserRegions() throws
2961       IOException, KeeperException, CoordinatedStateException {
2962     Set<TableName> disabledOrEnablingTables = tableStateManager.getTablesInStates(
2963       ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.ENABLING);
2964 
2965     Set<TableName> disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
2966       ZooKeeperProtos.Table.State.DISABLED,
2967       ZooKeeperProtos.Table.State.DISABLING,
2968       ZooKeeperProtos.Table.State.ENABLING);
2969 
2970     // Region assignment from META
2971     List<Result> results = MetaTableAccessor.fullScanOfMeta(server.getConnection());
2972     // Get any new but slow to checkin region server that joined the cluster
2973     Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
2974     // Set of offline servers to be returned
2975     Set<ServerName> offlineServers = new HashSet<ServerName>();
2976     // Iterate regions in META
2977     for (Result result : results) {
2978       if (result == null && LOG.isDebugEnabled()){
2979         LOG.debug("null result from meta - ignoring but this is strange.");
2980         continue;
2981       }
2982       // keep a track of replicas to close. These were the replicas of the originally
2983       // unmerged regions. The master might have closed them before but it mightn't
2984       // maybe because it crashed.
2985       PairOfSameType<HRegionInfo> p = MetaTableAccessor.getMergeRegions(result);
2986       if (p.getFirst() != null && p.getSecond() != null) {
2987         int numReplicas = ((MasterServices)server).getTableDescriptors().get(p.getFirst().
2988             getTable()).getRegionReplication();
2989         for (HRegionInfo merge : p) {
2990           for (int i = 1; i < numReplicas; i++) {
2991             replicasToClose.add(RegionReplicaUtil.getRegionInfoForReplica(merge, i));
2992           }
2993         }
2994       }
2995       RegionLocations rl =  MetaTableAccessor.getRegionLocations(result);
2996       if (rl == null) continue;
2997       HRegionLocation[] locations = rl.getRegionLocations();
2998       if (locations == null) continue;
2999       for (HRegionLocation hrl : locations) {
3000         if (hrl == null) continue;
3001         HRegionInfo regionInfo = hrl.getRegionInfo();
3002         if (regionInfo == null) continue;
3003         int replicaId = regionInfo.getReplicaId();
3004         State state = RegionStateStore.getRegionState(result, replicaId);
3005         // keep a track of replicas to close. These were the replicas of the split parents
3006         // from the previous life of the master. The master should have closed them before
3007         // but it couldn't maybe because it crashed
3008         if (replicaId == 0 && state.equals(State.SPLIT)) {
3009           for (HRegionLocation h : locations) {
3010             replicasToClose.add(h.getRegionInfo());
3011           }
3012         }
3013         ServerName lastHost = hrl.getServerName();
3014         ServerName regionLocation = RegionStateStore.getRegionServer(result, replicaId);
3015         if (tableStateManager.isTableState(regionInfo.getTable(),
3016              ZooKeeperProtos.Table.State.DISABLED)) {
3017           // force region to forget it hosts for disabled/disabling tables.
3018           // see HBASE-13326
3019           lastHost = null;
3020           regionLocation = null;
3021         }
3022         regionStates.createRegionState(regionInfo, state, regionLocation, lastHost);
3023         if (!regionStates.isRegionInState(regionInfo, State.OPEN)) {
3024           // Region is not open (either offline or in transition), skip
3025           continue;
3026         }
3027         TableName tableName = regionInfo.getTable();
3028         if (!onlineServers.contains(regionLocation)) {
3029           // Region is located on a server that isn't online
3030           offlineServers.add(regionLocation);
3031           if (useZKForAssignment) {
3032             regionStates.regionOffline(regionInfo);
3033           }
3034         } else if (!disabledOrEnablingTables.contains(tableName)) {
3035           // Region is being served and on an active server
3036           // add only if region not in disabled or enabling table
3037           regionStates.regionOnline(regionInfo, regionLocation);
3038           balancer.regionOnline(regionInfo, regionLocation);
3039         } else if (useZKForAssignment) {
3040           regionStates.regionOffline(regionInfo);
3041         }
3042         // need to enable the table if not disabled or disabling or enabling
3043         // this will be used in rolling restarts
3044         if (!disabledOrDisablingOrEnabling.contains(tableName)
3045           && !getTableStateManager().isTableState(tableName,
3046             ZooKeeperProtos.Table.State.ENABLED)) {
3047           setEnabledTable(tableName);
3048         }
3049       }
3050     }
3051     return offlineServers;
3052   }
3053 
3054   /**
3055    * Recover the tables that were not fully moved to DISABLED state. These
3056    * tables are in DISABLING state when the master restarted/switched.
3057    *
3058    * @throws KeeperException
3059    * @throws TableNotFoundException
3060    * @throws IOException
3061    */
3062   private void recoverTableInDisablingState()
3063       throws KeeperException, IOException, CoordinatedStateException {
3064     Set<TableName> disablingTables =
3065       tableStateManager.getTablesInStates(ZooKeeperProtos.Table.State.DISABLING);
3066     if (disablingTables.size() != 0) {
3067       for (TableName tableName : disablingTables) {
3068         // Recover by calling DisableTableHandler
3069         LOG.info("The table " + tableName
3070             + " is in DISABLING state.  Hence recovering by moving the table"
3071             + " to DISABLED state.");
3072         new DisableTableHandler(this.server, tableName,
3073             this, tableLockManager, true).prepare().process();
3074       }
3075     }
3076   }
3077 
3078   /**
3079    * Recover the tables that are not fully moved to ENABLED state. These tables
3080    * are in ENABLING state when the master restarted/switched
3081    *
3082    * @throws KeeperException
3083    * @throws org.apache.hadoop.hbase.TableNotFoundException
3084    * @throws IOException
3085    */
3086   private void recoverTableInEnablingState()
3087       throws KeeperException, IOException, CoordinatedStateException {
3088     Set<TableName> enablingTables = tableStateManager.
3089       getTablesInStates(ZooKeeperProtos.Table.State.ENABLING);
3090     if (enablingTables.size() != 0) {
3091       for (TableName tableName : enablingTables) {
3092         // Recover by calling EnableTableHandler
3093         LOG.info("The table " + tableName
3094             + " is in ENABLING state.  Hence recovering by moving the table"
3095             + " to ENABLED state.");
3096         // enableTable in sync way during master startup,
3097         // no need to invoke coprocessor
3098         EnableTableHandler eth = new EnableTableHandler(this.server, tableName,
3099           this, tableLockManager, true);
3100         try {
3101           eth.prepare();
3102         } catch (TableNotFoundException e) {
3103           LOG.warn("Table " + tableName + " not found in hbase:meta to recover.");
3104           continue;
3105         }
3106         eth.process();
3107       }
3108     }
3109   }
3110 
3111   /**
3112    * Processes list of dead servers from result of hbase:meta scan and regions in RIT
3113    * <p>
3114    * This is used for failover to recover the lost regions that belonged to
3115    * RegionServers which failed while there was no active master or regions
3116    * that were in RIT.
3117    * <p>
3118    *
3119    *
3120    * @param deadServers
3121    *          The list of dead servers which failed while there was no active
3122    *          master. Can be null.
3123    * @throws IOException
3124    * @throws KeeperException
3125    */
3126   private void processDeadServersAndRecoverLostRegions(
3127       Set<ServerName> deadServers) throws IOException, KeeperException {
3128     if (deadServers != null && !deadServers.isEmpty()) {
3129       for (ServerName serverName: deadServers) {
3130         if (!serverManager.isServerDead(serverName)) {
3131           serverManager.expireServer(serverName); // Let SSH do region re-assign
3132         }
3133       }
3134     }
3135 
3136     List<String> nodes = useZKForAssignment ?
3137       ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.assignmentZNode)
3138       : ZKUtil.listChildrenNoWatch(watcher, watcher.assignmentZNode);
3139     if (nodes != null && !nodes.isEmpty()) {
3140       for (String encodedRegionName : nodes) {
3141         processRegionInTransition(encodedRegionName, null);
3142       }
3143     } else if (!useZKForAssignment) {
3144       processRegionInTransitionZkLess();
3145     }
3146   }
3147 
3148   void processRegionInTransitionZkLess() {
3149  // We need to send RPC call again for PENDING_OPEN/PENDING_CLOSE regions
3150     // in case the RPC call is not sent out yet before the master was shut down
3151     // since we update the state before we send the RPC call. We can't update
3152     // the state after the RPC call. Otherwise, we don't know what's happened
3153     // to the region if the master dies right after the RPC call is out.
3154     Map<String, RegionState> rits = regionStates.getRegionsInTransition();
3155     for (RegionState regionState : rits.values()) {
3156       LOG.info("Processing " + regionState);
3157       ServerName serverName = regionState.getServerName();
3158       // Server could be null in case of FAILED_OPEN when master cannot find a region plan. In that
3159       // case, try assigning it here.
3160       if (serverName != null
3161           && !serverManager.getOnlineServers().containsKey(serverName)) {
3162         LOG.info("Server " + serverName + " isn't online. SSH will handle this");
3163         continue;
3164       }
3165       HRegionInfo regionInfo = regionState.getRegion();
3166       State state = regionState.getState();
3167 
3168       switch (state) {
3169       case CLOSED:
3170         invokeAssign(regionInfo);
3171         break;
3172       case PENDING_OPEN:
3173         retrySendRegionOpen(regionState);
3174         break;
3175       case PENDING_CLOSE:
3176         retrySendRegionClose(regionState);
3177         break;
3178       case FAILED_CLOSE:
3179       case FAILED_OPEN:
3180         invokeUnAssign(regionInfo);
3181         break;
3182       default:
3183         // No process for other states
3184       }
3185     }
3186   }
3187 
3188   /**
3189    * At master failover, for pending_open region, make sure
3190    * sendRegionOpen RPC call is sent to the target regionserver
3191    */
3192   private void retrySendRegionOpen(final RegionState regionState) {
3193     this.executorService.submit(
3194       new EventHandler(server, EventType.M_MASTER_RECOVERY) {
3195         @Override
3196         public void process() throws IOException {
3197           HRegionInfo hri = regionState.getRegion();
3198           ServerName serverName = regionState.getServerName();
3199           ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
3200           try {
3201             for (int i = 1; i <= maximumAttempts; i++) {
3202               if (!serverManager.isServerOnline(serverName)
3203                   || server.isStopped() || server.isAborted()) {
3204                 return; // No need any more
3205               }
3206               try {
3207                 if (!regionState.equals(regionStates.getRegionState(hri))) {
3208                   return; // Region is not in the expected state any more
3209                 }
3210                 List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
3211                 if (shouldAssignRegionsWithFavoredNodes) {
3212                   favoredNodes = ((FavoredNodeLoadBalancer)balancer).getFavoredNodes(hri);
3213                 }
3214                 RegionOpeningState regionOpenState = serverManager.sendRegionOpen(
3215                   serverName, hri, -1, favoredNodes);
3216 
3217                 if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
3218                   // Failed opening this region, this means the target server didn't get
3219                   // the original region open RPC, so re-assign it with a new plan
3220                   LOG.debug("Got failed_opening in retry sendRegionOpen for "
3221                     + regionState + ", re-assign it");
3222                   invokeAssign(hri, true);
3223                 }
3224                 return; // Done.
3225               } catch (Throwable t) {
3226                 if (t instanceof RemoteException) {
3227                   t = ((RemoteException) t).unwrapRemoteException();
3228                 }
3229                 // In case SocketTimeoutException/FailedServerException, retry
3230                 if (t instanceof java.net.SocketTimeoutException
3231                     || t instanceof FailedServerException) {
3232                   Threads.sleep(100);
3233                   continue;
3234                 }
3235                 // For other exceptions, re-assign it
3236                 LOG.debug("Got exception in retry sendRegionOpen for "
3237                   + regionState + ", re-assign it", t);
3238                 invokeAssign(hri);
3239                 return; // Done.
3240               }
3241             }
3242           } finally {
3243             lock.unlock();
3244           }
3245         }
3246       });
3247   }
3248 
3249   /**
3250    * At master failover, for pending_close region, make sure
3251    * sendRegionClose RPC call is sent to the target regionserver
3252    */
3253   private void retrySendRegionClose(final RegionState regionState) {
3254     this.executorService.submit(
3255       new EventHandler(server, EventType.M_MASTER_RECOVERY) {
3256         @Override
3257         public void process() throws IOException {
3258           HRegionInfo hri = regionState.getRegion();
3259           ServerName serverName = regionState.getServerName();
3260           ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
3261           try {
3262             for (int i = 1; i <= maximumAttempts; i++) {
3263               if (!serverManager.isServerOnline(serverName)
3264                   || server.isStopped() || server.isAborted()) {
3265                 return; // No need any more
3266               }
3267               try {
3268                 if (!regionState.equals(regionStates.getRegionState(hri))) {
3269                   return; // Region is not in the expected state any more
3270                 }
3271                 if (!serverManager.sendRegionClose(serverName, hri, -1, null, false)) {
3272                   // This means the region is still on the target server
3273                   LOG.debug("Got false in retry sendRegionClose for "
3274                     + regionState + ", re-close it");
3275                   invokeUnAssign(hri);
3276                 }
3277                 return; // Done.
3278               } catch (Throwable t) {
3279                 if (t instanceof RemoteException) {
3280                   t = ((RemoteException) t).unwrapRemoteException();
3281                 }
3282                 // In case SocketTimeoutException/FailedServerException, retry
3283                 if (t instanceof java.net.SocketTimeoutException
3284                     || t instanceof FailedServerException) {
3285                   Threads.sleep(100);
3286                   continue;
3287                 }
3288                 if (!(t instanceof NotServingRegionException
3289                     || t instanceof RegionAlreadyInTransitionException)) {
3290                   // NotServingRegionException/RegionAlreadyInTransitionException
3291                   // means the target server got the original region close request.
3292                   // For other exceptions, re-close it
3293                   LOG.debug("Got exception in retry sendRegionClose for "
3294                     + regionState + ", re-close it", t);
3295                   invokeUnAssign(hri);
3296                 }
3297                 return; // Done.
3298               }
3299             }
3300           } finally {
3301             lock.unlock();
3302           }
3303         }
3304       });
3305   }
3306 
3307   /**
3308    * Set Regions in transitions metrics.
3309    * This takes an iterator on the RegionInTransition map (CLSM), and is not synchronized.
3310    * This iterator is not fail fast, which may lead to stale read; but that's better than
3311    * creating a copy of the map for metrics computation, as this method will be invoked
3312    * on a frequent interval.
3313    */
3314   public void updateRegionsInTransitionMetrics() {
3315     long currentTime = System.currentTimeMillis();
3316     int totalRITs = 0;
3317     int totalRITsOverThreshold = 0;
3318     long oldestRITTime = 0;
3319     int ritThreshold = this.server.getConfiguration().
3320       getInt(HConstants.METRICS_RIT_STUCK_WARNING_THRESHOLD, 60000);
3321     for (RegionState state: regionStates.getRegionsInTransition().values()) {
3322       totalRITs++;
3323       long ritTime = currentTime - state.getStamp();
3324       if (ritTime > ritThreshold) { // more than the threshold
3325         totalRITsOverThreshold++;
3326       }
3327       if (oldestRITTime < ritTime) {
3328         oldestRITTime = ritTime;
3329       }
3330     }
3331     if (this.metricsAssignmentManager != null) {
3332       this.metricsAssignmentManager.updateRITOldestAge(oldestRITTime);
3333       this.metricsAssignmentManager.updateRITCount(totalRITs);
3334       this.metricsAssignmentManager.updateRITCountOverThreshold(totalRITsOverThreshold);
3335     }
3336   }
3337 
3338   /**
3339    * @param region Region whose plan we are to clear.
3340    */
3341   void clearRegionPlan(final HRegionInfo region) {
3342     synchronized (this.regionPlans) {
3343       this.regionPlans.remove(region.getEncodedName());
3344     }
3345   }
3346 
3347   /**
3348    * Wait on region to clear regions-in-transition.
3349    * @param hri Region to wait on.
3350    * @throws IOException
3351    */
3352   public void waitOnRegionToClearRegionsInTransition(final HRegionInfo hri)
3353       throws IOException, InterruptedException {
3354     waitOnRegionToClearRegionsInTransition(hri, -1L);
3355   }
3356 
3357   /**
3358    * Wait on region to clear regions-in-transition or time out
3359    * @param hri
3360    * @param timeOut Milliseconds to wait for current region to be out of transition state.
3361    * @return True when a region clears regions-in-transition before timeout otherwise false
3362    * @throws InterruptedException
3363    */
3364   public boolean waitOnRegionToClearRegionsInTransition(final HRegionInfo hri, long timeOut)
3365       throws InterruptedException {
3366     if (!regionStates.isRegionInTransition(hri)) return true;
3367     long end = (timeOut <= 0) ? Long.MAX_VALUE : EnvironmentEdgeManager.currentTime()
3368         + timeOut;
3369     // There is already a timeout monitor on regions in transition so I
3370     // should not have to have one here too?
3371     LOG.info("Waiting for " + hri.getEncodedName() +
3372         " to leave regions-in-transition, timeOut=" + timeOut + " ms.");
3373     while (!this.server.isStopped() && regionStates.isRegionInTransition(hri)) {
3374       regionStates.waitForUpdate(100);
3375       if (EnvironmentEdgeManager.currentTime() > end) {
3376         LOG.info("Timed out on waiting for " + hri.getEncodedName() + " to be assigned.");
3377         return false;
3378       }
3379     }
3380     if (this.server.isStopped()) {
3381       LOG.info("Giving up wait on regions in transition because stoppable.isStopped is set");
3382       return false;
3383     }
3384     return true;
3385   }
3386 
3387   void invokeAssign(HRegionInfo regionInfo) {
3388     invokeAssign(regionInfo, true);
3389   }
3390 
3391   void invokeAssign(HRegionInfo regionInfo, boolean newPlan) {
3392     threadPoolExecutorService.submit(new AssignCallable(this, regionInfo, newPlan));
3393   }
3394 
3395   void invokeUnAssign(HRegionInfo regionInfo) {
3396     threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
3397   }
3398 
3399   public ServerHostRegion isCarryingMeta(ServerName serverName) {
3400     return isCarryingRegion(serverName, HRegionInfo.FIRST_META_REGIONINFO);
3401   }
3402 
3403   public ServerHostRegion isCarryingMetaReplica(ServerName serverName, int replicaId) {
3404     return isCarryingRegion(serverName,
3405         RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO, replicaId));
3406   }
3407 
3408   public ServerHostRegion isCarryingMetaReplica(ServerName serverName, HRegionInfo metaHri) {
3409     return isCarryingRegion(serverName, metaHri);
3410   }
3411 
3412   /**
3413    * Check if the shutdown server carries the specific region.
3414    * We have a bunch of places that store region location
3415    * Those values aren't consistent. There is a delay of notification.
3416    * The location from zookeeper unassigned node has the most recent data;
3417    * but the node could be deleted after the region is opened by AM.
3418    * The AM's info could be old when OpenedRegionHandler
3419    * processing hasn't finished yet when server shutdown occurs.
3420    * @return whether the serverName currently hosts the region
3421    */
3422   private ServerHostRegion isCarryingRegion(ServerName serverName, HRegionInfo hri) {
3423     RegionTransition rt = null;
3424     try {
3425       byte [] data = ZKAssign.getData(watcher, hri.getEncodedName());
3426       // This call can legitimately come by null
3427       rt = data == null? null: RegionTransition.parseFrom(data);
3428     } catch (KeeperException e) {
3429       server.abort("Exception reading unassigned node for region=" + hri.getEncodedName(), e);
3430     } catch (DeserializationException e) {
3431       server.abort("Exception parsing unassigned node for region=" + hri.getEncodedName(), e);
3432     }
3433 
3434     ServerName addressFromZK = rt != null? rt.getServerName():  null;
3435     if (addressFromZK != null) {
3436       // if we get something from ZK, we will use the data
3437       boolean matchZK = addressFromZK.equals(serverName);
3438       LOG.debug("Checking region=" + hri.getRegionNameAsString() + ", zk server=" + addressFromZK +
3439         " current=" + serverName + ", matches=" + matchZK);
3440       return matchZK ? ServerHostRegion.HOSTING_REGION : ServerHostRegion.NOT_HOSTING_REGION;
3441     }
3442 
3443     ServerName addressFromAM = regionStates.getRegionServerOfRegion(hri);
3444     if (addressFromAM != null) {
3445       boolean matchAM = addressFromAM.equals(serverName);
3446       LOG.debug("based on AM, current region=" + hri.getRegionNameAsString() +
3447         " is on server=" + (addressFromAM != null ? addressFromAM : "null") +
3448         " server being checked: " + serverName);
3449       return matchAM ? ServerHostRegion.HOSTING_REGION : ServerHostRegion.NOT_HOSTING_REGION;
3450     }
3451 
3452     if (hri.isMetaRegion() && RegionReplicaUtil.isDefaultReplica(hri)) {
3453       // For the Meta region (default replica), we can do one more check on MetaTableLocator
3454       final ServerName serverNameInZK =
3455           server.getMetaTableLocator().getMetaRegionLocation(this.server.getZooKeeper());
3456       LOG.debug("Based on MetaTableLocator, the META region is on server="
3457           + (serverNameInZK == null ? "null" : serverNameInZK)
3458           + " server being checked: " + serverName);
3459       if (serverNameInZK != null) {
3460         return serverNameInZK.equals(serverName) ?
3461             ServerHostRegion.HOSTING_REGION : ServerHostRegion.NOT_HOSTING_REGION;
3462       }
3463     }
3464     // Checked everywhere, if reaching here, we are sure that the server is not
3465     // carrying region.
3466     return ServerHostRegion.UNKNOWN;
3467   }
3468 
3469   /**
3470    * Process shutdown server removing any assignments.
3471    * @param sn Server that went down.
3472    * @return list of regions in transition on this server
3473    */
3474   public List<HRegionInfo> processServerShutdown(final ServerName sn) {
3475     // Clean out any existing assignment plans for this server
3476     synchronized (this.regionPlans) {
3477       for (Iterator <Map.Entry<String, RegionPlan>> i =
3478           this.regionPlans.entrySet().iterator(); i.hasNext();) {
3479         Map.Entry<String, RegionPlan> e = i.next();
3480         ServerName otherSn = e.getValue().getDestination();
3481         // The name will be null if the region is planned for a random assign.
3482         if (otherSn != null && otherSn.equals(sn)) {
3483           // Use iterator's remove else we'll get CME
3484           i.remove();
3485         }
3486       }
3487     }
3488     List<HRegionInfo> regions = regionStates.serverOffline(watcher, sn);
3489     for (Iterator<HRegionInfo> it = regions.iterator(); it.hasNext(); ) {
3490       HRegionInfo hri = it.next();
3491       String encodedName = hri.getEncodedName();
3492 
3493       // We need a lock on the region as we could update it
3494       Lock lock = locker.acquireLock(encodedName);
3495       try {
3496         RegionState regionState =
3497           regionStates.getRegionTransitionState(encodedName);
3498         if (regionState == null
3499             || (regionState.getServerName() != null && !regionState.isOnServer(sn))
3500             || !(regionState.isFailedClose() || regionState.isOffline()
3501               || regionState.isPendingOpenOrOpening())) {
3502           LOG.info("Skip " + regionState + " since it is not opening/failed_close"
3503             + " on the dead server any more: " + sn);
3504           it.remove();
3505         } else {
3506           try {
3507             // Delete the ZNode if exists
3508             ZKAssign.deleteNodeFailSilent(watcher, hri);
3509           } catch (KeeperException ke) {
3510             server.abort("Unexpected ZK exception deleting node " + hri, ke);
3511           }
3512           if (tableStateManager.isTableState(hri.getTable(),
3513               ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3514             regionStates.regionOffline(hri);
3515             it.remove();
3516             continue;
3517           }
3518           // Mark the region offline and assign it again by SSH
3519           regionStates.updateRegionState(hri, State.OFFLINE);
3520         }
3521       } finally {
3522         lock.unlock();
3523       }
3524     }
3525     return regions;
3526   }
3527 
3528   /**
3529    * @param plan Plan to execute.
3530    */
3531   public void balance(final RegionPlan plan) {
3532 
3533     HRegionInfo hri = plan.getRegionInfo();
3534     TableName tableName = hri.getTable();
3535     if (tableStateManager.isTableState(tableName,
3536       ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3537       LOG.info("Ignored moving region of disabling/disabled table "
3538         + tableName);
3539       return;
3540     }
3541 
3542     // Move the region only if it's assigned
3543     String encodedName = hri.getEncodedName();
3544     ReentrantLock lock = locker.acquireLock(encodedName);
3545     try {
3546       if (!regionStates.isRegionOnline(hri)) {
3547         RegionState state = regionStates.getRegionState(encodedName);
3548         LOG.info("Ignored moving region not assigned: " + hri + ", "
3549           + (state == null ? "not in region states" : state));
3550         return;
3551       }
3552       synchronized (this.regionPlans) {
3553         this.regionPlans.put(plan.getRegionName(), plan);
3554       }
3555       unassign(hri, false, plan.getDestination());
3556     } finally {
3557       lock.unlock();
3558     }
3559   }
3560 
3561   public void stop() {
3562     shutdown(); // Stop executor service, etc
3563   }
3564 
3565   /**
3566    * Shutdown the threadpool executor service
3567    */
3568   public void shutdown() {
3569     // It's an immediate shutdown, so we're clearing the remaining tasks.
3570     synchronized (zkEventWorkerWaitingList){
3571       zkEventWorkerWaitingList.clear();
3572     }
3573 
3574     // Shutdown the threadpool executor service
3575     threadPoolExecutorService.shutdownNow();
3576     zkEventWorkers.shutdownNow();
3577     regionStateStore.stop();
3578   }
3579 
3580   protected void setEnabledTable(TableName tableName) {
3581     try {
3582       this.tableStateManager.setTableState(tableName,
3583         ZooKeeperProtos.Table.State.ENABLED);
3584     } catch (CoordinatedStateException e) {
3585       // here we can abort as it is the start up flow
3586       String errorMsg = "Unable to ensure that the table " + tableName
3587           + " will be" + " enabled because of a ZooKeeper issue";
3588       LOG.error(errorMsg);
3589       this.server.abort(errorMsg, e);
3590     }
3591   }
3592 
3593   /**
3594    * Set region as OFFLINED up in zookeeper asynchronously.
3595    * @param state
3596    * @return True if we succeeded, false otherwise (State was incorrect or failed
3597    * updating zk).
3598    */
3599   private boolean asyncSetOfflineInZooKeeper(final RegionState state,
3600       final AsyncCallback.StringCallback cb, final ServerName destination) {
3601     if (!state.isClosed() && !state.isOffline()) {
3602       this.server.abort("Unexpected state trying to OFFLINE; " + state,
3603         new IllegalStateException());
3604       return false;
3605     }
3606     regionStates.updateRegionState(state.getRegion(), State.OFFLINE);
3607     try {
3608       ZKAssign.asyncCreateNodeOffline(watcher, state.getRegion(),
3609         destination, cb, state);
3610     } catch (KeeperException e) {
3611       if (e instanceof NodeExistsException) {
3612         LOG.warn("Node for " + state.getRegion() + " already exists");
3613       } else {
3614         server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
3615       }
3616       return false;
3617     }
3618     return true;
3619   }
3620 
3621   private boolean deleteNodeInStates(String encodedName,
3622       String desc, ServerName sn, EventType... types) {
3623     try {
3624       for (EventType et: types) {
3625         if (ZKAssign.deleteNode(watcher, encodedName, et, sn)) {
3626           return true;
3627         }
3628       }
3629       LOG.info("Failed to delete the " + desc + " node for "
3630         + encodedName + ". The node type may not match");
3631     } catch (NoNodeException e) {
3632       if (LOG.isDebugEnabled()) {
3633         LOG.debug("The " + desc + " node for " + encodedName + " already deleted");
3634       }
3635     } catch (KeeperException ke) {
3636       server.abort("Unexpected ZK exception deleting " + desc
3637         + " node for the region " + encodedName, ke);
3638     }
3639     return false;
3640   }
3641 
3642   private void deleteMergingNode(String encodedName, ServerName sn) {
3643     deleteNodeInStates(encodedName, "merging", sn, EventType.RS_ZK_REGION_MERGING,
3644       EventType.RS_ZK_REQUEST_REGION_MERGE, EventType.RS_ZK_REGION_MERGED);
3645   }
3646 
3647   private void deleteSplittingNode(String encodedName, ServerName sn) {
3648     deleteNodeInStates(encodedName, "splitting", sn, EventType.RS_ZK_REGION_SPLITTING,
3649       EventType.RS_ZK_REQUEST_REGION_SPLIT, EventType.RS_ZK_REGION_SPLIT);
3650   }
3651 
3652   private void onRegionFailedOpen(
3653       final HRegionInfo hri, final ServerName sn) {
3654     String encodedName = hri.getEncodedName();
3655     AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
3656     if (failedOpenCount == null) {
3657       failedOpenCount = new AtomicInteger();
3658       // No need to use putIfAbsent, or extra synchronization since
3659       // this whole handleRegion block is locked on the encoded region
3660       // name, and failedOpenTracker is updated only in this block
3661       failedOpenTracker.put(encodedName, failedOpenCount);
3662     }
3663     if (failedOpenCount.incrementAndGet() >= maximumAttempts && !hri.isMetaRegion()) {
3664       regionStates.updateRegionState(hri, State.FAILED_OPEN);
3665       // remove the tracking info to save memory, also reset
3666       // the count for next open initiative
3667       failedOpenTracker.remove(encodedName);
3668     } else {
3669       if (hri.isMetaRegion() && failedOpenCount.get() >= maximumAttempts) {
3670         // Log a warning message if a meta region failedOpenCount exceeds maximumAttempts
3671         // so that we are aware of potential problem if it persists for a long time.
3672         LOG.warn("Failed to open the hbase:meta region " +
3673             hri.getRegionNameAsString() + " after" +
3674             failedOpenCount.get() + " retries. Continue retrying.");
3675       }
3676 
3677       // Handle this the same as if it were opened and then closed.
3678       RegionState regionState = regionStates.updateRegionState(hri, State.CLOSED);
3679       if (regionState != null) {
3680         // When there are more than one region server a new RS is selected as the
3681         // destination and the same is updated in the region plan. (HBASE-5546)
3682         if (getTableStateManager().isTableState(hri.getTable(),
3683             ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING) ||
3684             replicasToClose.contains(hri)) {
3685           offlineDisabledRegion(hri);
3686           return;
3687         }
3688         // ZK Node is in CLOSED state, assign it.
3689          regionStates.updateRegionState(hri, RegionState.State.CLOSED);
3690         // This below has to do w/ online enable/disable of a table
3691         removeClosedRegion(hri);
3692         try {
3693           getRegionPlan(hri, sn, true);
3694         } catch (HBaseIOException e) {
3695           LOG.warn("Failed to get region plan", e);
3696         }
3697         invokeAssign(hri, false);
3698       }
3699     }
3700   }
3701 
3702   private void onRegionOpen(
3703       final HRegionInfo hri, final ServerName sn, long openSeqNum) {
3704     regionOnline(hri, sn, openSeqNum);
3705     if (useZKForAssignment) {
3706       try {
3707         // Delete the ZNode if exists
3708         ZKAssign.deleteNodeFailSilent(watcher, hri);
3709       } catch (KeeperException ke) {
3710         server.abort("Unexpected ZK exception deleting node " + hri, ke);
3711       }
3712     }
3713 
3714     // reset the count, if any
3715     failedOpenTracker.remove(hri.getEncodedName());
3716     if (getTableStateManager().isTableState(hri.getTable(),
3717         ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3718       invokeUnAssign(hri);
3719     }
3720   }
3721 
3722   private void onRegionClosed(final HRegionInfo hri) {
3723     if (getTableStateManager().isTableState(hri.getTable(),
3724         ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING) ||
3725         replicasToClose.contains(hri)) {
3726       offlineDisabledRegion(hri);
3727       return;
3728     }
3729     regionStates.updateRegionState(hri, RegionState.State.CLOSED);
3730     sendRegionClosedNotification(hri);
3731     // This below has to do w/ online enable/disable of a table
3732     removeClosedRegion(hri);
3733     invokeAssign(hri, false);
3734   }
3735 
3736   private String onRegionSplit(ServerName sn, TransitionCode code,
3737       final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) {
3738     final RegionState rs_p = regionStates.getRegionState(p);
3739     RegionState rs_a = regionStates.getRegionState(a);
3740     RegionState rs_b = regionStates.getRegionState(b);
3741     if (!(rs_p.isOpenOrSplittingOnServer(sn)
3742         && (rs_a == null || rs_a.isOpenOrSplittingNewOnServer(sn))
3743         && (rs_b == null || rs_b.isOpenOrSplittingNewOnServer(sn)))) {
3744       return "Not in state good for split";
3745     }
3746 
3747     regionStates.updateRegionState(a, State.SPLITTING_NEW, sn);
3748     regionStates.updateRegionState(b, State.SPLITTING_NEW, sn);
3749     regionStates.updateRegionState(p, State.SPLITTING);
3750 
3751     if (code == TransitionCode.SPLIT) {
3752       if (TEST_SKIP_SPLIT_HANDLING) {
3753         return "Skipping split message, TEST_SKIP_SPLIT_HANDLING is set";
3754       }
3755       regionOffline(p, State.SPLIT);
3756       regionOnline(a, sn, 1);
3757       regionOnline(b, sn, 1);
3758 
3759       // User could disable the table before master knows the new region.
3760       if (getTableStateManager().isTableState(p.getTable(),
3761           ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3762         invokeUnAssign(a);
3763         invokeUnAssign(b);
3764       } else {
3765         Callable<Object> splitReplicasCallable = new Callable<Object>() {
3766           @Override
3767           public Object call() {
3768             doSplittingOfReplicas(p, a, b);
3769             return null;
3770           }
3771         };
3772         threadPoolExecutorService.submit(splitReplicasCallable);
3773       }
3774     } else if (code == TransitionCode.SPLIT_PONR) {
3775       try {
3776         regionStates.splitRegion(p, a, b, sn);
3777       } catch (IOException ioe) {
3778         LOG.info("Failed to record split region " + p.getShortNameToLog());
3779         return "Failed to record the splitting in meta";
3780       }
3781     } else if (code == TransitionCode.SPLIT_REVERTED) {
3782       // Always bring the parent back online. Even if it's not offline
3783       // There's no harm in making it online again.
3784       regionOnline(p, sn);
3785 
3786       // Only offline the region if they are known to exist.
3787       RegionState regionStateA = regionStates.getRegionState(a);
3788       RegionState regionStateB = regionStates.getRegionState(b);
3789       if (regionStateA != null) {
3790         regionOffline(a);
3791       }
3792       if (regionStateB != null) {
3793         regionOffline(b);
3794       }
3795 
3796       if (getTableStateManager().isTableState(p.getTable(),
3797           ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3798         invokeUnAssign(p);
3799       }
3800     }
3801     return null;
3802   }
3803 
3804   private String onRegionMerge(ServerName sn, TransitionCode code,
3805       final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) {
3806     RegionState rs_p = regionStates.getRegionState(p);
3807     RegionState rs_a = regionStates.getRegionState(a);
3808     RegionState rs_b = regionStates.getRegionState(b);
3809     if (!(rs_a.isOpenOrMergingOnServer(sn) && rs_b.isOpenOrMergingOnServer(sn)
3810         && (rs_p == null || rs_p.isOpenOrMergingNewOnServer(sn)))) {
3811       return "Not in state good for merge";
3812     }
3813 
3814     regionStates.updateRegionState(a, State.MERGING);
3815     regionStates.updateRegionState(b, State.MERGING);
3816     regionStates.updateRegionState(p, State.MERGING_NEW, sn);
3817 
3818     String encodedName = p.getEncodedName();
3819     if (code == TransitionCode.READY_TO_MERGE) {
3820       mergingRegions.put(encodedName,
3821         new PairOfSameType<HRegionInfo>(a, b));
3822     } else if (code == TransitionCode.MERGED) {
3823       mergingRegions.remove(encodedName);
3824       regionOffline(a, State.MERGED);
3825       regionOffline(b, State.MERGED);
3826       regionOnline(p, sn, 1);
3827 
3828       // User could disable the table before master knows the new region.
3829       if (getTableStateManager().isTableState(p.getTable(),
3830           ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3831         invokeUnAssign(p);
3832       } else {
3833         Callable<Object> mergeReplicasCallable = new Callable<Object>() {
3834           @Override
3835           public Object call() {
3836             doMergingOfReplicas(p, a, b);
3837             return null;
3838           }
3839         };
3840         threadPoolExecutorService.submit(mergeReplicasCallable);
3841       }
3842     } else if (code == TransitionCode.MERGE_PONR) {
3843       try {
3844         regionStates.mergeRegions(p, a, b, sn);
3845       } catch (IOException ioe) {
3846         LOG.info("Failed to record merged region " + p.getShortNameToLog());
3847         return "Failed to record the merging in meta";
3848       }
3849     } else {
3850       mergingRegions.remove(encodedName);
3851       regionOnline(a, sn);
3852       regionOnline(b, sn);
3853       regionOffline(p);
3854 
3855       if (getTableStateManager().isTableState(p.getTable(),
3856           ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3857         invokeUnAssign(a);
3858         invokeUnAssign(b);
3859       }
3860     }
3861     return null;
3862   }
3863 
3864   /**
3865    * A helper to handle region merging transition event.
3866    * It transitions merging regions to MERGING state.
3867    */
3868   private boolean handleRegionMerging(final RegionTransition rt, final String encodedName,
3869       final String prettyPrintedRegionName, final ServerName sn) {
3870     if (!serverManager.isServerOnline(sn)) {
3871       LOG.warn("Dropped merging! ServerName=" + sn + " unknown.");
3872       return false;
3873     }
3874     byte [] payloadOfMerging = rt.getPayload();
3875     List<HRegionInfo> mergingRegions;
3876     try {
3877       mergingRegions = HRegionInfo.parseDelimitedFrom(
3878         payloadOfMerging, 0, payloadOfMerging.length);
3879     } catch (IOException e) {
3880       LOG.error("Dropped merging! Failed reading "  + rt.getEventType()
3881         + " payload for " + prettyPrintedRegionName);
3882       return false;
3883     }
3884     assert mergingRegions.size() == 3;
3885     HRegionInfo p = mergingRegions.get(0);
3886     HRegionInfo hri_a = mergingRegions.get(1);
3887     HRegionInfo hri_b = mergingRegions.get(2);
3888 
3889     RegionState rs_p = regionStates.getRegionState(p);
3890     RegionState rs_a = regionStates.getRegionState(hri_a);
3891     RegionState rs_b = regionStates.getRegionState(hri_b);
3892 
3893     if (!((rs_a == null || rs_a.isOpenOrMergingOnServer(sn))
3894         && (rs_b == null || rs_b.isOpenOrMergingOnServer(sn))
3895         && (rs_p == null || rs_p.isOpenOrMergingNewOnServer(sn)))) {
3896       LOG.warn("Dropped merging! Not in state good for MERGING; rs_p="
3897         + rs_p + ", rs_a=" + rs_a + ", rs_b=" + rs_b);
3898       return false;
3899     }
3900 
3901     EventType et = rt.getEventType();
3902     if (et == EventType.RS_ZK_REQUEST_REGION_MERGE) {
3903       try {
3904         RegionMergeCoordination.RegionMergeDetails std =
3905             ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
3906                 .getRegionMergeCoordination().getDefaultDetails();
3907         ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
3908             .getRegionMergeCoordination().processRegionMergeRequest(p, hri_a, hri_b, sn, std);
3909         if (((ZkRegionMergeCoordination.ZkRegionMergeDetails) std).getZnodeVersion() == -1) {
3910           byte[] data = ZKAssign.getData(watcher, encodedName);
3911          EventType currentType = null;
3912           if (data != null) {
3913             RegionTransition newRt = RegionTransition.parseFrom(data);
3914             currentType = newRt.getEventType();
3915           }
3916           if (currentType == null || (currentType != EventType.RS_ZK_REGION_MERGED
3917               && currentType != EventType.RS_ZK_REGION_MERGING)) {
3918             LOG.warn("Failed to transition pending_merge node "
3919               + encodedName + " to merging, it's now " + currentType);
3920             return false;
3921           }
3922         }
3923       } catch (Exception e) {
3924         LOG.warn("Failed to transition pending_merge node "
3925           + encodedName + " to merging", e);
3926         return false;
3927       }
3928     }
3929 
3930     synchronized (regionStates) {
3931       regionStates.updateRegionState(hri_a, State.MERGING);
3932       regionStates.updateRegionState(hri_b, State.MERGING);
3933       regionStates.updateRegionState(p, State.MERGING_NEW, sn);
3934 
3935       if (et != EventType.RS_ZK_REGION_MERGED) {
3936         this.mergingRegions.put(encodedName,
3937           new PairOfSameType<HRegionInfo>(hri_a, hri_b));
3938       } else {
3939         this.mergingRegions.remove(encodedName);
3940         regionOffline(hri_a, State.MERGED);
3941         regionOffline(hri_b, State.MERGED);
3942         regionOnline(p, sn);
3943       }
3944     }
3945 
3946     if (et == EventType.RS_ZK_REGION_MERGED) {
3947       doMergingOfReplicas(p, hri_a, hri_b);
3948       LOG.debug("Handling MERGED event for " + encodedName + "; deleting node");
3949       // Remove region from ZK
3950       try {
3951         boolean successful = false;
3952         while (!successful) {
3953           // It's possible that the RS tickles in between the reading of the
3954           // znode and the deleting, so it's safe to retry.
3955           successful = ZKAssign.deleteNode(watcher, encodedName,
3956             EventType.RS_ZK_REGION_MERGED, sn);
3957         }
3958       } catch (KeeperException e) {
3959         if (e instanceof NoNodeException) {
3960           String znodePath = ZKUtil.joinZNode(watcher.splitLogZNode, encodedName);
3961           LOG.debug("The znode " + znodePath + " does not exist.  May be deleted already.");
3962         } else {
3963           server.abort("Error deleting MERGED node " + encodedName, e);
3964         }
3965       }
3966       LOG.info("Handled MERGED event; merged=" + p.getRegionNameAsString()
3967         + ", region_a=" + hri_a.getRegionNameAsString() + ", region_b="
3968         + hri_b.getRegionNameAsString() + ", on " + sn);
3969 
3970       // User could disable the table before master knows the new region.
3971       if (tableStateManager.isTableState(p.getTable(),
3972           ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
3973         unassign(p);
3974       }
3975     }
3976     return true;
3977   }
3978 
3979   /**
3980    * A helper to handle region splitting transition event.
3981    */
3982   private boolean handleRegionSplitting(final RegionTransition rt, final String encodedName,
3983       final String prettyPrintedRegionName, final ServerName sn) {
3984     if (!serverManager.isServerOnline(sn)) {
3985       LOG.warn("Dropped splitting! ServerName=" + sn + " unknown.");
3986       return false;
3987     }
3988     byte [] payloadOfSplitting = rt.getPayload();
3989     List<HRegionInfo> splittingRegions;
3990     try {
3991       splittingRegions = HRegionInfo.parseDelimitedFrom(
3992         payloadOfSplitting, 0, payloadOfSplitting.length);
3993     } catch (IOException e) {
3994       LOG.error("Dropped splitting! Failed reading " + rt.getEventType()
3995         + " payload for " + prettyPrintedRegionName);
3996       return false;
3997     }
3998     assert splittingRegions.size() == 2;
3999     HRegionInfo hri_a = splittingRegions.get(0);
4000     HRegionInfo hri_b = splittingRegions.get(1);
4001 
4002     RegionState rs_p = regionStates.getRegionState(encodedName);
4003     RegionState rs_a = regionStates.getRegionState(hri_a);
4004     RegionState rs_b = regionStates.getRegionState(hri_b);
4005 
4006     if (!((rs_p == null || rs_p.isOpenOrSplittingOnServer(sn))
4007         && (rs_a == null || rs_a.isOpenOrSplittingNewOnServer(sn))
4008         && (rs_b == null || rs_b.isOpenOrSplittingNewOnServer(sn)))) {
4009       LOG.warn("Dropped splitting! Not in state good for SPLITTING; rs_p="
4010         + rs_p + ", rs_a=" + rs_a + ", rs_b=" + rs_b);
4011       return false;
4012     }
4013 
4014     if (rs_p == null) {
4015       // Splitting region should be online
4016       rs_p = regionStates.updateRegionState(rt, State.OPEN);
4017       if (rs_p == null) {
4018         LOG.warn("Received splitting for region " + prettyPrintedRegionName
4019           + " from server " + sn + " but it doesn't exist anymore,"
4020           + " probably already processed its split");
4021         return false;
4022       }
4023       regionStates.regionOnline(rs_p.getRegion(), sn);
4024     }
4025 
4026     HRegionInfo p = rs_p.getRegion();
4027     EventType et = rt.getEventType();
4028     if (et == EventType.RS_ZK_REQUEST_REGION_SPLIT) {
4029       try {
4030         SplitTransactionDetails std =
4031             ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
4032                 .getSplitTransactionCoordination().getDefaultDetails();
4033         if (((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
4034             .getSplitTransactionCoordination().processTransition(p, hri_a, hri_b, sn, std) == -1) {
4035           byte[] data = ZKAssign.getData(watcher, encodedName);
4036           EventType currentType = null;
4037           if (data != null) {
4038             RegionTransition newRt = RegionTransition.parseFrom(data);
4039             currentType = newRt.getEventType();
4040           }
4041           if (currentType == null
4042               || (currentType != EventType.RS_ZK_REGION_SPLIT && currentType != EventType.RS_ZK_REGION_SPLITTING)) {
4043             LOG.warn("Failed to transition pending_split node " + encodedName
4044                 + " to splitting, it's now " + currentType);
4045             return false;
4046           }
4047         }
4048       } catch (Exception e) {
4049         LOG.warn("Failed to transition pending_split node " + encodedName + " to splitting", e);
4050         return false;
4051       }
4052     }
4053 
4054     synchronized (regionStates) {
4055       splitRegions.put(p, new PairOfSameType<HRegionInfo>(hri_a, hri_b));
4056       regionStates.updateRegionState(hri_a, State.SPLITTING_NEW, sn);
4057       regionStates.updateRegionState(hri_b, State.SPLITTING_NEW, sn);
4058       regionStates.updateRegionState(rt, State.SPLITTING);
4059 
4060       // The below is for testing ONLY!  We can't do fault injection easily, so
4061       // resort to this kinda uglyness -- St.Ack 02/25/2011.
4062       if (TEST_SKIP_SPLIT_HANDLING) {
4063         LOG.warn("Skipping split message, TEST_SKIP_SPLIT_HANDLING is set");
4064         return true; // return true so that the splitting node stays
4065       }
4066 
4067       if (et == EventType.RS_ZK_REGION_SPLIT) {
4068         regionOffline(p, State.SPLIT);
4069         regionOnline(hri_a, sn);
4070         regionOnline(hri_b, sn);
4071         splitRegions.remove(p);
4072       }
4073     }
4074 
4075     if (et == EventType.RS_ZK_REGION_SPLIT) {
4076       // split replicas
4077       doSplittingOfReplicas(rs_p.getRegion(), hri_a, hri_b);
4078       LOG.debug("Handling SPLIT event for " + encodedName + "; deleting node");
4079       // Remove region from ZK
4080       try {
4081         boolean successful = false;
4082         while (!successful) {
4083           // It's possible that the RS tickles in between the reading of the
4084           // znode and the deleting, so it's safe to retry.
4085           successful = ZKAssign.deleteNode(watcher, encodedName,
4086             EventType.RS_ZK_REGION_SPLIT, sn);
4087         }
4088       } catch (KeeperException e) {
4089         if (e instanceof NoNodeException) {
4090           String znodePath = ZKUtil.joinZNode(watcher.splitLogZNode, encodedName);
4091           LOG.debug("The znode " + znodePath + " does not exist.  May be deleted already.");
4092         } else {
4093           server.abort("Error deleting SPLIT node " + encodedName, e);
4094         }
4095       }
4096       LOG.info("Handled SPLIT event; parent=" + p.getRegionNameAsString()
4097         + ", daughter a=" + hri_a.getRegionNameAsString() + ", daughter b="
4098         + hri_b.getRegionNameAsString() + ", on " + sn);
4099 
4100       // User could disable the table before master knows the new region.
4101       if (tableStateManager.isTableState(p.getTable(),
4102           ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
4103         unassign(hri_a);
4104         unassign(hri_b);
4105       }
4106     }
4107     return true;
4108   }
4109 
4110   private void doMergingOfReplicas(HRegionInfo mergedHri, final HRegionInfo hri_a,
4111       final HRegionInfo hri_b) {
4112     // Close replicas for the original unmerged regions. create/assign new replicas
4113     // for the merged parent.
4114     List<HRegionInfo> unmergedRegions = new ArrayList<HRegionInfo>();
4115     unmergedRegions.add(hri_a);
4116     unmergedRegions.add(hri_b);
4117     Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(unmergedRegions);
4118     Collection<List<HRegionInfo>> c = map.values();
4119     for (List<HRegionInfo> l : c) {
4120       for (HRegionInfo h : l) {
4121         if (!RegionReplicaUtil.isDefaultReplica(h)) {
4122           LOG.debug("Unassigning un-merged replica " + h);
4123           unassign(h);
4124         }
4125       }
4126     }
4127     int numReplicas = 1;
4128     try {
4129       numReplicas = ((MasterServices)server).getTableDescriptors().get(mergedHri.getTable()).
4130           getRegionReplication();
4131     } catch (IOException e) {
4132       LOG.warn("Couldn't get the replication attribute of the table " + mergedHri.getTable() +
4133           " due to " + e.getMessage() + ". The assignment of replicas for the merged region " +
4134           "will not be done");
4135     }
4136     List<HRegionInfo> regions = new ArrayList<HRegionInfo>();
4137     for (int i = 1; i < numReplicas; i++) {
4138       regions.add(RegionReplicaUtil.getRegionInfoForReplica(mergedHri, i));
4139     }
4140     try {
4141       assign(regions);
4142     } catch (IOException ioe) {
4143       LOG.warn("Couldn't assign all replica(s) of region " + mergedHri + " because of " +
4144                 ioe.getMessage());
4145     } catch (InterruptedException ie) {
4146       LOG.warn("Couldn't assign all replica(s) of region " + mergedHri+ " because of " +
4147                 ie.getMessage());
4148     }
4149   }
4150 
4151   private void doSplittingOfReplicas(final HRegionInfo parentHri, final HRegionInfo hri_a,
4152       final HRegionInfo hri_b) {
4153     // create new regions for the replica, and assign them to match with the
4154     // current replica assignments. If replica1 of parent is assigned to RS1,
4155     // the replica1s of daughters will be on the same machine
4156     int numReplicas = 1;
4157     try {
4158       numReplicas = ((MasterServices)server).getTableDescriptors().get(parentHri.getTable()).
4159           getRegionReplication();
4160     } catch (IOException e) {
4161       LOG.warn("Couldn't get the replication attribute of the table " + parentHri.getTable() +
4162           " due to " + e.getMessage() + ". The assignment of daughter replicas " +
4163           "replicas will not be done");
4164     }
4165     // unassign the old replicas
4166     List<HRegionInfo> parentRegion = new ArrayList<HRegionInfo>();
4167     parentRegion.add(parentHri);
4168     Map<ServerName, List<HRegionInfo>> currentAssign =
4169         regionStates.getRegionAssignments(parentRegion);
4170     Collection<List<HRegionInfo>> c = currentAssign.values();
4171     for (List<HRegionInfo> l : c) {
4172       for (HRegionInfo h : l) {
4173         if (!RegionReplicaUtil.isDefaultReplica(h)) {
4174           LOG.debug("Unassigning parent's replica " + h);
4175           unassign(h);
4176         }
4177       }
4178     }
4179     // assign daughter replicas
4180     Map<HRegionInfo, ServerName> map = new HashMap<HRegionInfo, ServerName>();
4181     for (int i = 1; i < numReplicas; i++) {
4182       prepareDaughterReplicaForAssignment(hri_a, parentHri, i, map);
4183       prepareDaughterReplicaForAssignment(hri_b, parentHri, i, map);
4184     }
4185     try {
4186       assign(map);
4187     } catch (IOException e) {
4188       LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)");
4189     } catch (InterruptedException e) {
4190       LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)");
4191     }
4192   }
4193 
4194   private void prepareDaughterReplicaForAssignment(HRegionInfo daughterHri, HRegionInfo parentHri,
4195       int replicaId, Map<HRegionInfo, ServerName> map) {
4196     HRegionInfo parentReplica = RegionReplicaUtil.getRegionInfoForReplica(parentHri, replicaId);
4197     HRegionInfo daughterReplica = RegionReplicaUtil.getRegionInfoForReplica(daughterHri,
4198         replicaId);
4199     LOG.debug("Created replica region for daughter " + daughterReplica);
4200     ServerName sn;
4201     if ((sn = regionStates.getRegionServerOfRegion(parentReplica)) != null) {
4202       map.put(daughterReplica, sn);
4203     } else {
4204       List<ServerName> servers = serverManager.getOnlineServersList();
4205       sn = servers.get((new Random(System.currentTimeMillis())).nextInt(servers.size()));
4206       map.put(daughterReplica, sn);
4207     }
4208   }
4209 
4210   public Set<HRegionInfo> getReplicasToClose() {
4211     return replicasToClose;
4212   }
4213 
4214   /**
4215    * A region is offline.  The new state should be the specified one,
4216    * if not null.  If the specified state is null, the new state is Offline.
4217    * The specified state can be Split/Merged/Offline/null only.
4218    */
4219   private void regionOffline(final HRegionInfo regionInfo, final State state) {
4220     regionStates.regionOffline(regionInfo, state);
4221     removeClosedRegion(regionInfo);
4222     // remove the region plan as well just in case.
4223     clearRegionPlan(regionInfo);
4224     balancer.regionOffline(regionInfo);
4225 
4226     // Tell our listeners that a region was closed
4227     sendRegionClosedNotification(regionInfo);
4228     // also note that all the replicas of the primary should be closed
4229     if (state != null && state.equals(State.SPLIT)) {
4230       Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1);
4231       c.add(regionInfo);
4232       Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(c);
4233       Collection<List<HRegionInfo>> allReplicas = map.values();
4234       for (List<HRegionInfo> list : allReplicas) {
4235         replicasToClose.addAll(list);
4236       }
4237     }
4238     else if (state != null && state.equals(State.MERGED)) {
4239       Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1);
4240       c.add(regionInfo);
4241       Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(c);
4242       Collection<List<HRegionInfo>> allReplicas = map.values();
4243       for (List<HRegionInfo> list : allReplicas) {
4244         replicasToClose.addAll(list);
4245       }
4246     }
4247   }
4248 
4249   private void sendRegionOpenedNotification(final HRegionInfo regionInfo,
4250       final ServerName serverName) {
4251     if (!this.listeners.isEmpty()) {
4252       for (AssignmentListener listener : this.listeners) {
4253         listener.regionOpened(regionInfo, serverName);
4254       }
4255     }
4256   }
4257 
4258   private void sendRegionClosedNotification(final HRegionInfo regionInfo) {
4259     if (!this.listeners.isEmpty()) {
4260       for (AssignmentListener listener : this.listeners) {
4261         listener.regionClosed(regionInfo);
4262       }
4263     }
4264   }
4265 
4266   /**
4267    * Try to update some region states. If the state machine prevents
4268    * such update, an error message is returned to explain the reason.
4269    *
4270    * It's expected that in each transition there should have just one
4271    * region for opening/closing, 3 regions for splitting/merging.
4272    * These regions should be on the server that requested the change.
4273    *
4274    * Region state machine. Only these transitions
4275    * are expected to be triggered by a region server.
4276    *
4277    * On the state transition:
4278    *  (1) Open/Close should be initiated by master
4279    *      (a) Master sets the region to pending_open/pending_close
4280    *        in memory and hbase:meta after sending the request
4281    *        to the region server
4282    *      (b) Region server reports back to the master
4283    *        after open/close is done (either success/failure)
4284    *      (c) If region server has problem to report the status
4285    *        to master, it must be because the master is down or some
4286    *        temporary network issue. Otherwise, the region server should
4287    *        abort since it must be a bug. If the master is not accessible,
4288    *        the region server should keep trying until the server is
4289    *        stopped or till the status is reported to the (new) master
4290    *      (d) If region server dies in the middle of opening/closing
4291    *        a region, SSH picks it up and finishes it
4292    *      (e) If master dies in the middle, the new master recovers
4293    *        the state during initialization from hbase:meta. Region server
4294    *        can report any transition that has not been reported to
4295    *        the previous active master yet
4296    *  (2) Split/merge is initiated by region servers
4297    *      (a) To split a region, a region server sends a request
4298    *        to master to try to set a region to splitting, together with
4299    *        two daughters (to be created) to splitting new. If approved
4300    *        by the master, the splitting can then move ahead
4301    *      (b) To merge two regions, a region server sends a request to
4302    *        master to try to set the new merged region (to be created) to
4303    *        merging_new, together with two regions (to be merged) to merging.
4304    *        If it is ok with the master, the merge can then move ahead
4305    *      (c) Once the splitting/merging is done, the region server
4306    *        reports the status back to the master either success/failure.
4307    *      (d) Other scenarios should be handled similarly as for
4308    *        region open/close
4309    */
4310   protected String onRegionTransition(final ServerName serverName,
4311       final RegionStateTransition transition) {
4312     TransitionCode code = transition.getTransitionCode();
4313     HRegionInfo hri = HRegionInfo.convert(transition.getRegionInfo(0));
4314     RegionState current = regionStates.getRegionState(hri);
4315     if (LOG.isDebugEnabled()) {
4316       LOG.debug("Got transition " + code + " for "
4317         + (current != null ? current.toString() : hri.getShortNameToLog())
4318         + " from " + serverName);
4319     }
4320     String errorMsg = null;
4321     switch (code) {
4322     case OPENED:
4323       if (current != null && current.isOpened() && current.isOnServer(serverName)) {
4324         LOG.info("Region " + hri.getShortNameToLog() + " is already " + current.getState() + " on "
4325             + serverName);
4326         break;
4327       }
4328     case FAILED_OPEN:
4329       if (current == null
4330           || !current.isPendingOpenOrOpeningOnServer(serverName)) {
4331         errorMsg = hri.getShortNameToLog()
4332           + " is not pending open on " + serverName;
4333       } else if (code == TransitionCode.FAILED_OPEN) {
4334         onRegionFailedOpen(hri, serverName);
4335       } else {
4336         long openSeqNum = HConstants.NO_SEQNUM;
4337         if (transition.hasOpenSeqNum()) {
4338           openSeqNum = transition.getOpenSeqNum();
4339         }
4340         if (openSeqNum < 0) {
4341           errorMsg = "Newly opened region has invalid open seq num " + openSeqNum;
4342         } else {
4343           onRegionOpen(hri, serverName, openSeqNum);
4344         }
4345       }
4346       break;
4347 
4348     case CLOSED:
4349       if (current == null
4350           || !current.isPendingCloseOrClosingOnServer(serverName)) {
4351         errorMsg = hri.getShortNameToLog()
4352           + " is not pending close on " + serverName;
4353       } else {
4354         onRegionClosed(hri);
4355       }
4356       break;
4357 
4358     case READY_TO_SPLIT:
4359       try {
4360         regionStateListener.onRegionSplit(hri);
4361       } catch (IOException exp) {
4362         errorMsg = StringUtils.stringifyException(exp);
4363       }
4364     case SPLIT_PONR:
4365     case SPLIT:
4366     case SPLIT_REVERTED:
4367       errorMsg =
4368           onRegionSplit(serverName, code, hri, HRegionInfo.convert(transition.getRegionInfo(1)),
4369             HRegionInfo.convert(transition.getRegionInfo(2)));
4370       if (org.apache.commons.lang.StringUtils.isEmpty(errorMsg)) {
4371         try {
4372           regionStateListener.onRegionSplitReverted(hri);
4373         } catch (IOException exp) {
4374           LOG.warn(StringUtils.stringifyException(exp));
4375         }
4376       }
4377       break;
4378     case READY_TO_MERGE:
4379     case MERGE_PONR:
4380     case MERGED:
4381     case MERGE_REVERTED:
4382       errorMsg = onRegionMerge(serverName, code, hri,
4383         HRegionInfo.convert(transition.getRegionInfo(1)),
4384         HRegionInfo.convert(transition.getRegionInfo(2)));
4385       if (code == TransitionCode.MERGED && org.apache.commons.lang.StringUtils.isEmpty(errorMsg)) {
4386         try {
4387           regionStateListener.onRegionMerged(hri);
4388         } catch (IOException exp) {
4389           errorMsg = StringUtils.stringifyException(exp);
4390         }
4391       }
4392       break;
4393 
4394     default:
4395       errorMsg = "Unexpected transition code " + code;
4396     }
4397     if (errorMsg != null) {
4398       LOG.error("Failed to transtion region from " + current + " to "
4399         + code + " by " + serverName + ": " + errorMsg);
4400     }
4401     return errorMsg;
4402   }
4403 
4404   /**
4405    * @return Instance of load balancer
4406    */
4407   public LoadBalancer getBalancer() {
4408     return this.balancer;
4409   }
4410 
4411   public Map<ServerName, List<HRegionInfo>>
4412     getSnapShotOfAssignment(Collection<HRegionInfo> infos) {
4413     return getRegionStates().getRegionAssignments(infos);
4414   }
4415 
4416   void setRegionStateListener(RegionStateListener listener) {
4417     this.regionStateListener = listener;
4418   }
4419 }