View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.util.ArrayList;
24  import java.util.Arrays;
25  import java.util.Collections;
26  import java.util.HashMap;
27  import java.util.HashSet;
28  import java.util.Iterator;
29  import java.util.List;
30  import java.util.Map;
31  import java.util.NavigableMap;
32  import java.util.Set;
33  import java.util.TreeMap;
34  import java.util.concurrent.ConcurrentHashMap;
35  import java.util.concurrent.ConcurrentSkipListSet;
36  import java.util.concurrent.ThreadFactory;
37  import java.util.concurrent.TimeUnit;
38  import java.util.concurrent.atomic.AtomicBoolean;
39  import java.util.concurrent.atomic.AtomicInteger;
40  import java.util.concurrent.locks.Lock;
41  import java.util.concurrent.locks.ReentrantLock;
42  
43  import org.apache.commons.logging.Log;
44  import org.apache.commons.logging.LogFactory;
45  import org.apache.hadoop.classification.InterfaceAudience;
46  import org.apache.hadoop.conf.Configuration;
47  import org.apache.hadoop.hbase.Chore;
48  import org.apache.hadoop.hbase.HBaseIOException;
49  import org.apache.hadoop.hbase.HConstants;
50  import org.apache.hadoop.hbase.HRegionInfo;
51  import org.apache.hadoop.hbase.NotServingRegionException;
52  import org.apache.hadoop.hbase.RegionTransition;
53  import org.apache.hadoop.hbase.Server;
54  import org.apache.hadoop.hbase.ServerName;
55  import org.apache.hadoop.hbase.Stoppable;
56  import org.apache.hadoop.hbase.TableName;
57  import org.apache.hadoop.hbase.TableNotFoundException;
58  import org.apache.hadoop.hbase.catalog.CatalogTracker;
59  import org.apache.hadoop.hbase.catalog.MetaReader;
60  import org.apache.hadoop.hbase.client.Result;
61  import org.apache.hadoop.hbase.exceptions.DeserializationException;
62  import org.apache.hadoop.hbase.executor.EventHandler;
63  import org.apache.hadoop.hbase.executor.EventType;
64  import org.apache.hadoop.hbase.executor.ExecutorService;
65  import org.apache.hadoop.hbase.ipc.RpcClient.FailedServerException;
66  import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
67  import org.apache.hadoop.hbase.master.RegionState.State;
68  import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper;
69  import org.apache.hadoop.hbase.master.balancer.FavoredNodeLoadBalancer;
70  import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler;
71  import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
72  import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
73  import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler;
74  import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
75  import org.apache.hadoop.hbase.regionserver.RegionMergeTransaction;
76  import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
77  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
78  import org.apache.hadoop.hbase.regionserver.SplitTransaction;
79  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
80  import org.apache.hadoop.hbase.util.KeyLocker;
81  import org.apache.hadoop.hbase.util.Pair;
82  import org.apache.hadoop.hbase.util.PairOfSameType;
83  import org.apache.hadoop.hbase.util.Threads;
84  import org.apache.hadoop.hbase.util.Triple;
85  import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker;
86  import org.apache.hadoop.hbase.zookeeper.ZKAssign;
87  import org.apache.hadoop.hbase.zookeeper.ZKTable;
88  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
89  import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
90  import org.apache.hadoop.ipc.RemoteException;
91  import org.apache.zookeeper.AsyncCallback;
92  import org.apache.zookeeper.KeeperException;
93  import org.apache.zookeeper.KeeperException.NoNodeException;
94  import org.apache.zookeeper.KeeperException.NodeExistsException;
95  import org.apache.zookeeper.data.Stat;
96  
97  import com.google.common.base.Preconditions;
98  import com.google.common.collect.LinkedHashMultimap;
99  
100 /**
101  * Manages and performs region assignment.
102  * <p>
103  * Monitors ZooKeeper for events related to regions in transition.
104  * <p>
105  * Handles existing regions in transition during master failover.
106  */
107 @InterfaceAudience.Private
108 public class AssignmentManager extends ZooKeeperListener {
109   private static final Log LOG = LogFactory.getLog(AssignmentManager.class);
110 
111   public static final ServerName HBCK_CODE_SERVERNAME = ServerName.valueOf(HConstants.HBCK_CODE_NAME,
112       -1, -1L);
113 
114   public static final String ASSIGNMENT_TIMEOUT = "hbase.master.assignment.timeoutmonitor.timeout";
115   public static final int DEFAULT_ASSIGNMENT_TIMEOUT_DEFAULT = 600000;
116   public static final String ASSIGNMENT_TIMEOUT_MANAGEMENT = "hbase.assignment.timeout.management";
117   public static final boolean DEFAULT_ASSIGNMENT_TIMEOUT_MANAGEMENT = false;
118 
119   public static final String ALREADY_IN_TRANSITION_WAITTIME
120     = "hbase.assignment.already.intransition.waittime";
121   public static final int DEFAULT_ALREADY_IN_TRANSITION_WAITTIME = 60000; // 1 minute
122 
123   protected final Server server;
124 
125   private ServerManager serverManager;
126 
127   private boolean shouldAssignRegionsWithFavoredNodes;
128 
129   private CatalogTracker catalogTracker;
130 
131   protected final TimeoutMonitor timeoutMonitor;
132 
133   private final TimerUpdater timerUpdater;
134 
135   private LoadBalancer balancer;
136 
137   private final MetricsAssignmentManager metricsAssignmentManager;
138 
139   private final TableLockManager tableLockManager;
140 
141   private AtomicInteger numRegionsOpened = new AtomicInteger(0);
142 
143   final private KeyLocker<String> locker = new KeyLocker<String>();
144 
145   /**
146    * Map of regions to reopen after the schema of a table is changed. Key -
147    * encoded region name, value - HRegionInfo
148    */
149   private final Map <String, HRegionInfo> regionsToReopen;
150 
151   /*
152    * Maximum times we recurse an assignment/unassignment.
153    * See below in {@link #assign()} and {@link #unassign()}.
154    */
155   private final int maximumAttempts;
156 
157   /**
158    * Map of two merging regions from the region to be created.
159    */
160   private final Map<String, PairOfSameType<HRegionInfo>> mergingRegions
161     = new HashMap<String, PairOfSameType<HRegionInfo>>();
162 
163   /**
164    * The sleep time for which the assignment will wait before retrying in case of hbase:meta assignment
165    * failure due to lack of availability of region plan
166    */
167   private final long sleepTimeBeforeRetryingMetaAssignment;
168 
169   /** Plans for region movement. Key is the encoded version of a region name*/
170   // TODO: When do plans get cleaned out?  Ever? In server open and in server
171   // shutdown processing -- St.Ack
172   // All access to this Map must be synchronized.
173   final NavigableMap<String, RegionPlan> regionPlans =
174     new TreeMap<String, RegionPlan>();
175 
176   private final ZKTable zkTable;
177 
178   /**
179    * Contains the server which need to update timer, these servers will be
180    * handled by {@link TimerUpdater}
181    */
182   private final ConcurrentSkipListSet<ServerName> serversInUpdatingTimer;
183 
184   private final ExecutorService executorService;
185 
186   // For unit tests, keep track of calls to ClosedRegionHandler
187   private Map<HRegionInfo, AtomicBoolean> closedRegionHandlerCalled = null;
188 
189   // For unit tests, keep track of calls to OpenedRegionHandler
190   private Map<HRegionInfo, AtomicBoolean> openedRegionHandlerCalled = null;
191 
192   //Thread pool executor service for timeout monitor
193   private java.util.concurrent.ExecutorService threadPoolExecutorService;
194 
195   // A bunch of ZK events workers. Each is a single thread executor service
196   private final java.util.concurrent.ExecutorService zkEventWorkers;
197 
198   private List<EventType> ignoreStatesRSOffline = Arrays.asList(
199       EventType.RS_ZK_REGION_FAILED_OPEN, EventType.RS_ZK_REGION_CLOSED);
200 
201   private final RegionStates regionStates;
202 
203   // The threshold to use bulk assigning. Using bulk assignment
204   // only if assigning at least this many regions to at least this
205   // many servers. If assigning fewer regions to fewer servers,
206   // bulk assigning may be not as efficient.
207   private final int bulkAssignThresholdRegions;
208   private final int bulkAssignThresholdServers;
209 
210   // Should bulk assignment wait till all regions are assigned,
211   // or it is timed out?  This is useful to measure bulk assignment
212   // performance, but not needed in most use cases.
213   private final boolean bulkAssignWaitTillAllAssigned;
214 
215   /**
216    * Indicator that AssignmentManager has recovered the region states so
217    * that ServerShutdownHandler can be fully enabled and re-assign regions
218    * of dead servers. So that when re-assignment happens, AssignmentManager
219    * has proper region states.
220    *
221    * Protected to ease testing.
222    */
223   protected final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
224 
225   /** Is the TimeOutManagement activated **/
226   private final boolean tomActivated;
227 
228   /**
229    * A map to track the count a region fails to open in a row.
230    * So that we don't try to open a region forever if the failure is
231    * unrecoverable.  We don't put this information in region states
232    * because we don't expect this to happen frequently; we don't
233    * want to copy this information over during each state transition either.
234    */
235   private final ConcurrentHashMap<String, AtomicInteger>
236     failedOpenTracker = new ConcurrentHashMap<String, AtomicInteger>();
237 
238   /**
239    * For testing only!  Set to true to skip handling of split.
240    */
241   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
242   public static boolean TEST_SKIP_SPLIT_HANDLING = false;
243 
244   /**
245    * Constructs a new assignment manager.
246    *
247    * @param server
248    * @param serverManager
249    * @param catalogTracker
250    * @param service
251    * @throws KeeperException
252    * @throws IOException
253    */
254   public AssignmentManager(Server server, ServerManager serverManager,
255       CatalogTracker catalogTracker, final LoadBalancer balancer,
256       final ExecutorService service, MetricsMaster metricsMaster,
257       final TableLockManager tableLockManager) throws KeeperException, IOException {
258     super(server.getZooKeeper());
259     this.server = server;
260     this.serverManager = serverManager;
261     this.catalogTracker = catalogTracker;
262     this.executorService = service;
263     this.regionsToReopen = Collections.synchronizedMap
264                            (new HashMap<String, HRegionInfo> ());
265     Configuration conf = server.getConfiguration();
266     // Only read favored nodes if using the favored nodes load balancer.
267     this.shouldAssignRegionsWithFavoredNodes = conf.getClass(
268            HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class).equals(
269            FavoredNodeLoadBalancer.class);
270     this.tomActivated = conf.getBoolean(
271       ASSIGNMENT_TIMEOUT_MANAGEMENT, DEFAULT_ASSIGNMENT_TIMEOUT_MANAGEMENT);
272     if (tomActivated){
273       this.serversInUpdatingTimer =  new ConcurrentSkipListSet<ServerName>();
274       this.timeoutMonitor = new TimeoutMonitor(
275         conf.getInt("hbase.master.assignment.timeoutmonitor.period", 30000),
276         server, serverManager,
277         conf.getInt(ASSIGNMENT_TIMEOUT, DEFAULT_ASSIGNMENT_TIMEOUT_DEFAULT));
278       this.timerUpdater = new TimerUpdater(conf.getInt(
279         "hbase.master.assignment.timerupdater.period", 10000), server);
280       Threads.setDaemonThreadRunning(timerUpdater.getThread(),
281         server.getServerName() + ".timerUpdater");
282     } else {
283       this.serversInUpdatingTimer =  null;
284       this.timeoutMonitor = null;
285       this.timerUpdater = null;
286     }
287     try {
288       this.zkTable = new ZKTable(this.watcher);
289     } catch (InterruptedException e) {
290       throw new InterruptedIOException();
291     }
292     // This is the max attempts, not retries, so it should be at least 1.
293     this.maximumAttempts = Math.max(1,
294       this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10));
295     this.sleepTimeBeforeRetryingMetaAssignment = this.server.getConfiguration().getLong(
296         "hbase.meta.assignment.retry.sleeptime", 1000l);
297     this.balancer = balancer;
298     int maxThreads = conf.getInt("hbase.assignment.threads.max", 30);
299     this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool(
300       maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("AM."));
301     this.regionStates = new RegionStates(server, serverManager);
302 
303     this.bulkAssignWaitTillAllAssigned =
304       conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false);
305     this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7);
306     this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3);
307 
308     int workers = conf.getInt("hbase.assignment.zkevent.workers", 20);
309     ThreadFactory threadFactory = Threads.newDaemonThreadFactory("AM.ZK.Worker");
310     zkEventWorkers = Threads.getBoundedCachedThreadPool(workers, 60L,
311             TimeUnit.SECONDS, threadFactory);
312     this.tableLockManager = tableLockManager;
313 
314     this.metricsAssignmentManager = new MetricsAssignmentManager();
315   }
316 
317   void startTimeOutMonitor() {
318     if (tomActivated) {
319       Threads.setDaemonThreadRunning(timeoutMonitor.getThread(), server.getServerName()
320           + ".timeoutMonitor");
321     }
322   }
323 
324   /**
325    * @return Instance of ZKTable.
326    */
327   public ZKTable getZKTable() {
328     // These are 'expensive' to make involving trip to zk ensemble so allow
329     // sharing.
330     return this.zkTable;
331   }
332 
333   /**
334    * This SHOULD not be public. It is public now
335    * because of some unit tests.
336    *
337    * TODO: make it package private and keep RegionStates in the master package
338    */
339   public RegionStates getRegionStates() {
340     return regionStates;
341   }
342 
343   public RegionPlan getRegionReopenPlan(HRegionInfo hri) {
344     return new RegionPlan(hri, null, regionStates.getRegionServerOfRegion(hri));
345   }
346 
347   /**
348    * Add a regionPlan for the specified region.
349    * @param encodedName
350    * @param plan
351    */
352   public void addPlan(String encodedName, RegionPlan plan) {
353     synchronized (regionPlans) {
354       regionPlans.put(encodedName, plan);
355     }
356   }
357 
358   /**
359    * Add a map of region plans.
360    */
361   public void addPlans(Map<String, RegionPlan> plans) {
362     synchronized (regionPlans) {
363       regionPlans.putAll(plans);
364     }
365   }
366 
367   /**
368    * Set the list of regions that will be reopened
369    * because of an update in table schema
370    *
371    * @param regions
372    *          list of regions that should be tracked for reopen
373    */
374   public void setRegionsToReopen(List <HRegionInfo> regions) {
375     for(HRegionInfo hri : regions) {
376       regionsToReopen.put(hri.getEncodedName(), hri);
377     }
378   }
379 
380   /**
381    * Used by the client to identify if all regions have the schema updates
382    *
383    * @param tableName
384    * @return Pair indicating the status of the alter command
385    * @throws IOException
386    */
387   public Pair<Integer, Integer> getReopenStatus(TableName tableName)
388       throws IOException {
389     List <HRegionInfo> hris =
390       MetaReader.getTableRegions(this.server.getCatalogTracker(), tableName, true);
391     Integer pending = 0;
392     for (HRegionInfo hri : hris) {
393       String name = hri.getEncodedName();
394       // no lock concurrent access ok: sequential consistency respected.
395       if (regionsToReopen.containsKey(name)
396           || regionStates.isRegionInTransition(name)) {
397         pending++;
398       }
399     }
400     return new Pair<Integer, Integer>(pending, hris.size());
401   }
402 
403   /**
404    * Used by ServerShutdownHandler to make sure AssignmentManager has completed
405    * the failover cleanup before re-assigning regions of dead servers. So that
406    * when re-assignment happens, AssignmentManager has proper region states.
407    */
408   public boolean isFailoverCleanupDone() {
409     return failoverCleanupDone.get();
410   }
411 
412   /**
413    * To avoid racing with AM, external entities may need to lock a region,
414    * for example, when SSH checks what regions to skip re-assigning.
415    */
416   public Lock acquireRegionLock(final String encodedName) {
417     return locker.acquireLock(encodedName);
418   }
419 
420   /**
421    * Now, failover cleanup is completed. Notify server manager to
422    * process queued up dead servers processing, if any.
423    */
424   void failoverCleanupDone() {
425     failoverCleanupDone.set(true);
426     serverManager.processQueuedDeadServers();
427   }
428 
429   /**
430    * Called on startup.
431    * Figures whether a fresh cluster start of we are joining extant running cluster.
432    * @throws IOException
433    * @throws KeeperException
434    * @throws InterruptedException
435    */
436   void joinCluster() throws IOException,
437       KeeperException, InterruptedException {
438     // Concurrency note: In the below the accesses on regionsInTransition are
439     // outside of a synchronization block where usually all accesses to RIT are
440     // synchronized.  The presumption is that in this case it is safe since this
441     // method is being played by a single thread on startup.
442 
443     // TODO: Regions that have a null location and are not in regionsInTransitions
444     // need to be handled.
445 
446     // Scan hbase:meta to build list of existing regions, servers, and assignment
447     // Returns servers who have not checked in (assumed dead) and their regions
448     Map<ServerName, List<HRegionInfo>> deadServers = rebuildUserRegions();
449 
450     // This method will assign all user regions if a clean server startup or
451     // it will reconstruct master state and cleanup any leftovers from
452     // previous master process.
453     processDeadServersAndRegionsInTransition(deadServers);
454 
455     recoverTableInDisablingState();
456     recoverTableInEnablingState();
457   }
458 
459   /**
460    * Process all regions that are in transition in zookeeper and also
461    * processes the list of dead servers by scanning the META.
462    * Used by master joining an cluster.  If we figure this is a clean cluster
463    * startup, will assign all user regions.
464    * @param deadServers
465    *          Map of dead servers and their regions. Can be null.
466    * @throws KeeperException
467    * @throws IOException
468    * @throws InterruptedException
469    */
470   void processDeadServersAndRegionsInTransition(
471       final Map<ServerName, List<HRegionInfo>> deadServers)
472           throws KeeperException, IOException, InterruptedException {
473     List<String> nodes = ZKUtil.listChildrenNoWatch(watcher,
474       watcher.assignmentZNode);
475 
476     if (nodes == null) {
477       String errorMessage = "Failed to get the children from ZK";
478       server.abort(errorMessage, new IOException(errorMessage));
479       return;
480     }
481 
482     boolean failover = (!serverManager.getDeadServers().isEmpty() || !serverManager
483         .getRequeuedDeadServers().isEmpty());
484 
485     if (!failover) {
486       // If any one region except meta is assigned, it's a failover.
487       Map<HRegionInfo, ServerName> regions = regionStates.getRegionAssignments();
488       for (HRegionInfo hri: regions.keySet()) {
489         if (!hri.isMetaTable()) {
490           LOG.debug("Found " + hri + " out on cluster");
491           failover = true;
492           break;
493         }
494       }
495       if (!failover) {
496         // If any one region except meta is in transition, it's a failover.
497         for (String encodedName: nodes) {
498           RegionState state = regionStates.getRegionState(encodedName);
499           if (state != null && !state.getRegion().isMetaRegion()) {
500             LOG.debug("Found " + state.getRegion().getRegionNameAsString() + " in RITs");
501             failover = true;
502             break;
503           }
504         }
505       }
506     }
507 
508     // If we found user regions out on cluster, its a failover.
509     if (failover) {
510       LOG.info("Found regions out on cluster or in RIT; presuming failover");
511       // Process list of dead servers and regions in RIT.
512       // See HBASE-4580 for more information.
513       processDeadServersAndRecoverLostRegions(deadServers);
514     } else {
515       // Fresh cluster startup.
516       LOG.info("Clean cluster startup. Assigning userregions");
517       assignAllUserRegions();
518     }
519   }
520 
521   /**
522    * If region is up in zk in transition, then do fixup and block and wait until
523    * the region is assigned and out of transition.  Used on startup for
524    * catalog regions.
525    * @param hri Region to look for.
526    * @return True if we processed a region in transition else false if region
527    * was not up in zk in transition.
528    * @throws InterruptedException
529    * @throws KeeperException
530    * @throws IOException
531    */
532   boolean processRegionInTransitionAndBlockUntilAssigned(final HRegionInfo hri)
533       throws InterruptedException, KeeperException, IOException {
534     String encodedRegionName = hri.getEncodedName();
535     if (!processRegionInTransition(encodedRegionName, hri)) {
536       return false; // The region is not in transition
537     }
538     LOG.debug("Waiting on " + HRegionInfo.prettyPrint(encodedRegionName));
539     while (!this.server.isStopped() &&
540         this.regionStates.isRegionInTransition(encodedRegionName)) {
541       RegionState state = this.regionStates.getRegionTransitionState(encodedRegionName);
542       if (state == null || !serverManager.isServerOnline(state.getServerName())) {
543         // The region is not in transition, or not in transition on an online
544         // server. Doesn't help to block here any more. Caller need to
545         // verify the region is actually assigned.
546         break;
547       }
548       this.regionStates.waitForUpdate(100);
549     }
550     return true;
551   }
552 
553   /**
554    * Process failover of new master for region <code>encodedRegionName</code>
555    * up in zookeeper.
556    * @param encodedRegionName Region to process failover for.
557    * @param regionInfo If null we'll go get it from meta table.
558    * @return True if we processed <code>regionInfo</code> as a RIT.
559    * @throws KeeperException
560    * @throws IOException
561    */
562   boolean processRegionInTransition(final String encodedRegionName,
563       final HRegionInfo regionInfo) throws KeeperException, IOException {
564     // We need a lock here to ensure that we will not put the same region twice
565     // It has no reason to be a lock shared with the other operations.
566     // We can do the lock on the region only, instead of a global lock: what we want to ensure
567     // is that we don't have two threads working on the same region.
568     Lock lock = locker.acquireLock(encodedRegionName);
569     try {
570       Stat stat = new Stat();
571       byte [] data = ZKAssign.getDataAndWatch(watcher, encodedRegionName, stat);
572       if (data == null) return false;
573       RegionTransition rt;
574       try {
575         rt = RegionTransition.parseFrom(data);
576       } catch (DeserializationException e) {
577         LOG.warn("Failed parse znode data", e);
578         return false;
579       }
580       HRegionInfo hri = regionInfo;
581       if (hri == null) {
582         // The region info is not passed in. We will try to find the region
583         // from region states map/meta based on the encoded region name. But we
584         // may not be able to find it. This is valid for online merge that
585         // the region may have not been created if the merge is not completed.
586         // Therefore, it is not in meta at master recovery time.
587         hri = regionStates.getRegionInfo(rt.getRegionName());
588         EventType et = rt.getEventType();
589         if (hri == null && et != EventType.RS_ZK_REGION_MERGING
590             && et != EventType.RS_ZK_REQUEST_REGION_MERGE) {
591           LOG.warn("Couldn't find the region in recovering " + rt);
592           return false;
593         }
594       }
595       return processRegionsInTransition(
596         rt, hri, stat.getVersion());
597     } finally {
598       lock.unlock();
599     }
600   }
601 
602   /**
603    * This call is invoked only (1) master assign meta;
604    * (2) during failover mode startup, zk assignment node processing.
605    * The locker is set in the caller. It returns true if the region
606    * is in transition for sure, false otherwise.
607    *
608    * It should be private but it is used by some test too.
609    */
610   boolean processRegionsInTransition(
611       final RegionTransition rt, final HRegionInfo regionInfo,
612       final int expectedVersion) throws KeeperException {
613     EventType et = rt.getEventType();
614     // Get ServerName.  Could not be null.
615     final ServerName sn = rt.getServerName();
616     final byte[] regionName = rt.getRegionName();
617     final String encodedName = HRegionInfo.encodeRegionName(regionName);
618     final String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
619     LOG.info("Processing " + prettyPrintedRegionName + " in state: " + et);
620 
621     if (regionStates.isRegionInTransition(encodedName)) {
622       LOG.info("Processed region " + prettyPrintedRegionName + " in state: "
623         + et + ", does nothing since the region is already in transition "
624         + regionStates.getRegionTransitionState(encodedName));
625       // Just return
626       return true;
627     }
628     if (!serverManager.isServerOnline(sn)) {
629       // It was transitioning on a dead server, so it's closed now.
630       // Force to OFFLINE and put it in transition, but not assign it
631       // since log splitting for the dead server is not done yet.
632       LOG.debug("RIT " + encodedName + " in state=" + rt.getEventType() +
633         " was on deadserver; forcing offline");
634       if (regionStates.isRegionOnline(regionInfo)) {
635         // Meta could still show the region is assigned to the previous
636         // server. If that server is online, when we reload the meta, the
637         // region is put back to online, we need to offline it.
638         regionStates.regionOffline(regionInfo);
639       }
640       // Put it back in transition so that SSH can re-assign it
641       regionStates.updateRegionState(regionInfo, State.OFFLINE, sn);
642 
643       if (regionInfo.isMetaRegion()) {
644         // If it's meta region, reset the meta location.
645         // So that master knows the right meta region server.
646         MetaRegionTracker.setMetaLocation(watcher, sn);
647       } else {
648         // No matter the previous server is online or offline,
649         // we need to reset the last region server of the region.
650         regionStates.setLastRegionServerOfRegion(sn, encodedName);
651         // Make sure we know the server is dead.
652         if (!serverManager.isServerDead(sn)) {
653           serverManager.expireServer(sn);
654         }
655       }
656       return false;
657     }
658     switch (et) {
659       case M_ZK_REGION_CLOSING:
660         // Insert into RIT & resend the query to the region server: may be the previous master
661         // died before sending the query the first time.
662         final RegionState rsClosing = regionStates.updateRegionState(rt, State.CLOSING);
663         this.executorService.submit(
664           new EventHandler(server, EventType.M_MASTER_RECOVERY) {
665             @Override
666             public void process() throws IOException {
667               ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
668               try {
669                 unassign(regionInfo, rsClosing, expectedVersion, null, true, null);
670                 if (regionStates.isRegionOffline(regionInfo)) {
671                   assign(regionInfo, true);
672                 }
673               } finally {
674                 lock.unlock();
675               }
676             }
677           });
678         break;
679 
680       case RS_ZK_REGION_CLOSED:
681       case RS_ZK_REGION_FAILED_OPEN:
682         // Region is closed, insert into RIT and handle it
683         regionStates.updateRegionState(regionInfo, State.CLOSED, sn);
684         invokeAssign(regionInfo);
685         break;
686 
687       case M_ZK_REGION_OFFLINE:
688         // Insert in RIT and resend to the regionserver
689         regionStates.updateRegionState(rt, State.PENDING_OPEN);
690         final RegionState rsOffline = regionStates.getRegionState(regionInfo);
691         this.executorService.submit(
692           new EventHandler(server, EventType.M_MASTER_RECOVERY) {
693             @Override
694             public void process() throws IOException {
695               ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
696               try {
697                 RegionPlan plan = new RegionPlan(regionInfo, null, sn);
698                 addPlan(encodedName, plan);
699                 assign(rsOffline, false, false);
700               } finally {
701                 lock.unlock();
702               }
703             }
704           });
705         break;
706 
707       case RS_ZK_REGION_OPENING:
708         regionStates.updateRegionState(rt, State.OPENING);
709         break;
710 
711       case RS_ZK_REGION_OPENED:
712         // Region is opened, insert into RIT and handle it
713         // This could be done asynchronously, we would need then to acquire the lock in the
714         //  handler.
715         regionStates.updateRegionState(rt, State.OPEN);
716         new OpenedRegionHandler(server, this, regionInfo, sn, expectedVersion).process();
717         break;
718       case RS_ZK_REQUEST_REGION_SPLIT:
719       case RS_ZK_REGION_SPLITTING:
720       case RS_ZK_REGION_SPLIT:
721         // Splitting region should be online. We could have skipped it during
722         // user region rebuilding since we may consider the split is completed.
723         // Put it in SPLITTING state to avoid complications.
724         regionStates.regionOnline(regionInfo, sn);
725         regionStates.updateRegionState(rt, State.SPLITTING);
726         if (!handleRegionSplitting(
727             rt, encodedName, prettyPrintedRegionName, sn)) {
728           deleteSplittingNode(encodedName, sn);
729         }
730         break;
731       case RS_ZK_REQUEST_REGION_MERGE:
732       case RS_ZK_REGION_MERGING:
733       case RS_ZK_REGION_MERGED:
734         if (!handleRegionMerging(
735             rt, encodedName, prettyPrintedRegionName, sn)) {
736           deleteMergingNode(encodedName, sn);
737         }
738         break;
739       default:
740         throw new IllegalStateException("Received region in state:" + et + " is not valid.");
741     }
742     LOG.info("Processed region " + prettyPrintedRegionName + " in state "
743       + et + ", on " + (serverManager.isServerOnline(sn) ? "" : "dead ")
744       + "server: " + sn);
745     return true;
746   }
747 
748   /**
749    * When a region is closed, it should be removed from the regionsToReopen
750    * @param hri HRegionInfo of the region which was closed
751    */
752   public void removeClosedRegion(HRegionInfo hri) {
753     if (regionsToReopen.remove(hri.getEncodedName()) != null) {
754       LOG.debug("Removed region from reopening regions because it was closed");
755     }
756   }
757 
758   /**
759    * Handles various states an unassigned node can be in.
760    * <p>
761    * Method is called when a state change is suspected for an unassigned node.
762    * <p>
763    * This deals with skipped transitions (we got a CLOSED but didn't see CLOSING
764    * yet).
765    * @param rt
766    * @param expectedVersion
767    */
768   void handleRegion(final RegionTransition rt, int expectedVersion) {
769     if (rt == null) {
770       LOG.warn("Unexpected NULL input for RegionTransition rt");
771       return;
772     }
773     final ServerName sn = rt.getServerName();
774     // Check if this is a special HBCK transition
775     if (sn.equals(HBCK_CODE_SERVERNAME)) {
776       handleHBCK(rt);
777       return;
778     }
779     final long createTime = rt.getCreateTime();
780     final byte[] regionName = rt.getRegionName();
781     String encodedName = HRegionInfo.encodeRegionName(regionName);
782     String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
783     // Verify this is a known server
784     if (!serverManager.isServerOnline(sn)
785       && !ignoreStatesRSOffline.contains(rt.getEventType())) {
786       LOG.warn("Attempted to handle region transition for server but " +
787         "it is not online: " + prettyPrintedRegionName + ", " + rt);
788       return;
789     }
790 
791     RegionState regionState =
792       regionStates.getRegionState(encodedName);
793     long startTime = System.currentTimeMillis();
794     if (LOG.isDebugEnabled()) {
795       boolean lateEvent = createTime < (startTime - 15000);
796       LOG.debug("Handling " + rt.getEventType() +
797         ", server=" + sn + ", region=" +
798         (prettyPrintedRegionName == null ? "null" : prettyPrintedRegionName) +
799         (lateEvent ? ", which is more than 15 seconds late" : "") +
800         ", current_state=" + regionState);
801     }
802     // We don't do anything for this event,
803     // so separate it out, no need to lock/unlock anything
804     if (rt.getEventType() == EventType.M_ZK_REGION_OFFLINE) {
805       return;
806     }
807 
808     // We need a lock on the region as we could update it
809     Lock lock = locker.acquireLock(encodedName);
810     try {
811       RegionState latestState =
812         regionStates.getRegionState(encodedName);
813       if ((regionState == null && latestState != null)
814           || (regionState != null && latestState == null)
815           || (regionState != null && latestState != null
816             && latestState.getState() != regionState.getState())) {
817         LOG.warn("Region state changed from " + regionState + " to "
818           + latestState + ", while acquiring lock");
819       }
820       long waitedTime = System.currentTimeMillis() - startTime;
821       if (waitedTime > 5000) {
822         LOG.warn("Took " + waitedTime + "ms to acquire the lock");
823       }
824       regionState = latestState;
825       switch (rt.getEventType()) {
826       case RS_ZK_REQUEST_REGION_SPLIT:
827       case RS_ZK_REGION_SPLITTING:
828       case RS_ZK_REGION_SPLIT:
829         if (!handleRegionSplitting(
830             rt, encodedName, prettyPrintedRegionName, sn)) {
831           deleteSplittingNode(encodedName, sn);
832         }
833         break;
834 
835       case RS_ZK_REQUEST_REGION_MERGE:
836       case RS_ZK_REGION_MERGING:
837       case RS_ZK_REGION_MERGED:
838         // Merged region is a new region, we can't find it in the region states now.
839         // However, the two merging regions are not new. They should be in state for merging.
840         if (!handleRegionMerging(
841             rt, encodedName, prettyPrintedRegionName, sn)) {
842           deleteMergingNode(encodedName, sn);
843         }
844         break;
845 
846       case M_ZK_REGION_CLOSING:
847         // Should see CLOSING after we have asked it to CLOSE or additional
848         // times after already being in state of CLOSING
849         if (regionState == null
850             || !regionState.isPendingCloseOrClosingOnServer(sn)) {
851           LOG.warn("Received CLOSING for " + prettyPrintedRegionName
852             + " from " + sn + " but the region isn't PENDING_CLOSE/CLOSING here: "
853             + regionStates.getRegionState(encodedName));
854           return;
855         }
856         // Transition to CLOSING (or update stamp if already CLOSING)
857         regionStates.updateRegionState(rt, State.CLOSING);
858         break;
859 
860       case RS_ZK_REGION_CLOSED:
861         // Should see CLOSED after CLOSING but possible after PENDING_CLOSE
862         if (regionState == null
863             || !regionState.isPendingCloseOrClosingOnServer(sn)) {
864           LOG.warn("Received CLOSED for " + prettyPrintedRegionName
865             + " from " + sn + " but the region isn't PENDING_CLOSE/CLOSING here: "
866             + regionStates.getRegionState(encodedName));
867           return;
868         }
869         // Handle CLOSED by assigning elsewhere or stopping if a disable
870         // If we got here all is good.  Need to update RegionState -- else
871         // what follows will fail because not in expected state.
872         new ClosedRegionHandler(server, this, regionState.getRegion()).process();
873         updateClosedRegionHandlerTracker(regionState.getRegion());
874         break;
875 
876         case RS_ZK_REGION_FAILED_OPEN:
877           if (regionState == null
878               || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
879             LOG.warn("Received FAILED_OPEN for " + prettyPrintedRegionName
880               + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
881               + regionStates.getRegionState(encodedName));
882             return;
883           }
884           AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
885           if (failedOpenCount == null) {
886             failedOpenCount = new AtomicInteger();
887             // No need to use putIfAbsent, or extra synchronization since
888             // this whole handleRegion block is locked on the encoded region
889             // name, and failedOpenTracker is updated only in this block
890             failedOpenTracker.put(encodedName, failedOpenCount);
891           }
892           if (failedOpenCount.incrementAndGet() >= maximumAttempts) {
893             regionStates.updateRegionState(rt, State.FAILED_OPEN);
894             // remove the tracking info to save memory, also reset
895             // the count for next open initiative
896             failedOpenTracker.remove(encodedName);
897           } else {
898             // Handle this the same as if it were opened and then closed.
899             regionState = regionStates.updateRegionState(rt, State.CLOSED);
900             if (regionState != null) {
901               // When there are more than one region server a new RS is selected as the
902               // destination and the same is updated in the regionplan. (HBASE-5546)
903               try {
904                 getRegionPlan(regionState.getRegion(), sn, true);
905                 new ClosedRegionHandler(server, this, regionState.getRegion()).process();
906               } catch (HBaseIOException e) {
907                 LOG.warn("Failed to get region plan", e);
908               }
909             }
910           }
911           break;
912 
913         case RS_ZK_REGION_OPENING:
914           // Should see OPENING after we have asked it to OPEN or additional
915           // times after already being in state of OPENING
916           if (regionState == null
917               || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
918             LOG.warn("Received OPENING for " + prettyPrintedRegionName
919               + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
920               + regionStates.getRegionState(encodedName));
921             return;
922           }
923           // Transition to OPENING (or update stamp if already OPENING)
924           regionStates.updateRegionState(rt, State.OPENING);
925           break;
926 
927         case RS_ZK_REGION_OPENED:
928           // Should see OPENED after OPENING but possible after PENDING_OPEN.
929           if (regionState == null
930               || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
931             LOG.warn("Received OPENED for " + prettyPrintedRegionName
932               + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
933               + regionStates.getRegionState(encodedName));
934 
935             if (regionState != null) {
936               // Close it without updating the internal region states,
937               // so as not to create double assignments in unlucky scenarios
938               // mentioned in OpenRegionHandler#process
939               unassign(regionState.getRegion(), null, -1, null, false, sn);
940             }
941             return;
942           }
943           // Handle OPENED by removing from transition and deleted zk node
944           regionState = regionStates.updateRegionState(rt, State.OPEN);
945           if (regionState != null) {
946             failedOpenTracker.remove(encodedName); // reset the count, if any
947             new OpenedRegionHandler(
948               server, this, regionState.getRegion(), sn, expectedVersion).process();
949             updateOpenedRegionHandlerTracker(regionState.getRegion());
950           }
951           break;
952 
953         default:
954           throw new IllegalStateException("Received event is not valid.");
955       }
956     } finally {
957       lock.unlock();
958     }
959   }
960 
961   //For unit tests only
962   boolean wasClosedHandlerCalled(HRegionInfo hri) {
963     AtomicBoolean b = closedRegionHandlerCalled.get(hri);
964     //compareAndSet to be sure that unit tests don't see stale values. Means,
965     //we will return true exactly once unless the handler code resets to true
966     //this value.
967     return b == null ? false : b.compareAndSet(true, false);
968   }
969 
970   //For unit tests only
971   boolean wasOpenedHandlerCalled(HRegionInfo hri) {
972     AtomicBoolean b = openedRegionHandlerCalled.get(hri);
973     //compareAndSet to be sure that unit tests don't see stale values. Means,
974     //we will return true exactly once unless the handler code resets to true
975     //this value.
976     return b == null ? false : b.compareAndSet(true, false);
977   }
978 
979   //For unit tests only
980   void initializeHandlerTrackers() {
981     closedRegionHandlerCalled = new HashMap<HRegionInfo, AtomicBoolean>();
982     openedRegionHandlerCalled = new HashMap<HRegionInfo, AtomicBoolean>();
983   }
984 
985   void updateClosedRegionHandlerTracker(HRegionInfo hri) {
986     if (closedRegionHandlerCalled != null) { //only for unit tests this is true
987       closedRegionHandlerCalled.put(hri, new AtomicBoolean(true));
988     }
989   }
990 
991   void updateOpenedRegionHandlerTracker(HRegionInfo hri) {
992     if (openedRegionHandlerCalled != null) { //only for unit tests this is true
993       openedRegionHandlerCalled.put(hri, new AtomicBoolean(true));
994     }
995   }
996 
997   // TODO: processFavoredNodes might throw an exception, for e.g., if the
998   // meta could not be contacted/updated. We need to see how seriously to treat
999   // this problem as. Should we fail the current assignment. We should be able
1000   // to recover from this problem eventually (if the meta couldn't be updated
1001   // things should work normally and eventually get fixed up).
1002   void processFavoredNodes(List<HRegionInfo> regions) throws IOException {
1003     if (!shouldAssignRegionsWithFavoredNodes) return;
1004     // The AM gets the favored nodes info for each region and updates the meta
1005     // table with that info
1006     Map<HRegionInfo, List<ServerName>> regionToFavoredNodes =
1007         new HashMap<HRegionInfo, List<ServerName>>();
1008     for (HRegionInfo region : regions) {
1009       regionToFavoredNodes.put(region,
1010           ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region));
1011     }
1012     FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(regionToFavoredNodes, catalogTracker);
1013   }
1014 
1015   /**
1016    * Handle a ZK unassigned node transition triggered by HBCK repair tool.
1017    * <p>
1018    * This is handled in a separate code path because it breaks the normal rules.
1019    * @param rt
1020    */
1021   private void handleHBCK(RegionTransition rt) {
1022     String encodedName = HRegionInfo.encodeRegionName(rt.getRegionName());
1023     LOG.info("Handling HBCK triggered transition=" + rt.getEventType() +
1024       ", server=" + rt.getServerName() + ", region=" +
1025       HRegionInfo.prettyPrint(encodedName));
1026     RegionState regionState = regionStates.getRegionTransitionState(encodedName);
1027     switch (rt.getEventType()) {
1028       case M_ZK_REGION_OFFLINE:
1029         HRegionInfo regionInfo;
1030         if (regionState != null) {
1031           regionInfo = regionState.getRegion();
1032         } else {
1033           try {
1034             byte [] name = rt.getRegionName();
1035             Pair<HRegionInfo, ServerName> p = MetaReader.getRegion(catalogTracker, name);
1036             regionInfo = p.getFirst();
1037           } catch (IOException e) {
1038             LOG.info("Exception reading hbase:meta doing HBCK repair operation", e);
1039             return;
1040           }
1041         }
1042         LOG.info("HBCK repair is triggering assignment of region=" +
1043             regionInfo.getRegionNameAsString());
1044         // trigger assign, node is already in OFFLINE so don't need to update ZK
1045         assign(regionInfo, false);
1046         break;
1047 
1048       default:
1049         LOG.warn("Received unexpected region state from HBCK: " + rt.toString());
1050         break;
1051     }
1052 
1053   }
1054 
1055   // ZooKeeper events
1056 
1057   /**
1058    * New unassigned node has been created.
1059    *
1060    * <p>This happens when an RS begins the OPENING or CLOSING of a region by
1061    * creating an unassigned node.
1062    *
1063    * <p>When this happens we must:
1064    * <ol>
1065    *   <li>Watch the node for further events</li>
1066    *   <li>Read and handle the state in the node</li>
1067    * </ol>
1068    */
1069   @Override
1070   public void nodeCreated(String path) {
1071     handleAssignmentEvent(path);
1072   }
1073 
1074   /**
1075    * Existing unassigned node has had data changed.
1076    *
1077    * <p>This happens when an RS transitions from OFFLINE to OPENING, or between
1078    * OPENING/OPENED and CLOSING/CLOSED.
1079    *
1080    * <p>When this happens we must:
1081    * <ol>
1082    *   <li>Watch the node for further events</li>
1083    *   <li>Read and handle the state in the node</li>
1084    * </ol>
1085    */
1086   @Override
1087   public void nodeDataChanged(String path) {
1088     handleAssignmentEvent(path);
1089   }
1090 
1091 
1092   // We  don't want to have two events on the same region managed simultaneously.
1093   // For this reason, we need to wait if an event on the same region is currently in progress.
1094   // So we track the region names of the events in progress, and we keep a waiting list.
1095   private final Set<String> regionsInProgress = new HashSet<String>();
1096   // In a LinkedHashMultimap, the put order is kept when we retrieve the collection back. We need
1097   //  this as we want the events to be managed in the same order as we received them.
1098   private final LinkedHashMultimap <String, RegionRunnable>
1099       zkEventWorkerWaitingList = LinkedHashMultimap.create();
1100 
1101   /**
1102    * A specific runnable that works only on a region.
1103    */
1104   private interface RegionRunnable extends Runnable{
1105     /**
1106      * @return - the name of the region it works on.
1107      */
1108     String getRegionName();
1109   }
1110 
1111   /**
1112    * Submit a task, ensuring that there is only one task at a time that working on a given region.
1113    * Order is respected.
1114    */
1115   protected void zkEventWorkersSubmit(final RegionRunnable regRunnable) {
1116 
1117     synchronized (regionsInProgress) {
1118       // If we're there is already a task with this region, we add it to the
1119       //  waiting list and return.
1120       if (regionsInProgress.contains(regRunnable.getRegionName())) {
1121         synchronized (zkEventWorkerWaitingList){
1122           zkEventWorkerWaitingList.put(regRunnable.getRegionName(), regRunnable);
1123         }
1124         return;
1125       }
1126 
1127       // No event in progress on this region => we can submit a new task immediately.
1128       regionsInProgress.add(regRunnable.getRegionName());
1129       zkEventWorkers.submit(new Runnable() {
1130         @Override
1131         public void run() {
1132           try {
1133             regRunnable.run();
1134           } finally {
1135             // now that we have finished, let's see if there is an event for the same region in the
1136             //  waiting list. If it's the case, we can now submit it to the pool.
1137             synchronized (regionsInProgress) {
1138               regionsInProgress.remove(regRunnable.getRegionName());
1139               synchronized (zkEventWorkerWaitingList) {
1140                 java.util.Set<RegionRunnable> waiting = zkEventWorkerWaitingList.get(
1141                     regRunnable.getRegionName());
1142                 if (!waiting.isEmpty()) {
1143                   // We want the first object only. The only way to get it is through an iterator.
1144                   RegionRunnable toSubmit = waiting.iterator().next();
1145                   zkEventWorkerWaitingList.remove(toSubmit.getRegionName(), toSubmit);
1146                   zkEventWorkersSubmit(toSubmit);
1147                 }
1148               }
1149             }
1150           }
1151         }
1152       });
1153     }
1154   }
1155 
1156   @Override
1157   public void nodeDeleted(final String path) {
1158     if (path.startsWith(watcher.assignmentZNode)) {
1159       final String regionName = ZKAssign.getRegionName(watcher, path);
1160       zkEventWorkersSubmit(new RegionRunnable() {
1161         @Override
1162         public String getRegionName() {
1163           return regionName;
1164         }
1165 
1166         @Override
1167         public void run() {
1168           Lock lock = locker.acquireLock(regionName);
1169           try {
1170             RegionState rs = regionStates.getRegionTransitionState(regionName);
1171             if (rs == null) {
1172               rs = regionStates.getRegionState(regionName);
1173               if (rs == null || !rs.isMergingNew()) {
1174                 // MergingNew is an offline state
1175                 return;
1176               }
1177             }
1178 
1179             HRegionInfo regionInfo = rs.getRegion();
1180             String regionNameStr = regionInfo.getRegionNameAsString();
1181             LOG.debug("Znode " + regionNameStr + " deleted, state: " + rs);
1182             boolean disabled = getZKTable().isDisablingOrDisabledTable(regionInfo.getTable());
1183             ServerName serverName = rs.getServerName();
1184             if (serverManager.isServerOnline(serverName)) {
1185               if (rs.isOnServer(serverName)
1186                   && (rs.isOpened() || rs.isSplitting())) {
1187                 regionOnline(regionInfo, serverName);
1188                 if (disabled) {
1189                   // if server is offline, no hurt to unassign again
1190                   LOG.info("Opened " + regionNameStr
1191                     + "but this table is disabled, triggering close of region");
1192                   unassign(regionInfo);
1193                 }
1194               } else if (rs.isMergingNew()) {
1195                 synchronized (regionStates) {
1196                   String p = regionInfo.getEncodedName();
1197                   PairOfSameType<HRegionInfo> regions = mergingRegions.get(p);
1198                   if (regions != null) {
1199                     onlineMergingRegion(disabled, regions.getFirst(), serverName);
1200                     onlineMergingRegion(disabled, regions.getSecond(), serverName);
1201                   }
1202                 }
1203               }
1204             }
1205           } finally {
1206             lock.unlock();
1207           }
1208         }
1209 
1210         private void onlineMergingRegion(boolean disabled,
1211             final HRegionInfo hri, final ServerName serverName) {
1212           RegionState regionState = regionStates.getRegionState(hri);
1213           if (regionState != null && regionState.isMerging()
1214               && regionState.isOnServer(serverName)) {
1215             regionOnline(regionState.getRegion(), serverName);
1216             if (disabled) {
1217               unassign(hri);
1218             }
1219           }
1220         }
1221       });
1222     }
1223   }
1224 
1225   /**
1226    * New unassigned node has been created.
1227    *
1228    * <p>This happens when an RS begins the OPENING, SPLITTING or CLOSING of a
1229    * region by creating a znode.
1230    *
1231    * <p>When this happens we must:
1232    * <ol>
1233    *   <li>Watch the node for further children changed events</li>
1234    *   <li>Watch all new children for changed events</li>
1235    * </ol>
1236    */
1237   @Override
1238   public void nodeChildrenChanged(String path) {
1239     if (path.equals(watcher.assignmentZNode)) {
1240       zkEventWorkers.submit(new Runnable() {
1241         @Override
1242         public void run() {
1243           try {
1244             // Just make sure we see the changes for the new znodes
1245             List<String> children =
1246               ZKUtil.listChildrenAndWatchForNewChildren(
1247                 watcher, watcher.assignmentZNode);
1248             if (children != null) {
1249               Stat stat = new Stat();
1250               for (String child : children) {
1251                 // if region is in transition, we already have a watch
1252                 // on it, so no need to watch it again. So, as I know for now,
1253                 // this is needed to watch splitting nodes only.
1254                 if (!regionStates.isRegionInTransition(child)) {
1255                   ZKAssign.getDataAndWatch(watcher, child, stat);
1256                 }
1257               }
1258             }
1259           } catch (KeeperException e) {
1260             server.abort("Unexpected ZK exception reading unassigned children", e);
1261           }
1262         }
1263       });
1264     }
1265   }
1266 
1267   /**
1268    * Marks the region as online.  Removes it from regions in transition and
1269    * updates the in-memory assignment information.
1270    * <p>
1271    * Used when a region has been successfully opened on a region server.
1272    * @param regionInfo
1273    * @param sn
1274    */
1275   void regionOnline(HRegionInfo regionInfo, ServerName sn) {
1276     numRegionsOpened.incrementAndGet();
1277     regionStates.regionOnline(regionInfo, sn);
1278 
1279     // Remove plan if one.
1280     clearRegionPlan(regionInfo);
1281     // Add the server to serversInUpdatingTimer
1282     addToServersInUpdatingTimer(sn);
1283     balancer.regionOnline(regionInfo, sn);
1284   }
1285 
1286   /**
1287    * Pass the assignment event to a worker for processing.
1288    * Each worker is a single thread executor service.  The reason
1289    * for just one thread is to make sure all events for a given
1290    * region are processed in order.
1291    *
1292    * @param path
1293    */
1294   private void handleAssignmentEvent(final String path) {
1295     if (path.startsWith(watcher.assignmentZNode)) {
1296       final String regionName = ZKAssign.getRegionName(watcher, path);
1297 
1298       zkEventWorkersSubmit(new RegionRunnable() {
1299         @Override
1300         public String getRegionName() {
1301           return regionName;
1302         }
1303 
1304         @Override
1305         public void run() {
1306           try {
1307             Stat stat = new Stat();
1308             byte [] data = ZKAssign.getDataAndWatch(watcher, path, stat);
1309             if (data == null) return;
1310 
1311             RegionTransition rt = RegionTransition.parseFrom(data);
1312             handleRegion(rt, stat.getVersion());
1313           } catch (KeeperException e) {
1314             server.abort("Unexpected ZK exception reading unassigned node data", e);
1315           } catch (DeserializationException e) {
1316             server.abort("Unexpected exception deserializing node data", e);
1317           }
1318         }
1319       });
1320     }
1321   }
1322 
1323   /**
1324    * Add the server to the set serversInUpdatingTimer, then {@link TimerUpdater}
1325    * will update timers for this server in background
1326    * @param sn
1327    */
1328   private void addToServersInUpdatingTimer(final ServerName sn) {
1329     if (tomActivated){
1330       this.serversInUpdatingTimer.add(sn);
1331     }
1332   }
1333 
1334   /**
1335    * Touch timers for all regions in transition that have the passed
1336    * <code>sn</code> in common.
1337    * Call this method whenever a server checks in.  Doing so helps the case where
1338    * a new regionserver has joined the cluster and its been given 1k regions to
1339    * open.  If this method is tickled every time the region reports in a
1340    * successful open then the 1k-th region won't be timed out just because its
1341    * sitting behind the open of 999 other regions.  This method is NOT used
1342    * as part of bulk assign -- there we have a different mechanism for extending
1343    * the regions in transition timer (we turn it off temporarily -- because
1344    * there is no regionplan involved when bulk assigning.
1345    * @param sn
1346    */
1347   private void updateTimers(final ServerName sn) {
1348     Preconditions.checkState(tomActivated);
1349     if (sn == null) return;
1350 
1351     // This loop could be expensive.
1352     // First make a copy of current regionPlan rather than hold sync while
1353     // looping because holding sync can cause deadlock.  Its ok in this loop
1354     // if the Map we're going against is a little stale
1355     List<Map.Entry<String, RegionPlan>> rps;
1356     synchronized(this.regionPlans) {
1357       rps = new ArrayList<Map.Entry<String, RegionPlan>>(regionPlans.entrySet());
1358     }
1359 
1360     for (Map.Entry<String, RegionPlan> e : rps) {
1361       if (e.getValue() != null && e.getKey() != null && sn.equals(e.getValue().getDestination())) {
1362         RegionState regionState = regionStates.getRegionTransitionState(e.getKey());
1363         if (regionState != null) {
1364           regionState.updateTimestampToNow();
1365         }
1366       }
1367     }
1368   }
1369 
1370   /**
1371    * Marks the region as offline.  Removes it from regions in transition and
1372    * removes in-memory assignment information.
1373    * <p>
1374    * Used when a region has been closed and should remain closed.
1375    * @param regionInfo
1376    */
1377   public void regionOffline(final HRegionInfo regionInfo) {
1378     regionOffline(regionInfo, null);
1379   }
1380 
1381   public void offlineDisabledRegion(HRegionInfo regionInfo) {
1382     // Disabling so should not be reassigned, just delete the CLOSED node
1383     LOG.debug("Table being disabled so deleting ZK node and removing from " +
1384       "regions in transition, skipping assignment of region " +
1385         regionInfo.getRegionNameAsString());
1386     String encodedName = regionInfo.getEncodedName();
1387     deleteNodeInStates(encodedName, "closed", null,
1388       EventType.RS_ZK_REGION_CLOSED, EventType.M_ZK_REGION_OFFLINE);
1389     regionOffline(regionInfo);
1390   }
1391 
1392   // Assignment methods
1393 
1394   /**
1395    * Assigns the specified region.
1396    * <p>
1397    * If a RegionPlan is available with a valid destination then it will be used
1398    * to determine what server region is assigned to.  If no RegionPlan is
1399    * available, region will be assigned to a random available server.
1400    * <p>
1401    * Updates the RegionState and sends the OPEN RPC.
1402    * <p>
1403    * This will only succeed if the region is in transition and in a CLOSED or
1404    * OFFLINE state or not in transition (in-memory not zk), and of course, the
1405    * chosen server is up and running (It may have just crashed!).  If the
1406    * in-memory checks pass, the zk node is forced to OFFLINE before assigning.
1407    *
1408    * @param region server to be assigned
1409    * @param setOfflineInZK whether ZK node should be created/transitioned to an
1410    *                       OFFLINE state before assigning the region
1411    */
1412   public void assign(HRegionInfo region, boolean setOfflineInZK) {
1413     assign(region, setOfflineInZK, false);
1414   }
1415 
1416   /**
1417    * Use care with forceNewPlan. It could cause double assignment.
1418    */
1419   public void assign(HRegionInfo region,
1420       boolean setOfflineInZK, boolean forceNewPlan) {
1421     if (isDisabledorDisablingRegionInRIT(region)) {
1422       return;
1423     }
1424     if (this.serverManager.isClusterShutdown()) {
1425       LOG.info("Cluster shutdown is set; skipping assign of " +
1426         region.getRegionNameAsString());
1427       return;
1428     }
1429     String encodedName = region.getEncodedName();
1430     Lock lock = locker.acquireLock(encodedName);
1431     try {
1432       RegionState state = forceRegionStateToOffline(region, forceNewPlan);
1433       if (state != null) {
1434         if (regionStates.wasRegionOnDeadServer(encodedName)) {
1435           LOG.info("Skip assigning " + region.getRegionNameAsString()
1436             + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
1437             + " is dead but not processed yet");
1438           return;
1439         }
1440         assign(state, setOfflineInZK, forceNewPlan);
1441       }
1442     } finally {
1443       lock.unlock();
1444     }
1445   }
1446 
1447   /**
1448    * Bulk assign regions to <code>destination</code>.
1449    * @param destination
1450    * @param regions Regions to assign.
1451    * @return true if successful
1452    */
1453   boolean assign(final ServerName destination, final List<HRegionInfo> regions)
1454     throws InterruptedException {
1455     long startTime = EnvironmentEdgeManager.currentTimeMillis();
1456     try {
1457       int regionCount = regions.size();
1458       if (regionCount == 0) {
1459         return true;
1460       }
1461       LOG.debug("Assigning " + regionCount + " region(s) to " + destination.toString());
1462       Set<String> encodedNames = new HashSet<String>(regionCount);
1463       for (HRegionInfo region : regions) {
1464         encodedNames.add(region.getEncodedName());
1465       }
1466 
1467       List<HRegionInfo> failedToOpenRegions = new ArrayList<HRegionInfo>();
1468       Map<String, Lock> locks = locker.acquireLocks(encodedNames);
1469       try {
1470         AtomicInteger counter = new AtomicInteger(0);
1471         Map<String, Integer> offlineNodesVersions = new ConcurrentHashMap<String, Integer>();
1472         OfflineCallback cb = new OfflineCallback(
1473           watcher, destination, counter, offlineNodesVersions);
1474         Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>(regions.size());
1475         List<RegionState> states = new ArrayList<RegionState>(regions.size());
1476         for (HRegionInfo region : regions) {
1477           String encodedName = region.getEncodedName();
1478           if (!isDisabledorDisablingRegionInRIT(region)) {
1479             RegionState state = forceRegionStateToOffline(region, false);
1480             boolean onDeadServer = false;
1481             if (state != null) {
1482               if (regionStates.wasRegionOnDeadServer(encodedName)) {
1483                 LOG.info("Skip assigning " + region.getRegionNameAsString()
1484                   + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
1485                   + " is dead but not processed yet");
1486                 onDeadServer = true;
1487               } else if (asyncSetOfflineInZooKeeper(state, cb, destination)) {
1488                 RegionPlan plan = new RegionPlan(region, state.getServerName(), destination);
1489                 plans.put(encodedName, plan);
1490                 states.add(state);
1491                 continue;
1492               }
1493             }
1494             // Reassign if the region wasn't on a dead server
1495             if (!onDeadServer) {
1496               LOG.info("failed to force region state to offline or "
1497                 + "failed to set it offline in ZK, will reassign later: " + region);
1498               failedToOpenRegions.add(region); // assign individually later
1499             }
1500           }
1501           // Release the lock, this region is excluded from bulk assign because
1502           // we can't update its state, or set its znode to offline.
1503           Lock lock = locks.remove(encodedName);
1504           lock.unlock();
1505         }
1506 
1507         // Wait until all unassigned nodes have been put up and watchers set.
1508         int total = states.size();
1509         for (int oldCounter = 0; !server.isStopped();) {
1510           int count = counter.get();
1511           if (oldCounter != count) {
1512             LOG.info(destination.toString() + " unassigned znodes=" + count +
1513               " of total=" + total);
1514             oldCounter = count;
1515           }
1516           if (count >= total) break;
1517           Thread.sleep(5);
1518         }
1519 
1520         if (server.isStopped()) {
1521           return false;
1522         }
1523 
1524         // Add region plans, so we can updateTimers when one region is opened so
1525         // that unnecessary timeout on RIT is reduced.
1526         this.addPlans(plans);
1527 
1528         List<Triple<HRegionInfo, Integer, List<ServerName>>> regionOpenInfos =
1529           new ArrayList<Triple<HRegionInfo, Integer, List<ServerName>>>(states.size());
1530         for (RegionState state: states) {
1531           HRegionInfo region = state.getRegion();
1532           String encodedRegionName = region.getEncodedName();
1533           Integer nodeVersion = offlineNodesVersions.get(encodedRegionName);
1534           if (nodeVersion == null || nodeVersion == -1) {
1535             LOG.warn("failed to offline in zookeeper: " + region);
1536             failedToOpenRegions.add(region); // assign individually later
1537             Lock lock = locks.remove(encodedRegionName);
1538             lock.unlock();
1539           } else {
1540             regionStates.updateRegionState(
1541               region, State.PENDING_OPEN, destination);
1542             List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
1543             if (this.shouldAssignRegionsWithFavoredNodes) {
1544               favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
1545             }
1546             regionOpenInfos.add(new Triple<HRegionInfo, Integer,  List<ServerName>>(
1547               region, nodeVersion, favoredNodes));
1548           }
1549         }
1550 
1551         // Move on to open regions.
1552         try {
1553           // Send OPEN RPC. If it fails on a IOE or RemoteException,
1554           // regions will be assigned individually.
1555           long maxWaitTime = System.currentTimeMillis() +
1556             this.server.getConfiguration().
1557               getLong("hbase.regionserver.rpc.startup.waittime", 60000);
1558           for (int i = 1; i <= maximumAttempts && !server.isStopped(); i++) {
1559             try {
1560               List<RegionOpeningState> regionOpeningStateList = serverManager
1561                 .sendRegionOpen(destination, regionOpenInfos);
1562               if (regionOpeningStateList == null) {
1563                 // Failed getting RPC connection to this server
1564                 return false;
1565               }
1566               for (int k = 0, n = regionOpeningStateList.size(); k < n; k++) {
1567                 RegionOpeningState openingState = regionOpeningStateList.get(k);
1568                 if (openingState != RegionOpeningState.OPENED) {
1569                   HRegionInfo region = regionOpenInfos.get(k).getFirst();
1570                   if (openingState == RegionOpeningState.ALREADY_OPENED) {
1571                     processAlreadyOpenedRegion(region, destination);
1572                   } else if (openingState == RegionOpeningState.FAILED_OPENING) {
1573                     // Failed opening this region, reassign it later
1574                     failedToOpenRegions.add(region);
1575                   } else {
1576                     LOG.warn("THIS SHOULD NOT HAPPEN: unknown opening state "
1577                       + openingState + " in assigning region " + region);
1578                   }
1579                 }
1580               }
1581               break;
1582             } catch (IOException e) {
1583               if (e instanceof RemoteException) {
1584                 e = ((RemoteException)e).unwrapRemoteException();
1585               }
1586               if (e instanceof RegionServerStoppedException) {
1587                 LOG.warn("The region server was shut down, ", e);
1588                 // No need to retry, the region server is a goner.
1589                 return false;
1590               } else if (e instanceof ServerNotRunningYetException) {
1591                 long now = System.currentTimeMillis();
1592                 if (now < maxWaitTime) {
1593                   LOG.debug("Server is not yet up; waiting up to " +
1594                     (maxWaitTime - now) + "ms", e);
1595                   Thread.sleep(100);
1596                   i--; // reset the try count
1597                   continue;
1598                 }
1599               } else if (e instanceof java.net.SocketTimeoutException
1600                   && this.serverManager.isServerOnline(destination)) {
1601                 // In case socket is timed out and the region server is still online,
1602                 // the openRegion RPC could have been accepted by the server and
1603                 // just the response didn't go through.  So we will retry to
1604                 // open the region on the same server.
1605                 if (LOG.isDebugEnabled()) {
1606                   LOG.debug("Bulk assigner openRegion() to " + destination
1607                     + " has timed out, but the regions might"
1608                     + " already be opened on it.", e);
1609                 }
1610                 continue;
1611               }
1612               throw e;
1613             }
1614           }
1615         } catch (IOException e) {
1616           // Can be a socket timeout, EOF, NoRouteToHost, etc
1617           LOG.info("Unable to communicate with " + destination
1618             + " in order to assign regions, ", e);
1619           return false;
1620         }
1621       } finally {
1622         for (Lock lock : locks.values()) {
1623           lock.unlock();
1624         }
1625       }
1626 
1627       if (!failedToOpenRegions.isEmpty()) {
1628         for (HRegionInfo region : failedToOpenRegions) {
1629           if (!regionStates.isRegionOnline(region)) {
1630             invokeAssign(region);
1631           }
1632         }
1633       }
1634       LOG.debug("Bulk assigning done for " + destination);
1635       return true;
1636     } finally {
1637       metricsAssignmentManager.updateBulkAssignTime(EnvironmentEdgeManager.currentTimeMillis() - startTime);
1638     }
1639   }
1640 
1641   /**
1642    * Send CLOSE RPC if the server is online, otherwise, offline the region.
1643    *
1644    * The RPC will be sent only to the region sever found in the region state
1645    * if it is passed in, otherwise, to the src server specified. If region
1646    * state is not specified, we don't update region state at all, instead
1647    * we just send the RPC call. This is useful for some cleanup without
1648    * messing around the region states (see handleRegion, on region opened
1649    * on an unexpected server scenario, for an example)
1650    */
1651   private void unassign(final HRegionInfo region,
1652       final RegionState state, final int versionOfClosingNode,
1653       final ServerName dest, final boolean transitionInZK,
1654       final ServerName src) {
1655     ServerName server = src;
1656     if (state != null) {
1657       server = state.getServerName();
1658     }
1659     long maxWaitTime = -1;
1660     for (int i = 1; i <= this.maximumAttempts; i++) {
1661       if (this.server.isStopped() || this.server.isAborted()) {
1662         LOG.debug("Server stopped/aborted; skipping unassign of " + region);
1663         return;
1664       }
1665       // ClosedRegionhandler can remove the server from this.regions
1666       if (!serverManager.isServerOnline(server)) {
1667         LOG.debug("Offline " + region.getRegionNameAsString()
1668           + ", no need to unassign since it's on a dead server: " + server);
1669         if (transitionInZK) {
1670           // delete the node. if no node exists need not bother.
1671           deleteClosingOrClosedNode(region, server);
1672         }
1673         if (state != null) {
1674           regionOffline(region);
1675         }
1676         return;
1677       }
1678       try {
1679         // Send CLOSE RPC
1680         if (serverManager.sendRegionClose(server, region,
1681           versionOfClosingNode, dest, transitionInZK)) {
1682           LOG.debug("Sent CLOSE to " + server + " for region " +
1683             region.getRegionNameAsString());
1684           if (!transitionInZK && state != null) {
1685             // Retry to make sure the region is
1686             // closed so as to avoid double assignment.
1687             unassign(region, state, versionOfClosingNode,
1688               dest, transitionInZK,src);
1689           }
1690           return;
1691         }
1692         // This never happens. Currently regionserver close always return true.
1693         // Todo; this can now happen (0.96) if there is an exception in a coprocessor
1694         LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
1695           region.getRegionNameAsString());
1696       } catch (Throwable t) {
1697         if (t instanceof RemoteException) {
1698           t = ((RemoteException)t).unwrapRemoteException();
1699         }
1700         if (t instanceof NotServingRegionException
1701             || t instanceof RegionServerStoppedException
1702             || t instanceof ServerNotRunningYetException
1703             || t instanceof FailedServerException) {
1704           LOG.debug("Offline " + region.getRegionNameAsString()
1705             + ", it's not any more on " + server, t);
1706           if (transitionInZK) {
1707             deleteClosingOrClosedNode(region, server);
1708           }
1709           if (state != null) {
1710             regionOffline(region);
1711           }
1712           return;
1713         } else if (state != null
1714             && t instanceof RegionAlreadyInTransitionException) {
1715           // RS is already processing this region, only need to update the timestamp
1716           LOG.debug("update " + state + " the timestamp.");
1717           state.updateTimestampToNow();
1718           if (maxWaitTime < 0) {
1719             maxWaitTime = EnvironmentEdgeManager.currentTimeMillis()
1720               + this.server.getConfiguration().getLong(ALREADY_IN_TRANSITION_WAITTIME,
1721                 DEFAULT_ALREADY_IN_TRANSITION_WAITTIME);
1722           }
1723           try {
1724             long now = EnvironmentEdgeManager.currentTimeMillis();
1725             if (now < maxWaitTime) {
1726               LOG.debug("Region is already in transition; "
1727                 + "waiting up to " + (maxWaitTime - now) + "ms", t);
1728               Thread.sleep(100);
1729               i--; // reset the try count
1730             }
1731           } catch (InterruptedException ie) {
1732             LOG.warn("Failed to unassign "
1733               + region.getRegionNameAsString() + " since interrupted", ie);
1734             Thread.currentThread().interrupt();
1735             if (!tomActivated) {
1736               regionStates.updateRegionState(region, State.FAILED_CLOSE);
1737             }
1738             return;
1739           }
1740         } else {
1741           LOG.info("Server " + server + " returned " + t + " for "
1742             + region.getRegionNameAsString() + ", try=" + i
1743             + " of " + this.maximumAttempts, t);
1744           // Presume retry or server will expire.
1745         }
1746       }
1747     }
1748     // Run out of attempts
1749     if (!tomActivated && state != null) {
1750       regionStates.updateRegionState(region, State.FAILED_CLOSE);
1751     }
1752   }
1753 
1754   /**
1755    * Set region to OFFLINE unless it is opening and forceNewPlan is false.
1756    */
1757   private RegionState forceRegionStateToOffline(
1758       final HRegionInfo region, final boolean forceNewPlan) {
1759     RegionState state = regionStates.getRegionState(region);
1760     if (state == null) {
1761       LOG.warn("Assigning a region not in region states: " + region);
1762       state = regionStates.createRegionState(region);
1763     }
1764 
1765     ServerName sn = state.getServerName();
1766     if (forceNewPlan && LOG.isDebugEnabled()) {
1767       LOG.debug("Force region state offline " + state);
1768     }
1769 
1770     switch (state.getState()) {
1771     case OPEN:
1772     case OPENING:
1773     case PENDING_OPEN:
1774     case CLOSING:
1775     case PENDING_CLOSE:
1776       if (!forceNewPlan) {
1777         LOG.debug("Skip assigning " +
1778           region + ", it is already " + state);
1779         return null;
1780       }
1781     case FAILED_CLOSE:
1782     case FAILED_OPEN:
1783       unassign(region, state, -1, null, false, null);
1784       state = regionStates.getRegionState(region);
1785       if (state.isFailedClose()) {
1786         // If we can't close the region, we can't re-assign
1787         // it so as to avoid possible double assignment/data loss.
1788         LOG.info("Skip assigning " +
1789           region + ", we couldn't close it: " + state);
1790         return null;
1791       }
1792     case OFFLINE:
1793       // This region could have been open on this server
1794       // for a while. If the server is dead and not processed
1795       // yet, we can move on only if the meta shows the
1796       // region is not on this server actually, or on a server
1797       // not dead, or dead and processed already.
1798       if (regionStates.isServerDeadAndNotProcessed(sn)
1799           && wasRegionOnDeadServerByMeta(region, sn)) {
1800         LOG.info("Skip assigning " + region.getRegionNameAsString()
1801           + ", it is on a dead but not processed yet server");
1802         return null;
1803       }
1804     case CLOSED:
1805       break;
1806     default:
1807       LOG.error("Trying to assign region " + region
1808         + ", which is " + state);
1809       return null;
1810     }
1811     return state;
1812   }
1813 
1814   private boolean wasRegionOnDeadServerByMeta(
1815       final HRegionInfo region, final ServerName sn) {
1816     try {
1817       if (region.isMetaRegion()) {
1818         ServerName server = catalogTracker.getMetaLocation();
1819         return regionStates.isServerDeadAndNotProcessed(server);
1820       }
1821       while (!server.isStopped()) {
1822         try {
1823           catalogTracker.waitForMeta();
1824           Pair<HRegionInfo, ServerName> r =
1825             MetaReader.getRegion(catalogTracker, region.getRegionName());
1826           ServerName server = r == null ? null : r.getSecond();
1827           return regionStates.isServerDeadAndNotProcessed(server);
1828         } catch (IOException ioe) {
1829           LOG.info("Received exception accessing hbase:meta during force assign "
1830             + region.getRegionNameAsString() + ", retrying", ioe);
1831         }
1832       }
1833     } catch (InterruptedException e) {
1834       Thread.currentThread().interrupt();
1835       LOG.info("Interrupted accessing hbase:meta", e);
1836     }
1837     // Call is interrupted or server is stopped.
1838     return regionStates.isServerDeadAndNotProcessed(sn);
1839   }
1840 
1841   /**
1842    * Caller must hold lock on the passed <code>state</code> object.
1843    * @param state
1844    * @param setOfflineInZK
1845    * @param forceNewPlan
1846    */
1847   private void assign(RegionState state,
1848       final boolean setOfflineInZK, final boolean forceNewPlan) {
1849     long startTime = EnvironmentEdgeManager.currentTimeMillis();
1850     try {
1851       RegionState currentState = state;
1852       int versionOfOfflineNode = -1;
1853       RegionPlan plan = null;
1854       long maxWaitTime = -1;
1855       HRegionInfo region = state.getRegion();
1856       RegionOpeningState regionOpenState;
1857       for (int i = 1; i <= maximumAttempts; i++) {
1858         if (server.isStopped() || server.isAborted()) {
1859           LOG.info("Skip assigning " + region.getRegionNameAsString()
1860             + ", the server is stopped/aborted");
1861           return;
1862         }
1863         if (plan == null) { // Get a server for the region at first
1864           try {
1865             plan = getRegionPlan(region, forceNewPlan);
1866           } catch (HBaseIOException e) {
1867             LOG.warn("Failed to get region plan", e);
1868           }
1869         }
1870         if (plan == null) {
1871           LOG.warn("Unable to determine a plan to assign " + region);
1872           if (tomActivated){
1873             this.timeoutMonitor.setAllRegionServersOffline(true);
1874           } else {
1875             if (region.isMetaRegion()) {
1876               try {
1877                 Thread.sleep(this.sleepTimeBeforeRetryingMetaAssignment);
1878                 if (i == maximumAttempts) i = 1;
1879                 continue;
1880               } catch (InterruptedException e) {
1881                 LOG.error("Got exception while waiting for hbase:meta assignment");
1882                 Thread.currentThread().interrupt();
1883               }
1884             }
1885             regionStates.updateRegionState(region, State.FAILED_OPEN);
1886           }
1887           return;
1888         }
1889         if (setOfflineInZK && versionOfOfflineNode == -1) {
1890           // get the version of the znode after setting it to OFFLINE.
1891           // versionOfOfflineNode will be -1 if the znode was not set to OFFLINE
1892           versionOfOfflineNode = setOfflineInZooKeeper(currentState, plan.getDestination());
1893           if (versionOfOfflineNode != -1) {
1894             if (isDisabledorDisablingRegionInRIT(region)) {
1895               return;
1896             }
1897             // In case of assignment from EnableTableHandler table state is ENABLING. Any how
1898             // EnableTableHandler will set ENABLED after assigning all the table regions. If we
1899             // try to set to ENABLED directly then client API may think table is enabled.
1900             // When we have a case such as all the regions are added directly into hbase:meta and we call
1901             // assignRegion then we need to make the table ENABLED. Hence in such case the table
1902             // will not be in ENABLING or ENABLED state.
1903             TableName tableName = region.getTable();
1904             if (!zkTable.isEnablingTable(tableName) && !zkTable.isEnabledTable(tableName)) {
1905               LOG.debug("Setting table " + tableName + " to ENABLED state.");
1906               setEnabledTable(tableName);
1907             }
1908           }
1909         }
1910         if (setOfflineInZK && versionOfOfflineNode == -1) {
1911           LOG.info("Unable to set offline in ZooKeeper to assign " + region);
1912           // Setting offline in ZK must have been failed due to ZK racing or some
1913           // exception which may make the server to abort. If it is ZK racing,
1914           // we should retry since we already reset the region state,
1915           // existing (re)assignment will fail anyway.
1916           if (!server.isAborted()) {
1917             continue;
1918           }
1919         }
1920         LOG.info("Assigning " + region.getRegionNameAsString() +
1921             " to " + plan.getDestination().toString());
1922         // Transition RegionState to PENDING_OPEN
1923         currentState = regionStates.updateRegionState(region,
1924           State.PENDING_OPEN, plan.getDestination());
1925 
1926         boolean needNewPlan;
1927         final String assignMsg = "Failed assignment of " + region.getRegionNameAsString() +
1928             " to " + plan.getDestination();
1929         try {
1930           List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
1931           if (this.shouldAssignRegionsWithFavoredNodes) {
1932             favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
1933           }
1934           regionOpenState = serverManager.sendRegionOpen(
1935               plan.getDestination(), region, versionOfOfflineNode, favoredNodes);
1936 
1937           if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
1938             // Failed opening this region, looping again on a new server.
1939             needNewPlan = true;
1940             LOG.warn(assignMsg + ", regionserver says 'FAILED_OPENING', " +
1941                 " trying to assign elsewhere instead; " +
1942                 "try=" + i + " of " + this.maximumAttempts);
1943           } else {
1944             // we're done
1945             if (regionOpenState == RegionOpeningState.ALREADY_OPENED) {
1946               processAlreadyOpenedRegion(region, plan.getDestination());
1947             }
1948             return;
1949           }
1950 
1951         } catch (Throwable t) {
1952           if (t instanceof RemoteException) {
1953             t = ((RemoteException) t).unwrapRemoteException();
1954           }
1955 
1956           // Should we wait a little before retrying? If the server is starting it's yes.
1957           // If the region is already in transition, it's yes as well: we want to be sure that
1958           //  the region will get opened but we don't want a double assignment.
1959           boolean hold = (t instanceof RegionAlreadyInTransitionException ||
1960               t instanceof ServerNotRunningYetException);
1961 
1962           // In case socket is timed out and the region server is still online,
1963           // the openRegion RPC could have been accepted by the server and
1964           // just the response didn't go through.  So we will retry to
1965           // open the region on the same server to avoid possible
1966           // double assignment.
1967           boolean retry = !hold && (t instanceof java.net.SocketTimeoutException
1968               && this.serverManager.isServerOnline(plan.getDestination()));
1969 
1970 
1971           if (hold) {
1972             LOG.warn(assignMsg + ", waiting a little before trying on the same region server " +
1973               "try=" + i + " of " + this.maximumAttempts, t);
1974 
1975             if (maxWaitTime < 0) {
1976               if (t instanceof RegionAlreadyInTransitionException) {
1977                 maxWaitTime = EnvironmentEdgeManager.currentTimeMillis()
1978                   + this.server.getConfiguration().getLong(ALREADY_IN_TRANSITION_WAITTIME,
1979                     DEFAULT_ALREADY_IN_TRANSITION_WAITTIME);
1980               } else {
1981                 maxWaitTime = this.server.getConfiguration().
1982                   getLong("hbase.regionserver.rpc.startup.waittime", 60000);
1983               }
1984             }
1985             try {
1986               needNewPlan = false;
1987               long now = EnvironmentEdgeManager.currentTimeMillis();
1988               if (now < maxWaitTime) {
1989                 LOG.debug("Server is not yet up or region is already in transition; "
1990                   + "waiting up to " + (maxWaitTime - now) + "ms", t);
1991                 Thread.sleep(100);
1992                 i--; // reset the try count
1993               } else if (!(t instanceof RegionAlreadyInTransitionException)) {
1994                 LOG.debug("Server is not up for a while; try a new one", t);
1995                 needNewPlan = true;
1996               }
1997             } catch (InterruptedException ie) {
1998               LOG.warn("Failed to assign "
1999                   + region.getRegionNameAsString() + " since interrupted", ie);
2000               Thread.currentThread().interrupt();
2001               if (!tomActivated) {
2002                 regionStates.updateRegionState(region, State.FAILED_OPEN);
2003               }
2004               return;
2005             }
2006           } else if (retry) {
2007             needNewPlan = false;
2008             LOG.warn(assignMsg + ", trying to assign to the same region server " +
2009                 "try=" + i + " of " + this.maximumAttempts, t);
2010           } else {
2011             needNewPlan = true;
2012             LOG.warn(assignMsg + ", trying to assign elsewhere instead;" +
2013                 " try=" + i + " of " + this.maximumAttempts, t);
2014           }
2015         }
2016 
2017         if (i == this.maximumAttempts) {
2018           // Don't reset the region state or get a new plan any more.
2019           // This is the last try.
2020           continue;
2021         }
2022 
2023         // If region opened on destination of present plan, reassigning to new
2024         // RS may cause double assignments. In case of RegionAlreadyInTransitionException
2025         // reassigning to same RS.
2026         if (needNewPlan) {
2027           // Force a new plan and reassign. Will return null if no servers.
2028           // The new plan could be the same as the existing plan since we don't
2029           // exclude the server of the original plan, which should not be
2030           // excluded since it could be the only server up now.
2031           RegionPlan newPlan = null;
2032           try {
2033             newPlan = getRegionPlan(region, true);
2034           } catch (HBaseIOException e) {
2035             LOG.warn("Failed to get region plan", e);
2036           }
2037           if (newPlan == null) {
2038             if (tomActivated) {
2039               this.timeoutMonitor.setAllRegionServersOffline(true);
2040             } else {
2041               regionStates.updateRegionState(region, State.FAILED_OPEN);
2042             }
2043             LOG.warn("Unable to find a viable location to assign region " +
2044                 region.getRegionNameAsString());
2045             return;
2046           }
2047 
2048           if (plan != newPlan && !plan.getDestination().equals(newPlan.getDestination())) {
2049             // Clean out plan we failed execute and one that doesn't look like it'll
2050             // succeed anyways; we need a new plan!
2051             // Transition back to OFFLINE
2052             currentState = regionStates.updateRegionState(region, State.OFFLINE);
2053             versionOfOfflineNode = -1;
2054             plan = newPlan;
2055           }
2056         }
2057       }
2058       // Run out of attempts
2059       if (!tomActivated) {
2060         regionStates.updateRegionState(region, State.FAILED_OPEN);
2061       }
2062     } finally {
2063       metricsAssignmentManager.updateAssignmentTime(EnvironmentEdgeManager.currentTimeMillis() - startTime);
2064     }
2065   }
2066 
2067   private void processAlreadyOpenedRegion(HRegionInfo region, ServerName sn) {
2068     // Remove region from in-memory transition and unassigned node from ZK
2069     // While trying to enable the table the regions of the table were
2070     // already enabled.
2071     LOG.debug("ALREADY_OPENED " + region.getRegionNameAsString()
2072       + " to " + sn);
2073     String encodedName = region.getEncodedName();
2074     deleteNodeInStates(encodedName, "offline", sn, EventType.M_ZK_REGION_OFFLINE);
2075     regionStates.regionOnline(region, sn);
2076   }
2077 
2078   private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) {
2079     TableName tableName = region.getTable();
2080     boolean disabled = this.zkTable.isDisabledTable(tableName);
2081     if (disabled || this.zkTable.isDisablingTable(tableName)) {
2082       LOG.info("Table " + tableName + (disabled ? " disabled;" : " disabling;") +
2083         " skipping assign of " + region.getRegionNameAsString());
2084       offlineDisabledRegion(region);
2085       return true;
2086     }
2087     return false;
2088   }
2089 
2090   /**
2091    * Set region as OFFLINED up in zookeeper
2092    *
2093    * @param state
2094    * @return the version of the offline node if setting of the OFFLINE node was
2095    *         successful, -1 otherwise.
2096    */
2097   private int setOfflineInZooKeeper(final RegionState state, final ServerName destination) {
2098     if (!state.isClosed() && !state.isOffline()) {
2099       String msg = "Unexpected state : " + state + " .. Cannot transit it to OFFLINE.";
2100       this.server.abort(msg, new IllegalStateException(msg));
2101       return -1;
2102     }
2103     regionStates.updateRegionState(state.getRegion(), State.OFFLINE);
2104     int versionOfOfflineNode;
2105     try {
2106       // get the version after setting the znode to OFFLINE
2107       versionOfOfflineNode = ZKAssign.createOrForceNodeOffline(watcher,
2108         state.getRegion(), destination);
2109       if (versionOfOfflineNode == -1) {
2110         LOG.warn("Attempted to create/force node into OFFLINE state before "
2111             + "completing assignment but failed to do so for " + state);
2112         return -1;
2113       }
2114     } catch (KeeperException e) {
2115       server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
2116       return -1;
2117     }
2118     return versionOfOfflineNode;
2119   }
2120 
2121   /**
2122    * @param region the region to assign
2123    * @return Plan for passed <code>region</code> (If none currently, it creates one or
2124    * if no servers to assign, it returns null).
2125    */
2126   private RegionPlan getRegionPlan(final HRegionInfo region,
2127       final boolean forceNewPlan)  throws HBaseIOException {
2128     return getRegionPlan(region, null, forceNewPlan);
2129   }
2130 
2131   /**
2132    * @param region the region to assign
2133    * @param serverToExclude Server to exclude (we know its bad). Pass null if
2134    * all servers are thought to be assignable.
2135    * @param forceNewPlan If true, then if an existing plan exists, a new plan
2136    * will be generated.
2137    * @return Plan for passed <code>region</code> (If none currently, it creates one or
2138    * if no servers to assign, it returns null).
2139    */
2140   private RegionPlan getRegionPlan(final HRegionInfo region,
2141       final ServerName serverToExclude, final boolean forceNewPlan) throws HBaseIOException {
2142     // Pickup existing plan or make a new one
2143     final String encodedName = region.getEncodedName();
2144     final List<ServerName> destServers =
2145       serverManager.createDestinationServersList(serverToExclude);
2146 
2147     if (destServers.isEmpty()){
2148       LOG.warn("Can't move " + encodedName +
2149         ", there is no destination server available.");
2150       return null;
2151     }
2152 
2153     RegionPlan randomPlan = null;
2154     boolean newPlan = false;
2155     RegionPlan existingPlan;
2156 
2157     synchronized (this.regionPlans) {
2158       existingPlan = this.regionPlans.get(encodedName);
2159 
2160       if (existingPlan != null && existingPlan.getDestination() != null) {
2161         LOG.debug("Found an existing plan for " + region.getRegionNameAsString()
2162           + " destination server is " + existingPlan.getDestination() +
2163             " accepted as a dest server = " + destServers.contains(existingPlan.getDestination()));
2164       }
2165 
2166       if (forceNewPlan
2167           || existingPlan == null
2168           || existingPlan.getDestination() == null
2169           || !destServers.contains(existingPlan.getDestination())) {
2170         newPlan = true;
2171         randomPlan = new RegionPlan(region, null,
2172             balancer.randomAssignment(region, destServers));
2173         if (!region.isMetaTable() && shouldAssignRegionsWithFavoredNodes) {
2174           List<HRegionInfo> regions = new ArrayList<HRegionInfo>(1);
2175           regions.add(region);
2176           try {
2177             processFavoredNodes(regions);
2178           } catch (IOException ie) {
2179             LOG.warn("Ignoring exception in processFavoredNodes " + ie);
2180           }
2181         }
2182         this.regionPlans.put(encodedName, randomPlan);
2183       }
2184     }
2185 
2186     if (newPlan) {
2187       if (randomPlan.getDestination() == null) {
2188         LOG.warn("Can't find a destination for " + encodedName);
2189         return null;
2190       }
2191       LOG.debug("No previous transition plan found (or ignoring " +
2192         "an existing plan) for " + region.getRegionNameAsString() +
2193         "; generated random plan=" + randomPlan + "; " +
2194         serverManager.countOfRegionServers() +
2195                " (online=" + serverManager.getOnlineServers().size() +
2196                ", available=" + destServers.size() + ") available servers" +
2197                ", forceNewPlan=" + forceNewPlan);
2198         return randomPlan;
2199       }
2200     LOG.debug("Using pre-existing plan for " +
2201       region.getRegionNameAsString() + "; plan=" + existingPlan);
2202     return existingPlan;
2203   }
2204 
2205   /**
2206    * Unassigns the specified region.
2207    * <p>
2208    * Updates the RegionState and sends the CLOSE RPC unless region is being
2209    * split by regionserver; then the unassign fails (silently) because we
2210    * presume the region being unassigned no longer exists (its been split out
2211    * of existence). TODO: What to do if split fails and is rolled back and
2212    * parent is revivified?
2213    * <p>
2214    * If a RegionPlan is already set, it will remain.
2215    *
2216    * @param region server to be unassigned
2217    */
2218   public void unassign(HRegionInfo region) {
2219     unassign(region, false);
2220   }
2221 
2222 
2223   /**
2224    * Unassigns the specified region.
2225    * <p>
2226    * Updates the RegionState and sends the CLOSE RPC unless region is being
2227    * split by regionserver; then the unassign fails (silently) because we
2228    * presume the region being unassigned no longer exists (its been split out
2229    * of existence). TODO: What to do if split fails and is rolled back and
2230    * parent is revivified?
2231    * <p>
2232    * If a RegionPlan is already set, it will remain.
2233    *
2234    * @param region server to be unassigned
2235    * @param force if region should be closed even if already closing
2236    */
2237   public void unassign(HRegionInfo region, boolean force, ServerName dest) {
2238     // TODO: Method needs refactoring.  Ugly buried returns throughout.  Beware!
2239     LOG.debug("Starting unassign of " + region.getRegionNameAsString()
2240       + " (offlining), current state: " + regionStates.getRegionState(region));
2241 
2242     String encodedName = region.getEncodedName();
2243     // Grab the state of this region and synchronize on it
2244     int versionOfClosingNode = -1;
2245     // We need a lock here as we're going to do a put later and we don't want multiple states
2246     //  creation
2247     ReentrantLock lock = locker.acquireLock(encodedName);
2248     RegionState state = regionStates.getRegionTransitionState(encodedName);
2249     boolean reassign = true;
2250     try {
2251       if (state == null) {
2252         // Region is not in transition.
2253         // We can unassign it only if it's not SPLIT/MERGED.
2254         state = regionStates.getRegionState(encodedName);
2255         if (state != null && state.isUnassignable()) {
2256           LOG.info("Attempting to unassign " + state + ", ignored");
2257           // Offline region will be reassigned below
2258           return;
2259         }
2260         // Create the znode in CLOSING state
2261         try {
2262           if (state == null || state.getServerName() == null) {
2263             // We don't know where the region is, offline it.
2264             // No need to send CLOSE RPC
2265             LOG.warn("Attempting to unassign a region not in RegionStates"
2266               + region.getRegionNameAsString() + ", offlined");
2267             regionOffline(region);
2268             return;
2269           }
2270           versionOfClosingNode = ZKAssign.createNodeClosing(
2271             watcher, region, state.getServerName());
2272           if (versionOfClosingNode == -1) {
2273             LOG.info("Attempting to unassign " +
2274               region.getRegionNameAsString() + " but ZK closing node "
2275               + "can't be created.");
2276             reassign = false; // not unassigned at all
2277             return;
2278           }
2279         } catch (KeeperException e) {
2280           if (e instanceof NodeExistsException) {
2281             // Handle race between master initiated close and regionserver
2282             // orchestrated splitting. See if existing node is in a
2283             // SPLITTING or SPLIT state.  If so, the regionserver started
2284             // an op on node before we could get our CLOSING in.  Deal.
2285             NodeExistsException nee = (NodeExistsException)e;
2286             String path = nee.getPath();
2287             try {
2288               if (isSplitOrSplittingOrMergedOrMerging(path)) {
2289                 LOG.debug(path + " is SPLIT or SPLITTING or MERGED or MERGING; " +
2290                   "skipping unassign because region no longer exists -- its split or merge");
2291                 reassign = false; // no need to reassign for split/merged region
2292                 return;
2293               }
2294             } catch (KeeperException.NoNodeException ke) {
2295               LOG.warn("Failed getData on SPLITTING/SPLIT at " + path +
2296                 "; presuming split and that the region to unassign, " +
2297                 encodedName + ", no longer exists -- confirm", ke);
2298               return;
2299             } catch (KeeperException ke) {
2300               LOG.error("Unexpected zk state", ke);
2301             } catch (DeserializationException de) {
2302               LOG.error("Failed parse", de);
2303             }
2304           }
2305           // If we get here, don't understand whats going on -- abort.
2306           server.abort("Unexpected ZK exception creating node CLOSING", e);
2307           reassign = false; // heading out already
2308           return;
2309         }
2310         state = regionStates.updateRegionState(region, State.PENDING_CLOSE);
2311       } else if (state.isFailedOpen()) {
2312         // The region is not open yet
2313         regionOffline(region);
2314         return;
2315       } else if (force && state.isPendingCloseOrClosing()) {
2316         LOG.debug("Attempting to unassign " + region.getRegionNameAsString() +
2317           " which is already " + state.getState()  +
2318           " but forcing to send a CLOSE RPC again ");
2319         if (state.isFailedClose()) {
2320           state = regionStates.updateRegionState(region, State.PENDING_CLOSE);
2321         }
2322         state.updateTimestampToNow();
2323       } else {
2324         LOG.debug("Attempting to unassign " +
2325           region.getRegionNameAsString() + " but it is " +
2326           "already in transition (" + state.getState() + ", force=" + force + ")");
2327         return;
2328       }
2329 
2330       unassign(region, state, versionOfClosingNode, dest, true, null);
2331     } finally {
2332       lock.unlock();
2333 
2334       // Region is expected to be reassigned afterwards
2335       if (reassign && regionStates.isRegionOffline(region)) {
2336         assign(region, true);
2337       }
2338     }
2339   }
2340 
2341   public void unassign(HRegionInfo region, boolean force){
2342      unassign(region, force, null);
2343   }
2344 
2345   /**
2346    * @param region regioninfo of znode to be deleted.
2347    */
2348   public void deleteClosingOrClosedNode(HRegionInfo region, ServerName sn) {
2349     String encodedName = region.getEncodedName();
2350     deleteNodeInStates(encodedName, "closing", sn, EventType.M_ZK_REGION_CLOSING,
2351       EventType.RS_ZK_REGION_CLOSED);
2352   }
2353 
2354   /**
2355    * @param path
2356    * @return True if znode is in SPLIT or SPLITTING or MERGED or MERGING state.
2357    * @throws KeeperException Can happen if the znode went away in meantime.
2358    * @throws DeserializationException
2359    */
2360   private boolean isSplitOrSplittingOrMergedOrMerging(final String path)
2361       throws KeeperException, DeserializationException {
2362     boolean result = false;
2363     // This may fail if the SPLIT or SPLITTING or MERGED or MERGING znode gets
2364     // cleaned up before we can get data from it.
2365     byte [] data = ZKAssign.getData(watcher, path);
2366     if (data == null) {
2367       LOG.info("Node " + path + " is gone");
2368       return false;
2369     }
2370     RegionTransition rt = RegionTransition.parseFrom(data);
2371     switch (rt.getEventType()) {
2372     case RS_ZK_REQUEST_REGION_SPLIT:
2373     case RS_ZK_REGION_SPLIT:
2374     case RS_ZK_REGION_SPLITTING:
2375     case RS_ZK_REQUEST_REGION_MERGE:
2376     case RS_ZK_REGION_MERGED:
2377     case RS_ZK_REGION_MERGING:
2378       result = true;
2379       break;
2380     default:
2381       LOG.info("Node " + path + " is in " + rt.getEventType());
2382       break;
2383     }
2384     return result;
2385   }
2386 
2387   /**
2388    * Used by unit tests. Return the number of regions opened so far in the life
2389    * of the master. Increases by one every time the master opens a region
2390    * @return the counter value of the number of regions opened so far
2391    */
2392   public int getNumRegionsOpened() {
2393     return numRegionsOpened.get();
2394   }
2395 
2396   /**
2397    * Waits until the specified region has completed assignment.
2398    * <p>
2399    * If the region is already assigned, returns immediately.  Otherwise, method
2400    * blocks until the region is assigned.
2401    * @param regionInfo region to wait on assignment for
2402    * @throws InterruptedException
2403    */
2404   public boolean waitForAssignment(HRegionInfo regionInfo)
2405       throws InterruptedException {
2406     while (!regionStates.isRegionOnline(regionInfo)) {
2407       if (regionStates.isRegionInState(regionInfo, State.FAILED_OPEN)
2408           || this.server.isStopped()) {
2409         return false;
2410       }
2411 
2412       // We should receive a notification, but it's
2413       //  better to have a timeout to recheck the condition here:
2414       //  it lowers the impact of a race condition if any
2415       regionStates.waitForUpdate(100);
2416     }
2417     return true;
2418   }
2419 
2420   /**
2421    * Assigns the hbase:meta region.
2422    * <p>
2423    * Assumes that hbase:meta is currently closed and is not being actively served by
2424    * any RegionServer.
2425    * <p>
2426    * Forcibly unsets the current meta region location in ZooKeeper and assigns
2427    * hbase:meta to a random RegionServer.
2428    * @throws KeeperException
2429    */
2430   public void assignMeta() throws KeeperException {
2431     MetaRegionTracker.deleteMetaLocation(this.watcher);
2432     assign(HRegionInfo.FIRST_META_REGIONINFO, true);
2433   }
2434 
2435   /**
2436    * Assigns specified regions retaining assignments, if any.
2437    * <p>
2438    * This is a synchronous call and will return once every region has been
2439    * assigned.  If anything fails, an exception is thrown
2440    * @throws InterruptedException
2441    * @throws IOException
2442    */
2443   public void assign(Map<HRegionInfo, ServerName> regions)
2444         throws IOException, InterruptedException {
2445     if (regions == null || regions.isEmpty()) {
2446       return;
2447     }
2448     List<ServerName> servers = serverManager.createDestinationServersList();
2449     if (servers == null || servers.isEmpty()) {
2450       throw new IOException("Found no destination server to assign region(s)");
2451     }
2452 
2453     // Reuse existing assignment info
2454     Map<ServerName, List<HRegionInfo>> bulkPlan =
2455       balancer.retainAssignment(regions, servers);
2456 
2457     assign(regions.size(), servers.size(),
2458       "retainAssignment=true", bulkPlan);
2459   }
2460 
2461   /**
2462    * Assigns specified regions round robin, if any.
2463    * <p>
2464    * This is a synchronous call and will return once every region has been
2465    * assigned.  If anything fails, an exception is thrown
2466    * @throws InterruptedException
2467    * @throws IOException
2468    */
2469   public void assign(List<HRegionInfo> regions)
2470         throws IOException, InterruptedException {
2471     if (regions == null || regions.isEmpty()) {
2472       return;
2473     }
2474 
2475     List<ServerName> servers = serverManager.createDestinationServersList();
2476     if (servers == null || servers.isEmpty()) {
2477       throw new IOException("Found no destination server to assign region(s)");
2478     }
2479 
2480     // Generate a round-robin bulk assignment plan
2481     Map<ServerName, List<HRegionInfo>> bulkPlan
2482       = balancer.roundRobinAssignment(regions, servers);
2483     processFavoredNodes(regions);
2484 
2485     assign(regions.size(), servers.size(),
2486       "round-robin=true", bulkPlan);
2487   }
2488 
2489   private void assign(int regions, int totalServers,
2490       String message, Map<ServerName, List<HRegionInfo>> bulkPlan)
2491           throws InterruptedException, IOException {
2492 
2493     int servers = bulkPlan.size();
2494     if (servers == 1 || (regions < bulkAssignThresholdRegions
2495         && servers < bulkAssignThresholdServers)) {
2496 
2497       // Not use bulk assignment.  This could be more efficient in small
2498       // cluster, especially mini cluster for testing, so that tests won't time out
2499       if (LOG.isTraceEnabled()) {
2500         LOG.trace("Not using bulk assignment since we are assigning only " + regions +
2501           " region(s) to " + servers + " server(s)");
2502       }
2503       for (Map.Entry<ServerName, List<HRegionInfo>> plan: bulkPlan.entrySet()) {
2504         if (!assign(plan.getKey(), plan.getValue())) {
2505           for (HRegionInfo region: plan.getValue()) {
2506             if (!regionStates.isRegionOnline(region)) {
2507               invokeAssign(region);
2508             }
2509           }
2510         }
2511       }
2512     } else {
2513       LOG.info("Bulk assigning " + regions + " region(s) across "
2514         + totalServers + " server(s), " + message);
2515 
2516       // Use fixed count thread pool assigning.
2517       BulkAssigner ba = new GeneralBulkAssigner(
2518         this.server, bulkPlan, this, bulkAssignWaitTillAllAssigned);
2519       ba.bulkAssign();
2520       LOG.info("Bulk assigning done");
2521     }
2522   }
2523 
2524   /**
2525    * Assigns all user regions, if any exist.  Used during cluster startup.
2526    * <p>
2527    * This is a synchronous call and will return once every region has been
2528    * assigned.  If anything fails, an exception is thrown and the cluster
2529    * should be shutdown.
2530    * @throws InterruptedException
2531    * @throws IOException
2532    * @throws KeeperException
2533    */
2534   private void assignAllUserRegions()
2535       throws IOException, InterruptedException, KeeperException {
2536     // Cleanup any existing ZK nodes and start watching
2537     ZKAssign.deleteAllNodes(watcher);
2538     ZKUtil.listChildrenAndWatchForNewChildren(this.watcher,
2539       this.watcher.assignmentZNode);
2540     failoverCleanupDone();
2541 
2542     // Skip assignment for regions of tables in DISABLING state because during clean cluster startup
2543     // no RS is alive and regions map also doesn't have any information about the regions.
2544     // See HBASE-6281.
2545     Set<TableName> disabledOrDisablingOrEnabling = ZKTable.getDisabledOrDisablingTables(watcher);
2546     disabledOrDisablingOrEnabling.addAll(ZKTable.getEnablingTables(watcher));
2547     // Scan hbase:meta for all user regions, skipping any disabled tables
2548     Map<HRegionInfo, ServerName> allRegions;
2549     SnapshotOfRegionAssignmentFromMeta snapshotOfRegionAssignment =
2550        new SnapshotOfRegionAssignmentFromMeta(catalogTracker, disabledOrDisablingOrEnabling, true);
2551     snapshotOfRegionAssignment.initialize();
2552     allRegions = snapshotOfRegionAssignment.getRegionToRegionServerMap();
2553     if (allRegions == null || allRegions.isEmpty()) return;
2554 
2555     // Determine what type of assignment to do on startup
2556     boolean retainAssignment = server.getConfiguration().
2557       getBoolean("hbase.master.startup.retainassign", true);
2558 
2559     if (retainAssignment) {
2560       assign(allRegions);
2561     } else {
2562       List<HRegionInfo> regions = new ArrayList<HRegionInfo>(allRegions.keySet());
2563       assign(regions);
2564     }
2565 
2566     for (HRegionInfo hri : allRegions.keySet()) {
2567       TableName tableName = hri.getTable();
2568       if (!zkTable.isEnabledTable(tableName)) {
2569         setEnabledTable(tableName);
2570       }
2571     }
2572   }
2573 
2574   /**
2575    * Wait until no regions in transition.
2576    * @param timeout How long to wait.
2577    * @return True if nothing in regions in transition.
2578    * @throws InterruptedException
2579    */
2580   boolean waitUntilNoRegionsInTransition(final long timeout)
2581       throws InterruptedException {
2582     // Blocks until there are no regions in transition. It is possible that
2583     // there
2584     // are regions in transition immediately after this returns but guarantees
2585     // that if it returns without an exception that there was a period of time
2586     // with no regions in transition from the point-of-view of the in-memory
2587     // state of the Master.
2588     final long endTime = System.currentTimeMillis() + timeout;
2589 
2590     while (!this.server.isStopped() && regionStates.isRegionsInTransition()
2591         && endTime > System.currentTimeMillis()) {
2592       regionStates.waitForUpdate(100);
2593     }
2594 
2595     return !regionStates.isRegionsInTransition();
2596   }
2597 
2598   /**
2599    * Rebuild the list of user regions and assignment information.
2600    * <p>
2601    * Returns a map of servers that are not found to be online and the regions
2602    * they were hosting.
2603    * @return map of servers not online to their assigned regions, as stored
2604    *         in META
2605    * @throws IOException
2606    */
2607   Map<ServerName, List<HRegionInfo>> rebuildUserRegions() throws IOException, KeeperException {
2608     Set<TableName> enablingTables = ZKTable.getEnablingTables(watcher);
2609     Set<TableName> disabledOrEnablingTables = ZKTable.getDisabledTables(watcher);
2610     disabledOrEnablingTables.addAll(enablingTables);
2611     Set<TableName> disabledOrDisablingOrEnabling = ZKTable.getDisablingTables(watcher);
2612     disabledOrDisablingOrEnabling.addAll(disabledOrEnablingTables);
2613 
2614     // Region assignment from META
2615     List<Result> results = MetaReader.fullScan(this.catalogTracker);
2616     // Get any new but slow to checkin region server that joined the cluster
2617     Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
2618     // Map of offline servers and their regions to be returned
2619     Map<ServerName, List<HRegionInfo>> offlineServers =
2620       new TreeMap<ServerName, List<HRegionInfo>>();
2621     // Iterate regions in META
2622     for (Result result : results) {
2623       Pair<HRegionInfo, ServerName> region = HRegionInfo.getHRegionInfoAndServerName(result);
2624       if (region == null) continue;
2625       HRegionInfo regionInfo = region.getFirst();
2626       ServerName regionLocation = region.getSecond();
2627       if (regionInfo == null) continue;
2628       regionStates.createRegionState(regionInfo);
2629       if (regionStates.isRegionInState(regionInfo, State.SPLIT)) {
2630         // Split is considered to be completed. If the split znode still
2631         // exists, the region will be put back to SPLITTING state later
2632         LOG.debug("Region " + regionInfo.getRegionNameAsString()
2633            + " split is completed. Hence need not add to regions list");
2634         continue;
2635       }
2636       TableName tableName = regionInfo.getTable();
2637       if (regionLocation == null) {
2638         // regionLocation could be null if createTable didn't finish properly.
2639         // When createTable is in progress, HMaster restarts.
2640         // Some regions have been added to hbase:meta, but have not been assigned.
2641         // When this happens, the region's table must be in ENABLING state.
2642         // It can't be in ENABLED state as that is set when all regions are
2643         // assigned.
2644         // It can't be in DISABLING state, because DISABLING state transitions
2645         // from ENABLED state when application calls disableTable.
2646         // It can't be in DISABLED state, because DISABLED states transitions
2647         // from DISABLING state.
2648         if (!enablingTables.contains(tableName)) {
2649           LOG.warn("Region " + regionInfo.getEncodedName() +
2650             " has null regionLocation." + " But its table " + tableName +
2651             " isn't in ENABLING state.");
2652         }
2653       } else if (!onlineServers.contains(regionLocation)) {
2654         // Region is located on a server that isn't online
2655         List<HRegionInfo> offlineRegions = offlineServers.get(regionLocation);
2656         if (offlineRegions == null) {
2657           offlineRegions = new ArrayList<HRegionInfo>(1);
2658           offlineServers.put(regionLocation, offlineRegions);
2659         }
2660         offlineRegions.add(regionInfo);
2661         // need to enable the table if not disabled or disabling or enabling
2662         // this will be used in rolling restarts
2663         if (!disabledOrDisablingOrEnabling.contains(tableName)
2664             && !getZKTable().isEnabledTable(tableName)) {
2665           setEnabledTable(tableName);
2666         }
2667       } else {
2668         // Region is being served and on an active server
2669         // add only if region not in disabled or enabling table
2670         if (!disabledOrEnablingTables.contains(tableName)) {
2671           regionStates.updateRegionState(regionInfo, State.OPEN, regionLocation);
2672           regionStates.regionOnline(regionInfo, regionLocation);
2673           balancer.regionOnline(regionInfo, regionLocation);
2674         }
2675         // need to enable the table if not disabled or disabling or enabling
2676         // this will be used in rolling restarts
2677         if (!disabledOrDisablingOrEnabling.contains(tableName)
2678             && !getZKTable().isEnabledTable(tableName)) {
2679           setEnabledTable(tableName);
2680         }
2681       }
2682     }
2683     return offlineServers;
2684   }
2685 
2686   /**
2687    * Recover the tables that were not fully moved to DISABLED state. These
2688    * tables are in DISABLING state when the master restarted/switched.
2689    *
2690    * @throws KeeperException
2691    * @throws TableNotFoundException
2692    * @throws IOException
2693    */
2694   private void recoverTableInDisablingState()
2695       throws KeeperException, TableNotFoundException, IOException {
2696     Set<TableName> disablingTables = ZKTable.getDisablingTables(watcher);
2697     if (disablingTables.size() != 0) {
2698       for (TableName tableName : disablingTables) {
2699         // Recover by calling DisableTableHandler
2700         LOG.info("The table " + tableName
2701             + " is in DISABLING state.  Hence recovering by moving the table"
2702             + " to DISABLED state.");
2703         new DisableTableHandler(this.server, tableName, catalogTracker,
2704             this, tableLockManager, true).prepare().process();
2705       }
2706     }
2707   }
2708 
2709   /**
2710    * Recover the tables that are not fully moved to ENABLED state. These tables
2711    * are in ENABLING state when the master restarted/switched
2712    *
2713    * @throws KeeperException
2714    * @throws org.apache.hadoop.hbase.TableNotFoundException
2715    * @throws IOException
2716    */
2717   private void recoverTableInEnablingState()
2718       throws KeeperException, TableNotFoundException, IOException {
2719     Set<TableName> enablingTables = ZKTable.getEnablingTables(watcher);
2720     if (enablingTables.size() != 0) {
2721       for (TableName tableName : enablingTables) {
2722         // Recover by calling EnableTableHandler
2723         LOG.info("The table " + tableName
2724             + " is in ENABLING state.  Hence recovering by moving the table"
2725             + " to ENABLED state.");
2726         // enableTable in sync way during master startup,
2727         // no need to invoke coprocessor
2728         EnableTableHandler eth = new EnableTableHandler(this.server, tableName,
2729           catalogTracker, this, tableLockManager, true);
2730         try {
2731           eth.prepare();
2732         } catch (TableNotFoundException e) {
2733           LOG.warn("Table " + tableName + " not found in hbase:meta to recover.");
2734           continue;
2735         }
2736         eth.process();
2737       }
2738     }
2739   }
2740 
2741   /**
2742    * Processes list of dead servers from result of hbase:meta scan and regions in RIT
2743    * <p>
2744    * This is used for failover to recover the lost regions that belonged to
2745    * RegionServers which failed while there was no active master or regions
2746    * that were in RIT.
2747    * <p>
2748    *
2749    *
2750    * @param deadServers
2751    *          The list of dead servers which failed while there was no active
2752    *          master. Can be null.
2753    * @throws IOException
2754    * @throws KeeperException
2755    */
2756   private void processDeadServersAndRecoverLostRegions(
2757       Map<ServerName, List<HRegionInfo>> deadServers)
2758           throws IOException, KeeperException {
2759     if (deadServers != null) {
2760       for (Map.Entry<ServerName, List<HRegionInfo>> server: deadServers.entrySet()) {
2761         ServerName serverName = server.getKey();
2762         // We need to keep such info even if the server is known dead
2763         regionStates.setLastRegionServerOfRegions(serverName, server.getValue());
2764         if (!serverManager.isServerDead(serverName)) {
2765           serverManager.expireServer(serverName); // Let SSH do region re-assign
2766         }
2767       }
2768     }
2769     List<String> nodes = ZKUtil.listChildrenAndWatchForNewChildren(
2770       this.watcher, this.watcher.assignmentZNode);
2771     if (!nodes.isEmpty()) {
2772       for (String encodedRegionName : nodes) {
2773         processRegionInTransition(encodedRegionName, null);
2774       }
2775     }
2776 
2777     // Now we can safely claim failover cleanup completed and enable
2778     // ServerShutdownHandler for further processing. The nodes (below)
2779     // in transition, if any, are for regions not related to those
2780     // dead servers at all, and can be done in parallel to SSH.
2781     failoverCleanupDone();
2782   }
2783 
2784   /**
2785    * Set Regions in transitions metrics.
2786    * This takes an iterator on the RegionInTransition map (CLSM), and is not synchronized.
2787    * This iterator is not fail fast, which may lead to stale read; but that's better than
2788    * creating a copy of the map for metrics computation, as this method will be invoked
2789    * on a frequent interval.
2790    */
2791   public void updateRegionsInTransitionMetrics() {
2792     long currentTime = System.currentTimeMillis();
2793     int totalRITs = 0;
2794     int totalRITsOverThreshold = 0;
2795     long oldestRITTime = 0;
2796     int ritThreshold = this.server.getConfiguration().
2797       getInt(HConstants.METRICS_RIT_STUCK_WARNING_THRESHOLD, 60000);
2798     for (RegionState state: regionStates.getRegionsInTransition().values()) {
2799       totalRITs++;
2800       long ritTime = currentTime - state.getStamp();
2801       if (ritTime > ritThreshold) { // more than the threshold
2802         totalRITsOverThreshold++;
2803       }
2804       if (oldestRITTime < ritTime) {
2805         oldestRITTime = ritTime;
2806       }
2807     }
2808     if (this.metricsAssignmentManager != null) {
2809       this.metricsAssignmentManager.updateRITOldestAge(oldestRITTime);
2810       this.metricsAssignmentManager.updateRITCount(totalRITs);
2811       this.metricsAssignmentManager.updateRITCountOverThreshold(totalRITsOverThreshold);
2812     }
2813   }
2814 
2815   /**
2816    * @param region Region whose plan we are to clear.
2817    */
2818   void clearRegionPlan(final HRegionInfo region) {
2819     synchronized (this.regionPlans) {
2820       this.regionPlans.remove(region.getEncodedName());
2821     }
2822   }
2823 
2824   /**
2825    * Wait on region to clear regions-in-transition.
2826    * @param hri Region to wait on.
2827    * @throws IOException
2828    */
2829   public void waitOnRegionToClearRegionsInTransition(final HRegionInfo hri)
2830       throws IOException, InterruptedException {
2831     waitOnRegionToClearRegionsInTransition(hri, -1L);
2832   }
2833 
2834   /**
2835    * Wait on region to clear regions-in-transition or time out
2836    * @param hri
2837    * @param timeOut Milliseconds to wait for current region to be out of transition state.
2838    * @return True when a region clears regions-in-transition before timeout otherwise false
2839    * @throws InterruptedException
2840    */
2841   public boolean waitOnRegionToClearRegionsInTransition(final HRegionInfo hri, long timeOut)
2842       throws InterruptedException {
2843     if (!regionStates.isRegionInTransition(hri)) return true;
2844     long end = (timeOut <= 0) ? Long.MAX_VALUE : EnvironmentEdgeManager.currentTimeMillis()
2845         + timeOut;
2846     // There is already a timeout monitor on regions in transition so I
2847     // should not have to have one here too?
2848     LOG.info("Waiting for " + hri.getEncodedName() +
2849         " to leave regions-in-transition, timeOut=" + timeOut + " ms.");
2850     while (!this.server.isStopped() && regionStates.isRegionInTransition(hri)) {
2851       regionStates.waitForUpdate(100);
2852       if (EnvironmentEdgeManager.currentTimeMillis() > end) {
2853         LOG.info("Timed out on waiting for " + hri.getEncodedName() + " to be assigned.");
2854         return false;
2855       }
2856     }
2857     if (this.server.isStopped()) {
2858       LOG.info("Giving up wait on regions in transition because stoppable.isStopped is set");
2859       return false;
2860     }
2861     return true;
2862   }
2863 
2864   /**
2865    * Update timers for all regions in transition going against the server in the
2866    * serversInUpdatingTimer.
2867    */
2868   public class TimerUpdater extends Chore {
2869 
2870     public TimerUpdater(final int period, final Stoppable stopper) {
2871       super("AssignmentTimerUpdater", period, stopper);
2872     }
2873 
2874     @Override
2875     protected void chore() {
2876       Preconditions.checkState(tomActivated);
2877       ServerName serverToUpdateTimer = null;
2878       while (!serversInUpdatingTimer.isEmpty() && !stopper.isStopped()) {
2879         if (serverToUpdateTimer == null) {
2880           serverToUpdateTimer = serversInUpdatingTimer.first();
2881         } else {
2882           serverToUpdateTimer = serversInUpdatingTimer
2883               .higher(serverToUpdateTimer);
2884         }
2885         if (serverToUpdateTimer == null) {
2886           break;
2887         }
2888         updateTimers(serverToUpdateTimer);
2889         serversInUpdatingTimer.remove(serverToUpdateTimer);
2890       }
2891     }
2892   }
2893 
2894   /**
2895    * Monitor to check for time outs on region transition operations
2896    */
2897   public class TimeoutMonitor extends Chore {
2898     private boolean allRegionServersOffline = false;
2899     private ServerManager serverManager;
2900     private final int timeout;
2901 
2902     /**
2903      * Creates a periodic monitor to check for time outs on region transition
2904      * operations.  This will deal with retries if for some reason something
2905      * doesn't happen within the specified timeout.
2906      * @param period
2907    * @param stopper When {@link Stoppable#isStopped()} is true, this thread will
2908    * cleanup and exit cleanly.
2909      * @param timeout
2910      */
2911     public TimeoutMonitor(final int period, final Stoppable stopper,
2912         ServerManager serverManager,
2913         final int timeout) {
2914       super("AssignmentTimeoutMonitor", period, stopper);
2915       this.timeout = timeout;
2916       this.serverManager = serverManager;
2917     }
2918 
2919     private synchronized void setAllRegionServersOffline(
2920       boolean allRegionServersOffline) {
2921       this.allRegionServersOffline = allRegionServersOffline;
2922     }
2923 
2924     @Override
2925     protected void chore() {
2926       Preconditions.checkState(tomActivated);
2927       boolean noRSAvailable = this.serverManager.createDestinationServersList().isEmpty();
2928 
2929       // Iterate all regions in transition checking for time outs
2930       long now = System.currentTimeMillis();
2931       // no lock concurrent access ok: we will be working on a copy, and it's java-valid to do
2932       //  a copy while another thread is adding/removing items
2933       for (String regionName : regionStates.getRegionsInTransition().keySet()) {
2934         RegionState regionState = regionStates.getRegionTransitionState(regionName);
2935         if (regionState == null) continue;
2936 
2937         if (regionState.getStamp() + timeout <= now) {
2938           // decide on action upon timeout
2939           actOnTimeOut(regionState);
2940         } else if (this.allRegionServersOffline && !noRSAvailable) {
2941           RegionPlan existingPlan = regionPlans.get(regionName);
2942           if (existingPlan == null
2943               || !this.serverManager.isServerOnline(existingPlan
2944                   .getDestination())) {
2945             // if some RSs just came back online, we can start the assignment
2946             // right away
2947             actOnTimeOut(regionState);
2948           }
2949         }
2950       }
2951       setAllRegionServersOffline(noRSAvailable);
2952     }
2953 
2954     private void actOnTimeOut(RegionState regionState) {
2955       HRegionInfo regionInfo = regionState.getRegion();
2956       LOG.info("Regions in transition timed out:  " + regionState);
2957       // Expired! Do a retry.
2958       switch (regionState.getState()) {
2959       case CLOSED:
2960         LOG.info("Region " + regionInfo.getEncodedName()
2961             + " has been CLOSED for too long, waiting on queued "
2962             + "ClosedRegionHandler to run or server shutdown");
2963         // Update our timestamp.
2964         regionState.updateTimestampToNow();
2965         break;
2966       case OFFLINE:
2967         LOG.info("Region has been OFFLINE for too long, " + "reassigning "
2968             + regionInfo.getRegionNameAsString() + " to a random server");
2969         invokeAssign(regionInfo);
2970         break;
2971       case PENDING_OPEN:
2972         LOG.info("Region has been PENDING_OPEN for too "
2973             + "long, reassigning region=" + regionInfo.getRegionNameAsString());
2974         invokeAssign(regionInfo);
2975         break;
2976       case OPENING:
2977         processOpeningState(regionInfo);
2978         break;
2979       case OPEN:
2980         LOG.error("Region has been OPEN for too long, " +
2981             "we don't know where region was opened so can't do anything");
2982         regionState.updateTimestampToNow();
2983         break;
2984 
2985       case PENDING_CLOSE:
2986         LOG.info("Region has been PENDING_CLOSE for too "
2987             + "long, running forced unassign again on region="
2988             + regionInfo.getRegionNameAsString());
2989         invokeUnassign(regionInfo);
2990         break;
2991       case CLOSING:
2992         LOG.info("Region has been CLOSING for too " +
2993           "long, this should eventually complete or the server will " +
2994           "expire, send RPC again");
2995         invokeUnassign(regionInfo);
2996         break;
2997 
2998       case SPLIT:
2999       case SPLITTING:
3000       case FAILED_OPEN:
3001       case FAILED_CLOSE:
3002       case MERGING:
3003         break;
3004 
3005       default:
3006         throw new IllegalStateException("Received event is not valid.");
3007       }
3008     }
3009   }
3010 
3011   private void processOpeningState(HRegionInfo regionInfo) {
3012     LOG.info("Region has been OPENING for too long, reassigning region="
3013         + regionInfo.getRegionNameAsString());
3014     // Should have a ZK node in OPENING state
3015     try {
3016       String node = ZKAssign.getNodeName(watcher, regionInfo.getEncodedName());
3017       Stat stat = new Stat();
3018       byte [] data = ZKAssign.getDataNoWatch(watcher, node, stat);
3019       if (data == null) {
3020         LOG.warn("Data is null, node " + node + " no longer exists");
3021         return;
3022       }
3023       RegionTransition rt = RegionTransition.parseFrom(data);
3024       EventType et = rt.getEventType();
3025       if (et == EventType.RS_ZK_REGION_OPENED) {
3026         LOG.debug("Region has transitioned to OPENED, allowing "
3027             + "watched event handlers to process");
3028         return;
3029       } else if (et != EventType.RS_ZK_REGION_OPENING && et != EventType.RS_ZK_REGION_FAILED_OPEN ) {
3030         LOG.warn("While timing out a region, found ZK node in unexpected state: " + et);
3031         return;
3032       }
3033       invokeAssign(regionInfo);
3034     } catch (KeeperException ke) {
3035       LOG.error("Unexpected ZK exception timing out CLOSING region", ke);
3036     } catch (DeserializationException e) {
3037       LOG.error("Unexpected exception parsing CLOSING region", e);
3038     }
3039   }
3040 
3041   void invokeAssign(HRegionInfo regionInfo) {
3042     threadPoolExecutorService.submit(new AssignCallable(this, regionInfo));
3043   }
3044 
3045   private void invokeUnassign(HRegionInfo regionInfo) {
3046     threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
3047   }
3048 
3049   public boolean isCarryingMeta(ServerName serverName) {
3050     return isCarryingRegion(serverName, HRegionInfo.FIRST_META_REGIONINFO);
3051   }
3052 
3053   /**
3054    * Check if the shutdown server carries the specific region.
3055    * We have a bunch of places that store region location
3056    * Those values aren't consistent. There is a delay of notification.
3057    * The location from zookeeper unassigned node has the most recent data;
3058    * but the node could be deleted after the region is opened by AM.
3059    * The AM's info could be old when OpenedRegionHandler
3060    * processing hasn't finished yet when server shutdown occurs.
3061    * @return whether the serverName currently hosts the region
3062    */
3063   private boolean isCarryingRegion(ServerName serverName, HRegionInfo hri) {
3064     RegionTransition rt = null;
3065     try {
3066       byte [] data = ZKAssign.getData(watcher, hri.getEncodedName());
3067       // This call can legitimately come by null
3068       rt = data == null? null: RegionTransition.parseFrom(data);
3069     } catch (KeeperException e) {
3070       server.abort("Exception reading unassigned node for region=" + hri.getEncodedName(), e);
3071     } catch (DeserializationException e) {
3072       server.abort("Exception parsing unassigned node for region=" + hri.getEncodedName(), e);
3073     }
3074 
3075     ServerName addressFromZK = rt != null? rt.getServerName():  null;
3076     if (addressFromZK != null) {
3077       // if we get something from ZK, we will use the data
3078       boolean matchZK = addressFromZK.equals(serverName);
3079       LOG.debug("Checking region=" + hri.getRegionNameAsString() + ", zk server=" + addressFromZK +
3080         " current=" + serverName + ", matches=" + matchZK);
3081       return matchZK;
3082     }
3083 
3084     ServerName addressFromAM = regionStates.getRegionServerOfRegion(hri);
3085     boolean matchAM = (addressFromAM != null &&
3086       addressFromAM.equals(serverName));
3087     LOG.debug("based on AM, current region=" + hri.getRegionNameAsString() +
3088       " is on server=" + (addressFromAM != null ? addressFromAM : "null") +
3089       " server being checked: " + serverName);
3090 
3091     return matchAM;
3092   }
3093 
3094   /**
3095    * Process shutdown server removing any assignments.
3096    * @param sn Server that went down.
3097    * @return list of regions in transition on this server
3098    */
3099   public List<HRegionInfo> processServerShutdown(final ServerName sn) {
3100     // Clean out any existing assignment plans for this server
3101     synchronized (this.regionPlans) {
3102       for (Iterator <Map.Entry<String, RegionPlan>> i =
3103           this.regionPlans.entrySet().iterator(); i.hasNext();) {
3104         Map.Entry<String, RegionPlan> e = i.next();
3105         ServerName otherSn = e.getValue().getDestination();
3106         // The name will be null if the region is planned for a random assign.
3107         if (otherSn != null && otherSn.equals(sn)) {
3108           // Use iterator's remove else we'll get CME
3109           i.remove();
3110         }
3111       }
3112     }
3113     List<HRegionInfo> regions = regionStates.serverOffline(watcher, sn);
3114     for (Iterator<HRegionInfo> it = regions.iterator(); it.hasNext(); ) {
3115       HRegionInfo hri = it.next();
3116       String encodedName = hri.getEncodedName();
3117 
3118       // We need a lock on the region as we could update it
3119       Lock lock = locker.acquireLock(encodedName);
3120       try {
3121         RegionState regionState =
3122           regionStates.getRegionTransitionState(encodedName);
3123         if (regionState == null
3124             || (regionState.getServerName() != null && !regionState.isOnServer(sn))
3125             || !(regionState.isFailedClose() || regionState.isOffline()
3126               || regionState.isPendingOpenOrOpening())) {
3127           LOG.info("Skip " + regionState + " since it is not opening/failed_close"
3128             + " on the dead server any more: " + sn);
3129           it.remove();
3130         } else {
3131           try {
3132             // Delete the ZNode if exists
3133             ZKAssign.deleteNodeFailSilent(watcher, hri);
3134           } catch (KeeperException ke) {
3135             server.abort("Unexpected ZK exception deleting node " + hri, ke);
3136           }
3137           if (zkTable.isDisablingOrDisabledTable(hri.getTable())) {
3138             regionStates.regionOffline(hri);
3139             it.remove();
3140             continue;
3141           }
3142           // Mark the region offline and assign it again by SSH
3143           regionStates.updateRegionState(hri, State.OFFLINE);
3144         }
3145       } finally {
3146         lock.unlock();
3147       }
3148     }
3149     return regions;
3150   }
3151 
3152   /**
3153    * @param plan Plan to execute.
3154    */
3155   public void balance(final RegionPlan plan) {
3156     HRegionInfo hri = plan.getRegionInfo();
3157     TableName tableName = hri.getTable();
3158     if (zkTable.isDisablingOrDisabledTable(tableName)) {
3159       LOG.info("Ignored moving region of disabling/disabled table "
3160         + tableName);
3161       return;
3162     }
3163 
3164     // Move the region only if it's assigned
3165     String encodedName = hri.getEncodedName();
3166     ReentrantLock lock = locker.acquireLock(encodedName);
3167     try {
3168       if (!regionStates.isRegionOnline(hri)) {
3169         RegionState state = regionStates.getRegionState(encodedName);
3170         LOG.info("Ignored moving region not assigned: " + hri + ", "
3171           + (state == null ? "not in region states" : state));
3172         return;
3173       }
3174       synchronized (this.regionPlans) {
3175         this.regionPlans.put(plan.getRegionName(), plan);
3176       }
3177       unassign(hri, false, plan.getDestination());
3178     } finally {
3179       lock.unlock();
3180     }
3181   }
3182 
3183   public void stop() {
3184     shutdown(); // Stop executor service, etc
3185     if (tomActivated){
3186       this.timeoutMonitor.interrupt();
3187       this.timerUpdater.interrupt();
3188     }
3189   }
3190 
3191   /**
3192    * Shutdown the threadpool executor service
3193    */
3194   public void shutdown() {
3195     // It's an immediate shutdown, so we're clearing the remaining tasks.
3196     synchronized (zkEventWorkerWaitingList){
3197       zkEventWorkerWaitingList.clear();
3198     }
3199     threadPoolExecutorService.shutdownNow();
3200     zkEventWorkers.shutdownNow();
3201   }
3202 
3203   protected void setEnabledTable(TableName tableName) {
3204     try {
3205       this.zkTable.setEnabledTable(tableName);
3206     } catch (KeeperException e) {
3207       // here we can abort as it is the start up flow
3208       String errorMsg = "Unable to ensure that the table " + tableName
3209           + " will be" + " enabled because of a ZooKeeper issue";
3210       LOG.error(errorMsg);
3211       this.server.abort(errorMsg, e);
3212     }
3213   }
3214 
3215   /**
3216    * Set region as OFFLINED up in zookeeper asynchronously.
3217    * @param state
3218    * @return True if we succeeded, false otherwise (State was incorrect or failed
3219    * updating zk).
3220    */
3221   private boolean asyncSetOfflineInZooKeeper(final RegionState state,
3222       final AsyncCallback.StringCallback cb, final ServerName destination) {
3223     if (!state.isClosed() && !state.isOffline()) {
3224       this.server.abort("Unexpected state trying to OFFLINE; " + state,
3225         new IllegalStateException());
3226       return false;
3227     }
3228     regionStates.updateRegionState(state.getRegion(), State.OFFLINE);
3229     try {
3230       ZKAssign.asyncCreateNodeOffline(watcher, state.getRegion(),
3231         destination, cb, state);
3232     } catch (KeeperException e) {
3233       if (e instanceof NodeExistsException) {
3234         LOG.warn("Node for " + state.getRegion() + " already exists");
3235       } else {
3236         server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
3237       }
3238       return false;
3239     }
3240     return true;
3241   }
3242 
3243   private boolean deleteNodeInStates(String encodedName,
3244       String desc, ServerName sn, EventType... types) {
3245     try {
3246       for (EventType et: types) {
3247         if (ZKAssign.deleteNode(watcher, encodedName, et, sn)) {
3248           return true;
3249         }
3250       }
3251       LOG.info("Failed to delete the " + desc + " node for "
3252         + encodedName + ". The node type may not match");
3253     } catch (NoNodeException e) {
3254       if (LOG.isDebugEnabled()) {
3255         LOG.debug("The " + desc + " node for " + encodedName + " already deleted");
3256       }
3257     } catch (KeeperException ke) {
3258       server.abort("Unexpected ZK exception deleting " + desc
3259         + " node for the region " + encodedName, ke);
3260     }
3261     return false;
3262   }
3263 
3264   private void deleteMergingNode(String encodedName, ServerName sn) {
3265     deleteNodeInStates(encodedName, "merging", sn, EventType.RS_ZK_REGION_MERGING,
3266       EventType.RS_ZK_REQUEST_REGION_MERGE, EventType.RS_ZK_REGION_MERGED);
3267   }
3268 
3269   private void deleteSplittingNode(String encodedName, ServerName sn) {
3270     deleteNodeInStates(encodedName, "splitting", sn, EventType.RS_ZK_REGION_SPLITTING,
3271       EventType.RS_ZK_REQUEST_REGION_SPLIT, EventType.RS_ZK_REGION_SPLIT);
3272   }
3273 
3274   /**
3275    * A helper to handle region merging transition event.
3276    * It transitions merging regions to MERGING state.
3277    */
3278   private boolean handleRegionMerging(final RegionTransition rt, final String encodedName,
3279       final String prettyPrintedRegionName, final ServerName sn) {
3280     if (!serverManager.isServerOnline(sn)) {
3281       LOG.warn("Dropped merging! ServerName=" + sn + " unknown.");
3282       return false;
3283     }
3284     byte [] payloadOfMerging = rt.getPayload();
3285     List<HRegionInfo> mergingRegions;
3286     try {
3287       mergingRegions = HRegionInfo.parseDelimitedFrom(
3288         payloadOfMerging, 0, payloadOfMerging.length);
3289     } catch (IOException e) {
3290       LOG.error("Dropped merging! Failed reading "  + rt.getEventType()
3291         + " payload for " + prettyPrintedRegionName);
3292       return false;
3293     }
3294     assert mergingRegions.size() == 3;
3295     HRegionInfo p = mergingRegions.get(0);
3296     HRegionInfo hri_a = mergingRegions.get(1);
3297     HRegionInfo hri_b = mergingRegions.get(2);
3298 
3299     RegionState rs_p = regionStates.getRegionState(p);
3300     RegionState rs_a = regionStates.getRegionState(hri_a);
3301     RegionState rs_b = regionStates.getRegionState(hri_b);
3302 
3303     if (!((rs_a == null || rs_a.isOpenOrMergingOnServer(sn))
3304         && (rs_b == null || rs_b.isOpenOrMergingOnServer(sn))
3305         && (rs_p == null || rs_p.isOpenOrMergingNewOnServer(sn)))) {
3306       LOG.warn("Dropped merging! Not in state good for MERGING; rs_p="
3307         + rs_p + ", rs_a=" + rs_a + ", rs_b=" + rs_b);
3308       return false;
3309     }
3310 
3311     EventType et = rt.getEventType();
3312     if (et == EventType.RS_ZK_REQUEST_REGION_MERGE) {
3313       try {
3314         if (RegionMergeTransaction.transitionMergingNode(watcher, p,
3315             hri_a, hri_b, sn, -1, EventType.RS_ZK_REQUEST_REGION_MERGE,
3316             EventType.RS_ZK_REGION_MERGING) == -1) {
3317           byte[] data = ZKAssign.getData(watcher, encodedName);
3318           EventType currentType = null;
3319           if (data != null) {
3320             RegionTransition newRt = RegionTransition.parseFrom(data);
3321             currentType = newRt.getEventType();
3322           }
3323           if (currentType == null || (currentType != EventType.RS_ZK_REGION_MERGED
3324               && currentType != EventType.RS_ZK_REGION_MERGING)) {
3325             LOG.warn("Failed to transition pending_merge node "
3326               + encodedName + " to merging, it's now " + currentType);
3327             return false;
3328           }
3329         }
3330       } catch (Exception e) {
3331         LOG.warn("Failed to transition pending_merge node "
3332           + encodedName + " to merging", e);
3333         return false;
3334       }
3335     }
3336 
3337     synchronized (regionStates) {
3338       regionStates.updateRegionState(hri_a, State.MERGING);
3339       regionStates.updateRegionState(hri_b, State.MERGING);
3340       regionStates.updateRegionState(p, State.MERGING_NEW, sn);
3341 
3342       if (et != EventType.RS_ZK_REGION_MERGED) {
3343         regionStates.regionOffline(p, State.MERGING_NEW);
3344         this.mergingRegions.put(encodedName,
3345           new PairOfSameType<HRegionInfo>(hri_a, hri_b));
3346       } else {
3347         this.mergingRegions.remove(encodedName);
3348         regionOffline(hri_a, State.MERGED);
3349         regionOffline(hri_b, State.MERGED);
3350         regionOnline(p, sn);
3351       }
3352     }
3353 
3354     if (et == EventType.RS_ZK_REGION_MERGED) {
3355       LOG.debug("Handling MERGED event for " + encodedName + "; deleting node");
3356       // Remove region from ZK
3357       try {
3358         boolean successful = false;
3359         while (!successful) {
3360           // It's possible that the RS tickles in between the reading of the
3361           // znode and the deleting, so it's safe to retry.
3362           successful = ZKAssign.deleteNode(watcher, encodedName,
3363             EventType.RS_ZK_REGION_MERGED, sn);
3364         }
3365       } catch (KeeperException e) {
3366         if (e instanceof NoNodeException) {
3367           String znodePath = ZKUtil.joinZNode(watcher.splitLogZNode, encodedName);
3368           LOG.debug("The znode " + znodePath + " does not exist.  May be deleted already.");
3369         } else {
3370           server.abort("Error deleting MERGED node " + encodedName, e);
3371         }
3372       }
3373       LOG.info("Handled MERGED event; merged=" + p.getRegionNameAsString()
3374         + ", region_a=" + hri_a.getRegionNameAsString() + ", region_b="
3375         + hri_b.getRegionNameAsString() + ", on " + sn);
3376 
3377       // User could disable the table before master knows the new region.
3378       if (zkTable.isDisablingOrDisabledTable(p.getTable())) {
3379         unassign(p);
3380       }
3381     }
3382     return true;
3383   }
3384 
3385   /**
3386    * A helper to handle region splitting transition event.
3387    */
3388   private boolean handleRegionSplitting(final RegionTransition rt, final String encodedName,
3389       final String prettyPrintedRegionName, final ServerName sn) {
3390     if (!serverManager.isServerOnline(sn)) {
3391       LOG.warn("Dropped splitting! ServerName=" + sn + " unknown.");
3392       return false;
3393     }
3394     byte [] payloadOfSplitting = rt.getPayload();
3395     List<HRegionInfo> splittingRegions;
3396     try {
3397       splittingRegions = HRegionInfo.parseDelimitedFrom(
3398         payloadOfSplitting, 0, payloadOfSplitting.length);
3399     } catch (IOException e) {
3400       LOG.error("Dropped splitting! Failed reading " + rt.getEventType()
3401         + " payload for " + prettyPrintedRegionName);
3402       return false;
3403     }
3404     assert splittingRegions.size() == 2;
3405     HRegionInfo hri_a = splittingRegions.get(0);
3406     HRegionInfo hri_b = splittingRegions.get(1);
3407 
3408     RegionState rs_p = regionStates.getRegionState(encodedName);
3409     RegionState rs_a = regionStates.getRegionState(hri_a);
3410     RegionState rs_b = regionStates.getRegionState(hri_b);
3411 
3412     if (!((rs_p == null || rs_p.isOpenOrSplittingOnServer(sn))
3413         && (rs_a == null || rs_a.isOpenOrSplittingNewOnServer(sn))
3414         && (rs_b == null || rs_b.isOpenOrSplittingNewOnServer(sn)))) {
3415       LOG.warn("Dropped splitting! Not in state good for SPLITTING; rs_p="
3416         + rs_p + ", rs_a=" + rs_a + ", rs_b=" + rs_b);
3417       return false;
3418     }
3419 
3420     if (rs_p == null) {
3421       // Splitting region should be online
3422       rs_p = regionStates.updateRegionState(rt, State.OPEN);
3423       if (rs_p == null) {
3424         LOG.warn("Received splitting for region " + prettyPrintedRegionName
3425           + " from server " + sn + " but it doesn't exist anymore,"
3426           + " probably already processed its split");
3427         return false;
3428       }
3429       regionStates.regionOnline(rs_p.getRegion(), sn);
3430     }
3431 
3432     HRegionInfo p = rs_p.getRegion();
3433     EventType et = rt.getEventType();
3434     if (et == EventType.RS_ZK_REQUEST_REGION_SPLIT) {
3435       try {
3436         if (SplitTransaction.transitionSplittingNode(watcher, p,
3437             hri_a, hri_b, sn, -1, EventType.RS_ZK_REQUEST_REGION_SPLIT,
3438             EventType.RS_ZK_REGION_SPLITTING) == -1) {
3439           byte[] data = ZKAssign.getData(watcher, encodedName);
3440           EventType currentType = null;
3441           if (data != null) {
3442             RegionTransition newRt = RegionTransition.parseFrom(data);
3443             currentType = newRt.getEventType();
3444           }
3445           if (currentType == null || (currentType != EventType.RS_ZK_REGION_SPLIT
3446               && currentType != EventType.RS_ZK_REGION_SPLITTING)) {
3447             LOG.warn("Failed to transition pending_split node "
3448               + encodedName + " to splitting, it's now " + currentType);
3449             return false;
3450           }
3451         }
3452       } catch (Exception e) {
3453         LOG.warn("Failed to transition pending_split node "
3454           + encodedName + " to splitting", e);
3455         return false;
3456       }
3457     }
3458 
3459     synchronized (regionStates) {
3460       regionStates.updateRegionState(hri_a, State.SPLITTING_NEW, sn);
3461       regionStates.updateRegionState(hri_b, State.SPLITTING_NEW, sn);
3462       regionStates.regionOffline(hri_a, State.SPLITTING_NEW);
3463       regionStates.regionOffline(hri_b, State.SPLITTING_NEW);
3464       regionStates.updateRegionState(rt, State.SPLITTING);
3465 
3466       // The below is for testing ONLY!  We can't do fault injection easily, so
3467       // resort to this kinda uglyness -- St.Ack 02/25/2011.
3468       if (TEST_SKIP_SPLIT_HANDLING) {
3469         LOG.warn("Skipping split message, TEST_SKIP_SPLIT_HANDLING is set");
3470         return true; // return true so that the splitting node stays
3471       }
3472 
3473       if (et == EventType.RS_ZK_REGION_SPLIT) {
3474         regionOffline(p, State.SPLIT);
3475         regionOnline(hri_a, sn);
3476         regionOnline(hri_b, sn);
3477       }
3478     }
3479 
3480     if (et == EventType.RS_ZK_REGION_SPLIT) {
3481       LOG.debug("Handling SPLIT event for " + encodedName + "; deleting node");
3482       // Remove region from ZK
3483       try {
3484         boolean successful = false;
3485         while (!successful) {
3486           // It's possible that the RS tickles in between the reading of the
3487           // znode and the deleting, so it's safe to retry.
3488           successful = ZKAssign.deleteNode(watcher, encodedName,
3489             EventType.RS_ZK_REGION_SPLIT, sn);
3490         }
3491       } catch (KeeperException e) {
3492         if (e instanceof NoNodeException) {
3493           String znodePath = ZKUtil.joinZNode(watcher.splitLogZNode, encodedName);
3494           LOG.debug("The znode " + znodePath + " does not exist.  May be deleted already.");
3495         } else {
3496           server.abort("Error deleting SPLIT node " + encodedName, e);
3497         }
3498       }
3499       LOG.info("Handled SPLIT event; parent=" + p.getRegionNameAsString()
3500         + ", daughter a=" + hri_a.getRegionNameAsString() + ", daughter b="
3501         + hri_b.getRegionNameAsString() + ", on " + sn);
3502 
3503       // User could disable the table before master knows the new region.
3504       if (zkTable.isDisablingOrDisabledTable(p.getTable())) {
3505         unassign(hri_a);
3506         unassign(hri_b);
3507       }
3508     }
3509     return true;
3510   }
3511 
3512   /**
3513    * A region is offline.  The new state should be the specified one,
3514    * if not null.  If the specified state is null, the new state is Offline.
3515    * The specified state can be Split/Merged/Offline/null only.
3516    */
3517   private void regionOffline(final HRegionInfo regionInfo, final State state) {
3518     regionStates.regionOffline(regionInfo, state);
3519     removeClosedRegion(regionInfo);
3520     // remove the region plan as well just in case.
3521     clearRegionPlan(regionInfo);
3522     balancer.regionOffline(regionInfo);
3523   }
3524 
3525   /**
3526    * @return Instance of load balancer
3527    */
3528   public LoadBalancer getBalancer() {
3529     return this.balancer;
3530   }
3531 }