View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.master;
21  
22  import java.io.DataInput;
23  import java.io.DataOutput;
24  import java.io.IOException;
25  import java.util.ArrayList;
26  import java.util.Arrays;
27  import java.util.Collections;
28  import java.util.Date;
29  import java.util.HashMap;
30  import java.util.HashSet;
31  import java.util.Iterator;
32  import java.util.List;
33  import java.util.Map;
34  import java.util.NavigableMap;
35  import java.util.Set;
36  import java.util.SortedMap;
37  import java.util.TreeMap;
38  import java.util.TreeSet;
39  import java.util.concurrent.ConcurrentSkipListMap;
40  import java.util.concurrent.ConcurrentSkipListSet;
41  import java.util.concurrent.Executors;
42  import java.util.concurrent.atomic.AtomicInteger;
43  import java.util.concurrent.atomic.AtomicLong;
44  
45  import org.apache.commons.logging.Log;
46  import org.apache.commons.logging.LogFactory;
47  import org.apache.hadoop.conf.Configuration;
48  import org.apache.hadoop.hbase.Chore;
49  import org.apache.hadoop.hbase.HConstants;
50  import org.apache.hadoop.hbase.HRegionInfo;
51  import org.apache.hadoop.hbase.HServerLoad;
52  import org.apache.hadoop.hbase.NotServingRegionException;
53  import org.apache.hadoop.hbase.Server;
54  import org.apache.hadoop.hbase.ServerName;
55  import org.apache.hadoop.hbase.Stoppable;
56  import org.apache.hadoop.hbase.TableNotFoundException;
57  import org.apache.hadoop.hbase.catalog.CatalogTracker;
58  import org.apache.hadoop.hbase.catalog.MetaReader;
59  import org.apache.hadoop.hbase.catalog.RootLocationEditor;
60  import org.apache.hadoop.hbase.client.Result;
61  import org.apache.hadoop.hbase.executor.EventHandler;
62  import org.apache.hadoop.hbase.executor.EventHandler.EventType;
63  import org.apache.hadoop.hbase.executor.ExecutorService;
64  import org.apache.hadoop.hbase.executor.RegionTransitionData;
65  import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
66  import org.apache.hadoop.hbase.master.AssignmentManager.RegionState.State;
67  import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler;
68  import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
69  import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
70  import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler;
71  import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler;
72  import org.apache.hadoop.hbase.master.handler.SplitRegionHandler;
73  import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
74  import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
75  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
76  import org.apache.hadoop.hbase.util.Bytes;
77  import org.apache.hadoop.hbase.util.Pair;
78  import org.apache.hadoop.hbase.util.Threads;
79  import org.apache.hadoop.hbase.util.Writables;
80  import org.apache.hadoop.hbase.zookeeper.ZKAssign;
81  import org.apache.hadoop.hbase.zookeeper.ZKTable;
82  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
83  import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
84  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
85  import org.apache.hadoop.ipc.RemoteException;
86  import org.apache.zookeeper.AsyncCallback;
87  import org.apache.zookeeper.KeeperException;
88  import org.apache.zookeeper.KeeperException.NoNodeException;
89  import org.apache.zookeeper.KeeperException.NodeExistsException;
90  import org.apache.zookeeper.data.Stat;
91  
92  /**
93   * Manages and performs region assignment.
94   * <p>
95   * Monitors ZooKeeper for events related to regions in transition.
96   * <p>
97   * Handles existing regions in transition during master failover.
98   */
99  public class AssignmentManager extends ZooKeeperListener {
100 
101   private static final Log LOG = LogFactory.getLog(AssignmentManager.class);
102 
103   protected Server master;
104 
105   private ServerManager serverManager;
106 
107   private CatalogTracker catalogTracker;
108 
109   private TimeoutMonitor timeoutMonitor;
110 
111   private TimerUpdater timerUpdater;
112 
113   private LoadBalancer balancer;
114 
115   /**
116    * Map of regions to reopen after the schema of a table is changed. Key -
117    * encoded region name, value - HRegionInfo
118    */
119   private final Map <String, HRegionInfo> regionsToReopen;
120 
121   /*
122    * Maximum times we recurse an assignment.  See below in {@link #assign()}.
123    */
124   private final int maximumAssignmentAttempts;
125 
126   /**
127    * Regions currently in transition.  Map of encoded region names to the master
128    * in-memory state for that region.
129    */
130   final ConcurrentSkipListMap<String, RegionState> regionsInTransition =
131     new ConcurrentSkipListMap<String, RegionState>();
132 
133   /** Plans for region movement. Key is the encoded version of a region name*/
134   // TODO: When do plans get cleaned out?  Ever? In server open and in server
135   // shutdown processing -- St.Ack
136   // All access to this Map must be synchronized.
137   final NavigableMap<String, RegionPlan> regionPlans =
138     new TreeMap<String, RegionPlan>();
139 
140   private final ZKTable zkTable;
141 
142   // store all the table names in disabling state
143   Set<String> disablingTables = new HashSet<String>(1);
144   // store all the enabling state table names and corresponding online servers' regions.
145   // This may be needed to avoid calling assign twice for the regions of the ENABLING table
146   // that could have been assigned through processRIT.
147   Map<String, List<HRegionInfo>> enablingTables = new HashMap<String, List<HRegionInfo>>(1);
148   /**
149    * Server to regions assignment map.
150    * Contains the set of regions currently assigned to a given server.
151    * This Map and {@link #regions} are tied.  Always update this in tandem
152    * with the other under a lock on {@link #regions}.
153    * @see #regions
154    */
155   private final NavigableMap<ServerName, Set<HRegionInfo>> servers =
156     new TreeMap<ServerName, Set<HRegionInfo>>();
157 
158   /**
159    * Contains the server which need to update timer, these servers will be
160    * handled by {@link TimerUpdater}
161    */
162   private final ConcurrentSkipListSet<ServerName> serversInUpdatingTimer = 
163     new ConcurrentSkipListSet<ServerName>();
164 
165   /**
166    * Region to server assignment map.
167    * Contains the server a given region is currently assigned to.
168    * This Map and {@link #servers} are tied.  Always update this in tandem
169    * with the other under a lock on {@link #regions}.
170    * @see #servers
171    */
172   private final SortedMap<HRegionInfo, ServerName> regions =
173     new TreeMap<HRegionInfo, ServerName>();
174 
175   private final ExecutorService executorService;
176 
177   //Thread pool executor service for timeout monitor
178   private java.util.concurrent.ExecutorService threadPoolExecutorService;
179   
180   private List<EventType> ignoreStatesRSOffline = Arrays.asList(new EventType[]{
181       EventType.RS_ZK_REGION_FAILED_OPEN, EventType.RS_ZK_REGION_CLOSED });
182 
183   /**
184    * Set when we are doing master failover processing; cleared when failover
185    * completes.
186    */
187   private volatile boolean failover = false;
188 
189   // Set holding all the regions which got processed while RIT was not 
190   // populated during master failover. 
191   private Map<String, HRegionInfo> failoverProcessedRegions =
192     new HashMap<String, HRegionInfo>();
193 
194   /**
195    * Constructs a new assignment manager.
196    *
197    * @param master
198    * @param serverManager
199    * @param catalogTracker
200    * @param service
201    * @throws KeeperException
202    * @throws IOException 
203    */
204   public AssignmentManager(Server master, ServerManager serverManager,
205       CatalogTracker catalogTracker, final LoadBalancer balancer,
206       final ExecutorService service) throws KeeperException, IOException {
207     super(master.getZooKeeper());
208     this.master = master;
209     this.serverManager = serverManager;
210     this.catalogTracker = catalogTracker;
211     this.executorService = service;
212     this.regionsToReopen = Collections.synchronizedMap
213                            (new HashMap<String, HRegionInfo> ());
214     Configuration conf = master.getConfiguration();
215     this.timeoutMonitor = new TimeoutMonitor(
216       conf.getInt("hbase.master.assignment.timeoutmonitor.period", 10000),
217       master, serverManager,
218       conf.getInt("hbase.master.assignment.timeoutmonitor.timeout", 1800000));
219     this.timerUpdater = new TimerUpdater(conf.getInt(
220         "hbase.master.assignment.timerupdater.period", 10000), master);
221     Threads.setDaemonThreadRunning(timerUpdater.getThread(),
222         master.getServerName() + ".timerUpdater");
223     this.zkTable = new ZKTable(this.master.getZooKeeper());
224     this.maximumAssignmentAttempts =
225       this.master.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10);
226     this.balancer = balancer;
227     this.threadPoolExecutorService = Executors.newCachedThreadPool();
228   }
229   
230   void startTimeOutMonitor() {
231     Threads.setDaemonThreadRunning(timeoutMonitor.getThread(), master.getServerName()
232         + ".timeoutMonitor");
233   }
234 
235   /**
236    * Compute the average load across all region servers.
237    * Currently, this uses a very naive computation - just uses the number of
238    * regions being served, ignoring stats about number of requests.
239    * @return the average load
240    */
241   double getAverageLoad() {
242     int totalLoad = 0;
243     int numServers = 0;
244     // Sync on this.regions because access to this.servers always synchronizes
245     // in this order.
246     synchronized (this.regions) {
247       for (Map.Entry<ServerName, Set<HRegionInfo>> e: servers.entrySet()) {
248         numServers++;
249         totalLoad += e.getValue().size();
250       }
251     }
252     return (double)totalLoad / (double)numServers;
253   }
254 
255   /**
256    * @return Instance of ZKTable.
257    */
258   public ZKTable getZKTable() {
259     // These are 'expensive' to make involving trip to zk ensemble so allow
260     // sharing.
261     return this.zkTable;
262   }
263   /**
264    * Returns the RegionServer to which hri is assigned.
265    *
266    * @param hri
267    *          HRegion for which this function returns the region server
268    * @return HServerInfo The region server to which hri belongs
269    */
270   public ServerName getRegionServerOfRegion(HRegionInfo hri) {
271     synchronized (this.regions ) {
272       return regions.get(hri);
273     }
274   }
275 
276   /**
277    * Checks whether the region is assigned.
278    * @param hri HRegion for which this function returns the result
279    * @return True iff assigned.
280    */
281   public boolean isRegionAssigned(HRegionInfo hri) {
282     synchronized (this.regions ) {
283       return regions.containsKey(hri);
284     }
285   }
286 
287   /**
288    * Gives enabling table regions.
289    * 
290    * @param tableName
291    * @return list of regionInfos
292    */
293   public List<HRegionInfo> getEnablingTableRegions(String tableName){
294     return this.enablingTables.get(tableName);
295   }
296 
297   /**
298    * Add a regionPlan for the specified region.
299    * @param encodedName 
300    * @param plan 
301    */
302   public void addPlan(String encodedName, RegionPlan plan) {
303     synchronized (regionPlans) {
304       regionPlans.put(encodedName, plan);
305     }
306   }
307 
308   /**
309    * Add a map of region plans.
310    */
311   public void addPlans(Map<String, RegionPlan> plans) {
312     synchronized (regionPlans) {
313       regionPlans.putAll(plans);
314     }
315   }
316 
317   /**
318    * Set the list of regions that will be reopened
319    * because of an update in table schema
320    *
321    * @param regions
322    *          list of regions that should be tracked for reopen
323    */
324   public void setRegionsToReopen(List <HRegionInfo> regions) {
325     for(HRegionInfo hri : regions) {
326       regionsToReopen.put(hri.getEncodedName(), hri);
327     }
328   }
329 
330   /**
331    * Used by the client to identify if all regions have the schema updates
332    *
333    * @param tableName
334    * @return Pair indicating the status of the alter command
335    * @throws IOException
336    */
337   public Pair<Integer, Integer> getReopenStatus(byte[] tableName)
338   throws IOException {
339     List <HRegionInfo> hris =
340       MetaReader.getTableRegions(this.master.getCatalogTracker(), tableName);
341     Integer pending = 0;
342     for(HRegionInfo hri : hris) {
343       String name = hri.getEncodedName();
344       if (regionsToReopen.containsKey(name) || regionsInTransition.containsKey(name)) {
345         pending++;
346       }
347     }
348     return new Pair<Integer, Integer>(pending, hris.size());
349   }
350   /**
351    * Reset all unassigned znodes.  Called on startup of master.
352    * Call {@link #assignAllUserRegions()} after root and meta have been assigned.
353    * @throws IOException
354    * @throws KeeperException
355    */
356   void cleanoutUnassigned() throws IOException, KeeperException {
357     // Cleanup any existing ZK nodes and start watching
358     ZKAssign.deleteAllNodes(watcher);
359     ZKUtil.listChildrenAndWatchForNewChildren(this.watcher,
360       this.watcher.assignmentZNode);
361   }
362 
363   /**
364    * Called on startup.
365    * Figures whether a fresh cluster start of we are joining extant running cluster.
366    * @throws IOException
367    * @throws KeeperException
368    * @throws InterruptedException
369    */
370   void joinCluster() throws IOException,
371       KeeperException, InterruptedException {
372     // Concurrency note: In the below the accesses on regionsInTransition are
373     // outside of a synchronization block where usually all accesses to RIT are
374     // synchronized.  The presumption is that in this case it is safe since this
375     // method is being played by a single thread on startup.
376 
377     // TODO: Regions that have a null location and are not in regionsInTransitions
378     // need to be handled.
379 
380     // Scan META to build list of existing regions, servers, and assignment
381     // Returns servers who have not checked in (assumed dead) and their regions
382     Map<ServerName, List<Pair<HRegionInfo, Result>>> deadServers = rebuildUserRegions();
383 
384     processDeadServersAndRegionsInTransition(deadServers);
385 
386     // Recover the tables that were not fully moved to DISABLED state.
387     // These tables are in DISABLING state when the master restarted/switched.
388     boolean isWatcherCreated = recoverTableInDisablingState(this.disablingTables);
389     recoverTableInEnablingState(this.enablingTables.keySet(), isWatcherCreated);
390     this.enablingTables.clear();
391     this.disablingTables.clear();
392   }
393 
394   /**
395    * Process all regions that are in transition up in zookeeper.  Used by
396    * master joining an already running cluster.
397    * @throws KeeperException
398    * @throws IOException
399    * @throws InterruptedException
400    */
401   void processDeadServersAndRegionsInTransition()
402   throws KeeperException, IOException, InterruptedException {
403     // Pass null to signify no dead servers in this context.
404     processDeadServersAndRegionsInTransition(null);
405   }
406 
407   /**
408    * Process all regions that are in transition in zookeeper and also
409    * processes the list of dead servers by scanning the META. 
410    * Used by master joining an cluster.
411    * @param deadServers
412    *          Map of dead servers and their regions. Can be null.
413    * @throws KeeperException
414    * @throws IOException
415    * @throws InterruptedException
416    */
417   void processDeadServersAndRegionsInTransition(
418       final Map<ServerName, List<Pair<HRegionInfo, Result>>> deadServers)
419   throws KeeperException, IOException, InterruptedException {
420     List<String> nodes = ZKUtil.listChildrenAndWatchForNewChildren(watcher,
421       watcher.assignmentZNode);
422     
423     if (nodes == null) {
424       String errorMessage = "Failed to get the children from ZK";
425       master.abort(errorMessage, new IOException(errorMessage));
426       return;
427     }
428     // Run through all regions.  If they are not assigned and not in RIT, then
429     // its a clean cluster startup, else its a failover.
430     synchronized (this.regions) {
431       for (Map.Entry<HRegionInfo, ServerName> e : this.regions.entrySet()) {
432         if (!e.getKey().isMetaTable() && e.getValue() != null) {
433           LOG.debug("Found " + e + " out on cluster");
434           this.failover = true;
435           break;
436         }
437         if (nodes.contains(e.getKey().getEncodedName())) {
438           LOG.debug("Found " + e.getKey().getRegionNameAsString() + " in RITs");
439           // Could be a meta region.
440           this.failover = true;
441           break;
442         }
443       }
444     }
445 
446     // Remove regions in RIT, they are possibly being processed by
447     // ServerShutdownHandler.
448     synchronized (regionsInTransition) {
449       nodes.removeAll(regionsInTransition.keySet());
450     }
451 
452     // If some dead servers are processed by ServerShutdownHandler, we shouldn't
453     // assign all user regions( some would be assigned by
454     // ServerShutdownHandler), consider it as a failover
455     if (!this.serverManager.getDeadServers().isEmpty()) {
456       this.failover = true;
457     }
458 
459     // If we found user regions out on cluster, its a failover.
460     if (this.failover) {
461       LOG.info("Found regions out on cluster or in RIT; failover");
462       // Process list of dead servers and regions in RIT.
463       // See HBASE-4580 for more information.
464       processDeadServersAndRecoverLostRegions(deadServers, nodes);
465       this.failover = false;
466       failoverProcessedRegions.clear();
467     } else {
468       // Fresh cluster startup.
469       LOG.info("Clean cluster startup. Assigning userregions");
470       cleanoutUnassigned();
471       assignAllUserRegions();
472     }
473   }
474 
475   /**
476    * If region is up in zk in transition, then do fixup and block and wait until
477    * the region is assigned and out of transition.  Used on startup for
478    * catalog regions.
479    * @param hri Region to look for.
480    * @return True if we processed a region in transition else false if region
481    * was not up in zk in transition.
482    * @throws InterruptedException
483    * @throws KeeperException
484    * @throws IOException
485    */
486   boolean processRegionInTransitionAndBlockUntilAssigned(final HRegionInfo hri)
487   throws InterruptedException, KeeperException, IOException {
488     boolean intransistion =
489       processRegionInTransition(hri.getEncodedName(), hri, null);
490     if (!intransistion) return intransistion;
491     LOG.debug("Waiting on " + HRegionInfo.prettyPrint(hri.getEncodedName()));
492     synchronized(this.regionsInTransition) {
493       while (!this.master.isStopped() &&
494           this.regionsInTransition.containsKey(hri.getEncodedName())) {
495         // We expect a notify, but by security we set a timout
496         this.regionsInTransition.wait(100);
497       }
498     }
499     return intransistion;
500   }
501 
502   /**
503    * Process failover of new master for region <code>encodedRegionName</code>
504    * up in zookeeper.
505    * @param encodedRegionName Region to process failover for.
506    * @param regionInfo If null we'll go get it from meta table.
507    * @param deadServers Can be null 
508    * @return True if we processed <code>regionInfo</code> as a RIT.
509    * @throws KeeperException
510    * @throws IOException
511    */
512   boolean processRegionInTransition(final String encodedRegionName,
513       final HRegionInfo regionInfo,
514       final Map<ServerName,List<Pair<HRegionInfo,Result>>> deadServers)
515   throws KeeperException, IOException {
516     Stat stat = new Stat();
517     RegionTransitionData data = ZKAssign.getDataAndWatch(watcher,
518         encodedRegionName, stat);
519     if (data == null) return false;
520     HRegionInfo hri = regionInfo;
521     if (hri == null) {
522       if ((hri = getHRegionInfo(data)) == null) return false; 
523     }
524     processRegionsInTransition(data, hri, deadServers, stat.getVersion());
525     return true;
526   }
527 
528   void processRegionsInTransition(final RegionTransitionData data,
529       final HRegionInfo regionInfo,
530       final Map<ServerName, List<Pair<HRegionInfo, Result>>> deadServers,
531       int expectedVersion)
532   throws KeeperException {
533     String encodedRegionName = regionInfo.getEncodedName();
534     LOG.info("Processing region " + regionInfo.getRegionNameAsString() +
535       " in state " + data.getEventType());
536     List<HRegionInfo> hris = this.enablingTables.get(regionInfo.getTableNameAsString());
537     if (hris != null && !hris.isEmpty()) {
538       hris.remove(regionInfo);
539     }
540     synchronized (regionsInTransition) {
541       RegionState regionState = regionsInTransition.get(encodedRegionName);
542       if (regionState != null ||
543           failoverProcessedRegions.containsKey(encodedRegionName)) {
544         // Just return
545         return;
546       }
547       switch (data.getEventType()) {
548       case M_ZK_REGION_CLOSING:
549         // If zk node of the region was updated by a live server skip this
550         // region and just add it into RIT.
551         if (isOnDeadServer(regionInfo, deadServers) &&
552             (data.getOrigin() == null || !serverManager.isServerOnline(data.getOrigin()))) {
553           // If was on dead server, its closed now. Force to OFFLINE and this
554           // will get it reassigned if appropriate
555           forceOffline(regionInfo, data);
556         } else {
557           // Just insert region into RIT.
558           // If this never updates the timeout will trigger new assignment
559           regionsInTransition.put(encodedRegionName, new RegionState(
560             regionInfo, RegionState.State.CLOSING,
561             data.getStamp(), data.getOrigin()));
562         }
563         failoverProcessedRegions.put(encodedRegionName, regionInfo);
564         break;
565 
566       case RS_ZK_REGION_CLOSED:
567       case RS_ZK_REGION_FAILED_OPEN:
568         // Region is closed, insert into RIT and handle it
569         addToRITandCallClose(regionInfo, RegionState.State.CLOSED, data);
570         failoverProcessedRegions.put(encodedRegionName, regionInfo);
571         break;
572 
573       case M_ZK_REGION_OFFLINE:
574         // If zk node of the region was updated by a live server skip this
575         // region and just add it into RIT.
576         if (isOnDeadServer(regionInfo, deadServers) &&
577             (data.getOrigin() == null ||
578               !serverManager.isServerOnline(data.getOrigin()))) {
579           // Region is offline, insert into RIT and handle it like a closed
580           addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, data);
581         } else if (data.getOrigin() != null &&
582             !serverManager.isServerOnline(data.getOrigin())) {
583           // to handle cases where offline node is created but sendRegionOpen
584           // RPC is not yet sent
585           addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, data);
586         } else {
587           regionsInTransition.put(encodedRegionName, new RegionState(
588               regionInfo, RegionState.State.PENDING_OPEN, data.getStamp(), data
589                   .getOrigin()));
590         }
591         failoverProcessedRegions.put(encodedRegionName, regionInfo);
592         break;
593 
594       case RS_ZK_REGION_OPENING:
595         // TODO: Could check if it was on deadServers.  If it was, then we could
596         // do what happens in TimeoutMonitor when it sees this condition.
597 
598         // Just insert region into RIT
599         // If this never updates the timeout will trigger new assignment
600         if (regionInfo.isMetaTable()) {
601           regionsInTransition.put(encodedRegionName, new RegionState(
602               regionInfo, RegionState.State.OPENING, data.getStamp(), data
603                   .getOrigin()));
604           // If ROOT or .META. table is waiting for timeout monitor to assign
605           // it may take lot of time when the assignment.timeout.period is
606           // the default value which may be very long.  We will not be able
607           // to serve any request during this time.
608           // So we will assign the ROOT and .META. region immediately.
609           processOpeningState(regionInfo);
610           break;
611         }
612         regionsInTransition.put(encodedRegionName, new RegionState(regionInfo,
613             RegionState.State.OPENING, data.getStamp(), data.getOrigin()));
614         failoverProcessedRegions.put(encodedRegionName, regionInfo);
615         break;
616 
617       case RS_ZK_REGION_OPENED:
618         // Region is opened, insert into RIT and handle it
619         regionsInTransition.put(encodedRegionName, new RegionState(
620             regionInfo, RegionState.State.OPEN,
621             data.getStamp(), data.getOrigin()));
622         ServerName sn = data.getOrigin() == null? null: data.getOrigin();
623         // sn could be null if this server is no longer online.  If
624         // that is the case, just let this RIT timeout; it'll be assigned
625         // to new server then.
626         if (sn == null) {
627           LOG.warn("Region in transition " + regionInfo.getEncodedName() +
628             " references a null server; letting RIT timeout so will be " +
629             "assigned elsewhere");
630         } else if (!serverManager.isServerOnline(sn)
631             && (isOnDeadServer(regionInfo, deadServers)
632                 || regionInfo.isMetaRegion() || regionInfo.isRootRegion())) {
633           forceOffline(regionInfo, data);
634         } else {
635           new OpenedRegionHandler(master, this, regionInfo, sn, expectedVersion)
636               .process();
637         }
638         failoverProcessedRegions.put(encodedRegionName, regionInfo);
639         break;
640       }
641     }
642   }
643   
644 
645   /**
646    * Put the region <code>hri</code> into an offline state up in zk.
647    * @param hri
648    * @param oldData
649    * @throws KeeperException
650    */
651   private void forceOffline(final HRegionInfo hri,
652       final RegionTransitionData oldData)
653   throws KeeperException {
654     // If was on dead server, its closed now.  Force to OFFLINE and then
655     // handle it like a close; this will get it reassigned if appropriate
656     LOG.debug("RIT " + hri.getEncodedName() + " in state=" +
657       oldData.getEventType() + " was on deadserver; forcing offline");
658     ZKAssign.createOrForceNodeOffline(this.watcher, hri,
659       this.master.getServerName());
660     addToRITandCallClose(hri, RegionState.State.OFFLINE, oldData);
661   }
662 
663   /**
664    * Add to the in-memory copy of regions in transition and then call close
665    * handler on passed region <code>hri</code>
666    * @param hri
667    * @param state
668    * @param oldData
669    */
670   private void addToRITandCallClose(final HRegionInfo hri,
671       final RegionState.State state, final RegionTransitionData oldData) {
672     this.regionsInTransition.put(hri.getEncodedName(),
673       new RegionState(hri, state, oldData.getStamp(), oldData.getOrigin()));
674     new ClosedRegionHandler(this.master, this, hri).process();
675   }
676 
677   /**
678    * When a region is closed, it should be removed from the regionsToReopen
679    * @param hri HRegionInfo of the region which was closed
680    */
681   public void removeClosedRegion(HRegionInfo hri) {
682     if (!regionsToReopen.isEmpty()) {
683       if (regionsToReopen.remove(hri.getEncodedName()) != null) {
684           LOG.debug("Removed region from reopening regions because it was closed");
685       }
686     }
687   }
688 
689   /**
690    * @param regionInfo
691    * @param deadServers Map of deadServers and the regions they were carrying;
692    * can be null.
693    * @return True if the passed regionInfo in the passed map of deadServers?
694    */
695   private boolean isOnDeadServer(final HRegionInfo regionInfo,
696       final Map<ServerName, List<Pair<HRegionInfo, Result>>> deadServers) {
697     if (deadServers == null) return false;
698     for (Map.Entry<ServerName, List<Pair<HRegionInfo, Result>>> deadServer:
699         deadServers.entrySet()) {
700       for (Pair<HRegionInfo, Result> e: deadServer.getValue()) {
701         if (e.getFirst().equals(regionInfo)) return true;
702       }
703     }
704     return false;
705   }
706 
707   /**
708    * Handles various states an unassigned node can be in.
709    * <p>
710    * Method is called when a state change is suspected for an unassigned node.
711    * <p>
712    * This deals with skipped transitions (we got a CLOSED but didn't see CLOSING
713    * yet).
714    * @param data
715    * @param expectedVersion
716    */
717   private void handleRegion(final RegionTransitionData data, int expectedVersion) {
718     synchronized(regionsInTransition) {
719       HRegionInfo hri = null;
720       if (data == null || data.getOrigin() == null) {
721         LOG.warn("Unexpected NULL input " + data);
722         return;
723       }
724       ServerName sn = data.getOrigin();
725       // Check if this is a special HBCK transition
726       if (sn.equals(HConstants.HBCK_CODE_SERVERNAME)) {
727         handleHBCK(data);
728         return;
729       }
730       String encodedName = HRegionInfo.encodeRegionName(data.getRegionName());
731       String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
732       // Verify this is a known server
733       if (!serverManager.isServerOnline(sn) &&
734           !this.master.getServerName().equals(sn)
735           && !ignoreStatesRSOffline.contains(data.getEventType())) {
736         LOG.warn("Attempted to handle region transition for server but " +
737           "server is not online: " + prettyPrintedRegionName);
738         return;
739       }
740       // Printing if the event was created a long time ago helps debugging
741       boolean lateEvent = data.getStamp() <
742           (System.currentTimeMillis() - 15000);
743       LOG.debug("Handling transition=" + data.getEventType() +
744         ", server=" + data.getOrigin() + ", region=" +
745           (prettyPrintedRegionName == null? "null": prettyPrintedRegionName)  +
746           (lateEvent? ", which is more than 15 seconds late" : ""));
747       RegionState regionState = regionsInTransition.get(encodedName);
748       switch (data.getEventType()) {
749         case M_ZK_REGION_OFFLINE:
750           // Nothing to do.
751           break;
752 
753         case RS_ZK_REGION_SPLITTING:
754           if (!isInStateForSplitting(regionState)) break;
755           addSplittingToRIT(sn, encodedName);
756           break;
757 
758         case RS_ZK_REGION_SPLIT:
759           // RegionState must be null, or SPLITTING or PENDING_CLOSE.
760           if (!isInStateForSplitting(regionState)) break;
761           // If null, add SPLITTING state before going to SPLIT
762           if (regionState == null) {
763             regionState = addSplittingToRIT(sn, encodedName);
764             String message = "Received SPLIT for region " + prettyPrintedRegionName +
765               " from server " + sn;
766             // If still null, it means we cannot find it and it was already processed
767             if (regionState == null) {
768               LOG.warn(message + " but it doesn't exist anymore," +
769                   " probably already processed its split");
770               break;
771             }
772             LOG.info(message +
773                 " but region was not first in SPLITTING state; continuing");
774           }
775           // Check it has daughters.
776           byte [] payload = data.getPayload();
777           List<HRegionInfo> daughters = null;
778           try {
779             daughters = Writables.getHRegionInfos(payload, 0, payload.length);
780           } catch (IOException e) {
781             LOG.error("Dropped split! Failed reading split payload for " +
782               prettyPrintedRegionName);
783             break;
784           }
785           assert daughters.size() == 2;
786           // Assert that we can get a serverinfo for this server.
787           if (!this.serverManager.isServerOnline(sn)) {
788             LOG.error("Dropped split! ServerName=" + sn + " unknown.");
789             break;
790           }
791           // Run handler to do the rest of the SPLIT handling.
792           this.executorService.submit(new SplitRegionHandler(master, this,
793             regionState.getRegion(), sn, daughters));
794           break;
795 
796         case M_ZK_REGION_CLOSING:
797           hri = checkIfInFailover(regionState, encodedName, data);
798           if (hri != null) {
799             regionState = new RegionState(hri, RegionState.State.CLOSING, data
800                .getStamp(), data.getOrigin());
801             regionsInTransition.put(encodedName, regionState);
802             failoverProcessedRegions.put(encodedName, hri);
803             break;
804           }
805           // Should see CLOSING after we have asked it to CLOSE or additional
806           // times after already being in state of CLOSING
807           if (regionState == null ||
808               (!regionState.isPendingClose() && !regionState.isClosing())) {
809             LOG.warn("Received CLOSING for region " + prettyPrintedRegionName +
810               " from server " + data.getOrigin() + " but region was in " +
811               " the state " + regionState + " and not " +
812               "in expected PENDING_CLOSE or CLOSING states");
813             return;
814           }
815           // Transition to CLOSING (or update stamp if already CLOSING)
816           regionState.update(RegionState.State.CLOSING,
817               data.getStamp(), data.getOrigin());
818           break;
819 
820         case RS_ZK_REGION_CLOSED:
821           hri = checkIfInFailover(regionState, encodedName, data);
822           if (hri != null) {
823             regionState = new RegionState(hri, RegionState.State.CLOSED, data
824                 .getStamp(), data.getOrigin());
825             regionsInTransition.put(encodedName, regionState);
826             removeClosedRegion(regionState.getRegion());
827             new ClosedRegionHandler(master, this, regionState.getRegion())
828               .process();
829             failoverProcessedRegions.put(encodedName, hri);
830             break;
831           }
832           // Should see CLOSED after CLOSING but possible after PENDING_CLOSE
833           if (regionState == null ||
834               (!regionState.isPendingClose() && !regionState.isClosing())) {
835             LOG.warn("Received CLOSED for region " + prettyPrintedRegionName +
836                 " from server " + data.getOrigin() + " but region was in " +
837                 " the state " + regionState + " and not " +
838                 "in expected PENDING_CLOSE or CLOSING states");
839             return;
840           }
841           // Handle CLOSED by assigning elsewhere or stopping if a disable
842           // If we got here all is good.  Need to update RegionState -- else
843           // what follows will fail because not in expected state.
844           regionState.update(RegionState.State.CLOSED,
845               data.getStamp(), data.getOrigin());
846           removeClosedRegion(regionState.getRegion());
847           this.executorService.submit(new ClosedRegionHandler(master,
848             this, regionState.getRegion()));
849           break;
850           
851         case RS_ZK_REGION_FAILED_OPEN:
852           hri = checkIfInFailover(regionState, encodedName, data);
853           if (hri != null) {
854             regionState = new RegionState(hri, RegionState.State.CLOSED, data
855                 .getStamp(), data.getOrigin());
856             regionsInTransition.put(encodedName, regionState);
857             new ClosedRegionHandler(master, this, regionState.getRegion())
858               .process();
859             failoverProcessedRegions.put(encodedName, hri);
860             break;
861           }
862           if (regionState == null ||
863               (!regionState.isOffline() && !regionState.isPendingOpen() && !regionState.isOpening())) {
864             LOG.warn("Received FAILED_OPEN for region " + prettyPrintedRegionName +
865                 " from server " + data.getOrigin() + " but region was in " +
866                 " the state " + regionState + " and not in OFFLINE, PENDING_OPEN or OPENING");
867             return;
868           }
869           // Handle this the same as if it were opened and then closed.
870           regionState.update(RegionState.State.CLOSED,
871               data.getStamp(), data.getOrigin());
872           // When there are more than one region server a new RS is selected as the 
873           // destination and the same is updated in the regionplan. (HBASE-5546)
874           getRegionPlan(regionState, sn, true);
875           this.executorService.submit(new ClosedRegionHandler(master,
876             this, regionState.getRegion()));
877           break;
878 
879         case RS_ZK_REGION_OPENING:
880           hri = checkIfInFailover(regionState, encodedName, data);       
881           if (hri != null) {
882             regionState = new RegionState(hri, RegionState.State.OPENING, data
883                 .getStamp(), data.getOrigin());
884             regionsInTransition.put(encodedName, regionState);
885             failoverProcessedRegions.put(encodedName, hri);
886             break;
887           }
888           if (regionState == null ||
889               (!regionState.isOffline() && !regionState.isPendingOpen() &&
890                   !regionState.isOpening())) {
891               LOG.warn("Received OPENING for region " + prettyPrintedRegionName +  " from server " +
892                 sn + " but region was in " + " the state " + regionState + " and not " +
893                 "in expected OFFLINE, PENDING_OPEN or OPENING states");
894               return;
895           }
896           // Transition to OPENING (or update stamp if already OPENING)
897           regionState.update(RegionState.State.OPENING,
898               data.getStamp(), data.getOrigin());
899           break;
900 
901         case RS_ZK_REGION_OPENED:
902           hri = checkIfInFailover(regionState, encodedName, data);
903           if (hri != null) {
904             regionState = new RegionState(hri, RegionState.State.OPEN, data
905                 .getStamp(), data.getOrigin());
906             regionsInTransition.put(encodedName, regionState);
907             new OpenedRegionHandler(master, this, regionState.getRegion(), data
908               .getOrigin(), expectedVersion).process();
909             failoverProcessedRegions.put(encodedName, hri);
910             break;
911           }
912           // Should see OPENED after OPENING but possible after PENDING_OPEN
913           if (regionState == null ||
914               (!regionState.isOffline() && !regionState.isPendingOpen() && !regionState.isOpening())) {
915             LOG.warn("Received OPENED for region " +
916                 prettyPrintedRegionName +
917                 " from server " + data.getOrigin() + " but region was in " +
918                 " the state " + regionState + " and not " +
919                 "in expected OFFLINE, PENDING_OPEN or OPENING states");
920             return;
921           }
922           // Handle OPENED by removing from transition and deleted zk node
923           regionState.update(RegionState.State.OPEN,
924               data.getStamp(), data.getOrigin());
925           this.executorService.submit(
926             new OpenedRegionHandler(master, this, regionState.getRegion(),
927               data.getOrigin(), expectedVersion));
928           break;
929       }
930     }
931   }
932 
933   /**
934    * Checks whether the callback came while RIT was not yet populated during
935    * master failover.
936    * @param regionState
937    * @param encodedName
938    * @param data
939    * @return hri
940    */
941   private HRegionInfo checkIfInFailover(RegionState regionState,
942       String encodedName, RegionTransitionData data) {
943     if (regionState == null && this.failover &&
944         (failoverProcessedRegions.containsKey(encodedName) == false ||
945           failoverProcessedRegions.get(encodedName) == null)) {
946       HRegionInfo hri = this.failoverProcessedRegions.get(encodedName);
947       if (hri == null) hri = getHRegionInfo(data);
948       return hri;
949     }
950     return null;
951   }
952   
953   /**
954    * Gets the HRegionInfo from the META table
955    * @param  data
956    * @return HRegionInfo hri for the region 
957    */
958   private HRegionInfo getHRegionInfo(RegionTransitionData data) {
959     Pair<HRegionInfo, ServerName> p = null;
960     try {
961       p = MetaReader.getRegion(catalogTracker, data.getRegionName());
962       if (p == null) return null;
963       return p.getFirst();
964     } catch (IOException e) {
965       master.abort("Aborting because error occoured while reading "
966           + data.getRegionName() + " from .META.", e);
967       return null;
968     }
969   }
970 
971   /**
972    * @return Returns true if this RegionState is splittable; i.e. the
973    * RegionState is currently in splitting state or pending_close or
974    * null (Anything else will return false). (Anything else will return false).
975    */
976   private boolean isInStateForSplitting(final RegionState rs) {
977     if (rs == null) return true;
978     if (rs.isSplitting()) return true;
979     if (convertPendingCloseToSplitting(rs)) return true;
980     LOG.warn("Dropped region split! Not in state good for SPLITTING; rs=" + rs);
981     return false;
982   }
983 
984   /**
985    * If the passed regionState is in PENDING_CLOSE, clean up PENDING_CLOSE
986    * state and convert it to SPLITTING instead.
987    * This can happen in case where master wants to close a region at same time
988    * a regionserver starts a split.  The split won.  Clean out old PENDING_CLOSE
989    * state.
990    * @param rs
991    * @return True if we converted from PENDING_CLOSE to SPLITTING
992    */
993   private boolean convertPendingCloseToSplitting(final RegionState rs) {
994     if (!rs.isPendingClose()) return false;
995     LOG.debug("Converting PENDING_CLOSE to SPLITING; rs=" + rs);
996     rs.update(RegionState.State.SPLITTING);
997     // Clean up existing state.  Clear from region plans seems all we
998     // have to do here by way of clean up of PENDING_CLOSE.
999     clearRegionPlan(rs.getRegion());
1000     return true;
1001   }
1002 
1003   /**
1004    * @param serverName
1005    * @param encodedName
1006    * @return The SPLITTING RegionState we added to RIT for the passed region
1007    * <code>encodedName</code>
1008    */
1009   private RegionState addSplittingToRIT(final ServerName serverName,
1010       final String encodedName) {
1011     RegionState regionState = null;
1012     synchronized (this.regions) {
1013       regionState = findHRegionInfoThenAddToRIT(serverName, encodedName);
1014       if (regionState != null) {
1015         regionState.update(RegionState.State.SPLITTING,
1016           System.currentTimeMillis(), serverName);
1017       }
1018     }
1019     return regionState;
1020   }
1021 
1022   /**
1023    * Caller must hold lock on <code>this.regions</code>.
1024    * @param serverName
1025    * @param encodedName
1026    * @return The instance of RegionState that was added to RIT or null if error.
1027    */
1028   private RegionState findHRegionInfoThenAddToRIT(final ServerName serverName,
1029       final String encodedName) {
1030     HRegionInfo hri = findHRegionInfo(serverName, encodedName);
1031     if (hri == null) {
1032       LOG.warn("Region " + encodedName + " not found on server " + serverName +
1033         "; failed processing");
1034       return null;
1035     }
1036     // Add to regions in transition, then update state to SPLITTING.
1037     return addToRegionsInTransition(hri);
1038   }
1039 
1040   /**
1041    * Caller must hold lock on <code>this.regions</code>.
1042    * @param serverName
1043    * @param encodedName
1044    * @return Found HRegionInfo or null.
1045    */
1046   private HRegionInfo findHRegionInfo(final ServerName sn,
1047       final String encodedName) {
1048     if (!this.serverManager.isServerOnline(sn)) return null;
1049     Set<HRegionInfo> hris = this.servers.get(sn);
1050     HRegionInfo foundHri = null;
1051     for (HRegionInfo hri: hris) {
1052       if (hri.getEncodedName().equals(encodedName)) {
1053         foundHri = hri;
1054         break;
1055       }
1056     }
1057     return foundHri;
1058   }
1059 
1060   /**
1061    * Handle a ZK unassigned node transition triggered by HBCK repair tool.
1062    * <p>
1063    * This is handled in a separate code path because it breaks the normal rules.
1064    * @param data
1065    */
1066   private void handleHBCK(RegionTransitionData data) {
1067     String encodedName = HRegionInfo.encodeRegionName(data.getRegionName());
1068     LOG.info("Handling HBCK triggered transition=" + data.getEventType() +
1069       ", server=" + data.getOrigin() + ", region=" +
1070       HRegionInfo.prettyPrint(encodedName));
1071     RegionState regionState = regionsInTransition.get(encodedName);
1072     switch (data.getEventType()) {
1073       case M_ZK_REGION_OFFLINE:
1074         HRegionInfo regionInfo = null;
1075         if (regionState != null) {
1076           regionInfo = regionState.getRegion();
1077         } else {
1078           try {
1079             byte[] name = data.getRegionName();
1080             Pair<HRegionInfo, ServerName> p = MetaReader.getRegion(catalogTracker, name);
1081             regionInfo = p.getFirst();
1082           } catch (IOException e) {
1083             LOG.info("Exception reading META doing HBCK repair operation", e);
1084             return;
1085           }
1086         }
1087         LOG.info("HBCK repair is triggering assignment of region=" +
1088             regionInfo.getRegionNameAsString());
1089         // trigger assign, node is already in OFFLINE so don't need to update ZK
1090         assign(regionInfo, false);
1091         break;
1092 
1093       default:
1094         LOG.warn("Received unexpected region state from HBCK (" +
1095             data.getEventType() + ")");
1096         break;
1097     }
1098   }
1099 
1100   // ZooKeeper events
1101 
1102   /**
1103    * New unassigned node has been created.
1104    *
1105    * <p>This happens when an RS begins the OPENING or CLOSING of a region by
1106    * creating an unassigned node.
1107    *
1108    * <p>When this happens we must:
1109    * <ol>
1110    *   <li>Watch the node for further events</li>
1111    *   <li>Read and handle the state in the node</li>
1112    * </ol>
1113    */
1114   @Override
1115   public void nodeCreated(String path) {
1116     if(path.startsWith(watcher.assignmentZNode)) {
1117       try {
1118         Stat stat = new Stat();
1119         RegionTransitionData data = ZKAssign.getDataAndWatch(watcher, path, stat);
1120         if (data == null) {
1121           return;
1122         }
1123         handleRegion(data, stat.getVersion());
1124       } catch (KeeperException e) {
1125         master.abort("Unexpected ZK exception reading unassigned node data", e);
1126       }
1127     }
1128   }
1129 
1130   /**
1131    * Existing unassigned node has had data changed.
1132    *
1133    * <p>This happens when an RS transitions from OFFLINE to OPENING, or between
1134    * OPENING/OPENED and CLOSING/CLOSED.
1135    *
1136    * <p>When this happens we must:
1137    * <ol>
1138    *   <li>Watch the node for further events</li>
1139    *   <li>Read and handle the state in the node</li>
1140    * </ol>
1141    */
1142   @Override
1143   public void nodeDataChanged(String path) {
1144     if(path.startsWith(watcher.assignmentZNode)) {
1145       try {
1146         Stat stat = new Stat();
1147         RegionTransitionData data = ZKAssign.getDataAndWatch(watcher, path, stat);
1148         if (data == null) {
1149           return;
1150         }
1151         handleRegion(data, stat.getVersion());
1152       } catch (KeeperException e) {
1153         master.abort("Unexpected ZK exception reading unassigned node data", e);
1154       }
1155     }
1156   }
1157 
1158   @Override
1159   public void nodeDeleted(final String path) {
1160     if (path.startsWith(this.watcher.assignmentZNode)) {
1161       String regionName = ZKAssign.getRegionName(this.master.getZooKeeper(), path);
1162       RegionState rs = this.regionsInTransition.get(regionName);
1163       if (rs != null) {
1164         HRegionInfo regionInfo = rs.getRegion();
1165         if (rs.isSplit()) {
1166           LOG.debug("Ephemeral node deleted, regionserver crashed?, offlining the region"
1167               + rs.getRegion() + " clearing from RIT;");
1168           regionOffline(rs.getRegion());
1169         } else if (rs.isSplitting()) {
1170           LOG.debug("Ephemeral node deleted.  Found in SPLITTING state. " + "Removing from RIT "
1171               + rs.getRegion());
1172           synchronized(this.regionsInTransition) {
1173             this.regionsInTransition.remove(regionName);
1174           }
1175         } else {
1176           LOG.debug("The znode of region " + regionInfo.getRegionNameAsString()
1177               + " has been deleted.");
1178           if (rs.isOpened()) {
1179             makeRegionOnline(rs, regionInfo);
1180           }
1181         }
1182       }
1183     }
1184   }
1185 
1186   private void makeRegionOnline(RegionState rs, HRegionInfo regionInfo) {
1187     regionOnline(regionInfo, rs.serverName);
1188     LOG.info("The master has opened the region "
1189         + regionInfo.getRegionNameAsString() + " that was online on "
1190         + rs.serverName);
1191     if (this.getZKTable().isDisablingOrDisabledTable(
1192         regionInfo.getTableNameAsString())) {
1193       LOG.debug("Opened region "
1194           + regionInfo.getRegionNameAsString() + " but "
1195           + "this table is disabled, triggering close of region");
1196       unassign(regionInfo);
1197     }
1198   }
1199 
1200   /**
1201    * New unassigned node has been created.
1202    *
1203    * <p>This happens when an RS begins the OPENING, SPLITTING or CLOSING of a
1204    * region by creating a znode.
1205    *
1206    * <p>When this happens we must:
1207    * <ol>
1208    *   <li>Watch the node for further children changed events</li>
1209    *   <li>Watch all new children for changed events</li>
1210    * </ol>
1211    */
1212   @Override
1213   public void nodeChildrenChanged(String path) {
1214     if(path.equals(watcher.assignmentZNode)) {
1215       try {
1216         List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(watcher,
1217             watcher.assignmentZNode);
1218         if (children != null) {
1219           Stat stat = new Stat();
1220           for (String child : children) {
1221             stat.setVersion(0);
1222             RegionTransitionData data = ZKAssign.getDataAndWatch(watcher,
1223                 ZKUtil.joinZNode(watcher.assignmentZNode, child), stat);
1224             // See HBASE-7551, handle splitting here as well, in case we miss the node change event
1225             if (stat.getVersion() > 0 && data.getEventType() == EventType.RS_ZK_REGION_SPLITTING) {
1226               handleRegion(data, stat.getVersion());
1227             }
1228           }
1229         }
1230       } catch(KeeperException e) {
1231         master.abort("Unexpected ZK exception reading unassigned children", e);
1232       }
1233     }
1234   }
1235 
1236   /**
1237    * Marks the region as online.  Removes it from regions in transition and
1238    * updates the in-memory assignment information.
1239    * <p>
1240    * Used when a region has been successfully opened on a region server.
1241    * @param regionInfo
1242    * @param sn
1243    */
1244   void regionOnline(HRegionInfo regionInfo, ServerName sn) {
1245     synchronized (this.regionsInTransition) {
1246       RegionState rs =
1247         this.regionsInTransition.remove(regionInfo.getEncodedName());
1248       if (rs != null) {
1249         this.regionsInTransition.notifyAll();
1250       }
1251     }
1252     synchronized (this.regions) {
1253       // Add check
1254       ServerName oldSn = this.regions.get(regionInfo);
1255       if (oldSn != null) LOG.warn("Overwriting " + regionInfo.getEncodedName() +
1256         " on " + oldSn + " with " + sn);
1257       
1258       if (isServerOnline(sn)) {
1259         this.regions.put(regionInfo, sn);
1260         addToServers(sn, regionInfo);
1261         this.regions.notifyAll();
1262       } else {
1263         LOG.info("The server is not in online servers, ServerName=" + 
1264           sn.getServerName() + ", region=" + regionInfo.getEncodedName());
1265       }
1266     }
1267     // Remove plan if one.
1268     clearRegionPlan(regionInfo);
1269     // Add the server to serversInUpdatingTimer
1270     addToServersInUpdatingTimer(sn);
1271   }
1272 
1273   /**
1274    * Add the server to the set serversInUpdatingTimer, then {@link TimerUpdater}
1275    * will update timers for this server in background
1276    * @param sn
1277    */
1278   private void addToServersInUpdatingTimer(final ServerName sn) {
1279     this.serversInUpdatingTimer.add(sn);
1280   }
1281 
1282   /**
1283    * Touch timers for all regions in transition that have the passed
1284    * <code>sn</code> in common.
1285    * Call this method whenever a server checks in.  Doing so helps the case where
1286    * a new regionserver has joined the cluster and its been given 1k regions to
1287    * open.  If this method is tickled every time the region reports in a
1288    * successful open then the 1k-th region won't be timed out just because its
1289    * sitting behind the open of 999 other regions.  This method is NOT used
1290    * as part of bulk assign -- there we have a different mechanism for extending
1291    * the regions in transition timer (we turn it off temporarily -- because
1292    * there is no regionplan involved when bulk assigning.
1293    * @param sn
1294    */
1295   private void updateTimers(final ServerName sn) {
1296     // This loop could be expensive.
1297     // First make a copy of current regionPlan rather than hold sync while
1298     // looping because holding sync can cause deadlock.  Its ok in this loop
1299     // if the Map we're going against is a little stale
1300     Map<String, RegionPlan> copy = new HashMap<String, RegionPlan>();
1301     synchronized(this.regionPlans) {
1302       copy.putAll(this.regionPlans);
1303     }
1304     for (Map.Entry<String, RegionPlan> e: copy.entrySet()) {
1305       if (e.getValue() == null || e.getValue().getDestination() == null) continue;
1306       if (!e.getValue().getDestination().equals(sn)) continue;
1307       RegionState rs = null;
1308       synchronized (this.regionsInTransition) {
1309         rs = this.regionsInTransition.get(e.getKey());
1310       }
1311       if (rs == null) continue;
1312       rs.updateTimestampToNow();
1313     }
1314   }
1315 
1316   /**
1317    * Marks the region as offline.  Removes it from regions in transition and
1318    * removes in-memory assignment information.
1319    * <p>
1320    * Used when a region has been closed and should remain closed.
1321    * @param regionInfo
1322    */
1323   public void regionOffline(final HRegionInfo regionInfo) {
1324     // remove the region plan as well just in case.
1325     clearRegionPlan(regionInfo);
1326     setOffline(regionInfo);
1327 
1328     synchronized(this.regionsInTransition) {
1329       if (this.regionsInTransition.remove(regionInfo.getEncodedName()) != null) {
1330         this.regionsInTransition.notifyAll();
1331       }
1332     }
1333   }
1334 
1335   /**
1336    * Sets the region as offline by removing in-memory assignment information but
1337    * retaining transition information.
1338    * <p>
1339    * Used when a region has been closed but should be reassigned.
1340    * @param regionInfo
1341    */
1342   public void setOffline(HRegionInfo regionInfo) {
1343     synchronized (this.regions) {
1344       ServerName sn = this.regions.remove(regionInfo);
1345       if (sn == null) return;
1346       Set<HRegionInfo> serverRegions = this.servers.get(sn);
1347       if (!serverRegions.remove(regionInfo)) {
1348         LOG.warn("No " + regionInfo + " on " + sn);
1349       }
1350     }
1351   }
1352 
1353   public void offlineDisabledRegion(HRegionInfo regionInfo) {
1354     // Disabling so should not be reassigned, just delete the CLOSED node
1355     LOG.debug("Table being disabled so deleting ZK node and removing from " +
1356         "regions in transition, skipping assignment of region " +
1357           regionInfo.getRegionNameAsString());
1358     try {
1359       if (!ZKAssign.deleteClosedNode(watcher, regionInfo.getEncodedName())) {
1360         // Could also be in OFFLINE mode
1361         ZKAssign.deleteOfflineNode(watcher, regionInfo.getEncodedName());
1362       }
1363     } catch (KeeperException.NoNodeException nne) {
1364       LOG.debug("Tried to delete closed node for " + regionInfo + " but it " +
1365           "does not exist so just offlining");
1366     } catch (KeeperException e) {
1367       this.master.abort("Error deleting CLOSED node in ZK", e);
1368     }
1369     regionOffline(regionInfo);
1370   }
1371 
1372   // Assignment methods
1373 
1374   /**
1375    * Assigns the specified region.
1376    * <p>
1377    * If a RegionPlan is available with a valid destination then it will be used
1378    * to determine what server region is assigned to.  If no RegionPlan is
1379    * available, region will be assigned to a random available server.
1380    * <p>
1381    * Updates the RegionState and sends the OPEN RPC.
1382    * <p>
1383    * This will only succeed if the region is in transition and in a CLOSED or
1384    * OFFLINE state or not in transition (in-memory not zk), and of course, the
1385    * chosen server is up and running (It may have just crashed!).  If the
1386    * in-memory checks pass, the zk node is forced to OFFLINE before assigning.
1387    *
1388    * @param region server to be assigned
1389    * @param setOfflineInZK whether ZK node should be created/transitioned to an
1390    *                       OFFLINE state before assigning the region
1391    */
1392   public void assign(HRegionInfo region, boolean setOfflineInZK) {
1393     assign(region, setOfflineInZK, false);
1394   }
1395 
1396   public void assign(HRegionInfo region, boolean setOfflineInZK,
1397       boolean forceNewPlan) {
1398     assign(region, setOfflineInZK, forceNewPlan, false);
1399   }
1400 
1401   /**
1402    * @param region
1403    * @param setOfflineInZK
1404    * @param forceNewPlan
1405    * @param hijack
1406    *          - true new assignment is needed, false otherwise
1407    */
1408   public void assign(HRegionInfo region, boolean setOfflineInZK,
1409       boolean forceNewPlan, boolean hijack) {
1410     // If hijack is true do not call disableRegionIfInRIT as 
1411     // we have not yet moved the znode to OFFLINE state.
1412     if (!hijack && isDisabledorDisablingRegionInRIT(region)) {
1413       return;
1414     }
1415     if (this.serverManager.isClusterShutdown()) {
1416       LOG.info("Cluster shutdown is set; skipping assign of " +
1417         region.getRegionNameAsString());
1418       return;
1419     }
1420     if (isAssigningSplitParentRegion(region)) {
1421       return;
1422     }
1423     RegionState state = addToRegionsInTransition(region,
1424         hijack);
1425     synchronized (state) {
1426       assign(region, state, setOfflineInZK, forceNewPlan, hijack);
1427     }
1428   }
1429 
1430   /**
1431    * Bulk assign regions to <code>destination</code>.
1432    * @param destination
1433    * @param regions Regions to assign.
1434    */
1435   void assign(final ServerName destination,
1436       final List<HRegionInfo> regions) {
1437     if (regions.size() == 0) {
1438       return;
1439     }
1440     LOG.debug("Bulk assigning " + regions.size() + " region(s) to " +
1441       destination.toString());
1442 
1443     List<RegionState> states = new ArrayList<RegionState>(regions.size());
1444     synchronized (this.regionsInTransition) {
1445       for (HRegionInfo region: regions) {
1446         states.add(forceRegionStateToOffline(region));
1447       }
1448     }
1449     // Add region plans, so we can updateTimers when one region is opened so
1450     // that unnecessary timeout on RIT is reduced.
1451     Map<String, RegionPlan> plans=new HashMap<String, RegionPlan>();
1452     for (HRegionInfo region : regions) {
1453       plans.put(region.getEncodedName(), new RegionPlan(region, null,
1454           destination));
1455     }
1456     this.addPlans(plans);
1457     
1458     // Presumption is that only this thread will be updating the state at this
1459     // time; i.e. handlers on backend won't be trying to set it to OPEN, etc.
1460     AtomicInteger counter = new AtomicInteger(0);
1461     CreateUnassignedAsyncCallback cb =
1462       new CreateUnassignedAsyncCallback(this.watcher, destination, counter);
1463     for (RegionState state: states) {
1464       if (!asyncSetOfflineInZooKeeper(state, cb, state)) {
1465         return;
1466       }
1467     }
1468     // Wait until all unassigned nodes have been put up and watchers set.
1469     int total = regions.size();
1470     for (int oldCounter = 0; true;) {
1471       int count = counter.get();
1472       if (oldCounter != count) {
1473         LOG.info(destination.toString() + " outstanding calls=" + count +
1474           " of total=" + total);
1475         oldCounter = count;
1476       }
1477       if (count == total) break;
1478       Threads.sleep(1);
1479     }
1480     // Check if any failed.
1481     if (cb.hasErrors()) {
1482       // TODO: createOrForceNodeOffline actually handles this condition; whereas this
1483       //       code used to just abort master. Now, it will bail more "gracefully".
1484       LOG.error("Error creating nodes for some of the regions we are trying to bulk assign");
1485       return;
1486     }
1487 
1488     // Move on to open regions.
1489     try {
1490       // Send OPEN RPC. If it fails on a IOE or RemoteException, the
1491       // TimeoutMonitor will pick up the pieces.
1492       long maxWaitTime = System.currentTimeMillis() +
1493         this.master.getConfiguration().
1494           getLong("hbase.regionserver.rpc.startup.waittime", 60000);
1495       while (!this.master.isStopped()) {
1496         try {
1497           this.serverManager.sendRegionOpen(destination, regions);
1498           break;
1499         } catch (RemoteException e) {
1500           IOException decodedException = e.unwrapRemoteException();
1501           if (decodedException instanceof RegionServerStoppedException) {
1502             LOG.warn("The region server was shut down, ", decodedException);
1503             // No need to retry, the region server is a goner.
1504             return;
1505           } else if (decodedException instanceof ServerNotRunningYetException) {
1506             // This is the one exception to retry.  For all else we should just fail
1507             // the startup.
1508             long now = System.currentTimeMillis();
1509             if (now > maxWaitTime) throw e;
1510             LOG.debug("Server is not yet up; waiting up to " +
1511                 (maxWaitTime - now) + "ms", e);
1512             Thread.sleep(1000);
1513           }
1514 
1515           throw decodedException;
1516         }
1517       }
1518     } catch (IOException e) {
1519       // Can be a socket timeout, EOF, NoRouteToHost, etc
1520       LOG.info("Unable to communicate with the region server in order" +
1521           " to assign regions", e);
1522     } catch (InterruptedException e) {
1523       throw new RuntimeException(e);
1524     }
1525     LOG.debug("Bulk assigning done for " + destination.toString());
1526   }
1527 
1528   /**
1529    * Callback handler for create unassigned znodes used during bulk assign.
1530    */
1531   static class CreateUnassignedAsyncCallback implements AsyncCallback.StringCallback {
1532     private final Log LOG = LogFactory.getLog(CreateUnassignedAsyncCallback.class);
1533     private final ZooKeeperWatcher zkw;
1534     private final ServerName destination;
1535     private final AtomicInteger counter;
1536     private final AtomicInteger errorCount = new AtomicInteger(0);
1537 
1538     CreateUnassignedAsyncCallback(final ZooKeeperWatcher zkw,
1539         final ServerName destination, final AtomicInteger counter) {
1540       this.zkw = zkw;
1541       this.destination = destination;
1542       this.counter = counter;
1543     }
1544 
1545     boolean hasErrors() {
1546       return this.errorCount.get() > 0;
1547     }
1548 
1549     @Override
1550     public void processResult(int rc, String path, Object ctx, String name) {
1551       if (rc == KeeperException.Code.NODEEXISTS.intValue()) {
1552         LOG.warn("Node for " + path + " already exists");
1553         reportCompletion(false);
1554         return;
1555       }
1556       if (rc != 0) {
1557         // This is resultcode. If non-zero, we will abort :(
1558         LOG.warn("rc != 0 for " + path + " -- some error, may be retryable connection loss -- "
1559             + "FIX see http://wiki.apache.org/hadoop/ZooKeeper/FAQ#A2");
1560         this.zkw.abort("Some error, may be connection loss writing unassigned at " + path +
1561           ", rc=" + rc, null);
1562         return;
1563       }
1564       LOG.debug("rs=" + (RegionState)ctx + ", server=" + this.destination.toString());
1565       // Async exists to set a watcher so we'll get triggered when
1566       // unassigned node changes.
1567       this.zkw.getRecoverableZooKeeper().getZooKeeper().exists(path, this.zkw,
1568         new ExistsUnassignedAsyncCallback(this, destination), ctx);
1569     }
1570 
1571     void reportCompletion(boolean success) {
1572       if (!success) {
1573         this.errorCount.incrementAndGet();
1574       }
1575       this.counter.incrementAndGet();
1576     }
1577   }
1578 
1579   /**
1580    * Callback handler for the exists call that sets watcher on unassigned znodes.
1581    * Used during bulk assign on startup.
1582    */
1583   static class ExistsUnassignedAsyncCallback implements AsyncCallback.StatCallback {
1584     private final Log LOG = LogFactory.getLog(ExistsUnassignedAsyncCallback.class);
1585     private ServerName destination;
1586     private CreateUnassignedAsyncCallback parent;
1587 
1588     ExistsUnassignedAsyncCallback(
1589         CreateUnassignedAsyncCallback parent, ServerName destination) {
1590       this.parent = parent;
1591       this.destination = destination;
1592     }
1593 
1594     @Override
1595     public void processResult(int rc, String path, Object ctx, Stat stat) {
1596       if (rc != 0) {
1597         // This is resultcode.  If non-zero, need to resubmit.
1598         LOG.warn("rc != 0 for " + path + " -- some error, may be connection loss -- " +
1599           "FIX see http://wiki.apache.org/hadoop/ZooKeeper/FAQ#A2");
1600         parent.reportCompletion(false);
1601         return;
1602       }
1603       RegionState state = (RegionState)ctx;
1604       LOG.debug("rs=" + state);
1605       // Transition RegionState to PENDING_OPEN here in master; means we've
1606       // sent the open.  We're a little ahead of ourselves here since we've not
1607       // yet sent out the actual open but putting this state change after the
1608       // call to open risks our writing PENDING_OPEN after state has been moved
1609       // to OPENING by the regionserver.
1610       state.update(RegionState.State.PENDING_OPEN, System.currentTimeMillis(), destination);
1611       parent.reportCompletion(true);
1612     }
1613   }
1614 
1615   /**
1616    * @param region
1617    * @return The current RegionState
1618    */
1619   private RegionState addToRegionsInTransition(final HRegionInfo region) {
1620     return addToRegionsInTransition(region, false);
1621   }
1622   /**
1623    * @param region
1624    * @param hijack
1625    * @return The current RegionState
1626    */
1627   private RegionState addToRegionsInTransition(final HRegionInfo region,
1628       boolean hijack) {
1629     synchronized (regionsInTransition) {
1630       return forceRegionStateToOffline(region, hijack);
1631     }
1632   }
1633   /**
1634    * Sets regions {@link RegionState} to {@link RegionState.State#OFFLINE}.
1635    * Caller must hold lock on this.regionsInTransition.
1636    * @param region
1637    * @return Amended RegionState.
1638    */
1639   private RegionState forceRegionStateToOffline(final HRegionInfo region) {
1640     return forceRegionStateToOffline(region, false);
1641   }
1642 
1643   /**
1644    * Sets regions {@link RegionState} to {@link RegionState.State#OFFLINE}.
1645    * Caller must hold lock on this.regionsInTransition.
1646    * @param region
1647    * @param hijack
1648    * @return Amended RegionState.
1649    */
1650   private RegionState forceRegionStateToOffline(final HRegionInfo region,
1651       boolean hijack) {
1652     String encodedName = region.getEncodedName();
1653     RegionState state = this.regionsInTransition.get(encodedName);
1654     if (state == null) {
1655       state = new RegionState(region, RegionState.State.OFFLINE);
1656       this.regionsInTransition.put(encodedName, state);
1657     } else {
1658       // If we are reassigning the node do not force in-memory state to OFFLINE.
1659       // Based on the znode state we will decide if to change in-memory state to
1660       // OFFLINE or not. It will be done before setting znode to OFFLINE state.
1661 
1662       // We often get here with state == CLOSED because ClosedRegionHandler will
1663       // assign on its tail as part of the handling of a region close.
1664       if (!hijack) {
1665         LOG.debug("Forcing OFFLINE; was=" + state);
1666         state.update(RegionState.State.OFFLINE);
1667       }
1668     }
1669     return state;
1670   }
1671 
1672   /**
1673    * Caller must hold lock on the passed <code>state</code> object.
1674    * @param state
1675    * @param setOfflineInZK
1676    * @param forceNewPlan
1677    * @param hijack
1678    */
1679   private void assign(final HRegionInfo region, final RegionState state,
1680       final boolean setOfflineInZK, final boolean forceNewPlan,
1681       boolean hijack) {
1682     boolean regionAlreadyInTransitionException = false;
1683     boolean serverNotRunningYet = false;
1684     boolean socketTimeoutException = false;
1685 
1686     long maxRegionServerStartupWaitTime = -1;
1687     for (int i = 0; i < this.maximumAssignmentAttempts; i++) {
1688       int versionOfOfflineNode = -1;
1689       if (setOfflineInZK) {
1690         // get the version of the znode after setting it to OFFLINE.
1691         // versionOfOfflineNode will be -1 if the znode was not set to OFFLINE
1692         versionOfOfflineNode = setOfflineInZooKeeper(state, hijack,
1693             regionAlreadyInTransitionException);
1694         if(versionOfOfflineNode != -1){
1695           if (isDisabledorDisablingRegionInRIT(region)) {
1696             return;
1697           }
1698           // In case of assign from EnableTableHandler table state is ENABLING. Any how
1699           // EnableTableHandler will set ENABLED after assigning all the table regions. If we
1700           // try to set to ENABLED directly then client api may think ENABLE table is completed.
1701           // When we have a case like all the regions are added directly into META and we call
1702           // assignRegion then we need to make the table ENABLED. Hence in such case the table
1703           // will not be in ENABLING or ENABLED state.
1704           String tableName = region.getTableNameAsString();
1705           if (!zkTable.isEnablingTable(tableName) && !zkTable.isEnabledTable(tableName)) {
1706             LOG.debug("Setting table " + tableName + " to ENABLED state.");
1707             setEnabledTable(region);
1708           }
1709         }
1710       }
1711       
1712       if (setOfflineInZK && versionOfOfflineNode == -1) {
1713         return;
1714       }
1715       
1716       if (this.master.isStopped()) {
1717         LOG.debug("Server stopped; skipping assign of " + state);
1718         return;
1719       }
1720       RegionPlan plan = getRegionPlan(state, !regionAlreadyInTransitionException
1721           && !serverNotRunningYet && forceNewPlan);
1722       if (plan == null) {
1723         LOG.debug("Unable to determine a plan to assign " + state);
1724         this.timeoutMonitor.setAllRegionServersOffline(true);
1725         return; // Should get reassigned later when RIT times out.
1726       }
1727       try {
1728         LOG.debug("Assigning region " + state.getRegion().getRegionNameAsString() +
1729           " to " + plan.getDestination().toString());
1730         long currentOfflineTimeStamp = state.getStamp();
1731         RegionOpeningState regionOpenState = serverManager.sendRegionOpen(plan.getDestination(),
1732             state.getRegion(), versionOfOfflineNode);
1733         if (regionOpenState == RegionOpeningState.OPENED) {
1734           // Transition RegionState to PENDING_OPEN
1735           // Check if already the offline state has been updated due to a
1736           // failure in prev assign
1737           if (state.isOffline() && currentOfflineTimeStamp != state.getStamp()) {
1738             return;
1739           }
1740           if (state.isOffline() && !state.isOpening()) {
1741             state.update(RegionState.State.PENDING_OPEN,
1742                 System.currentTimeMillis(), plan.getDestination());
1743           }
1744           if (state.isOpening()) return;
1745           if (state.isOpened()) return;
1746         } else if (regionOpenState == RegionOpeningState.ALREADY_OPENED) {
1747           // Remove region from in-memory transition and unassigned node from ZK
1748           // While trying to enable the table the regions of the table were
1749           // already enabled.
1750           LOG.debug("ALREADY_OPENED region " + state.getRegion().getRegionNameAsString() +
1751               " to " + plan.getDestination().toString());
1752           String encodedRegionName = state.getRegion()
1753               .getEncodedName();
1754           try {
1755             ZKAssign.deleteOfflineNode(master.getZooKeeper(), encodedRegionName);
1756           } catch (KeeperException.NoNodeException e) {
1757             if(LOG.isDebugEnabled()){
1758               LOG.debug("The unassigned node "+encodedRegionName+" doesnot exist.");
1759             }
1760           } catch (KeeperException e) {
1761             master.abort(
1762                 "Error deleting OFFLINED node in ZK for transition ZK node ("
1763                     + encodedRegionName + ")", e);
1764           }
1765           synchronized (this.regionsInTransition) {
1766             this.regionsInTransition.remove(plan.getRegionInfo()
1767                 .getEncodedName());
1768           }
1769           synchronized (this.regions) {
1770             this.regions.put(plan.getRegionInfo(), plan.getDestination());
1771             addToServers(plan.getDestination(), plan.getRegionInfo());
1772           }
1773         }
1774         break;
1775       } catch (Throwable t) {
1776         if (t instanceof RemoteException) {
1777           t = ((RemoteException) t).unwrapRemoteException();
1778         }
1779         regionAlreadyInTransitionException = false;
1780         serverNotRunningYet = false;
1781         socketTimeoutException = false;
1782 
1783         if (t instanceof RegionAlreadyInTransitionException) {
1784           regionAlreadyInTransitionException = true;
1785           if (LOG.isDebugEnabled()) {
1786             LOG.debug("Failed assignment in: " + plan.getDestination() + " due to "
1787                 + t.getMessage());
1788           }
1789         } else if (t instanceof ServerNotRunningYetException) {
1790           if (maxRegionServerStartupWaitTime < 0) {
1791             maxRegionServerStartupWaitTime = System.currentTimeMillis()
1792                 + this.master.getConfiguration().getLong("hbase.regionserver.rpc.startup.waittime",
1793                     60000);
1794           }
1795           try {
1796             long now = System.currentTimeMillis();
1797             if (now < maxRegionServerStartupWaitTime) {
1798               LOG.debug("Server is not yet up; waiting up to "
1799                   + (maxRegionServerStartupWaitTime - now) + "ms", t);
1800               serverNotRunningYet = true;
1801               Thread.sleep(100);
1802               i--; // reset the try count
1803             } else {
1804               LOG.debug("Server is not up for a while; try a new one", t);
1805             }
1806           } catch (InterruptedException ie) {
1807             LOG.warn("Failed to assign " + state.getRegion().getRegionNameAsString()
1808                 + " since interrupted", ie);
1809             Thread.currentThread().interrupt();
1810             return;
1811           }
1812         } else if (t instanceof java.net.SocketTimeoutException 
1813             && this.serverManager.isServerOnline(plan.getDestination())) {
1814           LOG.warn("Call openRegion() to " + plan.getDestination()
1815               + " has timed out when trying to assign "
1816               + region.getRegionNameAsString()
1817               + ", but the region might already be opened on "
1818               + plan.getDestination() + ".", t);
1819           socketTimeoutException = true;
1820           try {
1821             Thread.sleep(100);
1822             i--; // reset the try count
1823           } catch (InterruptedException ie) {
1824             LOG.warn("Failed to assign " + state.getRegion().getRegionNameAsString()
1825                 + " since interrupted", ie);
1826             Thread.currentThread().interrupt();
1827             return;
1828           }
1829         }
1830         LOG.warn("Failed assignment of "
1831           + state.getRegion().getRegionNameAsString()
1832           + " to "
1833           + plan.getDestination()
1834           + ", trying to assign "
1835           + (regionAlreadyInTransitionException || serverNotRunningYet || socketTimeoutException
1836             ? "to the same region server because of "
1837             + "RegionAlreadyInTransitionException/ServerNotRunningYetException/"
1838             + "SocketTimeoutException;"
1839             : "elsewhere instead; ")
1840           + "retry=" + i, t);
1841         // Clean out plan we failed execute and one that doesn't look like it'll
1842         // succeed anyways; we need a new plan!
1843         // Transition back to OFFLINE
1844         state.update(RegionState.State.OFFLINE);
1845         // If region opened on destination of present plan, reassigning to new
1846         // RS may cause double assignments. In case of RegionAlreadyInTransitionException
1847         // reassigning to same RS.
1848         RegionPlan newPlan = plan;
1849         if (!regionAlreadyInTransitionException
1850             && !serverNotRunningYet && !socketTimeoutException) {
1851           // Force a new plan and reassign. Will return null if no servers.
1852           // The new plan could be the same as the existing plan since we don't
1853           // exclude the server of the original plan, which should not be
1854           // excluded since it could be the only server up now.
1855           newPlan = getRegionPlan(state, true);
1856         }
1857         if (newPlan == null) {
1858           this.timeoutMonitor.setAllRegionServersOffline(true);
1859           LOG.warn("Unable to find a viable location to assign region " +
1860             state.getRegion().getRegionNameAsString());
1861           return;
1862         }
1863       }
1864     }
1865   }
1866 
1867   private static boolean isAssigningSplitParentRegion(final HRegionInfo region) {
1868     if (region.isSplitParent()) {
1869       LOG.info("Skipping assign of " + region.getRegionNameAsString()
1870         + ", already split, or still splitting");
1871       return true;
1872     }
1873     return false;
1874   }
1875 
1876   private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) {
1877     String tableName = region.getTableNameAsString();
1878     boolean disabled = this.zkTable.isDisabledTable(tableName);
1879     if (disabled || this.zkTable.isDisablingTable(tableName)) {
1880       LOG.info("Table " + tableName + (disabled ? " disabled;" : " disabling;") +
1881         " skipping assign of " + region.getRegionNameAsString());
1882       offlineDisabledRegion(region);
1883       return true;
1884     }
1885     return false;
1886   }
1887 
1888   /**
1889    * Set region as OFFLINED up in zookeeper
1890    * 
1891    * @param state
1892    * @param hijack
1893    *          - true if needs to be hijacked and reassigned, false otherwise.
1894    * @param regionAlreadyInTransitionException  
1895    *          - true if we need to retry assignment because of RegionAlreadyInTransitionException.       
1896    * @return the version of the offline node if setting of the OFFLINE node was
1897    *         successful, -1 otherwise.
1898    */
1899   int setOfflineInZooKeeper(final RegionState state, boolean hijack,
1900       boolean regionAlreadyInTransitionException) {
1901     // In case of reassignment the current state in memory need not be
1902     // OFFLINE. 
1903     if (!hijack && !state.isClosed() && !state.isOffline()) {
1904       if (!regionAlreadyInTransitionException ) {
1905         LOG.warn("Unexpected state : " + state + " .. Cannot transit it to OFFLINE.");
1906         return -1;
1907       } 
1908       LOG.debug("Unexpected state : " + state
1909           + " but retrying to assign because RegionAlreadyInTransitionException.");
1910     }
1911     boolean allowZNodeCreation = false;
1912     // Under reassignment if the current state is PENDING_OPEN
1913     // or OPENING then refresh the in-memory state to PENDING_OPEN. This is
1914     // important because if the region was in 
1915     // RS_OPENING state for a long time the master will try to force the znode
1916     // to OFFLINE state meanwhile the RS could have opened the corresponding
1917     // region and the state in znode will be RS_ZK_REGION_OPENED.
1918     // For all other cases we can change the in-memory state to OFFLINE.
1919     if (hijack &&
1920         (state.getState().equals(RegionState.State.PENDING_OPEN) || 
1921             state.getState().equals(RegionState.State.OPENING))) {
1922       state.update(RegionState.State.PENDING_OPEN);
1923       allowZNodeCreation = false;
1924     } else {
1925       state.update(RegionState.State.OFFLINE);
1926       allowZNodeCreation = true;
1927     }
1928     int versionOfOfflineNode = -1;
1929     try {
1930       // get the version after setting the znode to OFFLINE
1931       versionOfOfflineNode = ZKAssign.createOrForceNodeOffline(master.getZooKeeper(), 
1932           state.getRegion(), this.master.getServerName(),
1933           hijack, allowZNodeCreation);
1934       if (versionOfOfflineNode == -1) {
1935         LOG.warn("Attempted to create/force node into OFFLINE state before "
1936             + "completing assignment but failed to do so for " + state);
1937         return -1;
1938       }
1939     } catch (KeeperException e) {
1940       master.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
1941       return -1;
1942     }
1943     return versionOfOfflineNode;
1944   }
1945 
1946   /**
1947    * Set region as OFFLINED up in zookeeper asynchronously.
1948    * @param state
1949    * @return True if we succeeded, false otherwise (State was incorrect or failed
1950    * updating zk).
1951    */
1952   boolean asyncSetOfflineInZooKeeper(final RegionState state,
1953       final AsyncCallback.StringCallback cb, final Object ctx) {
1954     if (!state.isClosed() && !state.isOffline()) {
1955         new RuntimeException("Unexpected state trying to OFFLINE; " + state);
1956       this.master.abort("Unexpected state trying to OFFLINE; " + state,
1957         new IllegalStateException());
1958       return false;
1959     }
1960     state.update(RegionState.State.OFFLINE);
1961     try {
1962       ZKAssign.asyncCreateNodeOffline(master.getZooKeeper(), state.getRegion(),
1963         this.master.getServerName(), cb, ctx);
1964     } catch (KeeperException e) {
1965       // TODO: this error handling will never execute, as the callback is async.
1966       if (e instanceof NodeExistsException) {
1967         LOG.warn("Node for " + state.getRegion() + " already exists");
1968       } else { 
1969         master.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
1970       }
1971       return false;
1972     }
1973     return true;
1974   }
1975 
1976   /**
1977    * @param state
1978    * @return Plan for passed <code>state</code> (If none currently, it creates one or
1979    * if no servers to assign, it returns null).
1980    */
1981   RegionPlan getRegionPlan(final RegionState state,
1982       final boolean forceNewPlan) {
1983     return getRegionPlan(state, null, forceNewPlan);
1984   }
1985 
1986   /**
1987    * @param state
1988    * @param serverToExclude Server to exclude (we know its bad). Pass null if
1989    * all servers are thought to be assignable.
1990    * @param forceNewPlan If true, then if an existing plan exists, a new plan
1991    * will be generated.
1992    * @return Plan for passed <code>state</code> (If none currently, it creates one or
1993    * if no servers to assign, it returns null).
1994    */
1995   RegionPlan getRegionPlan(final RegionState state,
1996       final ServerName serverToExclude, final boolean forceNewPlan) {
1997     // Pickup existing plan or make a new one
1998     final String encodedName = state.getRegion().getEncodedName();
1999     final List<ServerName> servers = this.serverManager.getOnlineServersList();
2000     final List<ServerName> drainingServers = this.serverManager.getDrainingServersList();
2001 
2002 
2003     if (serverToExclude != null) servers.remove(serverToExclude);
2004 
2005     // Loop through the draining server list and remove them from the server
2006     // list.
2007     if (!drainingServers.isEmpty()) {
2008       for (final ServerName server: drainingServers) {
2009         LOG.debug("Removing draining server: " + server +
2010             " from eligible server pool.");
2011         servers.remove(server);
2012       }
2013     }
2014 
2015     // Remove the deadNotExpired servers from the server list.
2016     removeDeadNotExpiredServers(servers);
2017 
2018 
2019 
2020     if (servers.isEmpty()) return null;
2021 
2022     RegionPlan randomPlan = null;
2023     boolean newPlan = false;
2024     RegionPlan existingPlan = null;
2025 
2026     synchronized (this.regionPlans) {
2027       existingPlan = this.regionPlans.get(encodedName);
2028 
2029       if (existingPlan != null && existingPlan.getDestination() != null) {
2030         LOG.debug("Found an existing plan for " +
2031             state.getRegion().getRegionNameAsString() +
2032        " destination server is " + existingPlan.getDestination().toString());
2033       }
2034 
2035       if (forceNewPlan
2036           || existingPlan == null
2037           || existingPlan.getDestination() == null
2038           || drainingServers.contains(existingPlan.getDestination())) {
2039         newPlan = true;
2040         randomPlan = new RegionPlan(state.getRegion(), null, balancer
2041             .randomAssignment(servers));
2042         this.regionPlans.put(encodedName, randomPlan);
2043       }
2044     }
2045 
2046     if (newPlan) {
2047       LOG.debug("No previous transition plan was found (or we are ignoring " +
2048         "an existing plan) for " + state.getRegion().getRegionNameAsString() +
2049         " so generated a random one; " + randomPlan + "; " +
2050         serverManager.countOfRegionServers() +
2051                " (online=" + serverManager.getOnlineServers().size() +
2052                ", available=" + servers.size() + ") available servers");
2053         return randomPlan;
2054       }
2055     LOG.debug("Using pre-existing plan for region " +
2056                state.getRegion().getRegionNameAsString() + "; plan=" + existingPlan);
2057       return existingPlan;
2058   }
2059 
2060   /**
2061    * Loop through the deadNotExpired server list and remove them from the
2062    * servers.
2063    * @param servers
2064    */
2065   public void removeDeadNotExpiredServers(List<ServerName> servers) {
2066     Set<ServerName> deadNotExpiredServers = this.serverManager
2067         .getDeadNotExpiredServers();
2068     if (!deadNotExpiredServers.isEmpty()) {
2069       for (ServerName server : deadNotExpiredServers) {
2070         LOG.debug("Removing dead but not expired server: " + server
2071             + " from eligible server pool.");
2072         servers.remove(server);
2073       }
2074     }
2075   }
2076 
2077   /**
2078    * Unassign the list of regions. Configuration knobs:
2079    * hbase.bulk.waitbetween.reopen indicates the number of milliseconds to
2080    * wait before unassigning another region from this region server
2081    *
2082    * @param regions
2083    * @throws InterruptedException
2084    */
2085   public void unassign(List<HRegionInfo> regions) {
2086     int waitTime = this.master.getConfiguration().getInt(
2087         "hbase.bulk.waitbetween.reopen", 0);
2088     for (HRegionInfo region : regions) {
2089       if (isRegionInTransition(region) != null)
2090         continue;
2091       unassign(region, false);
2092       while (isRegionInTransition(region) != null) {
2093         try {
2094           Thread.sleep(10);
2095         } catch (InterruptedException e) {
2096           // Do nothing, continue
2097         }
2098       }
2099       if (waitTime > 0)
2100         try {
2101           Thread.sleep(waitTime);
2102         } catch (InterruptedException e) {
2103           // Do nothing, continue
2104         }
2105     }
2106   }
2107 
2108   /**
2109    * Unassigns the specified region.
2110    * <p>
2111    * Updates the RegionState and sends the CLOSE RPC unless region is being
2112    * split by regionserver; then the unassign fails (silently) because we
2113    * presume the region being unassigned no longer exists (its been split out
2114    * of existence). TODO: What to do if split fails and is rolled back and
2115    * parent is revivified?
2116    * <p>
2117    * If a RegionPlan is already set, it will remain.
2118    *
2119    * @param region server to be unassigned
2120    */
2121   public void unassign(HRegionInfo region) {
2122     unassign(region, false);
2123   }
2124 
2125   /**
2126    * Unassigns the specified region.
2127    * <p>
2128    * Updates the RegionState and sends the CLOSE RPC unless region is being
2129    * split by regionserver; then the unassign fails (silently) because we
2130    * presume the region being unassigned no longer exists (its been split out
2131    * of existence). TODO: What to do if split fails and is rolled back and
2132    * parent is revivified?
2133    * <p>
2134    * If a RegionPlan is already set, it will remain.
2135    *
2136    * @param region server to be unassigned
2137    * @param force if region should be closed even if already closing
2138    */
2139   public void unassign(HRegionInfo region, boolean force) {
2140     // TODO: Method needs refactoring.  Ugly buried returns throughout.  Beware!
2141     LOG.debug("Starting unassignment of region " +
2142       region.getRegionNameAsString() + " (offlining)");
2143 
2144     synchronized (this.regions) {
2145       // Check if this region is currently assigned
2146       if (!regions.containsKey(region)) {
2147         LOG.debug("Attempted to unassign region " +
2148           region.getRegionNameAsString() + " but it is not " +
2149           "currently assigned anywhere");
2150         return;
2151       }
2152     }
2153     String encodedName = region.getEncodedName();
2154     // Grab the state of this region and synchronize on it
2155     RegionState state;
2156     int versionOfClosingNode = -1;
2157     synchronized (regionsInTransition) {
2158       state = regionsInTransition.get(encodedName);
2159       if (state == null) {
2160          // Create the znode in CLOSING state
2161         try {
2162           versionOfClosingNode = ZKAssign.createNodeClosing(
2163             master.getZooKeeper(), region, master.getServerName());
2164           if (versionOfClosingNode == -1) {
2165             LOG.debug("Attempting to unassign region " +
2166                 region.getRegionNameAsString() + " but ZK closing node "
2167                 + "can't be created.");
2168             return;
2169           }
2170         } catch (KeeperException e) {
2171           if (e instanceof NodeExistsException) {
2172             // Handle race between master initiated close and regionserver
2173             // orchestrated splitting. See if existing node is in a
2174             // SPLITTING or SPLIT state.  If so, the regionserver started
2175             // an op on node before we could get our CLOSING in.  Deal.
2176             NodeExistsException nee = (NodeExistsException)e;
2177             String path = nee.getPath();
2178             try {
2179               if (isSplitOrSplitting(path)) {
2180                 LOG.debug(path + " is SPLIT or SPLITTING; " +
2181                   "skipping unassign because region no longer exists -- its split");
2182                 return;
2183               }
2184             } catch (KeeperException.NoNodeException ke) {
2185               LOG.warn("Failed getData on SPLITTING/SPLIT at " + path +
2186                 "; presuming split and that the region to unassign, " +
2187                 encodedName + ", no longer exists -- confirm", ke);
2188               return;
2189             } catch (KeeperException ke) {
2190               LOG.error("Unexpected zk state", ke);
2191               ke = e;
2192             }
2193           }
2194           // If we get here, don't understand whats going on -- abort.
2195           master.abort("Unexpected ZK exception creating node CLOSING", e);
2196           return;
2197         }
2198         state = new RegionState(region, RegionState.State.PENDING_CLOSE);
2199         regionsInTransition.put(encodedName, state);
2200       } else if (force && (state.isPendingClose() || state.isClosing())) {
2201         LOG.debug("Attempting to unassign region " + region.getRegionNameAsString() + 
2202           " which is already " + state.getState()  + 
2203           " but forcing to send a CLOSE RPC again ");
2204         state.update(state.getState());
2205       } else {
2206         LOG.debug("Attempting to unassign region " +
2207           region.getRegionNameAsString() + " but it is " +
2208           "already in transition (" + state.getState() + ", force=" + force + ")");
2209         return;
2210       }
2211     } 
2212     // Send CLOSE RPC
2213     ServerName server = null;
2214     synchronized (this.regions) {
2215       server = regions.get(region);
2216     }
2217     // ClosedRegionhandler can remove the server from this.regions
2218     if (server == null) {
2219       // Possibility of disable flow removing from RIT.
2220       synchronized (regionsInTransition) {
2221         state = regionsInTransition.get(encodedName);
2222         if (state != null) {
2223           // remove only if the state is PENDING_CLOSE or CLOSING
2224           State presentState = state.getState();
2225           if (presentState == State.PENDING_CLOSE
2226               || presentState == State.CLOSING) {
2227             this.regionsInTransition.remove(encodedName);
2228           }
2229         }
2230       }
2231       // delete the node. if no node exists need not bother.
2232       deleteClosingOrClosedNode(region);
2233       return;
2234     }
2235     try {
2236       // TODO: We should consider making this look more like it does for the
2237       // region open where we catch all throwables and never abort
2238       if (serverManager.sendRegionClose(server, state.getRegion(),
2239         versionOfClosingNode)) {
2240         LOG.debug("Sent CLOSE to " + server + " for region " +
2241           region.getRegionNameAsString());
2242         return;
2243       }
2244       // This never happens. Currently regionserver close always return true.
2245       LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
2246         region.getRegionNameAsString());
2247     } catch (NotServingRegionException nsre) {
2248       LOG.info("Server " + server + " returned " + nsre + " for " +
2249         region.getRegionNameAsString());
2250       // Presume that master has stale data.  Presume remote side just split.
2251       // Presume that the split message when it comes in will fix up the master's
2252       // in memory cluster state.
2253     } catch (Throwable t) {
2254       if (t instanceof RemoteException) {
2255         t = ((RemoteException)t).unwrapRemoteException();
2256         if (t instanceof NotServingRegionException) {
2257           if (checkIfRegionBelongsToDisabling(region)
2258               || checkIfRegionBelongsToDisabled(region)) {
2259             // Remove from the regionsinTransition map
2260             LOG.info("While trying to recover the table "
2261                 + region.getTableNameAsString()
2262                 + " to DISABLED state the region " + region
2263                 + " was offlined but the table was in DISABLING state");
2264             synchronized (this.regionsInTransition) {
2265               this.regionsInTransition.remove(region.getEncodedName());
2266             }
2267             // Remove from the regionsMap
2268             synchronized (this.regions) {
2269               ServerName sn = this.regions.remove(region);
2270               if (sn != null) {
2271                 Set<HRegionInfo> serverRegions = this.servers.get(sn);
2272                 if (serverRegions == null || !serverRegions.remove(region)) {
2273                   LOG.warn("No " + region + " on " + sn);
2274                 }
2275               }
2276             }
2277             deleteClosingOrClosedNode(region);
2278           }
2279         }
2280         // RS is already processing this region, only need to update the timestamp
2281         if (t instanceof RegionAlreadyInTransitionException) {
2282           LOG.debug("update " + state + " the timestamp.");
2283           state.update(state.getState());
2284         }
2285       }
2286       LOG.info("Server " + server + " returned " + t + " for " +
2287         region.getEncodedName());
2288       // Presume retry or server will expire.
2289     }
2290   }
2291   
2292   /**
2293    * 
2294    * @param region regioninfo of znode to be deleted.
2295    */
2296   public void deleteClosingOrClosedNode(HRegionInfo region) {
2297     try {
2298       if (!ZKAssign.deleteNode(master.getZooKeeper(), region.getEncodedName(),
2299           EventHandler.EventType.M_ZK_REGION_CLOSING)) {
2300         boolean deleteNode = ZKAssign.deleteNode(master.getZooKeeper(), region
2301             .getEncodedName(), EventHandler.EventType.RS_ZK_REGION_CLOSED);
2302         // TODO : We don't abort if the delete node returns false. Is there any
2303         // such corner case?
2304         if (!deleteNode) {
2305           LOG.error("The deletion of the CLOSED node for the region "
2306               + region.getEncodedName() + " returned " + deleteNode);
2307         }
2308       }
2309     } catch (NoNodeException e) {
2310       LOG.debug("CLOSING/CLOSED node for the region " + region.getEncodedName()
2311           + " already deleted");
2312     } catch (KeeperException ke) {
2313       master.abort(
2314           "Unexpected ZK exception deleting node CLOSING/CLOSED for the region "
2315               + region.getEncodedName(), ke);
2316       return;
2317     }
2318   }
2319 
2320   /**
2321    * @param path
2322    * @return True if znode is in SPLIT or SPLITTING state.
2323    * @throws KeeperException Can happen if the znode went away in meantime.
2324    */
2325   private boolean isSplitOrSplitting(final String path) throws KeeperException {
2326     boolean result = false;
2327     // This may fail if the SPLIT or SPLITTING znode gets cleaned up before we
2328     // can get data from it.
2329     RegionTransitionData data = ZKAssign.getData(master.getZooKeeper(), path);
2330     EventType evt = data.getEventType();
2331     switch (evt) {
2332     case RS_ZK_REGION_SPLIT:
2333     case RS_ZK_REGION_SPLITTING:
2334       result = true;
2335       break;
2336     default:
2337       break;
2338     }
2339     return result;
2340   }
2341 
2342   /**
2343    * Waits until the specified region has completed assignment.
2344    * <p>
2345    * If the region is already assigned, returns immediately.  Otherwise, method
2346    * blocks until the region is assigned.
2347    * @param regionInfo region to wait on assignment for
2348    * @throws InterruptedException
2349    */
2350   public void waitForAssignment(HRegionInfo regionInfo)
2351   throws InterruptedException {
2352     synchronized(regions) {
2353       while (!this.master.isStopped() && !regions.containsKey(regionInfo)) {
2354         // We should receive a notification, but it's
2355         //  better to have a timeout to recheck the condition here:
2356         //  it lowers the impact of a race condition if any
2357         regions.wait(100);
2358       }
2359     }
2360   }
2361 
2362   /**
2363    * Assigns the ROOT region.
2364    * <p>
2365    * Assumes that ROOT is currently closed and is not being actively served by
2366    * any RegionServer.
2367    * <p>
2368    * Forcibly unsets the current root region location in ZooKeeper and assigns
2369    * ROOT to a random RegionServer.
2370    * @throws KeeperException
2371    */
2372   public void assignRoot() throws KeeperException {
2373     RootLocationEditor.deleteRootLocation(this.master.getZooKeeper());
2374     assign(HRegionInfo.ROOT_REGIONINFO, true);
2375   }
2376 
2377   /**
2378    * Assigns the META region.
2379    * <p>
2380    * Assumes that META is currently closed and is not being actively served by
2381    * any RegionServer.
2382    * <p>
2383    * Forcibly assigns META to a random RegionServer.
2384    */
2385   public void assignMeta() {
2386     // Force assignment to a random server
2387     assign(HRegionInfo.FIRST_META_REGIONINFO, true);
2388   }
2389 
2390   /**
2391    * Assigns all user regions to online servers. Use round-robin assignment.
2392    * 
2393    * @param regions
2394    * @throws IOException
2395    * @throws InterruptedException
2396    */
2397   public void assignUserRegionsToOnlineServers(List<HRegionInfo> regions)
2398       throws IOException,
2399       InterruptedException {
2400     List<ServerName> servers = this.serverManager.getOnlineServersList();
2401     removeDeadNotExpiredServers(servers);
2402     assignUserRegions(regions, servers);
2403   }
2404 
2405   /**
2406    * Assigns all user regions, if any.  Used during cluster startup.
2407    * <p>
2408    * This is a synchronous call and will return once every region has been
2409    * assigned.  If anything fails, an exception is thrown
2410    * @throws InterruptedException
2411    * @throws IOException
2412    */
2413   public void assignUserRegions(List<HRegionInfo> regions, List<ServerName> servers)
2414   throws IOException, InterruptedException {
2415     if (regions == null)
2416       return;
2417     Map<ServerName, List<HRegionInfo>> bulkPlan = null;
2418     // Generate a round-robin bulk assignment plan
2419     bulkPlan = balancer.roundRobinAssignment(regions, servers);
2420     LOG.info("Bulk assigning " + regions.size() + " region(s) round-robin across " +
2421                servers.size() + " server(s)");
2422     // Use fixed count thread pool assigning.
2423     BulkAssigner ba = new StartupBulkAssigner(this.master, bulkPlan, this);
2424     ba.bulkAssign();
2425     LOG.info("Bulk assigning done");
2426   }
2427 
2428   private void setEnabledTable(HRegionInfo hri) {
2429     String tableName = hri.getTableNameAsString();
2430     boolean isTableEnabled = this.zkTable.isEnabledTable(tableName);
2431     if (!isTableEnabled) {
2432       setEnabledTable(tableName);
2433     }    
2434   }
2435 
2436   /**
2437    * Assigns all user regions, if any exist.  Used during cluster startup.
2438    * <p>
2439    * This is a synchronous call and will return once every region has been
2440    * assigned.  If anything fails, an exception is thrown and the cluster
2441    * should be shutdown.
2442    * @throws InterruptedException
2443    * @throws IOException
2444    */
2445   public void assignAllUserRegions() throws IOException, InterruptedException {
2446     // Skip assignment for regions of tables in DISABLING state also because
2447     // during clean cluster startup no RS is alive and regions map also doesn't
2448     // have any information about the regions. See HBASE-6281.
2449     Set<String> disablingDisabledAndEnablingTables = new HashSet<String>(this.disablingTables);
2450     disablingDisabledAndEnablingTables.addAll(this.zkTable.getDisabledTables());
2451     disablingDisabledAndEnablingTables.addAll(this.enablingTables.keySet());
2452     // Scan META for all user regions, skipping any disabled tables
2453     Map<HRegionInfo, ServerName> allRegions = MetaReader.fullScan(catalogTracker,
2454         disablingDisabledAndEnablingTables, true);
2455     if (allRegions == null || allRegions.isEmpty()) return;
2456 
2457     // Get all available servers
2458     List<ServerName> servers = serverManager.getOnlineServersList();
2459 
2460     // Remove the deadNotExpired servers from the server list.
2461     removeDeadNotExpiredServers(servers);
2462 
2463     // If there are no servers we need not proceed with region assignment.
2464     if(servers.isEmpty()) return;
2465 
2466     // Determine what type of assignment to do on startup
2467     boolean retainAssignment = master.getConfiguration().
2468       getBoolean("hbase.master.startup.retainassign", true);
2469 
2470     Map<ServerName, List<HRegionInfo>> bulkPlan = null;
2471     if (retainAssignment) {
2472       // Reuse existing assignment info
2473       bulkPlan = balancer.retainAssignment(allRegions, servers);
2474     } else {
2475       // assign regions in round-robin fashion
2476       assignUserRegions(new ArrayList<HRegionInfo>(allRegions.keySet()), servers);
2477       for (HRegionInfo hri : allRegions.keySet()) {
2478         setEnabledTable(hri);
2479       }
2480       return;
2481     }
2482     LOG.info("Bulk assigning " + allRegions.size() + " region(s) across " +
2483       servers.size() + " server(s), retainAssignment=" + retainAssignment);
2484 
2485     // Use fixed count thread pool assigning.
2486     BulkAssigner ba = new StartupBulkAssigner(this.master, bulkPlan, this);
2487     ba.bulkAssign();
2488     for (HRegionInfo hri : allRegions.keySet()) {
2489       setEnabledTable(hri);
2490     }
2491     LOG.info("Bulk assigning done");
2492   }
2493 
2494   /**
2495    * Run bulk assign on startup.  Does one RCP per regionserver passing a
2496    * batch of reginons using {@link SingleServerBulkAssigner}.
2497    * Uses default {@link #getUncaughtExceptionHandler()}
2498    * which will abort the Server if exception.
2499    */
2500   static class StartupBulkAssigner extends BulkAssigner {
2501     final Map<ServerName, List<HRegionInfo>> bulkPlan;
2502     final AssignmentManager assignmentManager;
2503 
2504     StartupBulkAssigner(final Server server,
2505         final Map<ServerName, List<HRegionInfo>> bulkPlan,
2506         final AssignmentManager am) {
2507       super(server);
2508       this.bulkPlan = bulkPlan;
2509       this.assignmentManager = am;
2510     }
2511 
2512     @Override
2513     public boolean bulkAssign(boolean sync) throws InterruptedException,
2514         IOException {
2515       // Disable timing out regions in transition up in zk while bulk assigning.
2516       this.assignmentManager.timeoutMonitor.bulkAssign(true);
2517       try {
2518         return super.bulkAssign(sync);
2519       } finally {
2520         // Reenable timing out regions in transition up in zi.
2521         this.assignmentManager.timeoutMonitor.bulkAssign(false);
2522       }
2523     }
2524 
2525     @Override
2526     protected String getThreadNamePrefix() {
2527       return this.server.getServerName() + "-StartupBulkAssigner";
2528     }
2529 
2530     @Override
2531     protected void populatePool(java.util.concurrent.ExecutorService pool) {
2532       for (Map.Entry<ServerName, List<HRegionInfo>> e: this.bulkPlan.entrySet()) {
2533         pool.execute(new SingleServerBulkAssigner(e.getKey(), e.getValue(),
2534           this.assignmentManager));
2535       }
2536     }
2537 
2538     protected boolean waitUntilDone(final long timeout)
2539     throws InterruptedException {
2540       Set<HRegionInfo> regionSet = new HashSet<HRegionInfo>();
2541       for (List<HRegionInfo> regionList : bulkPlan.values()) {
2542         regionSet.addAll(regionList);
2543       }
2544       return this.assignmentManager.waitUntilNoRegionsInTransition(timeout, regionSet);
2545     }
2546 
2547     @Override
2548     protected long getTimeoutOnRIT() {
2549       // Guess timeout.  Multiply the number of regions on a random server
2550       // by how long we thing one region takes opening.
2551       long perRegionOpenTimeGuesstimate =
2552         this.server.getConfiguration().getLong("hbase.bulk.assignment.perregion.open.time", 1000);
2553       int regionsPerServer =
2554         this.bulkPlan.entrySet().iterator().next().getValue().size();
2555       long timeout = perRegionOpenTimeGuesstimate * regionsPerServer;
2556       LOG.debug("Timeout-on-RIT=" + timeout);
2557       return timeout;
2558     }
2559   }
2560 
2561   /**
2562    * Manage bulk assigning to a server.
2563    */
2564   static class SingleServerBulkAssigner implements Runnable {
2565     private final ServerName regionserver;
2566     private final List<HRegionInfo> regions;
2567     private final AssignmentManager assignmentManager;
2568 
2569     SingleServerBulkAssigner(final ServerName regionserver,
2570         final List<HRegionInfo> regions, final AssignmentManager am) {
2571       for (Iterator<HRegionInfo> it = regions.iterator(); it.hasNext(); ) {
2572         if (isAssigningSplitParentRegion(it.next())) {
2573           it.remove();
2574         }
2575       }
2576       this.regionserver = regionserver;
2577       this.regions = regions;
2578       this.assignmentManager = am;
2579     }
2580     @Override
2581     public void run() {
2582       this.assignmentManager.assign(this.regionserver, this.regions);
2583     }
2584   }
2585 
2586   /**
2587    * Wait until no regions in transition.
2588    * @param timeout How long to wait.
2589    * @return True if nothing in regions in transition.
2590    * @throws InterruptedException
2591    */
2592   boolean waitUntilNoRegionsInTransition(final long timeout)
2593   throws InterruptedException {
2594     // Blocks until there are no regions in transition. It is possible that
2595     // there
2596     // are regions in transition immediately after this returns but guarantees
2597     // that if it returns without an exception that there was a period of time
2598     // with no regions in transition from the point-of-view of the in-memory
2599     // state of the Master.
2600     long startTime = System.currentTimeMillis();
2601     long remaining = timeout;
2602     synchronized (regionsInTransition) {
2603       while (regionsInTransition.size() > 0 && !this.master.isStopped()
2604           && remaining > 0) {
2605         regionsInTransition.wait(remaining);
2606         remaining = timeout - (System.currentTimeMillis() - startTime);
2607       }
2608     }
2609     return regionsInTransition.isEmpty();
2610   }
2611 
2612   /**
2613    * Wait until no regions from set regions are in transition.
2614    * @param timeout How long to wait.
2615    * @param regions set of regions to wait for
2616    * @return True if nothing in regions in transition.
2617    * @throws InterruptedException
2618    */
2619   boolean waitUntilNoRegionsInTransition(final long timeout, Set<HRegionInfo> regions)
2620   throws InterruptedException {
2621     // Blocks until there are no regions in transition.
2622     long startTime = System.currentTimeMillis();
2623     long remaining = timeout;
2624     boolean stillInTransition = true;
2625     synchronized (regionsInTransition) {
2626       while (regionsInTransition.size() > 0 && !this.master.isStopped() &&
2627           remaining > 0 && stillInTransition) {
2628         int count = 0;
2629         for (RegionState rs : regionsInTransition.values()) {
2630           if (regions.contains(rs.getRegion())) {
2631             count++;
2632             break;
2633           }
2634         }
2635         if (count == 0) {
2636           stillInTransition = false;
2637           break;
2638         }
2639         regionsInTransition.wait(remaining);
2640         remaining = timeout - (System.currentTimeMillis() - startTime);
2641       }
2642     }
2643     return stillInTransition;
2644   }
2645 
2646   /**
2647    * Rebuild the list of user regions and assignment information.
2648    * <p>
2649    * Returns a map of servers that are not found to be online and the regions
2650    * they were hosting.
2651    * @return map of servers not online to their assigned regions, as stored
2652    *         in META
2653    * @throws IOException
2654    */
2655   Map<ServerName, List<Pair<HRegionInfo, Result>>> rebuildUserRegions() throws IOException,
2656       KeeperException {
2657     // Region assignment from META
2658     List<Result> results = MetaReader.fullScan(this.catalogTracker);
2659     // Get any new but slow to checkin region server that joined the cluster
2660     Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();    
2661     // Map of offline servers and their regions to be returned
2662     Map<ServerName, List<Pair<HRegionInfo,Result>>> offlineServers =
2663       new TreeMap<ServerName, List<Pair<HRegionInfo, Result>>>();
2664     // Iterate regions in META
2665     for (Result result : results) {
2666       boolean disabled = false;
2667       boolean disablingOrEnabling = false;
2668       Pair<HRegionInfo, ServerName> region = MetaReader.parseCatalogResult(result);
2669       if (region == null) continue;
2670       HRegionInfo regionInfo = region.getFirst();
2671       ServerName regionLocation = region.getSecond();
2672       if (regionInfo == null) continue;
2673       String tableName = regionInfo.getTableNameAsString();
2674       if (regionLocation == null) {
2675         // regionLocation could be null if createTable didn't finish properly.
2676         // When createTable is in progress, HMaster restarts.
2677         // Some regions have been added to .META., but have not been assigned.
2678         // When this happens, the region's table must be in ENABLING state.
2679         // It can't be in ENABLED state as that is set when all regions are
2680         // assigned.
2681         // It can't be in DISABLING state, because DISABLING state transitions
2682         // from ENABLED state when application calls disableTable.
2683         // It can't be in DISABLED state, because DISABLED states transitions
2684         // from DISABLING state.
2685         boolean enabling = checkIfRegionsBelongsToEnabling(regionInfo);
2686         addTheTablesInPartialState(regionInfo);
2687         if (enabling) {
2688           addToEnablingTableRegions(regionInfo);
2689         } else {
2690           LOG.warn("Region " + regionInfo.getEncodedName() + " has null regionLocation."
2691               + " But its table " + tableName + " isn't in ENABLING state.");
2692         }
2693       } else if (!onlineServers.contains(regionLocation)) {
2694         // Region is located on a server that isn't online
2695         List<Pair<HRegionInfo, Result>> offlineRegions =
2696           offlineServers.get(regionLocation);
2697         if (offlineRegions == null) {
2698           offlineRegions = new ArrayList<Pair<HRegionInfo,Result>>(1);
2699           offlineServers.put(regionLocation, offlineRegions);
2700         }
2701         offlineRegions.add(new Pair<HRegionInfo,Result>(regionInfo, result));
2702         disabled = checkIfRegionBelongsToDisabled(regionInfo);
2703         disablingOrEnabling = addTheTablesInPartialState(regionInfo);
2704         // need to enable the table if not disabled or disabling or enabling
2705         // this will be used in rolling restarts
2706         enableTableIfNotDisabledOrDisablingOrEnabling(disabled,
2707             disablingOrEnabling, tableName);
2708       } else {
2709         // If region is in offline and split state check the ZKNode
2710         if (regionInfo.isOffline() && regionInfo.isSplit()) {
2711           String node = ZKAssign.getNodeName(this.watcher, regionInfo
2712               .getEncodedName());
2713           Stat stat = new Stat();
2714           byte[] data = ZKUtil.getDataNoWatch(this.watcher, node, stat);
2715           // If znode does not exist dont consider this region
2716           if (data == null) {
2717             LOG.debug("Region "+ regionInfo.getRegionNameAsString() + " split is completed. " 
2718                 + "Hence need not add to regions list");
2719             continue;
2720           }
2721         }
2722         // Region is being served and on an active server
2723         // add only if region not in disabled and enabling table
2724         boolean enabling = checkIfRegionsBelongsToEnabling(regionInfo);
2725         disabled = checkIfRegionBelongsToDisabled(regionInfo);
2726         if (!enabling && !disabled) {
2727           synchronized (this.regions) {
2728             regions.put(regionInfo, regionLocation);
2729             addToServers(regionLocation, regionInfo);
2730           }
2731         }
2732         disablingOrEnabling = addTheTablesInPartialState(regionInfo);
2733         if (enabling) {
2734           addToEnablingTableRegions(regionInfo);
2735         }
2736         // need to enable the table if not disabled or disabling or enabling
2737         // this will be used in rolling restarts
2738         enableTableIfNotDisabledOrDisablingOrEnabling(disabled,
2739             disablingOrEnabling, tableName);
2740       }
2741     }
2742     return offlineServers;
2743   }
2744 
2745   private void addToEnablingTableRegions(HRegionInfo regionInfo) {
2746     String tableName = regionInfo.getTableNameAsString();
2747     List<HRegionInfo> hris = this.enablingTables.get(tableName);
2748     if (!hris.contains(regionInfo)) {
2749       if (LOG.isDebugEnabled()) {
2750         LOG.debug("Adding region" + regionInfo.getRegionNameAsString()
2751             + " to enabling table " + tableName + ".");
2752       }
2753       hris.add(regionInfo);
2754     }
2755   }
2756   
2757   private void enableTableIfNotDisabledOrDisablingOrEnabling(boolean disabled,
2758       boolean disablingOrEnabling, String tableName) {
2759     if (!disabled && !disablingOrEnabling
2760         && !getZKTable().isEnabledTable(tableName)) {
2761       setEnabledTable(tableName);
2762     }
2763   }
2764 
2765   private Boolean addTheTablesInPartialState(HRegionInfo regionInfo) {
2766     String tableName = regionInfo.getTableNameAsString();
2767     if (checkIfRegionBelongsToDisabling(regionInfo)) {
2768       this.disablingTables.add(tableName);
2769       return true;
2770     } else if (checkIfRegionsBelongsToEnabling(regionInfo)) {
2771       if (!this.enablingTables.containsKey(tableName)) {
2772         this.enablingTables.put(tableName, new ArrayList<HRegionInfo>());
2773       } 
2774       return true;
2775     } 
2776     return false;
2777   }
2778 
2779   /**
2780    * Recover the tables that were not fully moved to DISABLED state. These
2781    * tables are in DISABLING state when the master restarted/switched.
2782    * 
2783    * @param disablingTables
2784    * @return
2785    * @throws KeeperException
2786    * @throws TableNotFoundException
2787    * @throws IOException
2788    */
2789   private boolean recoverTableInDisablingState(Set<String> disablingTables)
2790       throws KeeperException, TableNotFoundException, IOException {
2791     boolean isWatcherCreated = false;
2792     if (disablingTables.size() != 0) {
2793       // Create a watcher on the zookeeper node
2794       ZKUtil.listChildrenAndWatchForNewChildren(watcher,
2795           watcher.assignmentZNode);
2796       isWatcherCreated = true;
2797       for (String tableName : disablingTables) {
2798         // Recover by calling DisableTableHandler
2799         LOG.info("The table " + tableName
2800             + " is in DISABLING state.  Hence recovering by moving the table"
2801             + " to DISABLED state.");
2802         new DisableTableHandler(this.master, tableName.getBytes(),
2803             catalogTracker, this, true).process();
2804       }
2805     }
2806     return isWatcherCreated;
2807   }
2808 
2809   /**
2810    * Recover the tables that are not fully moved to ENABLED state. These tables
2811    * are in ENABLING state when the master restarted/switched
2812    * 
2813    * @param enablingTables
2814    * @param isWatcherCreated
2815    * @throws KeeperException
2816    * @throws TableNotFoundException
2817    * @throws IOException
2818    */
2819   private void recoverTableInEnablingState(Set<String> enablingTables,
2820       boolean isWatcherCreated) throws KeeperException, TableNotFoundException,
2821       IOException {
2822     if (enablingTables.size() != 0) {
2823       if (false == isWatcherCreated) {
2824         ZKUtil.listChildrenAndWatchForNewChildren(watcher,
2825             watcher.assignmentZNode);
2826       }
2827       for (String tableName : enablingTables) {
2828         // Recover by calling EnableTableHandler
2829         LOG.info("The table " + tableName
2830             + " is in ENABLING state.  Hence recovering by moving the table"
2831             + " to ENABLED state.");
2832         // enableTable in sync way during master startup,
2833         // no need to invoke coprocessor
2834         EnableTableHandler eth = null;
2835         try {
2836           eth =
2837               new EnableTableHandler(this.master, tableName.getBytes(), catalogTracker, this, true);
2838         } catch (TableNotFoundException e) {
2839           LOG.warn("Table " + tableName + " not found in .META. to recover.");
2840           continue;
2841         }
2842         if (eth != null) eth.process();
2843       }
2844     }
2845   }
2846 
2847   private boolean checkIfRegionsBelongsToEnabling(HRegionInfo regionInfo) {
2848     String tableName = regionInfo.getTableNameAsString();
2849     return getZKTable().isEnablingTable(tableName);
2850   }
2851 
2852   private boolean checkIfRegionBelongsToDisabled(HRegionInfo regionInfo) {
2853     String tableName = regionInfo.getTableNameAsString();
2854     return getZKTable().isDisabledTable(tableName);
2855   }
2856 
2857   private boolean checkIfRegionBelongsToDisabling(HRegionInfo regionInfo) {
2858     String tableName = regionInfo.getTableNameAsString();
2859     return getZKTable().isDisablingTable(tableName);
2860   }
2861 
2862   /**
2863    * Processes list of dead servers from result of META scan and regions in RIT
2864    * <p>
2865    * This is used for failover to recover the lost regions that belonged to
2866    * RegionServers which failed while there was no active master or regions 
2867    * that were in RIT.
2868    * <p>
2869    * 
2870    * @param deadServers
2871    *          The list of dead servers which failed while there was no active
2872    *          master. Can be null.
2873    * @param nodes
2874    *          The regions in RIT
2875    * @throws IOException
2876    * @throws KeeperException
2877    */
2878   private void processDeadServersAndRecoverLostRegions(
2879       Map<ServerName, List<Pair<HRegionInfo, Result>>> deadServers,
2880       List<String> nodes) throws IOException, KeeperException {
2881     if (null != deadServers) {
2882       Set<ServerName> actualDeadServers = this.serverManager.getDeadServers();
2883       for (Map.Entry<ServerName, List<Pair<HRegionInfo, Result>>> deadServer : 
2884         deadServers.entrySet()) {
2885         // skip regions of dead servers because SSH will process regions during rs expiration.
2886         // see HBASE-5916
2887         if (actualDeadServers.contains(deadServer.getKey())) {
2888           for (Pair<HRegionInfo, Result> deadRegion : deadServer.getValue()) {
2889             HRegionInfo hri = deadRegion.getFirst();
2890             // Delete znode of region in transition if table is disabled or disabling. If a region
2891             // server went down during master initialization then SSH cannot handle the regions of
2892             // partially disabled tables because in memory region state information may not be
2893             // available with master.
2894             deleteNodeAndOfflineRegion(hri);
2895             nodes.remove(deadRegion.getFirst().getEncodedName());
2896           }
2897           continue;
2898         }
2899         List<Pair<HRegionInfo, Result>> regions = deadServer.getValue();
2900         for (Pair<HRegionInfo, Result> region : regions) {
2901           HRegionInfo regionInfo = region.getFirst();
2902           Result result = region.getSecond();
2903           // If region was in transition (was in zk) force it offline for
2904           // reassign
2905           try {
2906             RegionTransitionData data = ZKAssign.getData(watcher,
2907                 regionInfo.getEncodedName());
2908 
2909             // If zk node of this region has been updated by a live server,
2910             // we consider that this region is being handled.
2911             // So we should skip it and process it in
2912             // processRegionsInTransition.
2913             if (data != null && data.getOrigin() != null && 
2914                 serverManager.isServerOnline(data.getOrigin())) {
2915               LOG.info("The region " + regionInfo.getEncodedName()
2916                   + "is being handled on " + data.getOrigin());
2917               continue;
2918             }
2919             // Process with existing RS shutdown code
2920             boolean assign = ServerShutdownHandler.processDeadRegion(
2921                 regionInfo, result, this, this.catalogTracker);
2922             if (assign) {
2923               ZKAssign.createOrForceNodeOffline(watcher, regionInfo,
2924                   master.getServerName());
2925               if (!nodes.contains(regionInfo.getEncodedName())) {
2926                 nodes.add(regionInfo.getEncodedName());
2927               }
2928             }
2929           } catch (KeeperException.NoNodeException nne) {
2930             // This is fine
2931           }
2932         }
2933       }
2934     }
2935 
2936     if (!nodes.isEmpty()) {
2937       for (String encodedRegionName : nodes) {
2938         processRegionInTransition(encodedRegionName, null, deadServers);
2939       }
2940     }
2941   }
2942 
2943   /**
2944    * Delete znode of region in transition if table is disabling/disabled and offline the region.
2945    * @param hri region to offline.
2946    */
2947   public void deleteNodeAndOfflineRegion(HRegionInfo hri) {
2948     if (zkTable.isDisablingOrDisabledTable(hri.getTableNameAsString())) {
2949       try {
2950         // If table is partially disabled then delete znode if exists in any state.
2951         ZKAssign.deleteNodeFailSilent(this.master.getZooKeeper(), hri);
2952       } catch (KeeperException ke) {
2953         this.master.abort("Unexpected ZK exception deleting unassigned node " + hri, ke);
2954       }
2955       regionOffline(hri);
2956     }
2957   }
2958 
2959   /*
2960    * Presumes caller has taken care of necessary locking modifying servers Map.
2961    * @param hsi
2962    * @param hri
2963    */
2964   private void addToServers(final ServerName sn, final HRegionInfo hri) {
2965     Set<HRegionInfo> hris = servers.get(sn);
2966     if (hris == null) {
2967       hris = new ConcurrentSkipListSet<HRegionInfo>();
2968       servers.put(sn, hris);
2969     }
2970     if (!hris.contains(hri)) hris.add(hri);
2971   }
2972 
2973   /**
2974    * @return A copy of the Map of regions currently in transition.
2975    */
2976   public NavigableMap<String, RegionState> getRegionsInTransition() {
2977     synchronized (this.regionsInTransition) {
2978       return new TreeMap<String, RegionState>(this.regionsInTransition);
2979     }
2980   }
2981 
2982   /**
2983    * @return True if regions in transition.
2984    */
2985   public boolean isRegionsInTransition() {
2986     synchronized (this.regionsInTransition) {
2987       return !this.regionsInTransition.isEmpty();
2988     }
2989   }
2990 
2991   /**
2992    * @param hri Region to check.
2993    * @return Returns null if passed region is not in transition else the current
2994    * RegionState
2995    */
2996   public RegionState isRegionInTransition(final HRegionInfo hri) {
2997     synchronized (this.regionsInTransition) {
2998       return this.regionsInTransition.get(hri.getEncodedName());
2999     }
3000   }
3001 
3002   /**
3003    * Clears the specified region from being in transition.
3004    * <p>
3005    * @param hri Region to remove.
3006    * @deprecated This is a dupe of {@link #regionOffline(HRegionInfo)}.
3007    *   Please use that method instead.
3008    */
3009   public void clearRegionFromTransition(HRegionInfo hri) {
3010     synchronized (this.regionsInTransition) {
3011       this.regionsInTransition.remove(hri.getEncodedName());
3012     }
3013     synchronized (this.regions) {
3014       this.regions.remove(hri);
3015       for (Set<HRegionInfo> regions : this.servers.values()) {
3016         regions.remove(hri);
3017       }
3018     }
3019     clearRegionPlan(hri);
3020   }
3021 
3022   /**
3023    * @param region Region whose plan we are to clear.
3024    */
3025   void clearRegionPlan(final HRegionInfo region) {
3026     synchronized (this.regionPlans) {
3027       this.regionPlans.remove(region.getEncodedName());
3028     }
3029   }
3030 
3031   /**
3032    * Wait on region to clear regions-in-transition.
3033    * @param hri Region to wait on.
3034    * @throws IOException
3035    */
3036   public void waitOnRegionToClearRegionsInTransition(final HRegionInfo hri)
3037   throws IOException {
3038     if (isRegionInTransition(hri) == null) return;
3039     RegionState rs = null;
3040     // There is already a timeout monitor on regions in transition so I
3041     // should not have to have one here too?
3042     while(!this.master.isStopped() && (rs = isRegionInTransition(hri)) != null) {
3043       Threads.sleep(1000);
3044       LOG.info("Waiting on " + rs + " to clear regions-in-transition");
3045     }
3046     if (this.master.isStopped()) {
3047       LOG.info("Giving up wait on regions in " +
3048         "transition because stoppable.isStopped is set");
3049     }
3050   }
3051 
3052 
3053   /**
3054    * Gets the online regions of the specified table.
3055    * This method looks at the in-memory state.  It does not go to <code>.META.</code>.
3056    * Only returns <em>online</em> regions.  If a region on this table has been
3057    * closed during a disable, etc., it will be included in the returned list.
3058    * So, the returned list may not necessarily be ALL regions in this table, its
3059    * all the ONLINE regions in the table.
3060    * @param tableName
3061    * @return Online regions from <code>tableName</code>
3062    */
3063   public List<HRegionInfo> getRegionsOfTable(byte[] tableName) {
3064     List<HRegionInfo> tableRegions = new ArrayList<HRegionInfo>();
3065     // boundary needs to have table's name but regionID 0 so that it is sorted 
3066     // before all table's regions.
3067     HRegionInfo boundary =
3068       new HRegionInfo(tableName, null, null, false, 0L);
3069     synchronized (this.regions) {
3070       for (HRegionInfo regionInfo: this.regions.tailMap(boundary).keySet()) {
3071         if(Bytes.equals(regionInfo.getTableName(), tableName)) {
3072           tableRegions.add(regionInfo);
3073         } else {
3074           break;
3075         }
3076       }
3077     }
3078     return tableRegions;
3079   }
3080 
3081   /**
3082    * Update timers for all regions in transition going against the server in the
3083    * serversInUpdatingTimer.
3084    */
3085   public class TimerUpdater extends Chore {
3086 
3087     public TimerUpdater(final int period, final Stoppable stopper) {
3088       super("AssignmentTimerUpdater", period, stopper);
3089     }
3090 
3091     @Override
3092     protected void chore() {
3093       ServerName serverToUpdateTimer = null;
3094       while (!serversInUpdatingTimer.isEmpty() && !stopper.isStopped()) {
3095         if (serverToUpdateTimer == null) {
3096           serverToUpdateTimer = serversInUpdatingTimer.first();
3097         } else {
3098           serverToUpdateTimer = serversInUpdatingTimer
3099               .higher(serverToUpdateTimer);
3100         }
3101         if (serverToUpdateTimer == null) {
3102           break;
3103         }
3104         updateTimers(serverToUpdateTimer);
3105         serversInUpdatingTimer.remove(serverToUpdateTimer);
3106       }
3107     }
3108   }
3109 
3110   /**
3111    * Monitor to check for time outs on region transition operations
3112    */
3113   public class TimeoutMonitor extends Chore {
3114     private final int timeout;
3115     private boolean bulkAssign = false;
3116     private boolean allRegionServersOffline = false;
3117     private ServerManager serverManager;
3118 
3119     /**
3120      * Creates a periodic monitor to check for time outs on region transition
3121      * operations.  This will deal with retries if for some reason something
3122      * doesn't happen within the specified timeout.
3123      * @param period
3124    * @param stopper When {@link Stoppable#isStopped()} is true, this thread will
3125    * cleanup and exit cleanly.
3126      * @param timeout
3127      */
3128     public TimeoutMonitor(final int period, final Stoppable stopper,
3129         ServerManager serverManager,
3130         final int timeout) {
3131       super("AssignmentTimeoutMonitor", period, stopper);
3132       this.timeout = timeout;
3133       this.serverManager = serverManager;
3134     }
3135 
3136     /**
3137      * @param bulkAssign If true, we'll suspend checking regions in transition
3138      * up in zookeeper.  If false, will reenable check.
3139      * @return Old setting for bulkAssign.
3140      */
3141     public boolean bulkAssign(final boolean bulkAssign) {
3142       boolean result = this.bulkAssign;
3143       this.bulkAssign = bulkAssign;
3144       return result;
3145     }
3146 
3147     private synchronized void setAllRegionServersOffline(
3148       boolean allRegionServersOffline) {
3149       this.allRegionServersOffline = allRegionServersOffline;
3150     }
3151 
3152     @Override
3153     protected void chore() {
3154       // If bulkAssign in progress, suspend checks
3155       if (this.bulkAssign) return;
3156       boolean allRSsOffline = this.serverManager.getOnlineServersList().
3157         isEmpty();
3158 
3159       synchronized (regionsInTransition) {
3160         // Iterate all regions in transition checking for time outs
3161         long now = System.currentTimeMillis();
3162         for (RegionState regionState : regionsInTransition.values()) {
3163           if (regionState.getStamp() + timeout <= now) {
3164            //decide on action upon timeout
3165             actOnTimeOut(regionState);
3166           } else if (this.allRegionServersOffline && !allRSsOffline) {
3167             RegionPlan existingPlan = regionPlans.get(regionState.getRegion().getEncodedName());
3168             if (existingPlan == null
3169                 || !this.serverManager.isServerOnline(existingPlan.getDestination())) {
3170               // if some RSs just came back online, we can start the
3171               // the assignment right away
3172               actOnTimeOut(regionState);
3173             }
3174           }
3175         }
3176       }
3177       setAllRegionServersOffline(allRSsOffline);
3178     }
3179 
3180     private void actOnTimeOut(RegionState regionState) {
3181       HRegionInfo regionInfo = regionState.getRegion();
3182       LOG.info("Regions in transition timed out:  " + regionState);
3183       // Expired! Do a retry.
3184       switch (regionState.getState()) {
3185       case CLOSED:
3186         LOG.info("Region " + regionInfo.getEncodedName()
3187             + " has been CLOSED for too long, waiting on queued "
3188             + "ClosedRegionHandler to run or server shutdown");
3189         // Update our timestamp.
3190         regionState.updateTimestampToNow();
3191         break;
3192       case OFFLINE:
3193         LOG.info("Region has been OFFLINE for too long, " + "reassigning "
3194             + regionInfo.getRegionNameAsString() + " to a random server");
3195         invokeAssign(regionInfo);
3196         break;
3197       case PENDING_OPEN:
3198         LOG.info("Region has been PENDING_OPEN for too "
3199             + "long, reassigning region=" + regionInfo.getRegionNameAsString());
3200         invokeAssign(regionInfo);
3201         break;
3202       case OPENING:
3203         processOpeningState(regionInfo);
3204         break;
3205       case OPEN:
3206         LOG.error("Region has been OPEN for too long, " +
3207             "we don't know where region was opened so can't do anything");
3208         synchronized (regionState) {
3209           regionState.updateTimestampToNow();
3210         }
3211         break;
3212 
3213       case PENDING_CLOSE:
3214         LOG.info("Region has been PENDING_CLOSE for too "
3215             + "long, running forced unassign again on region="
3216             + regionInfo.getRegionNameAsString());
3217         invokeUnassign(regionInfo);
3218         break;
3219       case CLOSING:
3220         LOG.info("Region has been CLOSING for too " +
3221           "long, this should eventually complete or the server will " +
3222           "expire, send RPC again");
3223         invokeUnassign(regionInfo);
3224         break;
3225       }
3226     }
3227   }
3228   
3229   private void processOpeningState(HRegionInfo regionInfo) {
3230     LOG.info("Region has been OPENING for too " + "long, reassigning region="
3231         + regionInfo.getRegionNameAsString());
3232     // Should have a ZK node in OPENING state
3233     try {
3234       String node = ZKAssign.getNodeName(watcher, regionInfo.getEncodedName());
3235       Stat stat = new Stat();
3236       RegionTransitionData dataInZNode = ZKAssign.getDataNoWatch(watcher, node,
3237           stat);
3238       if (dataInZNode == null) {
3239         LOG.warn("Data is null, node " + node + " no longer exists");
3240         return;
3241       }
3242       if (dataInZNode.getEventType() == EventType.RS_ZK_REGION_OPENED) {
3243         LOG.debug("Region has transitioned to OPENED, allowing "
3244             + "watched event handlers to process");
3245         return;
3246       } else if (dataInZNode.getEventType() != EventType.RS_ZK_REGION_OPENING &&
3247           dataInZNode.getEventType() != EventType.RS_ZK_REGION_FAILED_OPEN ) {
3248         LOG.warn("While timing out a region in state OPENING, "
3249             + "found ZK node in unexpected state: "
3250             + dataInZNode.getEventType());
3251         return;
3252       }
3253       invokeAssign(regionInfo);
3254     } catch (KeeperException ke) {
3255       LOG.error("Unexpected ZK exception timing out CLOSING region", ke);
3256       return;
3257     }
3258     return;
3259   }
3260 
3261   private void invokeAssign(HRegionInfo regionInfo) {
3262     threadPoolExecutorService.submit(new AssignCallable(this, regionInfo));
3263   }
3264 
3265   private void invokeUnassign(HRegionInfo regionInfo) {
3266     threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
3267   }
3268 
3269   public boolean isCarryingRoot(ServerName serverName) {
3270     return isCarryingRegion(serverName, HRegionInfo.ROOT_REGIONINFO);
3271   }
3272 
3273   public boolean isCarryingMeta(ServerName serverName) {
3274     return isCarryingRegion(serverName, HRegionInfo.FIRST_META_REGIONINFO);
3275   }
3276   /**
3277    * Check if the shutdown server carries the specific region.
3278    * We have a bunch of places that store region location
3279    * Those values aren't consistent. There is a delay of notification.
3280    * The location from zookeeper unassigned node has the most recent data;
3281    * but the node could be deleted after the region is opened by AM.
3282    * The AM's info could be old when OpenedRegionHandler
3283    * processing hasn't finished yet when server shutdown occurs.
3284    * @return whether the serverName currently hosts the region
3285    */
3286   public boolean isCarryingRegion(ServerName serverName, HRegionInfo hri) {
3287     RegionTransitionData data = null;
3288     try {
3289       data = ZKAssign.getData(master.getZooKeeper(), hri.getEncodedName());
3290     } catch (KeeperException e) {
3291       master.abort("Unexpected ZK exception reading unassigned node for region="
3292         + hri.getEncodedName(), e);
3293     }
3294 
3295     ServerName addressFromZK = (data != null && data.getOrigin() != null) ?
3296       data.getOrigin() : null;
3297     if (addressFromZK != null) {
3298       // if we get something from ZK, we will use the data
3299       boolean matchZK = (addressFromZK != null &&
3300         addressFromZK.equals(serverName));
3301       LOG.debug("based on ZK, current region=" + hri.getRegionNameAsString() +
3302           " is on server=" + addressFromZK +
3303           " server being checked=: " + serverName);
3304       return matchZK;
3305     }
3306 
3307     ServerName addressFromAM = getRegionServerOfRegion(hri);
3308     boolean matchAM = (addressFromAM != null &&
3309       addressFromAM.equals(serverName));
3310     LOG.debug("based on AM, current region=" + hri.getRegionNameAsString() +
3311       " is on server=" + (addressFromAM != null ? addressFromAM : "null") +
3312       " server being checked: " + serverName);
3313 
3314     return matchAM;
3315   }
3316 
3317   /**
3318    * Start processing of shutdown server.
3319    * @param sn Server that went down.
3320    * @return Pair that has a set of regions in transition TO the dead server and
3321    * a list of regions that were in transition, and also ON this server.
3322    */
3323   public Pair<Set<HRegionInfo>, List<RegionState>> processServerShutdown(final ServerName sn) {
3324     // Clean out any existing assignment plans for this server
3325     synchronized (this.regionPlans) {
3326       for (Iterator <Map.Entry<String, RegionPlan>> i =
3327           this.regionPlans.entrySet().iterator(); i.hasNext();) {
3328         Map.Entry<String, RegionPlan> e = i.next();
3329         ServerName otherSn = e.getValue().getDestination();
3330         // The name will be null if the region is planned for a random assign.
3331         if (otherSn != null && otherSn.equals(sn)) {
3332           // Use iterator's remove else we'll get CME
3333           i.remove();
3334         }
3335       }
3336     }
3337     // TODO: Do we want to sync on RIT here?
3338     // Remove this server from map of servers to regions, and remove all regions
3339     // of this server from online map of regions.
3340     Set<HRegionInfo> deadRegions = new TreeSet<HRegionInfo>();
3341     synchronized (this.regions) {
3342       Set<HRegionInfo> assignedRegions = this.servers.remove(sn);
3343       if (assignedRegions != null && !assignedRegions.isEmpty()) {
3344         deadRegions.addAll(assignedRegions);
3345         for (HRegionInfo region : deadRegions) {
3346           this.regions.remove(region);
3347         }
3348       }
3349     }
3350     // See if any of the regions that were online on this server were in RIT
3351     // If they are, normal timeouts will deal with them appropriately so
3352     // let's skip a manual re-assignment.
3353     Set<HRegionInfo> ritsGoingToServer = new ConcurrentSkipListSet<HRegionInfo>();
3354     List<RegionState> ritsOnServer = new ArrayList<RegionState>();
3355     synchronized (regionsInTransition) {
3356       for (RegionState state : this.regionsInTransition.values()) {
3357         // If destination server in RegionState is same as dead server then add to regions to assign
3358         // Skip the region in OFFLINE state because destionation server in RegionState is master
3359         // server name. Skip the region if the destionation server in RegionState is other than dead
3360         // server.
3361         if ((state.getServerName() != null) && state.getServerName().equals(sn)) {
3362           ritsGoingToServer.add(state.getRegion());
3363         }
3364         if (deadRegions.contains(state.getRegion())) {
3365           ritsOnServer.add(state);
3366         }
3367       }
3368     }
3369     return new Pair<Set<HRegionInfo>, List<RegionState>>(ritsGoingToServer, ritsOnServer);
3370   }
3371 
3372   /**
3373    * Update inmemory structures.
3374    * @param sn Server that reported the split
3375    * @param parent Parent region that was split
3376    * @param a Daughter region A
3377    * @param b Daughter region B
3378    */
3379   public void handleSplitReport(final ServerName sn, final HRegionInfo parent,
3380       final HRegionInfo a, final HRegionInfo b) {
3381     regionOffline(parent);
3382     regionOnline(a, sn);
3383     regionOnline(b, sn);
3384 
3385     // There's a possibility that the region was splitting while a user asked
3386     // the master to disable, we need to make sure we close those regions in
3387     // that case. This is not racing with the region server itself since RS
3388     // report is done after the split transaction completed.
3389     if (this.zkTable.isDisablingOrDisabledTable(
3390         parent.getTableNameAsString())) {
3391       unassign(a);
3392       unassign(b);
3393     }
3394   }
3395 
3396   /**
3397    * This is an EXPENSIVE clone.  Cloning though is the safest thing to do.
3398    * Can't let out original since it can change and at least the loadbalancer
3399    * wants to iterate this exported list.  We need to synchronize on regions
3400    * since all access to this.servers is under a lock on this.regions.
3401    * 
3402    * @return A clone of current assignments by table.
3403    */
3404   Map<String, Map<ServerName, List<HRegionInfo>>> getAssignmentsByTable() {
3405     Map<String, Map<ServerName, List<HRegionInfo>>> result = null;
3406     synchronized (this.regions) {
3407       result = new HashMap<String, Map<ServerName,List<HRegionInfo>>>();
3408       if (!this.master.getConfiguration().
3409           getBoolean("hbase.master.loadbalance.bytable", true)) {
3410         result.put("ensemble", getAssignments());
3411       } else {
3412         for (Map.Entry<ServerName, Set<HRegionInfo>> e: this.servers.entrySet()) {
3413           for (HRegionInfo hri : e.getValue()) {
3414             if (hri.isMetaRegion() || hri.isRootRegion()) continue;
3415             String tablename = hri.getTableNameAsString();
3416             Map<ServerName, List<HRegionInfo>> svrToRegions = result.get(tablename);
3417             if (svrToRegions == null) {
3418               svrToRegions = new HashMap<ServerName, List<HRegionInfo>>(this.servers.size());
3419               result.put(tablename, svrToRegions);
3420             }
3421             List<HRegionInfo> regions = null;
3422             if (!svrToRegions.containsKey(e.getKey())) {
3423               regions = new ArrayList<HRegionInfo>();
3424               svrToRegions.put(e.getKey(), regions);
3425             } else {
3426               regions = svrToRegions.get(e.getKey());
3427             }
3428             regions.add(hri);
3429           }
3430         }
3431       }
3432     }
3433     Map<ServerName, HServerLoad> onlineSvrs = this.serverManager.getOnlineServers();
3434     List<ServerName> drainingServers = this.serverManager.getDrainingServersList();
3435     // Take care of servers w/o assignments.
3436     for (Map<ServerName,List<HRegionInfo>> map : result.values()) {
3437       for (Map.Entry<ServerName, HServerLoad> svrEntry: onlineSvrs.entrySet()) {
3438         if (!map.containsKey(svrEntry.getKey())) {
3439           map.put(svrEntry.getKey(), new ArrayList<HRegionInfo>());
3440         }
3441       }
3442       map.keySet().removeAll(drainingServers);
3443     }
3444     return result;
3445   }
3446   
3447   /**
3448    * @return A clone of current assignments. Note, this is assignments only.
3449    * If a new server has come in and it has no regions, it will not be included
3450    * in the returned Map.
3451    */
3452   Map<ServerName, List<HRegionInfo>> getAssignments() {
3453     // This is an EXPENSIVE clone.  Cloning though is the safest thing to do.
3454     // Can't let out original since it can change and at least the loadbalancer
3455     // wants to iterate this exported list.  We need to synchronize on regions
3456     // since all access to this.servers is under a lock on this.regions.
3457     Map<ServerName, List<HRegionInfo>> result = null;
3458     synchronized (this.regions) {
3459       result = new HashMap<ServerName, List<HRegionInfo>>(this.servers.size());
3460       for (Map.Entry<ServerName, Set<HRegionInfo>> e: this.servers.entrySet()) {
3461         result.put(e.getKey(), new ArrayList<HRegionInfo>(e.getValue()));
3462       }
3463     }
3464     return result;
3465   }
3466 
3467   /**
3468    * @param encodedRegionName Region encoded name.
3469    * @return Null or a {@link Pair} instance that holds the full {@link HRegionInfo}
3470    * and the hosting servers {@link ServerName}.
3471    */
3472   Pair<HRegionInfo, ServerName> getAssignment(final byte [] encodedRegionName) {
3473     String name = Bytes.toString(encodedRegionName);
3474     synchronized(this.regions) {
3475       for (Map.Entry<HRegionInfo, ServerName> e: this.regions.entrySet()) {
3476         if (e.getKey().getEncodedName().equals(name)) {
3477           return new Pair<HRegionInfo, ServerName>(e.getKey(), e.getValue());
3478         }
3479       }
3480     }
3481     return null;
3482   }
3483 
3484   /**
3485    * @param plan Plan to execute.
3486    */
3487   void balance(final RegionPlan plan) {
3488     synchronized (this.regionPlans) {
3489       this.regionPlans.put(plan.getRegionName(), plan);
3490     }
3491     unassign(plan.getRegionInfo());
3492   }
3493 
3494   /**
3495    * Run through remaining regionservers and unassign all catalog regions.
3496    */
3497   void unassignCatalogRegions() {
3498     synchronized (this.regions) {
3499       for (Map.Entry<ServerName, Set<HRegionInfo>> e: this.servers.entrySet()) {
3500         Set<HRegionInfo> regions = e.getValue();
3501         if (regions == null || regions.isEmpty()) continue;
3502         for (HRegionInfo hri: regions) {
3503           if (hri.isMetaRegion()) {
3504             unassign(hri);
3505           }
3506         }
3507       }
3508     }
3509   }
3510 
3511   /**
3512    * State of a Region while undergoing transitions.
3513    */
3514   public static class RegionState implements org.apache.hadoop.io.Writable {
3515     private HRegionInfo region;
3516 
3517     public enum State {
3518       OFFLINE,        // region is in an offline state
3519       PENDING_OPEN,   // sent rpc to server to open but has not begun
3520       OPENING,        // server has begun to open but not yet done
3521       OPEN,           // server opened region and updated meta
3522       PENDING_CLOSE,  // sent rpc to server to close but has not begun
3523       CLOSING,        // server has begun to close but not yet done
3524       CLOSED,         // server closed region and updated meta
3525       SPLITTING,      // server started split of a region
3526       SPLIT           // server completed split of a region
3527     }
3528 
3529     private State state;
3530     // Many threads can update the state at the stamp at the same time
3531     private final AtomicLong stamp;
3532     private ServerName serverName;
3533 
3534     public RegionState() {
3535       this.stamp = new AtomicLong(System.currentTimeMillis());
3536     }
3537 
3538     RegionState(HRegionInfo region, State state) {
3539       this(region, state, System.currentTimeMillis(), null);
3540     }
3541 
3542     RegionState(HRegionInfo region, State state, long stamp, ServerName serverName) {
3543       this.region = region;
3544       this.state = state;
3545       this.stamp = new AtomicLong(stamp);
3546       this.serverName = serverName;
3547     }
3548 
3549     public void update(State state, long stamp, ServerName serverName) {
3550       this.state = state;
3551       updateTimestamp(stamp);
3552       this.serverName = serverName;
3553     }
3554 
3555     public void update(State state) {
3556       this.state = state;
3557       updateTimestampToNow();
3558       this.serverName = null;
3559     }
3560 
3561     public void updateTimestamp(long stamp) {
3562       this.stamp.set(stamp);
3563     }
3564 
3565     public void updateTimestampToNow() {
3566       this.stamp.set(System.currentTimeMillis());
3567     }
3568 
3569     public State getState() {
3570       return state;
3571     }
3572 
3573     public long getStamp() {
3574       return stamp.get();
3575     }
3576 
3577     public HRegionInfo getRegion() {
3578       return region;
3579     }
3580 
3581     public ServerName getServerName() {
3582       return serverName;
3583     }
3584 
3585     public boolean isClosing() {
3586       return state == State.CLOSING;
3587     }
3588 
3589     public boolean isClosed() {
3590       return state == State.CLOSED;
3591     }
3592 
3593     public boolean isPendingClose() {
3594       return state == State.PENDING_CLOSE;
3595     }
3596 
3597     public boolean isOpening() {
3598       return state == State.OPENING;
3599     }
3600 
3601     public boolean isOpened() {
3602       return state == State.OPEN;
3603     }
3604 
3605     public boolean isPendingOpen() {
3606       return state == State.PENDING_OPEN;
3607     }
3608 
3609     public boolean isOffline() {
3610       return state == State.OFFLINE;
3611     }
3612 
3613     public boolean isSplitting() {
3614       return state == State.SPLITTING;
3615     }
3616  
3617     public boolean isSplit() {
3618       return state == State.SPLIT;
3619     }
3620 
3621     @Override
3622     public String toString() {
3623       return region.getRegionNameAsString()
3624         + " state=" + state
3625         + ", ts=" + stamp
3626         + ", server=" + serverName;
3627     }
3628 
3629     /**
3630      * A slower (but more easy-to-read) stringification 
3631      */
3632     public String toDescriptiveString() {
3633       long lstamp = stamp.get();
3634       long relTime = System.currentTimeMillis() - lstamp;
3635       
3636       return region.getRegionNameAsString()
3637         + " state=" + state
3638         + ", ts=" + new Date(lstamp) + " (" + (relTime/1000) + "s ago)"
3639         + ", server=" + serverName;
3640     }
3641 
3642     @Override
3643     public void readFields(DataInput in) throws IOException {
3644       region = new HRegionInfo();
3645       region.readFields(in);
3646       state = State.valueOf(in.readUTF());
3647       stamp.set(in.readLong());
3648     }
3649 
3650     @Override
3651     public void write(DataOutput out) throws IOException {
3652       region.write(out);
3653       out.writeUTF(state.name());
3654       out.writeLong(stamp.get());
3655     }
3656   }
3657   
3658   public void stop() {
3659     shutdown(); // Stop executor service, etc
3660     this.timeoutMonitor.interrupt();
3661     this.timerUpdater.interrupt();
3662   }
3663   
3664   /**
3665    * Check whether the RegionServer is online.
3666    * @param serverName 
3667    * @return True if online.
3668    */
3669   public boolean isServerOnline(ServerName serverName) {
3670     return this.serverManager.isServerOnline(serverName);
3671   }
3672   /**
3673    * Shutdown the threadpool executor service
3674    */
3675   public void shutdown() {
3676     if (null != threadPoolExecutorService) {
3677       this.threadPoolExecutorService.shutdown();
3678     }
3679   }
3680 
3681   protected void setEnabledTable(String tableName) {
3682     try {
3683       this.zkTable.setEnabledTable(tableName);
3684     } catch (KeeperException e) {
3685       // here we can abort as it is the start up flow
3686       String errorMsg = "Unable to ensure that the table " + tableName
3687           + " will be" + " enabled because of a ZooKeeper issue";
3688       LOG.error(errorMsg);
3689       this.master.abort(errorMsg, e);
3690     }
3691   }
3692 
3693 }