1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Arrays;
24  import java.util.Collections;
25  import java.util.HashMap;
26  import java.util.HashSet;
27  import java.util.Iterator;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.NavigableMap;
31  import java.util.Set;
32  import java.util.TreeMap;
33  import java.util.concurrent.ConcurrentHashMap;
34  import java.util.concurrent.ConcurrentSkipListSet;
35  import java.util.concurrent.ThreadFactory;
36  import java.util.concurrent.TimeUnit;
37  import java.util.concurrent.atomic.AtomicBoolean;
38  import java.util.concurrent.atomic.AtomicInteger;
39  import java.util.concurrent.locks.Lock;
40  import java.util.concurrent.locks.ReentrantLock;
41  
42  import org.apache.commons.logging.Log;
43  import org.apache.commons.logging.LogFactory;
44  import org.apache.hadoop.classification.InterfaceAudience;
45  import org.apache.hadoop.conf.Configuration;
46  import org.apache.hadoop.hbase.Chore;
47  import org.apache.hadoop.hbase.HConstants;
48  import org.apache.hadoop.hbase.HRegionInfo;
49  import org.apache.hadoop.hbase.RegionTransition;
50  import org.apache.hadoop.hbase.Server;
51  import org.apache.hadoop.hbase.ServerName;
52  import org.apache.hadoop.hbase.Stoppable;
53  import org.apache.hadoop.hbase.catalog.CatalogTracker;
54  import org.apache.hadoop.hbase.catalog.MetaReader;
55  import org.apache.hadoop.hbase.client.Result;
56  import org.apache.hadoop.hbase.exceptions.DeserializationException;
57  import org.apache.hadoop.hbase.exceptions.NotServingRegionException;
58  import org.apache.hadoop.hbase.exceptions.RegionAlreadyInTransitionException;
59  import org.apache.hadoop.hbase.exceptions.RegionServerStoppedException;
60  import org.apache.hadoop.hbase.exceptions.ServerNotRunningYetException;
61  import org.apache.hadoop.hbase.exceptions.TableNotFoundException;
62  import org.apache.hadoop.hbase.executor.EventHandler;
63  import org.apache.hadoop.hbase.executor.EventType;
64  import org.apache.hadoop.hbase.executor.ExecutorService;
65  import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper;
66  import org.apache.hadoop.hbase.master.balancer.FavoredNodeLoadBalancer;
67  import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler;
68  import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
69  import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
70  import org.apache.hadoop.hbase.master.handler.MergedRegionHandler;
71  import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler;
72  import org.apache.hadoop.hbase.master.handler.SplitRegionHandler;
73  import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
74  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
75  import org.apache.hadoop.hbase.util.KeyLocker;
76  import org.apache.hadoop.hbase.util.Pair;
77  import org.apache.hadoop.hbase.util.Threads;
78  import org.apache.hadoop.hbase.util.Triple;
79  import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker;
80  import org.apache.hadoop.hbase.zookeeper.ZKAssign;
81  import org.apache.hadoop.hbase.zookeeper.ZKTable;
82  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
83  import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
84  import org.apache.hadoop.ipc.RemoteException;
85  import org.apache.zookeeper.AsyncCallback;
86  import org.apache.zookeeper.KeeperException;
87  import org.apache.zookeeper.KeeperException.NoNodeException;
88  import org.apache.zookeeper.KeeperException.NodeExistsException;
89  import org.apache.zookeeper.data.Stat;
90  
91  import com.google.common.base.Preconditions;
92  import com.google.common.collect.LinkedHashMultimap;
93  
94  /**
95   * Manages and performs region assignment.
96   * <p>
97   * Monitors ZooKeeper for events related to regions in transition.
98   * <p>
99   * Handles existing regions in transition during master failover.
100  */
101 @InterfaceAudience.Private
102 public class AssignmentManager extends ZooKeeperListener {
103   private static final Log LOG = LogFactory.getLog(AssignmentManager.class);
104 
105   public static final ServerName HBCK_CODE_SERVERNAME = new ServerName(HConstants.HBCK_CODE_NAME,
106       -1, -1L);
107 
108   protected final Server server;
109 
110   private ServerManager serverManager;
111 
112   private boolean shouldAssignRegionsWithFavoredNodes;
113 
114   private CatalogTracker catalogTracker;
115 
116   protected final TimeoutMonitor timeoutMonitor;
117 
118   private final TimerUpdater timerUpdater;
119 
120   private LoadBalancer balancer;
121 
122   private final TableLockManager tableLockManager;
123 
124   final private KeyLocker<String> locker = new KeyLocker<String>();
125 
126   /**
127    * Map of regions to reopen after the schema of a table is changed. Key -
128    * encoded region name, value - HRegionInfo
129    */
130   private final Map <String, HRegionInfo> regionsToReopen;
131 
132   /*
133    * Maximum times we recurse an assignment/unassignment.
134    * See below in {@link #assign()} and {@link #unassign()}.
135    */
136   private final int maximumAttempts;
137 
138   /** Plans for region movement. Key is the encoded version of a region name*/
139   // TODO: When do plans get cleaned out?  Ever? In server open and in server
140   // shutdown processing -- St.Ack
141   // All access to this Map must be synchronized.
142   final NavigableMap<String, RegionPlan> regionPlans =
143     new TreeMap<String, RegionPlan>();
144 
145   private final ZKTable zkTable;
146 
147   /**
148    * Contains the server which need to update timer, these servers will be
149    * handled by {@link TimerUpdater}
150    */
151   private final ConcurrentSkipListSet<ServerName> serversInUpdatingTimer;
152 
153   private final ExecutorService executorService;
154 
155   //Thread pool executor service for timeout monitor
156   private java.util.concurrent.ExecutorService threadPoolExecutorService;
157 
158   // A bunch of ZK events workers. Each is a single thread executor service
159   private final java.util.concurrent.ExecutorService zkEventWorkers;
160 
161   private List<EventType> ignoreStatesRSOffline = Arrays.asList(
162       EventType.RS_ZK_REGION_FAILED_OPEN, EventType.RS_ZK_REGION_CLOSED);
163 
164   // metrics instance to send metrics for RITs
165   MetricsMaster metricsMaster;
166 
167   private final RegionStates regionStates;
168 
169   // The threshold to use bulk assigning. Using bulk assignment
170   // only if assigning at least this many regions to at least this
171   // many servers. If assigning fewer regions to fewer servers,
172   // bulk assigning may be not as efficient.
173   private final int bulkAssignThresholdRegions;
174   private final int bulkAssignThresholdServers;
175 
176   // Should bulk assignment wait till all regions are assigned,
177   // or it is timed out?  This is useful to measure bulk assignment
178   // performance, but not needed in most use cases.
179   private final boolean bulkAssignWaitTillAllAssigned;
180 
181   /**
182    * Indicator that AssignmentManager has recovered the region states so
183    * that ServerShutdownHandler can be fully enabled and re-assign regions
184    * of dead servers. So that when re-assignment happens, AssignmentManager
185    * has proper region states.
186    *
187    * Protected to ease testing.
188    */
189   protected final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
190 
191   /** Is the TimeOutManagement activated **/
192   private final boolean tomActivated;
193 
194   /**
195    * A map to track the count a region fails to open in a row.
196    * So that we don't try to open a region forever if the failure is
197    * unrecoverable.  We don't put this information in region states
198    * because we don't expect this to happen frequently; we don't
199    * want to copy this information over during each state transition either.
200    */
201   private final ConcurrentHashMap<String, AtomicInteger>
202     failedOpenTracker = new ConcurrentHashMap<String, AtomicInteger>();
203 
204   /**
205    * Constructs a new assignment manager.
206    *
207    * @param server
208    * @param serverManager
209    * @param catalogTracker
210    * @param service
211    * @throws KeeperException
212    * @throws IOException
213    */
214   public AssignmentManager(Server server, ServerManager serverManager,
215       CatalogTracker catalogTracker, final LoadBalancer balancer,
216       final ExecutorService service, MetricsMaster metricsMaster,
217       final TableLockManager tableLockManager) throws KeeperException, IOException {
218     super(server.getZooKeeper());
219     this.server = server;
220     this.serverManager = serverManager;
221     this.catalogTracker = catalogTracker;
222     this.executorService = service;
223     this.regionsToReopen = Collections.synchronizedMap
224                            (new HashMap<String, HRegionInfo> ());
225     Configuration conf = server.getConfiguration();
226     // Only read favored nodes if using the favored nodes load balancer.
227     this.shouldAssignRegionsWithFavoredNodes = conf.getClass(
228            HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class).equals(
229            FavoredNodeLoadBalancer.class);
230     this.tomActivated = conf.getBoolean("hbase.assignment.timeout.management", false);
231     if (tomActivated){
232       this.serversInUpdatingTimer =  new ConcurrentSkipListSet<ServerName>();
233       this.timeoutMonitor = new TimeoutMonitor(
234         conf.getInt("hbase.master.assignment.timeoutmonitor.period", 30000),
235         server, serverManager,
236         conf.getInt("hbase.master.assignment.timeoutmonitor.timeout", 600000));
237       this.timerUpdater = new TimerUpdater(conf.getInt(
238         "hbase.master.assignment.timerupdater.period", 10000), server);
239       Threads.setDaemonThreadRunning(timerUpdater.getThread(),
240         server.getServerName() + ".timerUpdater");
241     } else {
242       this.serversInUpdatingTimer =  null;
243       this.timeoutMonitor = null;
244       this.timerUpdater = null;
245     }
246     this.zkTable = new ZKTable(this.watcher);
247     this.maximumAttempts =
248       this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10);
249     this.balancer = balancer;
250     int maxThreads = conf.getInt("hbase.assignment.threads.max", 30);
251     this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool(
252       maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("hbase-am"));
253     this.metricsMaster = metricsMaster;// can be null only with tests.
254     this.regionStates = new RegionStates(server, serverManager);
255 
256     this.bulkAssignWaitTillAllAssigned =
257       conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false);
258     this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7);
259     this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3);
260 
261     int workers = conf.getInt("hbase.assignment.zkevent.workers", 20);
262     ThreadFactory threadFactory = Threads.newDaemonThreadFactory("hbase-am-zkevent-worker");
263     zkEventWorkers = Threads.getBoundedCachedThreadPool(workers, 60L,
264             TimeUnit.SECONDS, threadFactory);
265     this.tableLockManager = tableLockManager;
266   }
267 
268   void startTimeOutMonitor() {
269     if (tomActivated) {
270       Threads.setDaemonThreadRunning(timeoutMonitor.getThread(), server.getServerName()
271           + ".timeoutMonitor");
272     }
273   }
274 
275   /**
276    * @return Instance of ZKTable.
277    */
278   public ZKTable getZKTable() {
279     // These are 'expensive' to make involving trip to zk ensemble so allow
280     // sharing.
281     return this.zkTable;
282   }
283 
284   /**
285    * This SHOULD not be public. It is public now
286    * because of some unit tests.
287    *
288    * TODO: make it package private and keep RegionStates in the master package
289    */
290   public RegionStates getRegionStates() {
291     return regionStates;
292   }
293 
294   public RegionPlan getRegionReopenPlan(HRegionInfo hri) {
295     return new RegionPlan(hri, null, regionStates.getRegionServerOfRegion(hri));
296   }
297 
298   /**
299    * Add a regionPlan for the specified region.
300    * @param encodedName
301    * @param plan
302    */
303   public void addPlan(String encodedName, RegionPlan plan) {
304     synchronized (regionPlans) {
305       regionPlans.put(encodedName, plan);
306     }
307   }
308 
309   /**
310    * Add a map of region plans.
311    */
312   public void addPlans(Map<String, RegionPlan> plans) {
313     synchronized (regionPlans) {
314       regionPlans.putAll(plans);
315     }
316   }
317 
318   /**
319    * Set the list of regions that will be reopened
320    * because of an update in table schema
321    *
322    * @param regions
323    *          list of regions that should be tracked for reopen
324    */
325   public void setRegionsToReopen(List <HRegionInfo> regions) {
326     for(HRegionInfo hri : regions) {
327       regionsToReopen.put(hri.getEncodedName(), hri);
328     }
329   }
330 
331   /**
332    * Used by the client to identify if all regions have the schema updates
333    *
334    * @param tableName
335    * @return Pair indicating the status of the alter command
336    * @throws IOException
337    */
338   public Pair<Integer, Integer> getReopenStatus(byte[] tableName)
339       throws IOException {
340     List <HRegionInfo> hris =
341       MetaReader.getTableRegions(this.server.getCatalogTracker(), tableName, true);
342     Integer pending = 0;
343     for (HRegionInfo hri : hris) {
344       String name = hri.getEncodedName();
345       // no lock concurrent access ok: sequential consistency respected.
346       if (regionsToReopen.containsKey(name)
347           || regionStates.isRegionInTransition(name)) {
348         pending++;
349       }
350     }
351     return new Pair<Integer, Integer>(pending, hris.size());
352   }
353 
354   /**
355    * Used by ServerShutdownHandler to make sure AssignmentManager has completed
356    * the failover cleanup before re-assigning regions of dead servers. So that
357    * when re-assignment happens, AssignmentManager has proper region states.
358    */
359   public boolean isFailoverCleanupDone() {
360     return failoverCleanupDone.get();
361   }
362 
363   /**
364    * Now, failover cleanup is completed. Notify server manager to
365    * process queued up dead servers processing, if any.
366    */
367   void failoverCleanupDone() {
368     failoverCleanupDone.set(true);
369     serverManager.processQueuedDeadServers();
370   }
371 
372   /**
373    * Called on startup.
374    * Figures whether a fresh cluster start of we are joining extant running cluster.
375    * @throws IOException
376    * @throws KeeperException
377    * @throws InterruptedException
378    */
379   void joinCluster() throws IOException,
380       KeeperException, InterruptedException {
381     // Concurrency note: In the below the accesses on regionsInTransition are
382     // outside of a synchronization block where usually all accesses to RIT are
383     // synchronized.  The presumption is that in this case it is safe since this
384     // method is being played by a single thread on startup.
385 
386     // TODO: Regions that have a null location and are not in regionsInTransitions
387     // need to be handled.
388 
389     // Scan META to build list of existing regions, servers, and assignment
390     // Returns servers who have not checked in (assumed dead) and their regions
391     Map<ServerName, List<HRegionInfo>> deadServers = rebuildUserRegions();
392 
393     // This method will assign all user regions if a clean server startup or
394     // it will reconstruct master state and cleanup any leftovers from
395     // previous master process.
396     processDeadServersAndRegionsInTransition(deadServers);
397 
398     recoverTableInDisablingState();
399     recoverTableInEnablingState();
400   }
401 
402   /**
403    * Process all regions that are in transition in zookeeper and also
404    * processes the list of dead servers by scanning the META.
405    * Used by master joining an cluster.  If we figure this is a clean cluster
406    * startup, will assign all user regions.
407    * @param deadServers
408    *          Map of dead servers and their regions. Can be null.
409    * @throws KeeperException
410    * @throws IOException
411    * @throws InterruptedException
412    */
413   void processDeadServersAndRegionsInTransition(
414       final Map<ServerName, List<HRegionInfo>> deadServers)
415           throws KeeperException, IOException, InterruptedException {
416     List<String> nodes = ZKUtil.listChildrenNoWatch(watcher,
417       watcher.assignmentZNode);
418 
419     if (nodes == null) {
420       String errorMessage = "Failed to get the children from ZK";
421       server.abort(errorMessage, new IOException(errorMessage));
422       return;
423     }
424 
425     boolean failover = (!serverManager.getDeadServers().isEmpty() || !serverManager
426         .getRequeuedDeadServers().isEmpty());
427 
428     if (!failover) {
429       // Run through all regions.  If they are not assigned and not in RIT, then
430       // its a clean cluster startup, else its a failover.
431       Map<HRegionInfo, ServerName> regions = regionStates.getRegionAssignments();
432       for (Map.Entry<HRegionInfo, ServerName> e: regions.entrySet()) {
433         if (!e.getKey().isMetaTable() && e.getValue() != null) {
434           LOG.debug("Found " + e + " out on cluster");
435           failover = true;
436           break;
437         }
438         if (nodes.contains(e.getKey().getEncodedName())) {
439           LOG.debug("Found " + e.getKey().getRegionNameAsString() + " in RITs");
440           // Could be a meta region.
441           failover = true;
442           break;
443         }
444       }
445     }
446 
447     // If we found user regions out on cluster, its a failover.
448     if (failover) {
449       LOG.info("Found regions out on cluster or in RIT; failover");
450       // Process list of dead servers and regions in RIT.
451       // See HBASE-4580 for more information.
452       processDeadServersAndRecoverLostRegions(deadServers);
453     } else {
454       // Fresh cluster startup.
455       LOG.info("Clean cluster startup. Assigning userregions");
456       assignAllUserRegions();
457     }
458   }
459 
460   /**
461    * If region is up in zk in transition, then do fixup and block and wait until
462    * the region is assigned and out of transition.  Used on startup for
463    * catalog regions.
464    * @param hri Region to look for.
465    * @return True if we processed a region in transition else false if region
466    * was not up in zk in transition.
467    * @throws InterruptedException
468    * @throws KeeperException
469    * @throws IOException
470    */
471   boolean processRegionInTransitionAndBlockUntilAssigned(final HRegionInfo hri)
472       throws InterruptedException, KeeperException, IOException {
473     boolean intransistion = processRegionInTransition(hri.getEncodedName(), hri);
474     if (!intransistion) return intransistion;
475     LOG.debug("Waiting on " + HRegionInfo.prettyPrint(hri.getEncodedName()));
476     while (!this.server.isStopped() &&
477       this.regionStates.isRegionInTransition(hri.getEncodedName())) {
478       // We put a timeout because we may have the region getting in just between the test
479       //  and the waitForUpdate
480       this.regionStates.waitForUpdate(100);
481     }
482     return intransistion;
483   }
484 
485   /**
486    * Process failover of new master for region <code>encodedRegionName</code>
487    * up in zookeeper.
488    * @param encodedRegionName Region to process failover for.
489    * @param regionInfo If null we'll go get it from meta table.
490    * @return True if we processed <code>regionInfo</code> as a RIT.
491    * @throws KeeperException
492    * @throws IOException
493    */
494   boolean processRegionInTransition(final String encodedRegionName,
495       final HRegionInfo regionInfo) throws KeeperException, IOException {
496     // We need a lock here to ensure that we will not put the same region twice
497     // It has no reason to be a lock shared with the other operations.
498     // We can do the lock on the region only, instead of a global lock: what we want to ensure
499     // is that we don't have two threads working on the same region.
500     Lock lock = locker.acquireLock(encodedRegionName);
501     try {
502       Stat stat = new Stat();
503       byte [] data = ZKAssign.getDataAndWatch(watcher, encodedRegionName, stat);
504       if (data == null) return false;
505       RegionTransition rt;
506       try {
507         rt = RegionTransition.parseFrom(data);
508       } catch (DeserializationException e) {
509         LOG.warn("Failed parse znode data", e);
510         return false;
511       }
512       HRegionInfo hri = regionInfo;
513       if (hri == null) {
514         hri = regionStates.getRegionInfo(rt.getRegionName());
515         if (hri == null) return false;
516       }
517       processRegionsInTransition(rt, hri, stat.getVersion());
518       return true;
519     } finally {
520       lock.unlock();
521     }
522   }
523 
524   /**
525    * This call is invoked only (1) master assign meta;
526    * (2) during failover mode startup, zk assignment node processing.
527    * The locker is set in the caller.
528    *
529    * It should be private but it is used by some test too.
530    */
531   void processRegionsInTransition(
532       final RegionTransition rt, final HRegionInfo regionInfo,
533       final int expectedVersion) throws KeeperException {
534     EventType et = rt.getEventType();
535     // Get ServerName.  Could not be null.
536     final ServerName sn = rt.getServerName();
537     String encodedRegionName = regionInfo.getEncodedName();
538     LOG.info("Processing region " + regionInfo.getRegionNameAsString() + " in state " + et);
539 
540 
541     if (regionStates.isRegionInTransition(encodedRegionName)) {
542       // Just return
543       return;
544     }
545     switch (et) {
546       case M_ZK_REGION_CLOSING:
547         // If zk node of the region was updated by a live server skip this
548         // region and just add it into RIT.
549         if (!serverManager.isServerOnline(sn)) {
550           // If was not online, its closed now. Force to OFFLINE and this
551           // will get it reassigned if appropriate
552           forceOffline(regionInfo, rt);
553         } else {
554           // Insert into RIT & resend the query to the region server: may be the previous master
555           // died before sending the query the first time.
556           regionStates.updateRegionState(rt, RegionState.State.CLOSING);
557           final RegionState rs = regionStates.getRegionState(regionInfo);
558           this.executorService.submit(
559               new EventHandler(server, EventType.M_MASTER_RECOVERY) {
560                 @Override
561                 public void process() throws IOException {
562                   ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
563                   try {
564                     unassign(regionInfo, rs, expectedVersion, null, true, null);
565                   } finally {
566                     lock.unlock();
567                   }
568                 }
569               });
570         }
571         break;
572 
573       case RS_ZK_REGION_CLOSED:
574       case RS_ZK_REGION_FAILED_OPEN:
575         // Region is closed, insert into RIT and handle it
576         addToRITandCallClose(regionInfo, RegionState.State.CLOSED, rt);
577         break;
578 
579       case M_ZK_REGION_OFFLINE:
580         // If zk node of the region was updated by a live server skip this
581         // region and just add it into RIT.
582         if (!serverManager.isServerOnline(sn)) {
583           // Region is offline, insert into RIT and handle it like a closed
584           addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, rt);
585         } else {
586           // Insert in RIT and resend to the regionserver
587           regionStates.updateRegionState(rt, RegionState.State.PENDING_OPEN);
588           final RegionState rs = regionStates.getRegionState(regionInfo);
589           this.executorService.submit(
590               new EventHandler(server, EventType.M_MASTER_RECOVERY) {
591                 @Override
592                 public void process() throws IOException {
593                   ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
594                   try {
595                     assign(rs, false, false);
596                   } finally {
597                     lock.unlock();
598                   }
599                 }
600               });
601         }
602         break;
603 
604       case RS_ZK_REGION_OPENING:
605         if (!serverManager.isServerOnline(sn)) {
606           forceOffline(regionInfo, rt);
607         } else {
608           regionStates.updateRegionState(rt, RegionState.State.OPENING);
609         }
610         break;
611 
612       case RS_ZK_REGION_OPENED:
613         if (!serverManager.isServerOnline(sn)) {
614           forceOffline(regionInfo, rt);
615         } else {
616           // Region is opened, insert into RIT and handle it
617           // This could be done asynchronously, we would need then to acquire the lock in the
618           //  handler.
619           regionStates.updateRegionState(rt, RegionState.State.OPEN);
620           new OpenedRegionHandler(server, this, regionInfo, sn, expectedVersion).process();
621         }
622         break;
623       case RS_ZK_REGION_SPLITTING:
624         if (!serverManager.isServerOnline(sn)) {
625           // The regionserver started the split, but died before updating the status.
626           // It means (hopefully) that the split was not finished
627           // TBD - to study. In the meantime, do nothing as in the past.
628           LOG.warn("Processed region " + regionInfo.getEncodedName() + " in state : " + et +
629               " on a dead regionserver: " + sn + " doing nothing");
630         } else {
631           LOG.info("Processed region " + regionInfo.getEncodedName() + " in state : " +
632               et + " nothing to do.");
633           // We don't do anything. The way the code is written in RS_ZK_REGION_SPLIT management,
634           //  it adds the RS_ZK_REGION_SPLITTING state if needed. So we don't have to do it here.
635         }
636         break;
637       case RS_ZK_REGION_SPLIT:
638         if (!serverManager.isServerOnline(sn)) {
639           forceOffline(regionInfo, rt);
640         } else {
641           LOG.info("Processed region " + regionInfo.getEncodedName() + " in state : " +
642               et + " nothing to do.");
643           // We don't do anything. The regionserver is supposed to update the znode
644           // multiple times so if it's still up we will receive an update soon.
645         }
646         break;
647       case RS_ZK_REGION_MERGING:
648         // nothing to do
649         LOG.info("Processed region " + regionInfo.getEncodedName()
650             + " in state : " + et + " nothing to do.");
651         break;
652       case RS_ZK_REGION_MERGE:
653         if (!serverManager.isServerOnline(sn)) {
654           // ServerShutdownHandler would handle this region
655           LOG.warn("Processed region " + regionInfo.getEncodedName()
656               + " in state : " + et + " on a dead regionserver: " + sn
657               + " doing nothing");
658         } else {
659           LOG.info("Processed region " + regionInfo.getEncodedName() + " in state : " +
660               et + " nothing to do.");
661           // We don't do anything. The regionserver is supposed to update the znode
662           // multiple times so if it's still up we will receive an update soon.
663         }
664         break;
665       default:
666         throw new IllegalStateException("Received region in state :" + et + " is not valid.");
667     }
668   }
669 
670   /**
671    * Put the region <code>hri</code> into an offline state up in zk.
672    *
673    * You need to have lock on the region before calling this method.
674    *
675    * @param hri
676    * @param oldRt
677    * @throws KeeperException
678    */
679   private void forceOffline(final HRegionInfo hri, final RegionTransition oldRt)
680       throws KeeperException {
681     // If was on dead server, its closed now.  Force to OFFLINE and then
682     // handle it like a close; this will get it reassigned if appropriate
683     LOG.debug("RIT " + hri.getEncodedName() + " in state=" + oldRt.getEventType() +
684       " was on deadserver; forcing offline");
685     ZKAssign.createOrForceNodeOffline(this.watcher, hri, oldRt.getServerName());
686     addToRITandCallClose(hri, RegionState.State.OFFLINE, oldRt);
687   }
688 
689   /**
690    * Add to the in-memory copy of regions in transition and then call close
691    * handler on passed region <code>hri</code>
692    * @param hri
693    * @param state
694    * @param oldData
695    */
696   private void addToRITandCallClose(final HRegionInfo hri,
697       final RegionState.State state, final RegionTransition oldData) {
698     regionStates.updateRegionState(oldData, state);
699     new ClosedRegionHandler(this.server, this, hri).process();
700   }
701 
702   /**
703    * When a region is closed, it should be removed from the regionsToReopen
704    * @param hri HRegionInfo of the region which was closed
705    */
706   public void removeClosedRegion(HRegionInfo hri) {
707     if (regionsToReopen.remove(hri.getEncodedName()) != null) {
708       LOG.debug("Removed region from reopening regions because it was closed");
709     }
710   }
711 
712   /**
713    * Handles various states an unassigned node can be in.
714    * <p>
715    * Method is called when a state change is suspected for an unassigned node.
716    * <p>
717    * This deals with skipped transitions (we got a CLOSED but didn't see CLOSING
718    * yet).
719    * @param rt
720    * @param expectedVersion
721    */
722   private void handleRegion(final RegionTransition rt, int expectedVersion) {
723     if (rt == null) {
724       LOG.warn("Unexpected NULL input for RegionTransition rt");
725       return;
726     }
727     final ServerName sn = rt.getServerName();
728     // Check if this is a special HBCK transition
729     if (sn.equals(HBCK_CODE_SERVERNAME)) {
730       handleHBCK(rt);
731       return;
732     }
733     final long createTime = rt.getCreateTime();
734     final byte[] regionName = rt.getRegionName();
735     String encodedName = HRegionInfo.encodeRegionName(regionName);
736     String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
737     // Verify this is a known server
738     if (!serverManager.isServerOnline(sn)
739       && !ignoreStatesRSOffline.contains(rt.getEventType())) {
740       LOG.warn("Attempted to handle region transition for server but " +
741         "server is not online: " + prettyPrintedRegionName);
742       return;
743     }
744 
745     RegionState regionState =
746       regionStates.getRegionTransitionState(encodedName);
747     long startTime = System.currentTimeMillis();
748     if (LOG.isDebugEnabled()) {
749       boolean lateEvent = createTime < (startTime - 15000);
750       LOG.debug("Handling transition=" + rt.getEventType() +
751         ", server=" + sn + ", region=" +
752         (prettyPrintedRegionName == null ? "null" : prettyPrintedRegionName) +
753         (lateEvent ? ", which is more than 15 seconds late" : "") +
754         ", current state from region state map =" + regionState);
755     }
756     // We don't do anything for this event,
757     // so separate it out, no need to lock/unlock anything
758     if (rt.getEventType() == EventType.M_ZK_REGION_OFFLINE) {
759       return;
760     }
761 
762     // We need a lock on the region as we could update it
763     Lock lock = locker.acquireLock(encodedName);
764     try {
765       RegionState latestState =
766         regionStates.getRegionTransitionState(encodedName);
767       if ((regionState == null && latestState != null)
768           || (regionState != null && latestState == null)
769           || (regionState != null && latestState != null
770             && latestState.getState() != regionState.getState())) {
771         LOG.warn("Region state changed from " + regionState + " to "
772           + latestState + ", while acquiring lock");
773       }
774       long waitedTime = System.currentTimeMillis() - startTime;
775       if (waitedTime > 5000) {
776         LOG.warn("Took " + waitedTime + "ms to acquire the lock");
777       }
778       regionState = latestState;
779       switch (rt.getEventType()) {
780         case RS_ZK_REGION_SPLITTING:
781           if (!isInStateForSplitting(regionState)) break;
782           regionStates.updateRegionState(rt, RegionState.State.SPLITTING);
783           break;
784 
785         case RS_ZK_REGION_SPLIT:
786           // RegionState must be null, or SPLITTING or PENDING_CLOSE.
787           if (!isInStateForSplitting(regionState)) break;
788           // If null, add SPLITTING state before going to SPLIT
789           if (regionState == null) {
790             regionState = regionStates.updateRegionState(rt,
791               RegionState.State.SPLITTING);
792 
793             String message = "Received SPLIT for region " + prettyPrintedRegionName +
794               " from server " + sn;
795             // If still null, it means we cannot find it and it was already processed
796             if (regionState == null) {
797               LOG.warn(message + " but it doesn't exist anymore," +
798                   " probably already processed its split");
799               break;
800             }
801             LOG.info(message +
802                 " but region was not first in SPLITTING state; continuing");
803           }
804           // Check it has daughters.
805           byte [] payload = rt.getPayload();
806           List<HRegionInfo> daughters;
807           try {
808             daughters = HRegionInfo.parseDelimitedFrom(payload, 0, payload.length);
809           } catch (IOException e) {
810             LOG.error("Dropped split! Failed reading split payload for " +
811               prettyPrintedRegionName);
812             break;
813           }
814           assert daughters.size() == 2;
815           // Assert that we can get a serverinfo for this server.
816           if (!this.serverManager.isServerOnline(sn)) {
817             LOG.error("Dropped split! ServerName=" + sn + " unknown.");
818             break;
819           }
820           // Run handler to do the rest of the SPLIT handling.
821           this.executorService.submit(new SplitRegionHandler(server, this,
822             regionState.getRegion(), sn, daughters));
823           break;
824 
825         case RS_ZK_REGION_MERGING:
826           // Merged region is a new region, we can't find it in the region states now.
827           // Do nothing.
828           break;
829 
830         case RS_ZK_REGION_MERGE:
831           // Assert that we can get a serverinfo for this server.
832           if (!this.serverManager.isServerOnline(sn)) {
833             LOG.error("Dropped merge! ServerName=" + sn + " unknown.");
834             break;
835           }
836           // Get merged and merging regions.
837           byte[] payloadOfMerge = rt.getPayload();
838           List<HRegionInfo> mergeRegions;
839           try {
840             mergeRegions = HRegionInfo.parseDelimitedFrom(payloadOfMerge, 0,
841                 payloadOfMerge.length);
842           } catch (IOException e) {
843             LOG.error("Dropped merge! Failed reading merge payload for " +
844               prettyPrintedRegionName);
845             break;
846           }
847           assert mergeRegions.size() == 3;
848           // Run handler to do the rest of the MERGE handling.
849           this.executorService.submit(new MergedRegionHandler(server, this, sn,
850               mergeRegions));
851           break;
852 
853         case M_ZK_REGION_CLOSING:
854           // Should see CLOSING after we have asked it to CLOSE or additional
855           // times after already being in state of CLOSING
856           if (regionState != null
857               && !regionState.isPendingCloseOrClosingOnServer(sn)) {
858             LOG.warn("Received CLOSING for region " + prettyPrintedRegionName
859               + " from server " + sn + " but region was in the state " + regionState
860               + " and not in expected PENDING_CLOSE or CLOSING states,"
861               + " or not on the expected server");
862             return;
863           }
864           // Transition to CLOSING (or update stamp if already CLOSING)
865           regionStates.updateRegionState(rt, RegionState.State.CLOSING);
866           break;
867 
868         case RS_ZK_REGION_CLOSED:
869           // Should see CLOSED after CLOSING but possible after PENDING_CLOSE
870           if (regionState != null
871               && !regionState.isPendingCloseOrClosingOnServer(sn)) {
872             LOG.warn("Received CLOSED for region " + prettyPrintedRegionName
873               + " from server " + sn + " but region was in the state " + regionState
874               + " and not in expected PENDING_CLOSE or CLOSING states,"
875               + " or not on the expected server");
876             return;
877           }
878           // Handle CLOSED by assigning elsewhere or stopping if a disable
879           // If we got here all is good.  Need to update RegionState -- else
880           // what follows will fail because not in expected state.
881           regionState = regionStates.updateRegionState(rt, RegionState.State.CLOSED);
882           if (regionState != null) {
883             removeClosedRegion(regionState.getRegion());
884             this.executorService.submit(new ClosedRegionHandler(server,
885               this, regionState.getRegion()));
886           }
887           break;
888 
889         case RS_ZK_REGION_FAILED_OPEN:
890           if (regionState != null
891               && !regionState.isPendingOpenOrOpeningOnServer(sn)) {
892             LOG.warn("Received FAILED_OPEN for region " + prettyPrintedRegionName
893               + " from server " + sn + " but region was in the state " + regionState
894               + " and not in expected PENDING_OPEN or OPENING states,"
895               + " or not on the expected server");
896             return;
897           }
898           // Handle this the same as if it were opened and then closed.
899           regionState = regionStates.updateRegionState(rt, RegionState.State.CLOSED);
900           // When there are more than one region server a new RS is selected as the
901           // destination and the same is updated in the regionplan. (HBASE-5546)
902           if (regionState != null) {
903             AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
904             if (failedOpenCount == null) {
905               failedOpenCount = new AtomicInteger();
906               // No need to use putIfAbsent, or extra synchronization since
907               // this whole handleRegion block is locked on the encoded region
908               // name, and failedOpenTracker is updated only in this block
909               failedOpenTracker.put(encodedName, failedOpenCount);
910             }
911             if (failedOpenCount.incrementAndGet() >= maximumAttempts) {
912               regionStates.updateRegionState(
913                 regionState.getRegion(), RegionState.State.FAILED_OPEN);
914               // remove the tracking info to save memory, also reset
915               // the count for next open initiative
916               failedOpenTracker.remove(encodedName);
917             } else {
918               getRegionPlan(regionState.getRegion(), sn, true);
919               this.executorService.submit(new ClosedRegionHandler(server,
920                 this, regionState.getRegion()));
921             }
922           }
923           break;
924 
925         case RS_ZK_REGION_OPENING:
926           // Should see OPENING after we have asked it to OPEN or additional
927           // times after already being in state of OPENING
928           if (regionState != null
929               && !regionState.isPendingOpenOrOpeningOnServer(sn)) {
930             LOG.warn("Received OPENING for region " + prettyPrintedRegionName
931               + " from server " + sn + " but region was in the state " + regionState
932               + " and not in expected PENDING_OPEN or OPENING states,"
933               + " or not on the expected server");
934             return;
935           }
936           // Transition to OPENING (or update stamp if already OPENING)
937           regionStates.updateRegionState(rt, RegionState.State.OPENING);
938           break;
939 
940         case RS_ZK_REGION_OPENED:
941           // Should see OPENED after OPENING but possible after PENDING_OPEN
942           if (regionState != null
943               && !regionState.isPendingOpenOrOpeningOnServer(sn)) {
944             LOG.warn("Received OPENED for region " + prettyPrintedRegionName
945               + " from server " + sn + " but region was in the state " + regionState
946               + " and not in expected PENDING_OPEN or OPENING states,"
947               + " or not on the expected server");
948             // Close it without updating the internal region states,
949             // so as not to create double assignments in unlucky scenarios
950             // mentioned in OpenRegionHandler#process
951             unassign(regionState.getRegion(), null, -1, null, false, sn);
952             return;
953           }
954           // Handle OPENED by removing from transition and deleted zk node
955           regionState = regionStates.updateRegionState(rt, RegionState.State.OPEN);
956           if (regionState != null) {
957             failedOpenTracker.remove(encodedName); // reset the count, if any
958             this.executorService.submit(new OpenedRegionHandler(
959               server, this, regionState.getRegion(), sn, expectedVersion));
960           }
961           break;
962 
963         default:
964           throw new IllegalStateException("Received event is not valid.");
965       }
966     } finally {
967       lock.unlock();
968     }
969   }
970 
971   /**
972    * @return Returns true if this RegionState is splittable; i.e. the
973    * RegionState is currently in splitting state or pending_close or
974    * null (Anything else will return false). (Anything else will return false).
975    */
976   private boolean isInStateForSplitting(final RegionState rs) {
977     if (rs == null) return true;
978     if (rs.isSplitting()) return true;
979     if (convertPendingCloseToSplitting(rs)) return true;
980     LOG.warn("Dropped region split! Not in state good for SPLITTING; rs=" + rs);
981     return false;
982   }
983 
984   // TODO: processFavoredNodes might throw an exception, for e.g., if the
985   // meta could not be contacted/updated. We need to see how seriously to treat
986   // this problem as. Should we fail the current assignment. We should be able
987   // to recover from this problem eventually (if the meta couldn't be updated
988   // things should work normally and eventually get fixed up).
989   void processFavoredNodes(List<HRegionInfo> regions) throws IOException {
990     if (!shouldAssignRegionsWithFavoredNodes) return;
991     // The AM gets the favored nodes info for each region and updates the meta
992     // table with that info
993     Map<HRegionInfo, List<ServerName>> regionToFavoredNodes =
994         new HashMap<HRegionInfo, List<ServerName>>();
995     for (HRegionInfo region : regions) {
996       regionToFavoredNodes.put(region,
997           ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region));
998     }
999     FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(regionToFavoredNodes, catalogTracker);
1000   }
1001 
1002   /**
1003    * If the passed regionState is in PENDING_CLOSE, clean up PENDING_CLOSE
1004    * state and convert it to SPLITTING instead.
1005    * This can happen in case where master wants to close a region at same time
1006    * a regionserver starts a split.  The split won.  Clean out old PENDING_CLOSE
1007    * state.
1008    * @param rs
1009    * @return True if we converted from PENDING_CLOSE to SPLITTING
1010    */
1011   private boolean convertPendingCloseToSplitting(final RegionState rs) {
1012     if (!rs.isPendingClose()) return false;
1013     LOG.debug("Converting PENDING_CLOSE to SPLITTING; rs=" + rs);
1014     regionStates.updateRegionState(
1015       rs.getRegion(), RegionState.State.SPLITTING);
1016     // Clean up existing state.  Clear from region plans seems all we
1017     // have to do here by way of clean up of PENDING_CLOSE.
1018     clearRegionPlan(rs.getRegion());
1019     return true;
1020   }
1021 
1022   /**
1023    * Handle a ZK unassigned node transition triggered by HBCK repair tool.
1024    * <p>
1025    * This is handled in a separate code path because it breaks the normal rules.
1026    * @param rt
1027    */
1028   private void handleHBCK(RegionTransition rt) {
1029     String encodedName = HRegionInfo.encodeRegionName(rt.getRegionName());
1030     LOG.info("Handling HBCK triggered transition=" + rt.getEventType() +
1031       ", server=" + rt.getServerName() + ", region=" +
1032       HRegionInfo.prettyPrint(encodedName));
1033     RegionState regionState = regionStates.getRegionTransitionState(encodedName);
1034     switch (rt.getEventType()) {
1035       case M_ZK_REGION_OFFLINE:
1036         HRegionInfo regionInfo;
1037         if (regionState != null) {
1038           regionInfo = regionState.getRegion();
1039         } else {
1040           try {
1041             byte [] name = rt.getRegionName();
1042             Pair<HRegionInfo, ServerName> p = MetaReader.getRegion(catalogTracker, name);
1043             regionInfo = p.getFirst();
1044           } catch (IOException e) {
1045             LOG.info("Exception reading META doing HBCK repair operation", e);
1046             return;
1047           }
1048         }
1049         LOG.info("HBCK repair is triggering assignment of region=" +
1050             regionInfo.getRegionNameAsString());
1051         // trigger assign, node is already in OFFLINE so don't need to update ZK
1052         assign(regionInfo, false);
1053         break;
1054 
1055       default:
1056         LOG.warn("Received unexpected region state from HBCK: " + rt.toString());
1057         break;
1058     }
1059 
1060   }
1061 
1062   // ZooKeeper events
1063 
1064   /**
1065    * New unassigned node has been created.
1066    *
1067    * <p>This happens when an RS begins the OPENING or CLOSING of a region by
1068    * creating an unassigned node.
1069    *
1070    * <p>When this happens we must:
1071    * <ol>
1072    *   <li>Watch the node for further events</li>
1073    *   <li>Read and handle the state in the node</li>
1074    * </ol>
1075    */
1076   @Override
1077   public void nodeCreated(String path) {
1078     handleAssignmentEvent(path);
1079   }
1080 
1081   /**
1082    * Existing unassigned node has had data changed.
1083    *
1084    * <p>This happens when an RS transitions from OFFLINE to OPENING, or between
1085    * OPENING/OPENED and CLOSING/CLOSED.
1086    *
1087    * <p>When this happens we must:
1088    * <ol>
1089    *   <li>Watch the node for further events</li>
1090    *   <li>Read and handle the state in the node</li>
1091    * </ol>
1092    */
1093   @Override
1094   public void nodeDataChanged(String path) {
1095     handleAssignmentEvent(path);
1096   }
1097 
1098 
1099   // We  don't want to have two events on the same region managed simultaneously.
1100   // For this reason, we need to wait if an event on the same region is currently in progress.
1101   // So we track the region names of the events in progress, and we keep a waiting list.
1102   private final Set<String> regionsInProgress = new HashSet<String>();
1103   // In a LinkedHashMultimap, the put order is kept when we retrieve the collection back. We need
1104   //  this as we want the events to be managed in the same order as we received them.
1105   private final LinkedHashMultimap <String, RegionRunnable>
1106       zkEventWorkerWaitingList = LinkedHashMultimap.create();
1107 
1108   /**
1109    * A specific runnable that works only on a region.
1110    */
1111   private static interface RegionRunnable extends Runnable{
1112     /**
1113      * @return - the name of the region it works on.
1114      */
1115     public String getRegionName();
1116   }
1117 
1118   /**
1119    * Submit a task, ensuring that there is only one task at a time that working on a given region.
1120    * Order is respected.
1121    */
1122   protected void zkEventWorkersSubmit(final RegionRunnable regRunnable) {
1123 
1124     synchronized (regionsInProgress) {
1125       // If we're there is already a task with this region, we add it to the
1126       //  waiting list and return.
1127       if (regionsInProgress.contains(regRunnable.getRegionName())) {
1128         synchronized (zkEventWorkerWaitingList){
1129           zkEventWorkerWaitingList.put(regRunnable.getRegionName(), regRunnable);
1130         }
1131         return;
1132       }
1133 
1134       // No event in progress on this region => we can submit a new task immediately.
1135       regionsInProgress.add(regRunnable.getRegionName());
1136       zkEventWorkers.submit(new Runnable() {
1137         @Override
1138         public void run() {
1139           try {
1140             regRunnable.run();
1141           } finally {
1142             // now that we have finished, let's see if there is an event for the same region in the
1143             //  waiting list. If it's the case, we can now submit it to the pool.
1144             synchronized (regionsInProgress) {
1145               regionsInProgress.remove(regRunnable.getRegionName());
1146               synchronized (zkEventWorkerWaitingList) {
1147                 java.util.Set<RegionRunnable> waiting = zkEventWorkerWaitingList.get(
1148                     regRunnable.getRegionName());
1149                 if (!waiting.isEmpty()) {
1150                   // We want the first object only. The only way to get it is through an iterator.
1151                   RegionRunnable toSubmit = waiting.iterator().next();
1152                   zkEventWorkerWaitingList.remove(toSubmit.getRegionName(), toSubmit);
1153                   zkEventWorkersSubmit(toSubmit);
1154                 }
1155               }
1156             }
1157           }
1158         }
1159       });
1160     }
1161   }
1162 
1163   @Override
1164   public void nodeDeleted(final String path) {
1165     if (path.startsWith(watcher.assignmentZNode)) {
1166       final String regionName = ZKAssign.getRegionName(watcher, path);
1167       zkEventWorkersSubmit(new RegionRunnable() {
1168         @Override
1169         public String getRegionName() {
1170           return regionName;
1171         }
1172 
1173         @Override
1174         public void run() {
1175           Lock lock = locker.acquireLock(regionName);
1176           try {
1177             RegionState rs = regionStates.getRegionTransitionState(regionName);
1178             if (rs == null) return;
1179 
1180             HRegionInfo regionInfo = rs.getRegion();
1181             if (rs.isSplit()) {
1182               LOG.debug("Ephemeral node deleted, regionserver crashed?, " +
1183                 "clearing from RIT; rs=" + rs);
1184               regionOffline(rs.getRegion());
1185             } else {
1186               String regionNameStr = regionInfo.getRegionNameAsString();
1187               LOG.debug("The znode of region " + regionNameStr
1188                 + " has been deleted.");
1189               if (rs.isOpened()) {
1190                 ServerName serverName = rs.getServerName();
1191                 regionOnline(regionInfo, serverName);
1192                 LOG.info("The master has opened the region "
1193                   + regionNameStr + " that was online on " + serverName);
1194                 boolean disabled = getZKTable().isDisablingOrDisabledTable(
1195                   regionInfo.getTableNameAsString());
1196                 if (!serverManager.isServerOnline(serverName) && !disabled) {
1197                   LOG.info("Opened region " + regionNameStr
1198                     + "but the region server is offline, reassign the region");
1199                   assign(regionInfo, true);
1200                 } else if (disabled) {
1201                   // if server is offline, no hurt to unassign again
1202                   LOG.info("Opened region " + regionNameStr
1203                     + "but this table is disabled, triggering close of region");
1204                   unassign(regionInfo);
1205                 }
1206               }
1207             }
1208           } finally {
1209             lock.unlock();
1210           }
1211         }
1212       });
1213     }
1214   }
1215 
1216   /**
1217    * New unassigned node has been created.
1218    *
1219    * <p>This happens when an RS begins the OPENING, SPLITTING or CLOSING of a
1220    * region by creating a znode.
1221    *
1222    * <p>When this happens we must:
1223    * <ol>
1224    *   <li>Watch the node for further children changed events</li>
1225    *   <li>Watch all new children for changed events</li>
1226    * </ol>
1227    */
1228   @Override
1229   public void nodeChildrenChanged(String path) {
1230     if (path.equals(watcher.assignmentZNode)) {
1231       zkEventWorkers.submit(new Runnable() {
1232         @Override
1233         public void run() {
1234           try {
1235             // Just make sure we see the changes for the new znodes
1236             List<String> children =
1237               ZKUtil.listChildrenAndWatchForNewChildren(
1238                 watcher, watcher.assignmentZNode);
1239             if (children != null) {
1240               Stat stat = new Stat();
1241               for (String child : children) {
1242                 // if region is in transition, we already have a watch
1243                 // on it, so no need to watch it again. So, as I know for now,
1244                 // this is needed to watch splitting nodes only.
1245                 if (!regionStates.isRegionInTransition(child)) {
1246                   stat.setVersion(0);
1247                   byte[] data = ZKAssign.getDataAndWatch(watcher,
1248                     ZKUtil.joinZNode(watcher.assignmentZNode, child), stat);
1249                   if (data != null && stat.getVersion() > 0) {
1250                     try {
1251                       RegionTransition rt = RegionTransition.parseFrom(data);
1252 
1253                       //See HBASE-7551, handle splitting too, in case we miss the node change event
1254                       if (rt.getEventType() == EventType.RS_ZK_REGION_SPLITTING) {
1255                         handleRegion(rt, stat.getVersion());
1256                       }
1257                     } catch (DeserializationException de) {
1258                       LOG.error("error getting data for " + child, de);
1259                     }
1260                   }
1261                 }
1262               }
1263             }
1264           } catch (KeeperException e) {
1265             server.abort("Unexpected ZK exception reading unassigned children", e);
1266           }
1267         }
1268       });
1269     }
1270   }
1271 
1272   /**
1273    * Marks the region as online.  Removes it from regions in transition and
1274    * updates the in-memory assignment information.
1275    * <p>
1276    * Used when a region has been successfully opened on a region server.
1277    * @param regionInfo
1278    * @param sn
1279    */
1280   void regionOnline(HRegionInfo regionInfo, ServerName sn) {
1281     if (!serverManager.isServerOnline(sn)) {
1282       LOG.warn("A region was opened on a dead server, ServerName=" +
1283         sn + ", region=" + regionInfo.getEncodedName());
1284     }
1285 
1286     regionStates.regionOnline(regionInfo, sn);
1287 
1288     // Remove plan if one.
1289     clearRegionPlan(regionInfo);
1290     // Add the server to serversInUpdatingTimer
1291     addToServersInUpdatingTimer(sn);
1292   }
1293 
1294   /**
1295    * Pass the assignment event to a worker for processing.
1296    * Each worker is a single thread executor service.  The reason
1297    * for just one thread is to make sure all events for a given
1298    * region are processed in order.
1299    *
1300    * @param path
1301    */
1302   private void handleAssignmentEvent(final String path) {
1303     if (path.startsWith(watcher.assignmentZNode)) {
1304       final String regionName = ZKAssign.getRegionName(watcher, path);
1305 
1306       zkEventWorkersSubmit(new RegionRunnable() {
1307         @Override
1308         public String getRegionName() {
1309           return regionName;
1310         }
1311 
1312         @Override
1313         public void run() {
1314           try {
1315             Stat stat = new Stat();
1316             byte [] data = ZKAssign.getDataAndWatch(watcher, path, stat);
1317             if (data == null) return;
1318 
1319             RegionTransition rt = RegionTransition.parseFrom(data);
1320             handleRegion(rt, stat.getVersion());
1321           } catch (KeeperException e) {
1322             server.abort("Unexpected ZK exception reading unassigned node data", e);
1323           } catch (DeserializationException e) {
1324             server.abort("Unexpected exception deserializing node data", e);
1325           }
1326         }
1327       });
1328     }
1329   }
1330 
1331   /**
1332    * Add the server to the set serversInUpdatingTimer, then {@link TimerUpdater}
1333    * will update timers for this server in background
1334    * @param sn
1335    */
1336   private void addToServersInUpdatingTimer(final ServerName sn) {
1337     if (tomActivated){
1338       this.serversInUpdatingTimer.add(sn);
1339     }
1340   }
1341 
1342   /**
1343    * Touch timers for all regions in transition that have the passed
1344    * <code>sn</code> in common.
1345    * Call this method whenever a server checks in.  Doing so helps the case where
1346    * a new regionserver has joined the cluster and its been given 1k regions to
1347    * open.  If this method is tickled every time the region reports in a
1348    * successful open then the 1k-th region won't be timed out just because its
1349    * sitting behind the open of 999 other regions.  This method is NOT used
1350    * as part of bulk assign -- there we have a different mechanism for extending
1351    * the regions in transition timer (we turn it off temporarily -- because
1352    * there is no regionplan involved when bulk assigning.
1353    * @param sn
1354    */
1355   private void updateTimers(final ServerName sn) {
1356     Preconditions.checkState(tomActivated);
1357     if (sn == null) return;
1358 
1359     // This loop could be expensive.
1360     // First make a copy of current regionPlan rather than hold sync while
1361     // looping because holding sync can cause deadlock.  Its ok in this loop
1362     // if the Map we're going against is a little stale
1363     List<Map.Entry<String, RegionPlan>> rps;
1364     synchronized(this.regionPlans) {
1365       rps = new ArrayList<Map.Entry<String, RegionPlan>>(regionPlans.entrySet());
1366     }
1367 
1368     for (Map.Entry<String, RegionPlan> e : rps) {
1369       if (e.getValue() != null && e.getKey() != null && sn.equals(e.getValue().getDestination())) {
1370         RegionState regionState = regionStates.getRegionTransitionState(e.getKey());
1371         if (regionState != null) {
1372           regionState.updateTimestampToNow();
1373         }
1374       }
1375     }
1376   }
1377 
1378   /**
1379    * Marks the region as offline.  Removes it from regions in transition and
1380    * removes in-memory assignment information.
1381    * <p>
1382    * Used when a region has been closed and should remain closed.
1383    * @param regionInfo
1384    */
1385   public void regionOffline(final HRegionInfo regionInfo) {
1386     regionStates.regionOffline(regionInfo);
1387     removeClosedRegion(regionInfo);
1388     // remove the region plan as well just in case.
1389     clearRegionPlan(regionInfo);
1390   }
1391 
1392   public void offlineDisabledRegion(HRegionInfo regionInfo) {
1393     // Disabling so should not be reassigned, just delete the CLOSED node
1394     LOG.debug("Table being disabled so deleting ZK node and removing from " +
1395         "regions in transition, skipping assignment of region " +
1396           regionInfo.getRegionNameAsString());
1397     try {
1398       if (!ZKAssign.deleteClosedNode(watcher, regionInfo.getEncodedName())) {
1399         // Could also be in OFFLINE mode
1400         ZKAssign.deleteOfflineNode(watcher, regionInfo.getEncodedName());
1401       }
1402     } catch (KeeperException.NoNodeException nne) {
1403       LOG.debug("Tried to delete closed node for " + regionInfo + " but it " +
1404           "does not exist so just offlining");
1405     } catch (KeeperException e) {
1406       this.server.abort("Error deleting CLOSED node in ZK", e);
1407     }
1408     regionOffline(regionInfo);
1409   }
1410 
1411   // Assignment methods
1412 
1413   /**
1414    * Assigns the specified region.
1415    * <p>
1416    * If a RegionPlan is available with a valid destination then it will be used
1417    * to determine what server region is assigned to.  If no RegionPlan is
1418    * available, region will be assigned to a random available server.
1419    * <p>
1420    * Updates the RegionState and sends the OPEN RPC.
1421    * <p>
1422    * This will only succeed if the region is in transition and in a CLOSED or
1423    * OFFLINE state or not in transition (in-memory not zk), and of course, the
1424    * chosen server is up and running (It may have just crashed!).  If the
1425    * in-memory checks pass, the zk node is forced to OFFLINE before assigning.
1426    *
1427    * @param region server to be assigned
1428    * @param setOfflineInZK whether ZK node should be created/transitioned to an
1429    *                       OFFLINE state before assigning the region
1430    */
1431   public void assign(HRegionInfo region, boolean setOfflineInZK) {
1432     assign(region, setOfflineInZK, false);
1433   }
1434 
1435   /**
1436    * Use care with forceNewPlan. It could cause double assignment.
1437    */
1438   public void assign(HRegionInfo region,
1439       boolean setOfflineInZK, boolean forceNewPlan) {
1440     if (!setOfflineInZK && isDisabledorDisablingRegionInRIT(region)) {
1441       return;
1442     }
1443     if (this.serverManager.isClusterShutdown()) {
1444       LOG.info("Cluster shutdown is set; skipping assign of " +
1445         region.getRegionNameAsString());
1446       return;
1447     }
1448     String encodedName = region.getEncodedName();
1449     Lock lock = locker.acquireLock(encodedName);
1450     try {
1451       RegionState state = forceRegionStateToOffline(region, forceNewPlan);
1452       if (state != null) {
1453         assign(state, setOfflineInZK, forceNewPlan);
1454       }
1455     } finally {
1456       lock.unlock();
1457     }
1458   }
1459 
1460   /**
1461    * Bulk assign regions to <code>destination</code>.
1462    * @param destination
1463    * @param regions Regions to assign.
1464    * @return true if successful
1465    */
1466   boolean assign(final ServerName destination,
1467       final List<HRegionInfo> regions) {
1468     int regionCount = regions.size();
1469     if (regionCount == 0) {
1470       return true;
1471     }
1472     LOG.debug("Bulk assigning " + regionCount + " region(s) to " +
1473       destination.toString());
1474 
1475     Set<String> encodedNames = new HashSet<String>(regionCount);
1476     for (HRegionInfo region : regions) {
1477       encodedNames.add(region.getEncodedName());
1478     }
1479 
1480     List<HRegionInfo> failedToOpenRegions = new ArrayList<HRegionInfo>();
1481     Map<String, Lock> locks = locker.acquireLocks(encodedNames);
1482     try {
1483       AtomicInteger counter = new AtomicInteger(0);
1484       Map<String, Integer> offlineNodesVersions = new ConcurrentHashMap<String, Integer>();
1485       OfflineCallback cb = new OfflineCallback(
1486         watcher, destination, counter, offlineNodesVersions);
1487       Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>(regions.size());
1488       List<RegionState> states = new ArrayList<RegionState>(regions.size());
1489       for (HRegionInfo region : regions) {
1490         String encodedRegionName = region.getEncodedName();
1491         RegionState state = forceRegionStateToOffline(region, true);
1492         if (state != null && asyncSetOfflineInZooKeeper(state, cb, destination)) {
1493           RegionPlan plan = new RegionPlan(region, state.getServerName(), destination);
1494           plans.put(encodedRegionName, plan);
1495           states.add(state);
1496         } else {
1497           LOG.warn("failed to force region state to offline or "
1498             + "failed to set it offline in ZK, will reassign later: " + region);
1499           failedToOpenRegions.add(region); // assign individually later
1500           Lock lock = locks.remove(encodedRegionName);
1501           lock.unlock();
1502         }
1503       }
1504 
1505       // Wait until all unassigned nodes have been put up and watchers set.
1506       int total = states.size();
1507       for (int oldCounter = 0; !server.isStopped();) {
1508         int count = counter.get();
1509         if (oldCounter != count) {
1510           LOG.info(destination.toString() + " unassigned znodes=" + count +
1511             " of total=" + total);
1512           oldCounter = count;
1513         }
1514         if (count >= total) break;
1515         Threads.sleep(5);
1516       }
1517 
1518       if (server.isStopped()) {
1519         return false;
1520       }
1521 
1522       // Add region plans, so we can updateTimers when one region is opened so
1523       // that unnecessary timeout on RIT is reduced.
1524       this.addPlans(plans);
1525 
1526       List<Triple<HRegionInfo, Integer, List<ServerName>>> regionOpenInfos =
1527         new ArrayList<Triple<HRegionInfo, Integer, List<ServerName>>>(states.size());
1528       for (RegionState state: states) {
1529         HRegionInfo region = state.getRegion();
1530         String encodedRegionName = region.getEncodedName();
1531         Integer nodeVersion = offlineNodesVersions.get(encodedRegionName);
1532         if (nodeVersion == null || nodeVersion == -1) {
1533           LOG.warn("failed to offline in zookeeper: " + region);
1534           failedToOpenRegions.add(region); // assign individually later
1535           Lock lock = locks.remove(encodedRegionName);
1536           lock.unlock();
1537         } else {
1538           regionStates.updateRegionState(region,
1539             RegionState.State.PENDING_OPEN, destination);
1540           List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
1541           if (this.shouldAssignRegionsWithFavoredNodes) {
1542             favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
1543           }
1544           regionOpenInfos.add(new Triple<HRegionInfo, Integer,  List<ServerName>>(
1545             region, nodeVersion, favoredNodes));
1546         }
1547       }
1548 
1549       // Move on to open regions.
1550       try {
1551         // Send OPEN RPC. If it fails on a IOE or RemoteException, the
1552         // TimeoutMonitor will pick up the pieces.
1553         long maxWaitTime = System.currentTimeMillis() +
1554           this.server.getConfiguration().
1555             getLong("hbase.regionserver.rpc.startup.waittime", 60000);
1556         for (int i = 1; i <= maximumAttempts && !server.isStopped(); i++) {
1557           try {
1558             List<RegionOpeningState> regionOpeningStateList = serverManager
1559               .sendRegionOpen(destination, regionOpenInfos);
1560             if (regionOpeningStateList == null) {
1561               // Failed getting RPC connection to this server
1562               return false;
1563             }
1564             for (int k = 0, n = regionOpeningStateList.size(); k < n; k++) {
1565               RegionOpeningState openingState = regionOpeningStateList.get(k);
1566               if (openingState != RegionOpeningState.OPENED) {
1567                 HRegionInfo region = regionOpenInfos.get(k).getFirst();
1568                 if (openingState == RegionOpeningState.ALREADY_OPENED) {
1569                   processAlreadyOpenedRegion(region, destination);
1570                 } else if (openingState == RegionOpeningState.FAILED_OPENING) {
1571                   // Failed opening this region, reassign it later
1572                   failedToOpenRegions.add(region);
1573                 } else {
1574                   LOG.warn("THIS SHOULD NOT HAPPEN: unknown opening state "
1575                     + openingState + " in assigning region " + region);
1576                 }
1577               }
1578             }
1579             break;
1580           } catch (IOException e) {
1581             if (e instanceof RemoteException) {
1582               e = ((RemoteException)e).unwrapRemoteException();
1583             }
1584             if (e instanceof RegionServerStoppedException) {
1585               LOG.warn("The region server was shut down, ", e);
1586               // No need to retry, the region server is a goner.
1587               return false;
1588             } else if (e instanceof ServerNotRunningYetException) {
1589               long now = System.currentTimeMillis();
1590               if (now < maxWaitTime) {
1591                 LOG.debug("Server is not yet up; waiting up to " +
1592                   (maxWaitTime - now) + "ms", e);
1593                 Thread.sleep(100);
1594                 i--; // reset the try count
1595                 continue;
1596               }
1597             } else if (e instanceof java.net.SocketTimeoutException
1598                 && this.serverManager.isServerOnline(destination)) {
1599               // In case socket is timed out and the region server is still online,
1600               // the openRegion RPC could have been accepted by the server and
1601               // just the response didn't go through.  So we will retry to
1602               // open the region on the same server.
1603               if (LOG.isDebugEnabled()) {
1604                 LOG.debug("Bulk assigner openRegion() to " + destination
1605                   + " has timed out, but the regions might"
1606                   + " already be opened on it.", e);
1607               }
1608               continue;
1609             }
1610             throw e;
1611           }
1612         }
1613       } catch (IOException e) {
1614         // Can be a socket timeout, EOF, NoRouteToHost, etc
1615         LOG.info("Unable to communicate with the region server in order" +
1616           " to assign regions", e);
1617         return false;
1618       } catch (InterruptedException e) {
1619         throw new RuntimeException(e);
1620       }
1621     } finally {
1622       for (Lock lock : locks.values()) {
1623         lock.unlock();
1624       }
1625     }
1626 
1627     if (!failedToOpenRegions.isEmpty()) {
1628       for (HRegionInfo region : failedToOpenRegions) {
1629         invokeAssign(region);
1630       }
1631     }
1632     LOG.debug("Bulk assigning done for " + destination.toString());
1633     return true;
1634   }
1635 
1636   /**
1637    * Send CLOSE RPC if the server is online, otherwise, offline the region.
1638    *
1639    * The RPC will be sent only to the region sever found in the region state
1640    * if it is passed in, otherwise, to the src server specified. If region
1641    * state is not specified, we don't update region state at all, instead
1642    * we just send the RPC call. This is useful for some cleanup without
1643    * messing around the region states (see handleRegion, on region opened
1644    * on an unexpected server scenario, for an example)
1645    */
1646   private void unassign(final HRegionInfo region,
1647       final RegionState state, final int versionOfClosingNode,
1648       final ServerName dest, final boolean transitionInZK,
1649       final ServerName src) {
1650     ServerName server = src;
1651     if (state != null) {
1652       server = state.getServerName();
1653     }
1654     for (int i = 1; i <= this.maximumAttempts; i++) {
1655       // ClosedRegionhandler can remove the server from this.regions
1656       if (!serverManager.isServerOnline(server)) {
1657         if (transitionInZK) {
1658           // delete the node. if no node exists need not bother.
1659           deleteClosingOrClosedNode(region);
1660         }
1661         if (state != null) {
1662           regionOffline(region);
1663         }
1664         return;
1665       }
1666       try {
1667         // Send CLOSE RPC
1668         if (serverManager.sendRegionClose(server, region,
1669           versionOfClosingNode, dest, transitionInZK)) {
1670           LOG.debug("Sent CLOSE to " + server + " for region " +
1671             region.getRegionNameAsString());
1672           return;
1673         }
1674         // This never happens. Currently regionserver close always return true.
1675         // Todo; this can now happen (0.96) if there is an exception in a coprocessor
1676         LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
1677           region.getRegionNameAsString());
1678       } catch (Throwable t) {
1679         if (t instanceof RemoteException) {
1680           t = ((RemoteException)t).unwrapRemoteException();
1681         }
1682         if (t instanceof NotServingRegionException
1683             || t instanceof RegionServerStoppedException) {
1684           if (transitionInZK) {
1685             deleteClosingOrClosedNode(region);
1686           }
1687           if (state != null) {
1688             regionOffline(region);
1689           }
1690           return;
1691         } else if (state != null
1692             && t instanceof RegionAlreadyInTransitionException) {
1693           // RS is already processing this region, only need to update the timestamp
1694           LOG.debug("update " + state + " the timestamp.");
1695           state.updateTimestampToNow();
1696         }
1697         LOG.info("Server " + server + " returned " + t + " for "
1698           + region.getRegionNameAsString() + ", try=" + i
1699           + " of " + this.maximumAttempts, t);
1700         // Presume retry or server will expire.
1701       }
1702     }
1703     // Run out of attempts
1704     if (!tomActivated && state != null) {
1705       regionStates.updateRegionState(region, RegionState.State.FAILED_CLOSE);
1706     }
1707   }
1708 
1709   /**
1710    * Set region to OFFLINE unless it is opening and forceNewPlan is false.
1711    */
1712   private RegionState forceRegionStateToOffline(
1713       final HRegionInfo region, final boolean forceNewPlan) {
1714     RegionState state = regionStates.getRegionState(region);
1715     if (state == null) {
1716       LOG.warn("Assigning a region not in region states: " + region);
1717       state = regionStates.createRegionState(region);
1718     } else {
1719       switch (state.getState()) {
1720       case OPEN:
1721       case OPENING:
1722       case PENDING_OPEN:
1723         if (!forceNewPlan) {
1724           LOG.debug("Attempting to assign region " +
1725             region + " but it is already in transition: " + state);
1726           return null;
1727         }
1728       case CLOSING:
1729       case PENDING_CLOSE:
1730       case FAILED_CLOSE:
1731         unassign(region, state, -1, null, false, null);
1732         state = regionStates.getRegionState(region);
1733         if (state.isOffline()) break;
1734       case FAILED_OPEN:
1735       case CLOSED:
1736         LOG.debug("Forcing OFFLINE; was=" + state);
1737         state = regionStates.updateRegionState(
1738           region, RegionState.State.OFFLINE);
1739       case OFFLINE:
1740         break;
1741       default:
1742         LOG.error("Trying to assign region " + region
1743           + ", which is in state " + state);
1744         return null;
1745       }
1746     }
1747     return state;
1748   }
1749 
1750   /**
1751    * Caller must hold lock on the passed <code>state</code> object.
1752    * @param state
1753    * @param setOfflineInZK
1754    * @param forceNewPlan
1755    */
1756   private void assign(RegionState state,
1757       final boolean setOfflineInZK, final boolean forceNewPlan) {
1758     RegionState currentState = state;
1759     int versionOfOfflineNode = -1;
1760     RegionPlan plan = null;
1761     long maxRegionServerStartupWaitTime = -1;
1762     HRegionInfo region = state.getRegion();
1763     RegionOpeningState regionOpenState;
1764     for (int i = 1; i <= maximumAttempts && !server.isStopped(); i++) {
1765       if (plan == null) { // Get a server for the region at first
1766         plan = getRegionPlan(region, forceNewPlan);
1767       }
1768       if (plan == null) {
1769         LOG.warn("Unable to determine a plan to assign " + region);
1770         if (tomActivated){
1771           this.timeoutMonitor.setAllRegionServersOffline(true);
1772         } else {
1773           regionStates.updateRegionState(region, RegionState.State.FAILED_OPEN);
1774         }
1775         return;
1776       }
1777       if (setOfflineInZK && versionOfOfflineNode == -1) {
1778         // get the version of the znode after setting it to OFFLINE.
1779         // versionOfOfflineNode will be -1 if the znode was not set to OFFLINE
1780         versionOfOfflineNode = setOfflineInZooKeeper(currentState, plan.getDestination());
1781         if (versionOfOfflineNode != -1) {
1782           if (isDisabledorDisablingRegionInRIT(region)) {
1783             return;
1784           }
1785           // In case of assignment from EnableTableHandler table state is ENABLING. Any how
1786           // EnableTableHandler will set ENABLED after assigning all the table regions. If we
1787           // try to set to ENABLED directly then client API may think table is enabled.
1788           // When we have a case such as all the regions are added directly into .META. and we call
1789           // assignRegion then we need to make the table ENABLED. Hence in such case the table
1790           // will not be in ENABLING or ENABLED state.
1791           String tableName = region.getTableNameAsString();
1792           if (!zkTable.isEnablingTable(tableName) && !zkTable.isEnabledTable(tableName)) {
1793             LOG.debug("Setting table " + tableName + " to ENABLED state.");
1794             setEnabledTable(tableName);
1795           }
1796         }
1797       }
1798       if (setOfflineInZK && versionOfOfflineNode == -1) {
1799         LOG.info("Unable to set offline in ZooKeeper to assign " + region);
1800         // Setting offline in ZK must have been failed due to ZK racing or some
1801         // exception which may make the server to abort. If it is ZK racing,
1802         // we should retry since we already reset the region state,
1803         // existing (re)assignment will fail anyway.
1804         if (!server.isAborted()) {
1805           continue;
1806         }
1807       }
1808       if (this.server.isStopped() || this.server.isAborted()) {
1809         LOG.debug("Server stopped/aborted; skipping assign of " + region);
1810         return;
1811       }
1812       LOG.info("Assigning region " + region.getRegionNameAsString() +
1813           " to " + plan.getDestination().toString());
1814       // Transition RegionState to PENDING_OPEN
1815       currentState = regionStates.updateRegionState(region,
1816           RegionState.State.PENDING_OPEN, plan.getDestination());
1817 
1818       boolean needNewPlan;
1819       final String assignMsg = "Failed assignment of " + region.getRegionNameAsString() +
1820           " to " + plan.getDestination();
1821       try {
1822         List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
1823         if (this.shouldAssignRegionsWithFavoredNodes) {
1824           favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
1825         }
1826         regionOpenState = serverManager.sendRegionOpen(
1827             plan.getDestination(), region, versionOfOfflineNode, favoredNodes);
1828 
1829         if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
1830           // Failed opening this region, looping again on a new server.
1831           needNewPlan = true;
1832           LOG.warn(assignMsg + ", regionserver says 'FAILED_OPENING', " +
1833               " trying to assign elsewhere instead; " +
1834               "try=" + i + " of " + this.maximumAttempts);
1835         } else {
1836           // we're done
1837           if (regionOpenState == RegionOpeningState.ALREADY_OPENED) {
1838             processAlreadyOpenedRegion(region, plan.getDestination());
1839           }
1840           return;
1841         }
1842 
1843       } catch (Throwable t) {
1844         if (t instanceof RemoteException) {
1845           t = ((RemoteException) t).unwrapRemoteException();
1846         }
1847 
1848         // Should we wait a little before retrying? If the server is starting it's yes.
1849         // If the region is already in transition, it's yes as well: we want to be sure that
1850         //  the region will get opened but we don't want a double assignment.
1851         boolean hold = (t instanceof RegionAlreadyInTransitionException ||
1852             t instanceof ServerNotRunningYetException);
1853 
1854         // In case socket is timed out and the region server is still online,
1855         // the openRegion RPC could have been accepted by the server and
1856         // just the response didn't go through.  So we will retry to
1857         // open the region on the same server to avoid possible
1858         // double assignment.
1859         boolean retry = !hold && (t instanceof java.net.SocketTimeoutException
1860             && this.serverManager.isServerOnline(plan.getDestination()));
1861 
1862 
1863         if (hold) {
1864           LOG.warn(assignMsg + ", waiting a little before trying on the same region server " +
1865               "try=" + i + " of " + this.maximumAttempts, t);
1866 
1867           if (maxRegionServerStartupWaitTime < 0) {
1868             maxRegionServerStartupWaitTime = EnvironmentEdgeManager.currentTimeMillis() +
1869                 this.server.getConfiguration().
1870                     getLong("hbase.regionserver.rpc.startup.waittime", 60000);
1871           }
1872           try {
1873             long now = EnvironmentEdgeManager.currentTimeMillis();
1874             if (now < maxRegionServerStartupWaitTime) {
1875               LOG.debug("Server is not yet up; waiting up to " +
1876                   (maxRegionServerStartupWaitTime - now) + "ms", t);
1877               Thread.sleep(100);
1878               i--; // reset the try count
1879               needNewPlan = false;
1880             } else {
1881               LOG.debug("Server is not up for a while; try a new one", t);
1882               needNewPlan = true;
1883             }
1884           } catch (InterruptedException ie) {
1885             LOG.warn("Failed to assign "
1886                 + region.getRegionNameAsString() + " since interrupted", ie);
1887             Thread.currentThread().interrupt();
1888             if (!tomActivated) {
1889               regionStates.updateRegionState(region, RegionState.State.FAILED_OPEN);
1890             }
1891             return;
1892           }
1893         } else if (retry) {
1894           needNewPlan = false;
1895           LOG.warn(assignMsg + ", trying to assign to the same region server " +
1896               "try=" + i + " of " + this.maximumAttempts, t);
1897         } else {
1898           needNewPlan = true;
1899           LOG.warn(assignMsg + ", trying to assign elsewhere instead;" +
1900               " try=" + i + " of " + this.maximumAttempts, t);
1901         }
1902       }
1903 
1904       if (i == this.maximumAttempts) {
1905         // Don't reset the region state or get a new plan any more.
1906         // This is the last try.
1907         continue;
1908       }
1909 
1910       // If region opened on destination of present plan, reassigning to new
1911       // RS may cause double assignments. In case of RegionAlreadyInTransitionException
1912       // reassigning to same RS.
1913       if (needNewPlan) {
1914         // Force a new plan and reassign. Will return null if no servers.
1915         // The new plan could be the same as the existing plan since we don't
1916         // exclude the server of the original plan, which should not be
1917         // excluded since it could be the only server up now.
1918         RegionPlan newPlan = getRegionPlan(region, true);
1919 
1920         if (newPlan == null) {
1921           if (tomActivated) {
1922             this.timeoutMonitor.setAllRegionServersOffline(true);
1923           } else {
1924             regionStates.updateRegionState(region, RegionState.State.FAILED_OPEN);
1925           }
1926           LOG.warn("Unable to find a viable location to assign region " +
1927               region.getRegionNameAsString());
1928           return;
1929         }
1930 
1931         if (plan != newPlan && !plan.getDestination().equals(newPlan.getDestination())) {
1932           // Clean out plan we failed execute and one that doesn't look like it'll
1933           // succeed anyways; we need a new plan!
1934           // Transition back to OFFLINE
1935           currentState = regionStates.updateRegionState(region, RegionState.State.OFFLINE);
1936           versionOfOfflineNode = -1;
1937           plan = newPlan;
1938         }
1939       }
1940     }
1941     // Run out of attempts
1942     if (!tomActivated) {
1943       regionStates.updateRegionState(region, RegionState.State.FAILED_OPEN);
1944     }
1945   }
1946 
1947   private void processAlreadyOpenedRegion(HRegionInfo region, ServerName sn) {
1948     // Remove region from in-memory transition and unassigned node from ZK
1949     // While trying to enable the table the regions of the table were
1950     // already enabled.
1951     LOG.debug("ALREADY_OPENED region " + region.getRegionNameAsString()
1952         + " to " + sn);
1953     String encodedRegionName = region.getEncodedName();
1954     try {
1955       ZKAssign.deleteOfflineNode(watcher, encodedRegionName);
1956     } catch (KeeperException.NoNodeException e) {
1957       if (LOG.isDebugEnabled()) {
1958         LOG.debug("The unassigned node " + encodedRegionName
1959             + " does not exist.");
1960       }
1961     } catch (KeeperException e) {
1962       server.abort(
1963           "Error deleting OFFLINED node in ZK for transition ZK node ("
1964               + encodedRegionName + ")", e);
1965     }
1966 
1967     regionStates.regionOnline(region, sn);
1968   }
1969 
1970   private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) {
1971     String tableName = region.getTableNameAsString();
1972     boolean disabled = this.zkTable.isDisabledTable(tableName);
1973     if (disabled || this.zkTable.isDisablingTable(tableName)) {
1974       LOG.info("Table " + tableName + (disabled ? " disabled;" : " disabling;") +
1975         " skipping assign of " + region.getRegionNameAsString());
1976       offlineDisabledRegion(region);
1977       return true;
1978     }
1979     return false;
1980   }
1981 
1982   /**
1983    * Set region as OFFLINED up in zookeeper
1984    *
1985    * @param state
1986    * @return the version of the offline node if setting of the OFFLINE node was
1987    *         successful, -1 otherwise.
1988    */
1989   private int setOfflineInZooKeeper(final RegionState state, final ServerName destination) {
1990     if (!state.isClosed() && !state.isOffline()) {
1991       String msg = "Unexpected state : " + state + " .. Cannot transit it to OFFLINE.";
1992       this.server.abort(msg, new IllegalStateException(msg));
1993       return -1;
1994     }
1995     regionStates.updateRegionState(state.getRegion(),
1996       RegionState.State.OFFLINE);
1997     int versionOfOfflineNode;
1998     try {
1999       // get the version after setting the znode to OFFLINE
2000       versionOfOfflineNode = ZKAssign.createOrForceNodeOffline(watcher,
2001         state.getRegion(), destination);
2002       if (versionOfOfflineNode == -1) {
2003         LOG.warn("Attempted to create/force node into OFFLINE state before "
2004             + "completing assignment but failed to do so for " + state);
2005         return -1;
2006       }
2007     } catch (KeeperException e) {
2008       server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
2009       return -1;
2010     }
2011     return versionOfOfflineNode;
2012   }
2013 
2014   /**
2015    * @param region the region to assign
2016    * @return Plan for passed <code>region</code> (If none currently, it creates one or
2017    * if no servers to assign, it returns null).
2018    */
2019   private RegionPlan getRegionPlan(final HRegionInfo region,
2020       final boolean forceNewPlan) {
2021     return getRegionPlan(region, null, forceNewPlan);
2022   }
2023 
2024   /**
2025    * @param region the region to assign
2026    * @param serverToExclude Server to exclude (we know its bad). Pass null if
2027    * all servers are thought to be assignable.
2028    * @param forceNewPlan If true, then if an existing plan exists, a new plan
2029    * will be generated.
2030    * @return Plan for passed <code>region</code> (If none currently, it creates one or
2031    * if no servers to assign, it returns null).
2032    */
2033   private RegionPlan getRegionPlan(final HRegionInfo region,
2034       final ServerName serverToExclude, final boolean forceNewPlan) {
2035     // Pickup existing plan or make a new one
2036     final String encodedName = region.getEncodedName();
2037     final List<ServerName> destServers =
2038       serverManager.createDestinationServersList(serverToExclude);
2039 
2040     if (destServers.isEmpty()){
2041       LOG.warn("Can't move the region " + encodedName +
2042         ", there is no destination server available.");
2043       return null;
2044     }
2045 
2046     RegionPlan randomPlan = null;
2047     boolean newPlan = false;
2048     RegionPlan existingPlan;
2049 
2050     synchronized (this.regionPlans) {
2051       existingPlan = this.regionPlans.get(encodedName);
2052 
2053       if (existingPlan != null && existingPlan.getDestination() != null) {
2054         LOG.debug("Found an existing plan for " + region.getRegionNameAsString()
2055           + " destination server is " + existingPlan.getDestination() +
2056             " accepted as a dest server = " + destServers.contains(existingPlan.getDestination()));
2057       }
2058 
2059       if (forceNewPlan
2060           || existingPlan == null
2061           || existingPlan.getDestination() == null
2062           || !destServers.contains(existingPlan.getDestination())) {
2063         newPlan = true;
2064         randomPlan = new RegionPlan(region, null,
2065             balancer.randomAssignment(region, destServers));
2066         if (!region.isMetaTable() && shouldAssignRegionsWithFavoredNodes) {
2067           List<HRegionInfo> regions = new ArrayList<HRegionInfo>(1);
2068           regions.add(region);
2069           try {
2070             processFavoredNodes(regions);
2071           } catch (IOException ie) {
2072             LOG.warn("Ignoring exception in processFavoredNodes " + ie);
2073           }
2074         }
2075         this.regionPlans.put(encodedName, randomPlan);
2076       }
2077     }
2078 
2079     if (newPlan) {
2080       if (randomPlan.getDestination() == null) {
2081         LOG.warn("Can't find a destination for region" + encodedName);
2082         return null;
2083       }
2084       LOG.debug("No previous transition plan was found (or we are ignoring " +
2085         "an existing plan) for " + region.getRegionNameAsString() +
2086         " so generated a random one; " + randomPlan + "; " +
2087         serverManager.countOfRegionServers() +
2088                " (online=" + serverManager.getOnlineServers().size() +
2089                ", available=" + destServers.size() + ") available servers" +
2090                ", forceNewPlan=" + forceNewPlan);
2091         return randomPlan;
2092       }
2093     LOG.debug("Using pre-existing plan for region " +
2094       region.getRegionNameAsString() + "; plan=" + existingPlan);
2095     return existingPlan;
2096   }
2097 
2098   /**
2099    * Unassign the list of regions. Configuration knobs:
2100    * hbase.bulk.waitbetween.reopen indicates the number of milliseconds to
2101    * wait before unassigning another region from this region server
2102    *
2103    * @param regions
2104    * @throws InterruptedException
2105    */
2106   public void unassign(List<HRegionInfo> regions) {
2107     int waitTime = this.server.getConfiguration().getInt(
2108         "hbase.bulk.waitbetween.reopen", 0);
2109     for (HRegionInfo region : regions) {
2110       if (regionStates.isRegionInTransition(region))
2111         continue;
2112       unassign(region, false);
2113       while (regionStates.isRegionInTransition(region)) {
2114         try {
2115           Thread.sleep(10);
2116         } catch (InterruptedException e) {
2117           // Do nothing, continue
2118         }
2119       }
2120       if (waitTime > 0)
2121         try {
2122           Thread.sleep(waitTime);
2123         } catch (InterruptedException e) {
2124           // Do nothing, continue
2125         }
2126     }
2127   }
2128 
2129   /**
2130    * Unassigns the specified region.
2131    * <p>
2132    * Updates the RegionState and sends the CLOSE RPC unless region is being
2133    * split by regionserver; then the unassign fails (silently) because we
2134    * presume the region being unassigned no longer exists (its been split out
2135    * of existence). TODO: What to do if split fails and is rolled back and
2136    * parent is revivified?
2137    * <p>
2138    * If a RegionPlan is already set, it will remain.
2139    *
2140    * @param region server to be unassigned
2141    */
2142   public void unassign(HRegionInfo region) {
2143     unassign(region, false);
2144   }
2145 
2146 
2147   /**
2148    * Unassigns the specified region.
2149    * <p>
2150    * Updates the RegionState and sends the CLOSE RPC unless region is being
2151    * split by regionserver; then the unassign fails (silently) because we
2152    * presume the region being unassigned no longer exists (its been split out
2153    * of existence). TODO: What to do if split fails and is rolled back and
2154    * parent is revivified?
2155    * <p>
2156    * If a RegionPlan is already set, it will remain.
2157    *
2158    * @param region server to be unassigned
2159    * @param force if region should be closed even if already closing
2160    */
2161   public void unassign(HRegionInfo region, boolean force, ServerName dest) {
2162     // TODO: Method needs refactoring.  Ugly buried returns throughout.  Beware!
2163     LOG.debug("Starting unassignment of region " +
2164       region.getRegionNameAsString() + " (offlining)");
2165 
2166     String encodedName = region.getEncodedName();
2167     // Grab the state of this region and synchronize on it
2168     int versionOfClosingNode = -1;
2169     // We need a lock here as we're going to do a put later and we don't want multiple states
2170     //  creation
2171     ReentrantLock lock = locker.acquireLock(encodedName);
2172     RegionState state = regionStates.getRegionTransitionState(encodedName);
2173     try {
2174       if (state == null) {
2175         // Create the znode in CLOSING state
2176         try {
2177           state = regionStates.getRegionState(region);
2178           if (state == null || state.getServerName() == null) {
2179             // We don't know where the region is, offline it.
2180             // No need to send CLOSE RPC
2181             regionOffline(region);
2182             return;
2183           }
2184           versionOfClosingNode = ZKAssign.createNodeClosing(
2185             watcher, region, state.getServerName());
2186           if (versionOfClosingNode == -1) {
2187             LOG.debug("Attempting to unassign region " +
2188                 region.getRegionNameAsString() + " but ZK closing node "
2189                 + "can't be created.");
2190             return;
2191           }
2192         } catch (KeeperException e) {
2193           if (e instanceof NodeExistsException) {
2194             // Handle race between master initiated close and regionserver
2195             // orchestrated splitting. See if existing node is in a
2196             // SPLITTING or SPLIT state.  If so, the regionserver started
2197             // an op on node before we could get our CLOSING in.  Deal.
2198             NodeExistsException nee = (NodeExistsException)e;
2199             String path = nee.getPath();
2200             try {
2201               if (isSplitOrSplittingOrMergeOrMerging(path)) {
2202                 LOG.debug(path + " is SPLIT or SPLITTING or MERGE or MERGING; " +
2203                   "skipping unassign because region no longer exists -- its split or merge");
2204                 return;
2205               }
2206             } catch (KeeperException.NoNodeException ke) {
2207               LOG.warn("Failed getData on SPLITTING/SPLIT at " + path +
2208                 "; presuming split and that the region to unassign, " +
2209                 encodedName + ", no longer exists -- confirm", ke);
2210               return;
2211             } catch (KeeperException ke) {
2212               LOG.error("Unexpected zk state", ke);
2213             } catch (DeserializationException de) {
2214               LOG.error("Failed parse", de);
2215             }
2216           }
2217           // If we get here, don't understand whats going on -- abort.
2218           server.abort("Unexpected ZK exception creating node CLOSING", e);
2219           return;
2220         }
2221         state = regionStates.updateRegionState(region, RegionState.State.PENDING_CLOSE);
2222       } else if (state.isFailedOpen()) {
2223         // The region is not open yet
2224         regionOffline(region);
2225         return;
2226       } else if (force && (state.isPendingClose()
2227           || state.isClosing() || state.isFailedClose())) {
2228         LOG.debug("Attempting to unassign region " + region.getRegionNameAsString() +
2229           " which is already " + state.getState()  +
2230           " but forcing to send a CLOSE RPC again ");
2231         if (state.isFailedClose()) {
2232           state = regionStates.updateRegionState(region, RegionState.State.PENDING_CLOSE);
2233         }
2234         state.updateTimestampToNow();
2235       } else {
2236         LOG.debug("Attempting to unassign region " +
2237           region.getRegionNameAsString() + " but it is " +
2238           "already in transition (" + state.getState() + ", force=" + force + ")");
2239         return;
2240       }
2241 
2242       unassign(region, state, versionOfClosingNode, dest, true, null);
2243     } finally {
2244       lock.unlock();
2245     }
2246   }
2247 
2248   public void unassign(HRegionInfo region, boolean force){
2249      unassign(region, force, null);
2250   }
2251 
2252   /**
2253    * @param region regioninfo of znode to be deleted.
2254    */
2255   public void deleteClosingOrClosedNode(HRegionInfo region) {
2256     String encodedName = region.getEncodedName();
2257     try {
2258       if (!ZKAssign.deleteNode(watcher, encodedName,
2259           EventType.M_ZK_REGION_CLOSING)) {
2260         boolean deleteNode = ZKAssign.deleteNode(watcher,
2261           encodedName, EventType.RS_ZK_REGION_CLOSED);
2262         // TODO : We don't abort if the delete node returns false. Is there any
2263         // such corner case?
2264         if (!deleteNode) {
2265           LOG.error("The deletion of the CLOSED node for the region "
2266             + encodedName + " returned " + deleteNode);
2267         }
2268       }
2269     } catch (NoNodeException e) {
2270       LOG.debug("CLOSING/CLOSED node for the region " + encodedName
2271         + " already deleted");
2272     } catch (KeeperException ke) {
2273       server.abort(
2274         "Unexpected ZK exception deleting node CLOSING/CLOSED for the region "
2275           + encodedName, ke);
2276     }
2277   }
2278 
2279   /**
2280    * @param path
2281    * @return True if znode is in SPLIT or SPLITTING or MERGE or MERGING state.
2282    * @throws KeeperException Can happen if the znode went away in meantime.
2283    * @throws DeserializationException
2284    */
2285   private boolean isSplitOrSplittingOrMergeOrMerging(final String path)
2286       throws KeeperException, DeserializationException {
2287     boolean result = false;
2288     // This may fail if the SPLIT or SPLITTING or MERGE or MERGING znode gets
2289     // cleaned up before we can get data from it.
2290     byte [] data = ZKAssign.getData(watcher, path);
2291     if (data == null) return false;
2292     RegionTransition rt = RegionTransition.parseFrom(data);
2293     switch (rt.getEventType()) {
2294     case RS_ZK_REGION_SPLIT:
2295     case RS_ZK_REGION_SPLITTING:
2296     case RS_ZK_REGION_MERGE:
2297     case RS_ZK_REGION_MERGING:
2298       result = true;
2299       break;
2300     default:
2301       break;
2302     }
2303     return result;
2304   }
2305 
2306   /**
2307    * Waits until the specified region has completed assignment.
2308    * <p>
2309    * If the region is already assigned, returns immediately.  Otherwise, method
2310    * blocks until the region is assigned.
2311    * @param regionInfo region to wait on assignment for
2312    * @throws InterruptedException
2313    */
2314   public boolean waitForAssignment(HRegionInfo regionInfo)
2315       throws InterruptedException {
2316     while (!regionStates.isRegionAssigned(regionInfo)) {
2317       if (regionStates.isRegionFailedToOpen(regionInfo)
2318           || this.server.isStopped()) {
2319         return false;
2320       }
2321 
2322       // We should receive a notification, but it's
2323       //  better to have a timeout to recheck the condition here:
2324       //  it lowers the impact of a race condition if any
2325       regionStates.waitForUpdate(100);
2326     }
2327     return true;
2328   }
2329 
2330   /**
2331    * Assigns the META region.
2332    * <p>
2333    * Assumes that META is currently closed and is not being actively served by
2334    * any RegionServer.
2335    * <p>
2336    * Forcibly unsets the current meta region location in ZooKeeper and assigns
2337    * META to a random RegionServer.
2338    * @throws KeeperException
2339    */
2340   public void assignMeta() throws KeeperException {
2341     MetaRegionTracker.deleteMetaLocation(this.watcher);
2342     assign(HRegionInfo.FIRST_META_REGIONINFO, true);
2343   }
2344 
2345   /**
2346    * Assigns specified regions retaining assignments, if any.
2347    * <p>
2348    * This is a synchronous call and will return once every region has been
2349    * assigned.  If anything fails, an exception is thrown
2350    * @throws InterruptedException
2351    * @throws IOException
2352    */
2353   public void assign(Map<HRegionInfo, ServerName> regions)
2354         throws IOException, InterruptedException {
2355     if (regions == null || regions.isEmpty()) {
2356       return;
2357     }
2358     List<ServerName> servers = serverManager.createDestinationServersList();
2359     if (servers == null || servers.isEmpty()) {
2360       throw new IOException("Found no destination server to assign region(s)");
2361     }
2362 
2363     // Reuse existing assignment info
2364     Map<ServerName, List<HRegionInfo>> bulkPlan =
2365       balancer.retainAssignment(regions, servers);
2366 
2367     assign(regions.size(), servers.size(),
2368       "retainAssignment=true", bulkPlan);
2369   }
2370 
2371   /**
2372    * Assigns specified regions round robin, if any.
2373    * <p>
2374    * This is a synchronous call and will return once every region has been
2375    * assigned.  If anything fails, an exception is thrown
2376    * @throws InterruptedException
2377    * @throws IOException
2378    */
2379   public void assign(List<HRegionInfo> regions)
2380         throws IOException, InterruptedException {
2381     if (regions == null || regions.isEmpty()) {
2382       return;
2383     }
2384 
2385     List<ServerName> servers = serverManager.createDestinationServersList();
2386     if (servers == null || servers.isEmpty()) {
2387       throw new IOException("Found no destination server to assign region(s)");
2388     }
2389 
2390     // Generate a round-robin bulk assignment plan
2391     Map<ServerName, List<HRegionInfo>> bulkPlan
2392       = balancer.roundRobinAssignment(regions, servers);
2393     processFavoredNodes(regions);
2394 
2395     assign(regions.size(), servers.size(),
2396       "round-robin=true", bulkPlan);
2397   }
2398 
2399   private void assign(int regions, int totalServers,
2400       String message, Map<ServerName, List<HRegionInfo>> bulkPlan)
2401           throws InterruptedException, IOException {
2402 
2403     int servers = bulkPlan.size();
2404     if (servers == 1 || (regions < bulkAssignThresholdRegions
2405         && servers < bulkAssignThresholdServers)) {
2406 
2407       // Not use bulk assignment.  This could be more efficient in small
2408       // cluster, especially mini cluster for testing, so that tests won't time out
2409       LOG.info("Not use bulk assigning since we are assigning only "
2410         + regions + " region(s) to " + servers + " server(s)");
2411 
2412       for (Map.Entry<ServerName, List<HRegionInfo>> plan: bulkPlan.entrySet()) {
2413         assign(plan.getKey(), plan.getValue());
2414       }
2415     } else {
2416       LOG.info("Bulk assigning " + regions + " region(s) across "
2417         + totalServers + " server(s), " + message);
2418 
2419       // Use fixed count thread pool assigning.
2420       BulkAssigner ba = new GeneralBulkAssigner(
2421         this.server, bulkPlan, this, bulkAssignWaitTillAllAssigned);
2422       ba.bulkAssign();
2423       LOG.info("Bulk assigning done");
2424     }
2425   }
2426 
2427   /**
2428    * Assigns all user regions, if any exist.  Used during cluster startup.
2429    * <p>
2430    * This is a synchronous call and will return once every region has been
2431    * assigned.  If anything fails, an exception is thrown and the cluster
2432    * should be shutdown.
2433    * @throws InterruptedException
2434    * @throws IOException
2435    * @throws KeeperException
2436    */
2437   private void assignAllUserRegions()
2438       throws IOException, InterruptedException, KeeperException {
2439     // Cleanup any existing ZK nodes and start watching
2440     ZKAssign.deleteAllNodes(watcher);
2441     ZKUtil.listChildrenAndWatchForNewChildren(this.watcher,
2442       this.watcher.assignmentZNode);
2443     failoverCleanupDone();
2444 
2445     // Skip assignment for regions of tables in DISABLING state because during clean cluster startup
2446     // no RS is alive and regions map also doesn't have any information about the regions.
2447     // See HBASE-6281.
2448     Set<String> disabledOrDisablingOrEnabling = ZKTable.getDisabledOrDisablingTables(watcher);
2449     disabledOrDisablingOrEnabling.addAll(ZKTable.getEnablingTables(watcher));
2450     // Scan META for all user regions, skipping any disabled tables
2451     Map<HRegionInfo, ServerName> allRegions = null;
2452     if (this.shouldAssignRegionsWithFavoredNodes) {
2453       allRegions = FavoredNodeAssignmentHelper.fullScan(
2454         catalogTracker, disabledOrDisablingOrEnabling, true, (FavoredNodeLoadBalancer)balancer);
2455     } else {
2456       allRegions = MetaReader.fullScan(
2457         catalogTracker, disabledOrDisablingOrEnabling, true);
2458     }
2459     if (allRegions == null || allRegions.isEmpty()) return;
2460 
2461     // Determine what type of assignment to do on startup
2462     boolean retainAssignment = server.getConfiguration().
2463       getBoolean("hbase.master.startup.retainassign", true);
2464 
2465     if (retainAssignment) {
2466       assign(allRegions);
2467     } else {
2468       List<HRegionInfo> regions = new ArrayList<HRegionInfo>(allRegions.keySet());
2469       assign(regions);
2470     }
2471 
2472     for (HRegionInfo hri : allRegions.keySet()) {
2473       String tableName = hri.getTableNameAsString();
2474       if (!zkTable.isEnabledTable(tableName)) {
2475         setEnabledTable(tableName);
2476       }
2477     }
2478   }
2479 
2480   /**
2481    * Wait until no regions in transition.
2482    * @param timeout How long to wait.
2483    * @return True if nothing in regions in transition.
2484    * @throws InterruptedException
2485    */
2486   boolean waitUntilNoRegionsInTransition(final long timeout)
2487       throws InterruptedException {
2488     // Blocks until there are no regions in transition. It is possible that
2489     // there
2490     // are regions in transition immediately after this returns but guarantees
2491     // that if it returns without an exception that there was a period of time
2492     // with no regions in transition from the point-of-view of the in-memory
2493     // state of the Master.
2494     final long endTime = System.currentTimeMillis() + timeout;
2495 
2496     while (!this.server.isStopped() && regionStates.isRegionsInTransition()
2497         && endTime > System.currentTimeMillis()) {
2498       regionStates.waitForUpdate(100);
2499     }
2500 
2501     return !regionStates.isRegionsInTransition();
2502   }
2503 
2504   /**
2505    * Rebuild the list of user regions and assignment information.
2506    * <p>
2507    * Returns a map of servers that are not found to be online and the regions
2508    * they were hosting.
2509    * @return map of servers not online to their assigned regions, as stored
2510    *         in META
2511    * @throws IOException
2512    */
2513   Map<ServerName, List<HRegionInfo>> rebuildUserRegions() throws IOException, KeeperException {
2514     Set<String> enablingTables = ZKTable.getEnablingTables(watcher);
2515     Set<String> disabledOrEnablingTables = ZKTable.getDisabledTables(watcher);
2516     disabledOrEnablingTables.addAll(enablingTables);
2517     Set<String> disabledOrDisablingOrEnabling = ZKTable.getDisablingTables(watcher);
2518     disabledOrDisablingOrEnabling.addAll(disabledOrEnablingTables);
2519 
2520     // Region assignment from META
2521     List<Result> results = MetaReader.fullScan(this.catalogTracker);
2522     // Get any new but slow to checkin region server that joined the cluster
2523     Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
2524     // Map of offline servers and their regions to be returned
2525     Map<ServerName, List<HRegionInfo>> offlineServers =
2526       new TreeMap<ServerName, List<HRegionInfo>>();
2527     // Iterate regions in META
2528     for (Result result : results) {
2529       Pair<HRegionInfo, ServerName> region = HRegionInfo.getHRegionInfoAndServerName(result);
2530       if (region == null) continue;
2531       HRegionInfo regionInfo = region.getFirst();
2532       ServerName regionLocation = region.getSecond();
2533       if (regionInfo == null) continue;
2534       regionStates.createRegionState(regionInfo);
2535       String tableName = regionInfo.getTableNameAsString();
2536       if (regionLocation == null) {
2537         // regionLocation could be null if createTable didn't finish properly.
2538         // When createTable is in progress, HMaster restarts.
2539         // Some regions have been added to .META., but have not been assigned.
2540         // When this happens, the region's table must be in ENABLING state.
2541         // It can't be in ENABLED state as that is set when all regions are
2542         // assigned.
2543         // It can't be in DISABLING state, because DISABLING state transitions
2544         // from ENABLED state when application calls disableTable.
2545         // It can't be in DISABLED state, because DISABLED states transitions
2546         // from DISABLING state.
2547         if (!enablingTables.contains(tableName)) {
2548           LOG.warn("Region " + regionInfo.getEncodedName() +
2549             " has null regionLocation." + " But its table " + tableName +
2550             " isn't in ENABLING state.");
2551         }
2552       } else if (!onlineServers.contains(regionLocation)) {
2553         // Region is located on a server that isn't online
2554         List<HRegionInfo> offlineRegions = offlineServers.get(regionLocation);
2555         if (offlineRegions == null) {
2556           offlineRegions = new ArrayList<HRegionInfo>(1);
2557           offlineServers.put(regionLocation, offlineRegions);
2558         }
2559         offlineRegions.add(regionInfo);
2560         // need to enable the table if not disabled or disabling or enabling
2561         // this will be used in rolling restarts
2562         if (!disabledOrDisablingOrEnabling.contains(tableName)
2563             && !getZKTable().isEnabledTable(tableName)) {
2564           setEnabledTable(tableName);
2565         }
2566       } else {
2567         // If region is in offline and split state check the ZKNode
2568         if (regionInfo.isOffline() && regionInfo.isSplit()) {
2569           String node = ZKAssign.getNodeName(this.watcher, regionInfo
2570               .getEncodedName());
2571           Stat stat = new Stat();
2572           byte[] data = ZKUtil.getDataNoWatch(this.watcher, node, stat);
2573           // If znode does not exist, don't consider this region
2574           if (data == null) {
2575             LOG.debug("Region "	+  regionInfo.getRegionNameAsString()
2576                + " split is completed. Hence need not add to regions list");
2577             continue;
2578           }
2579         }
2580         // Region is being served and on an active server
2581         // add only if region not in disabled or enabling table
2582         if (!disabledOrEnablingTables.contains(tableName)) {
2583           regionStates.regionOnline(regionInfo, regionLocation);
2584         }
2585         // need to enable the table if not disabled or disabling or enabling
2586         // this will be used in rolling restarts
2587         if (!disabledOrDisablingOrEnabling.contains(tableName)
2588             && !getZKTable().isEnabledTable(tableName)) {
2589           setEnabledTable(tableName);
2590         }
2591       }
2592     }
2593     return offlineServers;
2594   }
2595 
2596   /**
2597    * Recover the tables that were not fully moved to DISABLED state. These
2598    * tables are in DISABLING state when the master restarted/switched.
2599    *
2600    * @throws KeeperException
2601    * @throws TableNotFoundException
2602    * @throws IOException
2603    */
2604   private void recoverTableInDisablingState()
2605       throws KeeperException, TableNotFoundException, IOException {
2606     Set<String> disablingTables = ZKTable.getDisablingTables(watcher);
2607     if (disablingTables.size() != 0) {
2608       for (String tableName : disablingTables) {
2609         // Recover by calling DisableTableHandler
2610         LOG.info("The table " + tableName
2611             + " is in DISABLING state.  Hence recovering by moving the table"
2612             + " to DISABLED state.");
2613         new DisableTableHandler(this.server, tableName.getBytes(), catalogTracker,
2614             this, tableLockManager, true).prepare().process();
2615       }
2616     }
2617   }
2618 
2619   /**
2620    * Recover the tables that are not fully moved to ENABLED state. These tables
2621    * are in ENABLING state when the master restarted/switched
2622    *
2623    * @throws KeeperException
2624    * @throws org.apache.hadoop.hbase.exceptions.TableNotFoundException
2625    * @throws IOException
2626    */
2627   private void recoverTableInEnablingState()
2628       throws KeeperException, TableNotFoundException, IOException {
2629     Set<String> enablingTables = ZKTable.getEnablingTables(watcher);
2630     if (enablingTables.size() != 0) {
2631       for (String tableName : enablingTables) {
2632         // Recover by calling EnableTableHandler
2633         LOG.info("The table " + tableName
2634             + " is in ENABLING state.  Hence recovering by moving the table"
2635             + " to ENABLED state.");
2636         // enableTable in sync way during master startup,
2637         // no need to invoke coprocessor
2638         new EnableTableHandler(this.server, tableName.getBytes(),
2639             catalogTracker, this, tableLockManager, true).prepare().process();
2640       }
2641     }
2642   }
2643 
2644   /**
2645    * Processes list of dead servers from result of META scan and regions in RIT
2646    * <p>
2647    * This is used for failover to recover the lost regions that belonged to
2648    * RegionServers which failed while there was no active master or regions
2649    * that were in RIT.
2650    * <p>
2651    *
2652    *
2653    * @param deadServers
2654    *          The list of dead servers which failed while there was no active
2655    *          master. Can be null.
2656    * @throws IOException
2657    * @throws KeeperException
2658    */
2659   private void processDeadServersAndRecoverLostRegions(
2660       Map<ServerName, List<HRegionInfo>> deadServers)
2661           throws IOException, KeeperException {
2662     if (deadServers != null) {
2663       for (Map.Entry<ServerName, List<HRegionInfo>> server: deadServers.entrySet()) {
2664         ServerName serverName = server.getKey();
2665         if (!serverManager.isServerDead(serverName)) {
2666           serverManager.expireServer(serverName); // Let SSH do region re-assign
2667         }
2668       }
2669     }
2670     List<String> nodes = ZKUtil.listChildrenAndWatchForNewChildren(
2671       this.watcher, this.watcher.assignmentZNode);
2672     if (!nodes.isEmpty()) {
2673       for (String encodedRegionName : nodes) {
2674         processRegionInTransition(encodedRegionName, null);
2675       }
2676     }
2677 
2678     // Now we can safely claim failover cleanup completed and enable
2679     // ServerShutdownHandler for further processing. The nodes (below)
2680     // in transition, if any, are for regions not related to those
2681     // dead servers at all, and can be done in parallel to SSH.
2682     failoverCleanupDone();
2683   }
2684 
2685   /**
2686    * Set Regions in transitions metrics.
2687    * This takes an iterator on the RegionInTransition map (CLSM), and is not synchronized.
2688    * This iterator is not fail fast, which may lead to stale read; but that's better than
2689    * creating a copy of the map for metrics computation, as this method will be invoked
2690    * on a frequent interval.
2691    */
2692   public void updateRegionsInTransitionMetrics() {
2693     long currentTime = System.currentTimeMillis();
2694     int totalRITs = 0;
2695     int totalRITsOverThreshold = 0;
2696     long oldestRITTime = 0;
2697     int ritThreshold = this.server.getConfiguration().
2698       getInt(HConstants.METRICS_RIT_STUCK_WARNING_THRESHOLD, 60000);
2699     for (RegionState state: regionStates.getRegionsInTransition().values()) {
2700       totalRITs++;
2701       long ritTime = currentTime - state.getStamp();
2702       if (ritTime > ritThreshold) { // more than the threshold
2703         totalRITsOverThreshold++;
2704       }
2705       if (oldestRITTime < ritTime) {
2706         oldestRITTime = ritTime;
2707       }
2708     }
2709     if (this.metricsMaster != null) {
2710       this.metricsMaster.updateRITOldestAge(oldestRITTime);
2711       this.metricsMaster.updateRITCount(totalRITs);
2712       this.metricsMaster.updateRITCountOverThreshold(totalRITsOverThreshold);
2713     }
2714   }
2715 
2716   /**
2717    * @param region Region whose plan we are to clear.
2718    */
2719   void clearRegionPlan(final HRegionInfo region) {
2720     synchronized (this.regionPlans) {
2721       this.regionPlans.remove(region.getEncodedName());
2722     }
2723   }
2724 
2725   /**
2726    * Wait on region to clear regions-in-transition.
2727    * @param hri Region to wait on.
2728    * @throws IOException
2729    */
2730   public void waitOnRegionToClearRegionsInTransition(final HRegionInfo hri)
2731       throws IOException, InterruptedException {
2732     waitOnRegionToClearRegionsInTransition(hri, -1L);
2733   }
2734 
2735   /**
2736    * Wait on region to clear regions-in-transition or time out
2737    * @param hri
2738    * @param timeOut Milliseconds to wait for current region to be out of transition state.
2739    * @return True when a region clears regions-in-transition before timeout otherwise false
2740    * @throws IOException
2741    * @throws InterruptedException
2742    */
2743   public boolean waitOnRegionToClearRegionsInTransition(final HRegionInfo hri, long timeOut)
2744       throws IOException, InterruptedException {
2745     if (!regionStates.isRegionInTransition(hri)) return true;
2746     RegionState rs = null;
2747     long end = (timeOut <= 0) ? Long.MAX_VALUE : EnvironmentEdgeManager.currentTimeMillis()
2748         + timeOut;
2749     // There is already a timeout monitor on regions in transition so I
2750     // should not have to have one here too?
2751     LOG.info("Waiting on " + rs + " to clear regions-in-transition");
2752     while (!this.server.isStopped() && regionStates.isRegionInTransition(hri)) {
2753       regionStates.waitForUpdate(100);
2754       if (EnvironmentEdgeManager.currentTimeMillis() > end) {
2755         LOG.info("Timed out on waiting for region:" + hri.getEncodedName() + " to be assigned.");
2756         return false;
2757       }
2758     }
2759     if (this.server.isStopped()) {
2760       LOG.info("Giving up wait on regions in transition because stoppable.isStopped is set");
2761       return false;
2762     }
2763     return true;
2764   }
2765 
2766   /**
2767    * Update timers for all regions in transition going against the server in the
2768    * serversInUpdatingTimer.
2769    */
2770   public class TimerUpdater extends Chore {
2771 
2772     public TimerUpdater(final int period, final Stoppable stopper) {
2773       super("AssignmentTimerUpdater", period, stopper);
2774     }
2775 
2776     @Override
2777     protected void chore() {
2778       Preconditions.checkState(tomActivated);
2779       ServerName serverToUpdateTimer = null;
2780       while (!serversInUpdatingTimer.isEmpty() && !stopper.isStopped()) {
2781         if (serverToUpdateTimer == null) {
2782           serverToUpdateTimer = serversInUpdatingTimer.first();
2783         } else {
2784           serverToUpdateTimer = serversInUpdatingTimer
2785               .higher(serverToUpdateTimer);
2786         }
2787         if (serverToUpdateTimer == null) {
2788           break;
2789         }
2790         updateTimers(serverToUpdateTimer);
2791         serversInUpdatingTimer.remove(serverToUpdateTimer);
2792       }
2793     }
2794   }
2795 
2796   /**
2797    * Monitor to check for time outs on region transition operations
2798    */
2799   public class TimeoutMonitor extends Chore {
2800     private boolean allRegionServersOffline = false;
2801     private ServerManager serverManager;
2802     private final int timeout;
2803 
2804     /**
2805      * Creates a periodic monitor to check for time outs on region transition
2806      * operations.  This will deal with retries if for some reason something
2807      * doesn't happen within the specified timeout.
2808      * @param period
2809    * @param stopper When {@link Stoppable#isStopped()} is true, this thread will
2810    * cleanup and exit cleanly.
2811      * @param timeout
2812      */
2813     public TimeoutMonitor(final int period, final Stoppable stopper,
2814         ServerManager serverManager,
2815         final int timeout) {
2816       super("AssignmentTimeoutMonitor", period, stopper);
2817       this.timeout = timeout;
2818       this.serverManager = serverManager;
2819     }
2820 
2821     private synchronized void setAllRegionServersOffline(
2822       boolean allRegionServersOffline) {
2823       this.allRegionServersOffline = allRegionServersOffline;
2824     }
2825 
2826     @Override
2827     protected void chore() {
2828       Preconditions.checkState(tomActivated);
2829       boolean noRSAvailable = this.serverManager.createDestinationServersList().isEmpty();
2830 
2831       // Iterate all regions in transition checking for time outs
2832       long now = System.currentTimeMillis();
2833       // no lock concurrent access ok: we will be working on a copy, and it's java-valid to do
2834       //  a copy while another thread is adding/removing items
2835       for (String regionName : regionStates.getRegionsInTransition().keySet()) {
2836         RegionState regionState = regionStates.getRegionTransitionState(regionName);
2837         if (regionState == null) continue;
2838 
2839         if (regionState.getStamp() + timeout <= now) {
2840           // decide on action upon timeout
2841           actOnTimeOut(regionState);
2842         } else if (this.allRegionServersOffline && !noRSAvailable) {
2843           RegionPlan existingPlan = regionPlans.get(regionName);
2844           if (existingPlan == null
2845               || !this.serverManager.isServerOnline(existingPlan
2846                   .getDestination())) {
2847             // if some RSs just came back online, we can start the assignment
2848             // right away
2849             actOnTimeOut(regionState);
2850           }
2851         }
2852       }
2853       setAllRegionServersOffline(noRSAvailable);
2854     }
2855 
2856     private void actOnTimeOut(RegionState regionState) {
2857       HRegionInfo regionInfo = regionState.getRegion();
2858       LOG.info("Regions in transition timed out:  " + regionState);
2859       // Expired! Do a retry.
2860       switch (regionState.getState()) {
2861       case CLOSED:
2862         LOG.info("Region " + regionInfo.getEncodedName()
2863             + " has been CLOSED for too long, waiting on queued "
2864             + "ClosedRegionHandler to run or server shutdown");
2865         // Update our timestamp.
2866         regionState.updateTimestampToNow();
2867         break;
2868       case OFFLINE:
2869         LOG.info("Region has been OFFLINE for too long, " + "reassigning "
2870             + regionInfo.getRegionNameAsString() + " to a random server");
2871         invokeAssign(regionInfo);
2872         break;
2873       case PENDING_OPEN:
2874         LOG.info("Region has been PENDING_OPEN for too "
2875             + "long, reassigning region=" + regionInfo.getRegionNameAsString());
2876         invokeAssign(regionInfo);
2877         break;
2878       case OPENING:
2879         processOpeningState(regionInfo);
2880         break;
2881       case OPEN:
2882         LOG.error("Region has been OPEN for too long, " +
2883             "we don't know where region was opened so can't do anything");
2884         regionState.updateTimestampToNow();
2885         break;
2886 
2887       case PENDING_CLOSE:
2888         LOG.info("Region has been PENDING_CLOSE for too "
2889             + "long, running forced unassign again on region="
2890             + regionInfo.getRegionNameAsString());
2891         invokeUnassign(regionInfo);
2892         break;
2893       case CLOSING:
2894         LOG.info("Region has been CLOSING for too " +
2895           "long, this should eventually complete or the server will " +
2896           "expire, send RPC again");
2897         invokeUnassign(regionInfo);
2898         break;
2899 
2900       case SPLIT:
2901       case SPLITTING:
2902       case FAILED_OPEN:
2903       case FAILED_CLOSE:
2904         break;
2905 
2906       default:
2907         throw new IllegalStateException("Received event is not valid.");
2908       }
2909     }
2910   }
2911 
2912   private void processOpeningState(HRegionInfo regionInfo) {
2913     LOG.info("Region has been OPENING for too long, reassigning region="
2914         + regionInfo.getRegionNameAsString());
2915     // Should have a ZK node in OPENING state
2916     try {
2917       String node = ZKAssign.getNodeName(watcher, regionInfo.getEncodedName());
2918       Stat stat = new Stat();
2919       byte [] data = ZKAssign.getDataNoWatch(watcher, node, stat);
2920       if (data == null) {
2921         LOG.warn("Data is null, node " + node + " no longer exists");
2922         return;
2923       }
2924       RegionTransition rt = RegionTransition.parseFrom(data);
2925       EventType et = rt.getEventType();
2926       if (et == EventType.RS_ZK_REGION_OPENED) {
2927         LOG.debug("Region has transitioned to OPENED, allowing "
2928             + "watched event handlers to process");
2929         return;
2930       } else if (et != EventType.RS_ZK_REGION_OPENING && et != EventType.RS_ZK_REGION_FAILED_OPEN ) {
2931         LOG.warn("While timing out a region, found ZK node in unexpected state: " + et);
2932         return;
2933       }
2934       invokeAssign(regionInfo);
2935     } catch (KeeperException ke) {
2936       LOG.error("Unexpected ZK exception timing out CLOSING region", ke);
2937     } catch (DeserializationException e) {
2938       LOG.error("Unexpected exception parsing CLOSING region", e);
2939     }
2940   }
2941 
2942   void invokeAssign(HRegionInfo regionInfo) {
2943     threadPoolExecutorService.submit(new AssignCallable(this, regionInfo));
2944   }
2945 
2946   private void invokeUnassign(HRegionInfo regionInfo) {
2947     threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
2948   }
2949 
2950   public boolean isCarryingMeta(ServerName serverName) {
2951     return isCarryingRegion(serverName, HRegionInfo.FIRST_META_REGIONINFO);
2952   }
2953 
2954   /**
2955    * Check if the shutdown server carries the specific region.
2956    * We have a bunch of places that store region location
2957    * Those values aren't consistent. There is a delay of notification.
2958    * The location from zookeeper unassigned node has the most recent data;
2959    * but the node could be deleted after the region is opened by AM.
2960    * The AM's info could be old when OpenedRegionHandler
2961    * processing hasn't finished yet when server shutdown occurs.
2962    * @return whether the serverName currently hosts the region
2963    */
2964   private boolean isCarryingRegion(ServerName serverName, HRegionInfo hri) {
2965     RegionTransition rt = null;
2966     try {
2967       byte [] data = ZKAssign.getData(watcher, hri.getEncodedName());
2968       // This call can legitimately come by null
2969       rt = data == null? null: RegionTransition.parseFrom(data);
2970     } catch (KeeperException e) {
2971       server.abort("Exception reading unassigned node for region=" + hri.getEncodedName(), e);
2972     } catch (DeserializationException e) {
2973       server.abort("Exception parsing unassigned node for region=" + hri.getEncodedName(), e);
2974     }
2975 
2976     ServerName addressFromZK = rt != null? rt.getServerName():  null;
2977     if (addressFromZK != null) {
2978       // if we get something from ZK, we will use the data
2979       boolean matchZK = addressFromZK.equals(serverName);
2980       LOG.debug("based on ZK, current region=" + hri.getRegionNameAsString() +
2981           " is on server=" + addressFromZK +
2982           " server being checked=: " + serverName);
2983       return matchZK;
2984     }
2985 
2986     ServerName addressFromAM = regionStates.getRegionServerOfRegion(hri);
2987     boolean matchAM = (addressFromAM != null &&
2988       addressFromAM.equals(serverName));
2989     LOG.debug("based on AM, current region=" + hri.getRegionNameAsString() +
2990       " is on server=" + (addressFromAM != null ? addressFromAM : "null") +
2991       " server being checked: " + serverName);
2992 
2993     return matchAM;
2994   }
2995 
2996   /**
2997    * Process shutdown server removing any assignments.
2998    * @param sn Server that went down.
2999    * @return list of regions in transition on this server
3000    */
3001   public List<HRegionInfo> processServerShutdown(final ServerName sn) {
3002     // Clean out any existing assignment plans for this server
3003     synchronized (this.regionPlans) {
3004       for (Iterator <Map.Entry<String, RegionPlan>> i =
3005           this.regionPlans.entrySet().iterator(); i.hasNext();) {
3006         Map.Entry<String, RegionPlan> e = i.next();
3007         ServerName otherSn = e.getValue().getDestination();
3008         // The name will be null if the region is planned for a random assign.
3009         if (otherSn != null && otherSn.equals(sn)) {
3010           // Use iterator's remove else we'll get CME
3011           i.remove();
3012         }
3013       }
3014     }
3015     List<HRegionInfo> regions = regionStates.serverOffline(sn);
3016     for (Iterator<HRegionInfo> it = regions.iterator(); it.hasNext(); ) {
3017       HRegionInfo hri = it.next();
3018       String encodedName = hri.getEncodedName();
3019 
3020       // We need a lock on the region as we could update it
3021       Lock lock = locker.acquireLock(encodedName);
3022       try {
3023         RegionState regionState =
3024           regionStates.getRegionTransitionState(encodedName);
3025         if (regionState == null
3026             || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
3027           LOG.info("Skip region " + hri
3028             + " since it is not opening on the dead server any more: " + sn);
3029           it.remove();
3030         } else {
3031           try{
3032             // Delete the ZNode if exists
3033             ZKAssign.deleteNodeFailSilent(watcher, hri);
3034           } catch (KeeperException ke) {
3035             server.abort("Unexpected ZK exception deleting node " + hri, ke);
3036           }
3037           if (zkTable.isDisablingOrDisabledTable(hri.getTableNameAsString())) {
3038             it.remove();
3039             regionStates.regionOffline(hri);
3040             continue;
3041           }
3042           // Mark the region closed and assign it again by SSH
3043           regionStates.updateRegionState(hri, RegionState.State.CLOSED);
3044         }
3045       } finally {
3046         lock.unlock();
3047       }
3048     }
3049     return regions;
3050   }
3051 
3052   /**
3053    * Update inmemory structures.
3054    * @param sn Server that reported the split
3055    * @param parent Parent region that was split
3056    * @param a Daughter region A
3057    * @param b Daughter region B
3058    */
3059   public void handleSplitReport(final ServerName sn, final HRegionInfo parent,
3060       final HRegionInfo a, final HRegionInfo b) {
3061     regionOffline(parent);
3062     regionOnline(a, sn);
3063     regionOnline(b, sn);
3064 
3065     // There's a possibility that the region was splitting while a user asked
3066     // the master to disable, we need to make sure we close those regions in
3067     // that case. This is not racing with the region server itself since RS
3068     // report is done after the split transaction completed.
3069     if (this.zkTable.isDisablingOrDisabledTable(
3070         parent.getTableNameAsString())) {
3071       unassign(a);
3072       unassign(b);
3073     }
3074   }
3075 
3076   /**
3077    * Update inmemory structures.
3078    * @param sn Server that reported the merge
3079    * @param merged regioninfo of merged
3080    * @param a region a
3081    * @param b region b
3082    */
3083   public void handleRegionsMergeReport(final ServerName sn,
3084       final HRegionInfo merged, final HRegionInfo a, final HRegionInfo b) {
3085     regionOffline(a);
3086     regionOffline(b);
3087     regionOnline(merged, sn);
3088 
3089     // There's a possibility that the region was merging while a user asked
3090     // the master to disable, we need to make sure we close those regions in
3091     // that case. This is not racing with the region server itself since RS
3092     // report is done after the regions merge transaction completed.
3093     if (this.zkTable.isDisablingOrDisabledTable(merged.getTableNameAsString())) {
3094       unassign(merged);
3095     }
3096   }
3097 
3098   /**
3099    * @param plan Plan to execute.
3100    */
3101   public void balance(final RegionPlan plan) {
3102     synchronized (this.regionPlans) {
3103       this.regionPlans.put(plan.getRegionName(), plan);
3104     }
3105     unassign(plan.getRegionInfo(), false, plan.getDestination());
3106   }
3107 
3108   public void stop() {
3109     if (tomActivated){
3110       this.timeoutMonitor.interrupt();
3111       this.timerUpdater.interrupt();
3112     }
3113   }
3114 
3115   /**
3116    * Shutdown the threadpool executor service
3117    */
3118   public void shutdown() {
3119     // It's an immediate shutdown, so we're clearing the remaining tasks.
3120     synchronized (zkEventWorkerWaitingList){
3121       zkEventWorkerWaitingList.clear();
3122     }
3123     threadPoolExecutorService.shutdownNow();
3124     zkEventWorkers.shutdownNow();
3125   }
3126 
3127   protected void setEnabledTable(String tableName) {
3128     try {
3129       this.zkTable.setEnabledTable(tableName);
3130     } catch (KeeperException e) {
3131       // here we can abort as it is the start up flow
3132       String errorMsg = "Unable to ensure that the table " + tableName
3133           + " will be" + " enabled because of a ZooKeeper issue";
3134       LOG.error(errorMsg);
3135       this.server.abort(errorMsg, e);
3136     }
3137   }
3138 
3139   /**
3140    * Set region as OFFLINED up in zookeeper asynchronously.
3141    * @param state
3142    * @return True if we succeeded, false otherwise (State was incorrect or failed
3143    * updating zk).
3144    */
3145   private boolean asyncSetOfflineInZooKeeper(final RegionState state,
3146       final AsyncCallback.StringCallback cb, final ServerName destination) {
3147     if (!state.isClosed() && !state.isOffline()) {
3148       this.server.abort("Unexpected state trying to OFFLINE; " + state,
3149         new IllegalStateException());
3150       return false;
3151     }
3152     regionStates.updateRegionState(
3153       state.getRegion(), RegionState.State.OFFLINE);
3154     try {
3155       ZKAssign.asyncCreateNodeOffline(watcher, state.getRegion(),
3156         destination, cb, state);
3157     } catch (KeeperException e) {
3158       if (e instanceof NodeExistsException) {
3159         LOG.warn("Node for " + state.getRegion() + " already exists");
3160       } else {
3161         server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
3162       }
3163       return false;
3164     }
3165     return true;
3166   }
3167 }