View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.util.ArrayList;
24  import java.util.Collection;
25  import java.util.Collections;
26  import java.util.HashMap;
27  import java.util.HashSet;
28  import java.util.Iterator;
29  import java.util.List;
30  import java.util.Map;
31  import java.util.NavigableMap;
32  import java.util.Random;
33  import java.util.Set;
34  import java.util.TreeMap;
35  import java.util.concurrent.Callable;
36  import java.util.concurrent.ConcurrentHashMap;
37  import java.util.concurrent.CopyOnWriteArrayList;
38  import java.util.concurrent.TimeUnit;
39  import java.util.concurrent.atomic.AtomicBoolean;
40  import java.util.concurrent.atomic.AtomicInteger;
41  import java.util.concurrent.locks.Lock;
42  import java.util.concurrent.locks.ReentrantLock;
43  
44  import org.apache.commons.logging.Log;
45  import org.apache.commons.logging.LogFactory;
46  import org.apache.hadoop.classification.InterfaceAudience;
47  import org.apache.hadoop.conf.Configuration;
48  import org.apache.hadoop.fs.FileSystem;
49  import org.apache.hadoop.fs.Path;
50  import org.apache.hadoop.hbase.CoordinatedStateException;
51  import org.apache.hadoop.hbase.HBaseIOException;
52  import org.apache.hadoop.hbase.HConstants;
53  import org.apache.hadoop.hbase.HRegionInfo;
54  import org.apache.hadoop.hbase.HRegionLocation;
55  import org.apache.hadoop.hbase.HTableDescriptor;
56  import org.apache.hadoop.hbase.MetaTableAccessor;
57  import org.apache.hadoop.hbase.NotServingRegionException;
58  import org.apache.hadoop.hbase.RegionLocations;
59  import org.apache.hadoop.hbase.Server;
60  import org.apache.hadoop.hbase.ServerName;
61  import org.apache.hadoop.hbase.TableName;
62  import org.apache.hadoop.hbase.TableNotFoundException;
63  import org.apache.hadoop.hbase.TableStateManager;
64  import org.apache.hadoop.hbase.client.RegionReplicaUtil;
65  import org.apache.hadoop.hbase.client.Result;
66  import org.apache.hadoop.hbase.executor.EventHandler;
67  import org.apache.hadoop.hbase.executor.EventType;
68  import org.apache.hadoop.hbase.executor.ExecutorService;
69  import org.apache.hadoop.hbase.ipc.RpcClient;
70  import org.apache.hadoop.hbase.ipc.RpcClient.FailedServerException;
71  import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
72  import org.apache.hadoop.hbase.master.RegionState.State;
73  import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper;
74  import org.apache.hadoop.hbase.master.balancer.FavoredNodeLoadBalancer;
75  import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
76  import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
77  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
78  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
79  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
80  import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
81  import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
82  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
83  import org.apache.hadoop.hbase.regionserver.wal.HLog;
84  import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
85  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
86  import org.apache.hadoop.hbase.util.FSUtils;
87  import org.apache.hadoop.hbase.util.KeyLocker;
88  import org.apache.hadoop.hbase.util.Pair;
89  import org.apache.hadoop.hbase.util.PairOfSameType;
90  import org.apache.hadoop.hbase.util.Threads;
91  import org.apache.hadoop.ipc.RemoteException;
92  import org.apache.zookeeper.KeeperException;
93  
94  import com.google.common.annotations.VisibleForTesting;
95  
96  /**
97   * Manages and performs region assignment.
98   * Related communications with regionserver are all done over RPC.
99   */
100 @InterfaceAudience.Private
101 public class AssignmentManager {
102   private static final Log LOG = LogFactory.getLog(AssignmentManager.class);
103 
104   static final String ALREADY_IN_TRANSITION_WAITTIME
105     = "hbase.assignment.already.intransition.waittime";
106   static final int DEFAULT_ALREADY_IN_TRANSITION_WAITTIME = 60000; // 1 minute
107 
108   protected final Server server;
109 
110   private ServerManager serverManager;
111 
112   private boolean shouldAssignRegionsWithFavoredNodes;
113 
114   private LoadBalancer balancer;
115 
116   private final MetricsAssignmentManager metricsAssignmentManager;
117 
118   private final TableLockManager tableLockManager;
119 
120   private AtomicInteger numRegionsOpened = new AtomicInteger(0);
121 
122   final private KeyLocker<String> locker = new KeyLocker<String>();
123 
124   Set<HRegionInfo> replicasToClose = Collections.synchronizedSet(new HashSet<HRegionInfo>());
125 
126   /**
127    * Map of regions to reopen after the schema of a table is changed. Key -
128    * encoded region name, value - HRegionInfo
129    */
130   private final Map <String, HRegionInfo> regionsToReopen;
131 
132   /*
133    * Maximum times we recurse an assignment/unassignment.
134    * See below in {@link #assign()} and {@link #unassign()}.
135    */
136   private final int maximumAttempts;
137 
138   /**
139    * Map of two merging regions from the region to be created.
140    */
141   private final Map<String, PairOfSameType<HRegionInfo>> mergingRegions
142     = new HashMap<String, PairOfSameType<HRegionInfo>>();
143 
144   /**
145    * The sleep time for which the assignment will wait before retrying in case of hbase:meta assignment
146    * failure due to lack of availability of region plan
147    */
148   private final long sleepTimeBeforeRetryingMetaAssignment;
149 
150   /** Plans for region movement. Key is the encoded version of a region name*/
151   // TODO: When do plans get cleaned out?  Ever? In server open and in server
152   // shutdown processing -- St.Ack
153   // All access to this Map must be synchronized.
154   final NavigableMap<String, RegionPlan> regionPlans =
155     new TreeMap<String, RegionPlan>();
156 
157   private final TableStateManager tableStateManager;
158 
159   private final ExecutorService executorService;
160 
161   // Thread pool executor service. TODO, consolidate with executorService?
162   private java.util.concurrent.ExecutorService threadPoolExecutorService;
163 
164   private final RegionStates regionStates;
165 
166   // The threshold to use bulk assigning. Using bulk assignment
167   // only if assigning at least this many regions to at least this
168   // many servers. If assigning fewer regions to fewer servers,
169   // bulk assigning may be not as efficient.
170   private final int bulkAssignThresholdRegions;
171   private final int bulkAssignThresholdServers;
172 
173   // Should bulk assignment wait till all regions are assigned,
174   // or it is timed out?  This is useful to measure bulk assignment
175   // performance, but not needed in most use cases.
176   private final boolean bulkAssignWaitTillAllAssigned;
177 
178   /**
179    * Indicator that AssignmentManager has recovered the region states so
180    * that ServerShutdownHandler can be fully enabled and re-assign regions
181    * of dead servers. So that when re-assignment happens, AssignmentManager
182    * has proper region states.
183    *
184    * Protected to ease testing.
185    */
186   protected final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
187 
188   /**
189    * A map to track the count a region fails to open in a row.
190    * So that we don't try to open a region forever if the failure is
191    * unrecoverable.  We don't put this information in region states
192    * because we don't expect this to happen frequently; we don't
193    * want to copy this information over during each state transition either.
194    */
195   private final ConcurrentHashMap<String, AtomicInteger>
196     failedOpenTracker = new ConcurrentHashMap<String, AtomicInteger>();
197 
198   // In case not using ZK for region assignment, region states
199   // are persisted in meta with a state store
200   private final RegionStateStore regionStateStore;
201 
202   /**
203    * For testing only!  Set to true to skip handling of split.
204    */
205   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
206   public static boolean TEST_SKIP_SPLIT_HANDLING = false;
207 
208   /** Listeners that are called on assignment events. */
209   private List<AssignmentListener> listeners = new CopyOnWriteArrayList<AssignmentListener>();
210 
211   /**
212    * Constructs a new assignment manager.
213    *
214    * @param server instance of HMaster this AM running inside
215    * @param serverManager serverManager for associated HMaster
216    * @param balancer implementation of {@link LoadBalancer}
217    * @param service Executor service
218    * @param metricsMaster metrics manager
219    * @param tableLockManager TableLock manager
220    * @throws CoordinatedStateException
221    * @throws IOException
222    */
223   public AssignmentManager(Server server, ServerManager serverManager,
224       final LoadBalancer balancer,
225       final ExecutorService service, MetricsMaster metricsMaster,
226       final TableLockManager tableLockManager)
227           throws IOException, CoordinatedStateException {
228     this.server = server;
229     this.serverManager = serverManager;
230     this.executorService = service;
231     this.regionStateStore = new RegionStateStore(server);
232     this.regionsToReopen = Collections.synchronizedMap
233                            (new HashMap<String, HRegionInfo> ());
234     Configuration conf = server.getConfiguration();
235     // Only read favored nodes if using the favored nodes load balancer.
236     this.shouldAssignRegionsWithFavoredNodes = conf.getClass(
237            HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class).equals(
238            FavoredNodeLoadBalancer.class);
239     try {
240       if (server.getCoordinatedStateManager() != null) {
241         this.tableStateManager = server.getCoordinatedStateManager().getTableStateManager();
242       } else {
243         this.tableStateManager = null;
244       }
245     } catch (InterruptedException e) {
246       throw new InterruptedIOException();
247     }
248     // This is the max attempts, not retries, so it should be at least 1.
249     this.maximumAttempts = Math.max(1,
250       this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10));
251     this.sleepTimeBeforeRetryingMetaAssignment = this.server.getConfiguration().getLong(
252         "hbase.meta.assignment.retry.sleeptime", 1000l);
253     this.balancer = balancer;
254     int maxThreads = conf.getInt("hbase.assignment.threads.max", 30);
255     this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool(
256       maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("AM."));
257     this.regionStates = new RegionStates(
258       server, tableStateManager, serverManager, regionStateStore);
259 
260     this.bulkAssignWaitTillAllAssigned =
261       conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false);
262     this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7);
263     this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3);
264 
265     this.metricsAssignmentManager = new MetricsAssignmentManager();
266     this.tableLockManager = tableLockManager;
267   }
268 
269   /**
270    * Add the listener to the notification list.
271    * @param listener The AssignmentListener to register
272    */
273   public void registerListener(final AssignmentListener listener) {
274     this.listeners.add(listener);
275   }
276 
277   /**
278    * Remove the listener from the notification list.
279    * @param listener The AssignmentListener to unregister
280    */
281   public boolean unregisterListener(final AssignmentListener listener) {
282     return this.listeners.remove(listener);
283   }
284 
285   /**
286    * @return Instance of ZKTableStateManager.
287    */
288   public TableStateManager getTableStateManager() {
289     // These are 'expensive' to make involving trip to zk ensemble so allow
290     // sharing.
291     return this.tableStateManager;
292   }
293 
294   /**
295    * This SHOULD not be public. It is public now
296    * because of some unit tests.
297    *
298    * TODO: make it package private and keep RegionStates in the master package
299    */
300   public RegionStates getRegionStates() {
301     return regionStates;
302   }
303 
304   /**
305    * Used in some tests to mock up region state in meta
306    */
307   @VisibleForTesting
308   RegionStateStore getRegionStateStore() {
309     return regionStateStore;
310   }
311 
312   public RegionPlan getRegionReopenPlan(HRegionInfo hri) {
313     return new RegionPlan(hri, null, regionStates.getRegionServerOfRegion(hri));
314   }
315 
316   /**
317    * Add a regionPlan for the specified region.
318    * @param encodedName
319    * @param plan
320    */
321   public void addPlan(String encodedName, RegionPlan plan) {
322     synchronized (regionPlans) {
323       regionPlans.put(encodedName, plan);
324     }
325   }
326 
327   /**
328    * Add a map of region plans.
329    */
330   public void addPlans(Map<String, RegionPlan> plans) {
331     synchronized (regionPlans) {
332       regionPlans.putAll(plans);
333     }
334   }
335 
336   /**
337    * Set the list of regions that will be reopened
338    * because of an update in table schema
339    *
340    * @param regions
341    *          list of regions that should be tracked for reopen
342    */
343   public void setRegionsToReopen(List <HRegionInfo> regions) {
344     for(HRegionInfo hri : regions) {
345       regionsToReopen.put(hri.getEncodedName(), hri);
346     }
347   }
348 
349   /**
350    * Used by the client to identify if all regions have the schema updates
351    *
352    * @param tableName
353    * @return Pair indicating the status of the alter command
354    * @throws IOException
355    */
356   public Pair<Integer, Integer> getReopenStatus(TableName tableName)
357       throws IOException {
358     List <HRegionInfo> hris = MetaTableAccessor.getTableRegions(
359       this.server.getZooKeeper(), this.server.getShortCircuitConnection(),
360       tableName, true);
361     Integer pending = 0;
362     for (HRegionInfo hri : hris) {
363       String name = hri.getEncodedName();
364       // no lock concurrent access ok: sequential consistency respected.
365       if (regionsToReopen.containsKey(name)
366           || regionStates.isRegionInTransition(name)) {
367         pending++;
368       }
369     }
370     return new Pair<Integer, Integer>(pending, hris.size());
371   }
372 
373   /**
374    * Used by ServerShutdownHandler to make sure AssignmentManager has completed
375    * the failover cleanup before re-assigning regions of dead servers. So that
376    * when re-assignment happens, AssignmentManager has proper region states.
377    */
378   public boolean isFailoverCleanupDone() {
379     return failoverCleanupDone.get();
380   }
381 
382   /**
383    * To avoid racing with AM, external entities may need to lock a region,
384    * for example, when SSH checks what regions to skip re-assigning.
385    */
386   public Lock acquireRegionLock(final String encodedName) {
387     return locker.acquireLock(encodedName);
388   }
389 
390   /**
391    * Now, failover cleanup is completed. Notify server manager to
392    * process queued up dead servers processing, if any.
393    */
394   void failoverCleanupDone() {
395     failoverCleanupDone.set(true);
396     serverManager.processQueuedDeadServers();
397   }
398 
399   /**
400    * Called on startup.
401    * Figures whether a fresh cluster start of we are joining extant running cluster.
402    * @throws IOException
403    * @throws KeeperException
404    * @throws InterruptedException
405    * @throws CoordinatedStateException
406    */
407   void joinCluster() throws IOException,
408       KeeperException, InterruptedException, CoordinatedStateException {
409     long startTime = System.currentTimeMillis();
410     // Concurrency note: In the below the accesses on regionsInTransition are
411     // outside of a synchronization block where usually all accesses to RIT are
412     // synchronized.  The presumption is that in this case it is safe since this
413     // method is being played by a single thread on startup.
414 
415     // TODO: Regions that have a null location and are not in regionsInTransitions
416     // need to be handled.
417 
418     // Scan hbase:meta to build list of existing regions, servers, and assignment
419     // Returns servers who have not checked in (assumed dead) that some regions
420     // were assigned to (according to the meta)
421     Set<ServerName> deadServers = rebuildUserRegions();
422 
423     // This method will assign all user regions if a clean server startup or
424     // it will reconstruct master state and cleanup any leftovers from
425     // previous master process.
426     boolean failover = processDeadServersAndRegionsInTransition(deadServers);
427 
428     recoverTableInDisablingState();
429     recoverTableInEnablingState();
430     LOG.info("Joined the cluster in " + (System.currentTimeMillis()
431       - startTime) + "ms, failover=" + failover);
432   }
433 
434   /**
435    * Process all regions that are in transition in zookeeper and also
436    * processes the list of dead servers by scanning the META.
437    * Used by master joining an cluster.  If we figure this is a clean cluster
438    * startup, will assign all user regions.
439    * @param deadServers
440    *          Map of dead servers and their regions. Can be null.
441    * @throws IOException
442    * @throws InterruptedException
443    * @throws CoordinatedStateException
444    */
445   boolean processDeadServersAndRegionsInTransition(final Set<ServerName> deadServers)
446       throws IOException, InterruptedException, CoordinatedStateException {
447     boolean failover = !serverManager.getDeadServers().isEmpty();
448     if (failover) {
449       // This may not be a failover actually, especially if meta is on this master.
450       if (LOG.isDebugEnabled()) {
451         LOG.debug("Found dead servers out on cluster " + serverManager.getDeadServers());
452       }
453     } else {
454       // If any one region except meta is assigned, it's a failover.
455       Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
456       for (Map.Entry<HRegionInfo, ServerName> en:
457           regionStates.getRegionAssignments().entrySet()) {
458         HRegionInfo hri = en.getKey();
459         if (!hri.isMetaTable()
460             && onlineServers.contains(en.getValue())) {
461           LOG.debug("Found " + hri + " out on cluster");
462           failover = true;
463           break;
464         }
465       }
466       if (!failover) {
467         // If any region except meta is in transition on a live server, it's a failover.
468         Map<String, RegionState> regionsInTransition = regionStates.getRegionsInTransition();
469         if (!regionsInTransition.isEmpty()) {
470           for (RegionState regionState: regionsInTransition.values()) {
471             if (!regionState.getRegion().isMetaRegion()
472                 && onlineServers.contains(regionState.getServerName())) {
473               LOG.debug("Found " + regionState + " in RITs");
474               failover = true;
475               break;
476             }
477           }
478         }
479       }
480     }
481     if (!failover) {
482       // If we get here, we have a full cluster restart. It is a failover only
483       // if there are some HLogs are not split yet. For meta HLogs, they should have
484       // been split already, if any. We can walk through those queued dead servers,
485       // if they don't have any HLogs, this restart should be considered as a clean one
486       Set<ServerName> queuedDeadServers = serverManager.getRequeuedDeadServers().keySet();
487       if (!queuedDeadServers.isEmpty()) {
488         Configuration conf = server.getConfiguration();
489         Path rootdir = FSUtils.getRootDir(conf);
490         FileSystem fs = rootdir.getFileSystem(conf);
491         for (ServerName serverName: queuedDeadServers) {
492           Path logDir = new Path(rootdir, HLogUtil.getHLogDirectoryName(serverName.toString()));
493           Path splitDir = logDir.suffix(HLog.SPLITTING_EXT);
494           if (fs.exists(logDir) || fs.exists(splitDir)) {
495             LOG.debug("Found queued dead server " + serverName);
496             failover = true;
497             break;
498           }
499         }
500         if (!failover) {
501           // We figured that it's not a failover, so no need to
502           // work on these re-queued dead servers any more.
503           LOG.info("AM figured that it's not a failover and cleaned up "
504             + queuedDeadServers.size() + " queued dead servers");
505           serverManager.removeRequeuedDeadServers();
506         }
507       }
508     }
509 
510     Set<TableName> disabledOrDisablingOrEnabling = null;
511     Map<HRegionInfo, ServerName> allRegions = null;
512 
513     if (!failover) {
514       disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
515         ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING,
516         ZooKeeperProtos.Table.State.ENABLING);
517 
518       // Clean re/start, mark all user regions closed before reassignment
519       allRegions = regionStates.closeAllUserRegions(
520         disabledOrDisablingOrEnabling);
521     }
522 
523     // Now region states are restored
524     regionStateStore.start();
525 
526     if (failover) {
527       processDeadServers(deadServers);
528     }
529 
530     // Now we can safely claim failover cleanup completed and enable
531     // ServerShutdownHandler for further processing. The nodes (below)
532     // in transition, if any, are for regions not related to those
533     // dead servers at all, and can be done in parallel to SSH.
534     failoverCleanupDone();
535     if (!failover) {
536       // Fresh cluster startup.
537       LOG.info("Clean cluster startup. Assigning user regions");
538       assignAllUserRegions(allRegions);
539     }
540     // unassign replicas of the split parents and the merged regions
541     // the daughter replicas are opened in assignAllUserRegions if it was
542     // not already opened.
543     for (HRegionInfo h : replicasToClose) {
544       unassign(h);
545     }
546     replicasToClose.clear();
547     return failover;
548   }
549 
550   /**
551    * When a region is closed, it should be removed from the regionsToReopen
552    * @param hri HRegionInfo of the region which was closed
553    */
554   public void removeClosedRegion(HRegionInfo hri) {
555     if (regionsToReopen.remove(hri.getEncodedName()) != null) {
556       LOG.debug("Removed region from reopening regions because it was closed");
557     }
558   }
559 
560   // TODO: processFavoredNodes might throw an exception, for e.g., if the
561   // meta could not be contacted/updated. We need to see how seriously to treat
562   // this problem as. Should we fail the current assignment. We should be able
563   // to recover from this problem eventually (if the meta couldn't be updated
564   // things should work normally and eventually get fixed up).
565   void processFavoredNodes(List<HRegionInfo> regions) throws IOException {
566     if (!shouldAssignRegionsWithFavoredNodes) return;
567     // The AM gets the favored nodes info for each region and updates the meta
568     // table with that info
569     Map<HRegionInfo, List<ServerName>> regionToFavoredNodes =
570         new HashMap<HRegionInfo, List<ServerName>>();
571     for (HRegionInfo region : regions) {
572       regionToFavoredNodes.put(region,
573           ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region));
574     }
575     FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(regionToFavoredNodes,
576       this.server.getShortCircuitConnection());
577   }
578 
579   /**
580    * Marks the region as online.  Removes it from regions in transition and
581    * updates the in-memory assignment information.
582    * <p>
583    * Used when a region has been successfully opened on a region server.
584    * @param regionInfo
585    * @param sn
586    */
587   void regionOnline(HRegionInfo regionInfo, ServerName sn) {
588     regionOnline(regionInfo, sn, HConstants.NO_SEQNUM);
589   }
590 
591   void regionOnline(HRegionInfo regionInfo, ServerName sn, long openSeqNum) {
592     numRegionsOpened.incrementAndGet();
593     regionStates.regionOnline(regionInfo, sn, openSeqNum);
594 
595     // Remove plan if one.
596     clearRegionPlan(regionInfo);
597     balancer.regionOnline(regionInfo, sn);
598 
599     // Tell our listeners that a region was opened
600     sendRegionOpenedNotification(regionInfo, sn);
601   }
602 
603   /**
604    * Marks the region as offline.  Removes it from regions in transition and
605    * removes in-memory assignment information.
606    * <p>
607    * Used when a region has been closed and should remain closed.
608    * @param regionInfo
609    */
610   public void regionOffline(final HRegionInfo regionInfo) {
611     regionOffline(regionInfo, null);
612   }
613 
614   public void offlineDisabledRegion(HRegionInfo regionInfo) {
615     replicasToClose.remove(regionInfo);
616     regionOffline(regionInfo);
617   }
618 
619   // Assignment methods
620 
621   /**
622    * Assigns the specified region.
623    * <p>
624    * If a RegionPlan is available with a valid destination then it will be used
625    * to determine what server region is assigned to.  If no RegionPlan is
626    * available, region will be assigned to a random available server.
627    * <p>
628    * Updates the RegionState and sends the OPEN RPC.
629    * <p>
630    * This will only succeed if the region is in transition and in a CLOSED or
631    * OFFLINE state or not in transition, and of course, the
632    * chosen server is up and running (It may have just crashed!).
633    *
634    * @param region server to be assigned
635    */
636   public void assign(HRegionInfo region) {
637     assign(region, false);
638   }
639 
640   /**
641    * Use care with forceNewPlan. It could cause double assignment.
642    */
643   public void assign(HRegionInfo region, boolean forceNewPlan) {
644     if (isDisabledorDisablingRegionInRIT(region)) {
645       return;
646     }
647     String encodedName = region.getEncodedName();
648     Lock lock = locker.acquireLock(encodedName);
649     try {
650       RegionState state = forceRegionStateToOffline(region, forceNewPlan);
651       if (state != null) {
652         if (regionStates.wasRegionOnDeadServer(encodedName)) {
653           LOG.info("Skip assigning " + region.getRegionNameAsString()
654             + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
655             + " is dead but not processed yet");
656           return;
657         }
658         assign(state, forceNewPlan);
659       }
660     } finally {
661       lock.unlock();
662     }
663   }
664 
665   /**
666    * Bulk assign regions to <code>destination</code>.
667    * @param destination
668    * @param regions Regions to assign.
669    * @return true if successful
670    */
671   boolean assign(final ServerName destination, final List<HRegionInfo> regions)
672     throws InterruptedException {
673     long startTime = EnvironmentEdgeManager.currentTimeMillis();
674     try {
675       int regionCount = regions.size();
676       if (regionCount == 0) {
677         return true;
678       }
679       LOG.info("Assigning " + regionCount + " region(s) to " + destination.toString());
680       Set<String> encodedNames = new HashSet<String>(regionCount);
681       for (HRegionInfo region : regions) {
682         encodedNames.add(region.getEncodedName());
683       }
684 
685       List<HRegionInfo> failedToOpenRegions = new ArrayList<HRegionInfo>();
686       Map<String, Lock> locks = locker.acquireLocks(encodedNames);
687       try {
688         Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>(regionCount);
689         List<RegionState> states = new ArrayList<RegionState>(regionCount);
690         for (HRegionInfo region : regions) {
691           String encodedName = region.getEncodedName();
692           if (!isDisabledorDisablingRegionInRIT(region)) {
693             RegionState state = forceRegionStateToOffline(region, false);
694             boolean onDeadServer = false;
695             if (state != null) {
696               if (regionStates.wasRegionOnDeadServer(encodedName)) {
697                 LOG.info("Skip assigning " + region.getRegionNameAsString()
698                   + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
699                   + " is dead but not processed yet");
700                 onDeadServer = true;
701               } else {
702                 RegionPlan plan = new RegionPlan(region, state.getServerName(), destination);
703                 plans.put(encodedName, plan);
704                 states.add(state);
705                 continue;
706               }
707             }
708             // Reassign if the region wasn't on a dead server
709             if (!onDeadServer) {
710               LOG.info("failed to force region state to offline, "
711                 + "will reassign later: " + region);
712               failedToOpenRegions.add(region); // assign individually later
713             }
714           }
715           // Release the lock, this region is excluded from bulk assign because
716           // we can't update its state, or set its znode to offline.
717           Lock lock = locks.remove(encodedName);
718           lock.unlock();
719         }
720 
721         if (server.isStopped()) {
722           return false;
723         }
724 
725         // Add region plans, so we can updateTimers when one region is opened so
726         // that unnecessary timeout on RIT is reduced.
727         this.addPlans(plans);
728 
729         List<Pair<HRegionInfo, List<ServerName>>> regionOpenInfos =
730           new ArrayList<Pair<HRegionInfo, List<ServerName>>>(states.size());
731         for (RegionState state: states) {
732           HRegionInfo region = state.getRegion();
733           regionStates.updateRegionState(
734             region, State.PENDING_OPEN, destination);
735           List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
736           if (this.shouldAssignRegionsWithFavoredNodes) {
737             favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
738           }
739           regionOpenInfos.add(new Pair<HRegionInfo, List<ServerName>>(
740             region, favoredNodes));
741         }
742 
743         // Move on to open regions.
744         try {
745           // Send OPEN RPC. If it fails on a IOE or RemoteException,
746           // regions will be assigned individually.
747           long maxWaitTime = System.currentTimeMillis() +
748             this.server.getConfiguration().
749               getLong("hbase.regionserver.rpc.startup.waittime", 60000);
750           for (int i = 1; i <= maximumAttempts && !server.isStopped(); i++) {
751             try {
752               List<RegionOpeningState> regionOpeningStateList = serverManager
753                 .sendRegionOpen(destination, regionOpenInfos);
754               if (regionOpeningStateList == null) {
755                 // Failed getting RPC connection to this server
756                 return false;
757               }
758               for (int k = 0, n = regionOpeningStateList.size(); k < n; k++) {
759                 RegionOpeningState openingState = regionOpeningStateList.get(k);
760                 if (openingState != RegionOpeningState.OPENED) {
761                   HRegionInfo region = regionOpenInfos.get(k).getFirst();
762                   // Failed opening this region, reassign it later
763                   failedToOpenRegions.add(region);
764                 }
765               }
766               break;
767             } catch (IOException e) {
768               if (e instanceof RemoteException) {
769                 e = ((RemoteException)e).unwrapRemoteException();
770               }
771               if (e instanceof RegionServerStoppedException) {
772                 LOG.warn("The region server was shut down, ", e);
773                 // No need to retry, the region server is a goner.
774                 return false;
775               } else if (e instanceof ServerNotRunningYetException) {
776                 long now = System.currentTimeMillis();
777                 if (now < maxWaitTime) {
778                   LOG.debug("Server is not yet up; waiting up to " +
779                     (maxWaitTime - now) + "ms", e);
780                   Thread.sleep(100);
781                   i--; // reset the try count
782                   continue;
783                 }
784               } else if (e instanceof java.net.SocketTimeoutException
785                   && this.serverManager.isServerOnline(destination)) {
786                 // In case socket is timed out and the region server is still online,
787                 // the openRegion RPC could have been accepted by the server and
788                 // just the response didn't go through.  So we will retry to
789                 // open the region on the same server.
790                 if (LOG.isDebugEnabled()) {
791                   LOG.debug("Bulk assigner openRegion() to " + destination
792                     + " has timed out, but the regions might"
793                     + " already be opened on it.", e);
794                 }
795                 // wait and reset the re-try count, server might be just busy.
796                 Thread.sleep(100);
797                 i--;
798                 continue;
799               }
800               throw e;
801             }
802           }
803         } catch (IOException e) {
804           // Can be a socket timeout, EOF, NoRouteToHost, etc
805           LOG.info("Unable to communicate with " + destination
806             + " in order to assign regions, ", e);
807           return false;
808         }
809       } finally {
810         for (Lock lock : locks.values()) {
811           lock.unlock();
812         }
813       }
814 
815       if (!failedToOpenRegions.isEmpty()) {
816         for (HRegionInfo region : failedToOpenRegions) {
817           if (!regionStates.isRegionOnline(region)) {
818             invokeAssign(region);
819           }
820         }
821       }
822       LOG.debug("Bulk assigning done for " + destination);
823       return true;
824     } finally {
825       metricsAssignmentManager.updateBulkAssignTime(EnvironmentEdgeManager.currentTimeMillis() - startTime);
826     }
827   }
828 
829   /**
830    * Send CLOSE RPC if the server is online, otherwise, offline the region.
831    *
832    * The RPC will be sent only to the region sever found in the region state
833    * if it is passed in, otherwise, to the src server specified. If region
834    * state is not specified, we don't update region state at all, instead
835    * we just send the RPC call. This is useful for some cleanup without
836    * messing around the region states (see handleRegion, on region opened
837    * on an unexpected server scenario, for an example)
838    */
839   private void unassign(final HRegionInfo region,
840       final RegionState state, final ServerName dest) {
841     ServerName server = state.getServerName();
842     long maxWaitTime = -1;
843     for (int i = 1; i <= this.maximumAttempts; i++) {
844       if (this.server.isStopped() || this.server.isAborted()) {
845         LOG.debug("Server stopped/aborted; skipping unassign of " + region);
846         return;
847       }
848       if (!serverManager.isServerOnline(server)) {
849         LOG.debug("Offline " + region.getRegionNameAsString()
850           + ", no need to unassign since it's on a dead server: " + server);
851         regionStates.updateRegionState(region, State.OFFLINE);
852         return;
853       }
854       try {
855         // Send CLOSE RPC
856         if (serverManager.sendRegionClose(server, region, dest)) {
857           LOG.debug("Sent CLOSE to " + server + " for region " +
858             region.getRegionNameAsString());
859           return;
860         }
861         // This never happens. Currently regionserver close always return true.
862         // Todo; this can now happen (0.96) if there is an exception in a coprocessor
863         LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
864           region.getRegionNameAsString());
865       } catch (Throwable t) {
866         if (t instanceof RemoteException) {
867           t = ((RemoteException)t).unwrapRemoteException();
868         }
869         boolean logRetries = true;
870         if (t instanceof NotServingRegionException
871             || t instanceof RegionServerStoppedException
872             || t instanceof ServerNotRunningYetException) {
873           LOG.debug("Offline " + region.getRegionNameAsString()
874             + ", it's not any more on " + server, t);
875           regionStates.updateRegionState(region, State.OFFLINE);
876           return;
877         } else if (t instanceof FailedServerException
878             || t instanceof RegionAlreadyInTransitionException) {
879           long sleepTime = 0;
880           Configuration conf = this.server.getConfiguration();
881           if(t instanceof FailedServerException) {
882             sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
883                   RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
884           } else {
885             if (maxWaitTime < 0) {
886               maxWaitTime =
887                   EnvironmentEdgeManager.currentTimeMillis()
888                       + conf.getLong(ALREADY_IN_TRANSITION_WAITTIME,
889                         DEFAULT_ALREADY_IN_TRANSITION_WAITTIME);
890             }
891             long now = EnvironmentEdgeManager.currentTimeMillis();
892             if (now < maxWaitTime) {
893               LOG.debug("Region is already in transition; "
894                 + "waiting up to " + (maxWaitTime - now) + "ms", t);
895               sleepTime = 100;
896               i--; // reset the try count
897               logRetries = false;
898             }
899           }
900           try {
901             if (sleepTime > 0) {
902               Thread.sleep(sleepTime);
903             }
904           } catch (InterruptedException ie) {
905             LOG.warn("Failed to unassign "
906               + region.getRegionNameAsString() + " since interrupted", ie);
907             Thread.currentThread().interrupt();
908             if (state != null) {
909               regionStates.updateRegionState(region, State.FAILED_CLOSE);
910             }
911             return;
912           }
913         }
914 
915         if (logRetries) {
916           LOG.info("Server " + server + " returned " + t + " for "
917             + region.getRegionNameAsString() + ", try=" + i
918             + " of " + this.maximumAttempts, t);
919           // Presume retry or server will expire.
920         }
921       }
922     }
923     // Run out of attempts
924     if (state != null) {
925       regionStates.updateRegionState(region, State.FAILED_CLOSE);
926     }
927   }
928 
929   /**
930    * Set region to OFFLINE unless it is opening and forceNewPlan is false.
931    */
932   private RegionState forceRegionStateToOffline(
933       final HRegionInfo region, final boolean forceNewPlan) {
934     RegionState state = regionStates.getRegionState(region);
935     if (state == null) {
936       LOG.warn("Assigning a region not in region states: " + region);
937       state = regionStates.createRegionState(region);
938     }
939 
940     if (forceNewPlan && LOG.isDebugEnabled()) {
941       LOG.debug("Force region state offline " + state);
942     }
943 
944     switch (state.getState()) {
945     case OPEN:
946     case OPENING:
947     case PENDING_OPEN:
948     case CLOSING:
949     case PENDING_CLOSE:
950       if (!forceNewPlan) {
951         LOG.debug("Skip assigning " +
952           region + ", it is already " + state);
953         return null;
954       }
955     case FAILED_CLOSE:
956     case FAILED_OPEN:
957       unassign(region, state, null);
958       state = regionStates.getRegionState(region);
959       if (state.isFailedClose()) {
960         // If we can't close the region, we can't re-assign
961         // it so as to avoid possible double assignment/data loss.
962         LOG.info("Skip assigning " +
963           region + ", we couldn't close it: " + state);
964         return null;
965       }
966     case OFFLINE:
967     case CLOSED:
968       break;
969     default:
970       LOG.error("Trying to assign region " + region
971         + ", which is " + state);
972       return null;
973     }
974     return state;
975   }
976 
977   /**
978    * Caller must hold lock on the passed <code>state</code> object.
979    * @param state
980    * @param forceNewPlan
981    */
982   private void assign(RegionState state, boolean forceNewPlan) {
983     long startTime = EnvironmentEdgeManager.currentTimeMillis();
984     try {
985       Configuration conf = server.getConfiguration();
986       RegionPlan plan = null;
987       long maxWaitTime = -1;
988       HRegionInfo region = state.getRegion();
989       RegionOpeningState regionOpenState;
990       Throwable previousException = null;
991       for (int i = 1; i <= maximumAttempts; i++) {
992         if (server.isStopped() || server.isAborted()) {
993           LOG.info("Skip assigning " + region.getRegionNameAsString()
994             + ", the server is stopped/aborted");
995           return;
996         }
997         if (plan == null) { // Get a server for the region at first
998           try {
999             plan = getRegionPlan(region, forceNewPlan);
1000           } catch (HBaseIOException e) {
1001             LOG.warn("Failed to get region plan", e);
1002           }
1003         }
1004         if (plan == null) {
1005           LOG.warn("Unable to determine a plan to assign " + region);
1006           if (region.isMetaRegion()) {
1007             try {
1008               Thread.sleep(this.sleepTimeBeforeRetryingMetaAssignment);
1009               if (i == maximumAttempts) i = 1;
1010               continue;
1011             } catch (InterruptedException e) {
1012               LOG.error("Got exception while waiting for hbase:meta assignment");
1013               Thread.currentThread().interrupt();
1014             }
1015           }
1016           regionStates.updateRegionState(region, State.FAILED_OPEN);
1017           return;
1018         }
1019             // In case of assignment from EnableTableHandler table state is ENABLING. Any how
1020             // EnableTableHandler will set ENABLED after assigning all the table regions. If we
1021             // try to set to ENABLED directly then client API may think table is enabled.
1022             // When we have a case such as all the regions are added directly into hbase:meta and we call
1023             // assignRegion then we need to make the table ENABLED. Hence in such case the table
1024             // will not be in ENABLING or ENABLED state.
1025             TableName tableName = region.getTable();
1026             if (!tableStateManager.isTableState(tableName,
1027               ZooKeeperProtos.Table.State.ENABLED, ZooKeeperProtos.Table.State.ENABLING)) {
1028               LOG.debug("Setting table " + tableName + " to ENABLED state.");
1029               setEnabledTable(tableName);
1030             }
1031         LOG.info("Assigning " + region.getRegionNameAsString() +
1032             " to " + plan.getDestination().toString());
1033         // Transition RegionState to PENDING_OPEN
1034        regionStates.updateRegionState(region,
1035           State.PENDING_OPEN, plan.getDestination());
1036 
1037         boolean needNewPlan;
1038         final String assignMsg = "Failed assignment of " + region.getRegionNameAsString() +
1039             " to " + plan.getDestination();
1040         try {
1041           List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
1042           if (this.shouldAssignRegionsWithFavoredNodes) {
1043             favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
1044           }
1045           regionOpenState = serverManager.sendRegionOpen(
1046               plan.getDestination(), region, favoredNodes);
1047 
1048           if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
1049             // Failed opening this region, looping again on a new server.
1050             needNewPlan = true;
1051             LOG.warn(assignMsg + ", regionserver says 'FAILED_OPENING', " +
1052                 " trying to assign elsewhere instead; " +
1053                 "try=" + i + " of " + this.maximumAttempts);
1054           } else {
1055             // we're done
1056             return;
1057           }
1058 
1059         } catch (Throwable t) {
1060           if (t instanceof RemoteException) {
1061             t = ((RemoteException) t).unwrapRemoteException();
1062           }
1063           previousException = t;
1064 
1065           // Should we wait a little before retrying? If the server is starting it's yes.
1066           // If the region is already in transition, it's yes as well: we want to be sure that
1067           //  the region will get opened but we don't want a double assignment.
1068           boolean hold = (t instanceof RegionAlreadyInTransitionException ||
1069               t instanceof ServerNotRunningYetException);
1070 
1071           // In case socket is timed out and the region server is still online,
1072           // the openRegion RPC could have been accepted by the server and
1073           // just the response didn't go through.  So we will retry to
1074           // open the region on the same server to avoid possible
1075           // double assignment.
1076           boolean retry = !hold && (t instanceof java.net.SocketTimeoutException
1077               && this.serverManager.isServerOnline(plan.getDestination()));
1078 
1079 
1080           if (hold) {
1081             LOG.warn(assignMsg + ", waiting a little before trying on the same region server " +
1082               "try=" + i + " of " + this.maximumAttempts, t);
1083 
1084             if (maxWaitTime < 0) {
1085               if (t instanceof RegionAlreadyInTransitionException) {
1086                 maxWaitTime = EnvironmentEdgeManager.currentTimeMillis()
1087                   + this.server.getConfiguration().getLong(ALREADY_IN_TRANSITION_WAITTIME,
1088                     DEFAULT_ALREADY_IN_TRANSITION_WAITTIME);
1089               } else {
1090                 maxWaitTime = EnvironmentEdgeManager.currentTimeMillis()
1091                   + this.server.getConfiguration().getLong(
1092                     "hbase.regionserver.rpc.startup.waittime", 60000);
1093               }
1094             }
1095             try {
1096               needNewPlan = false;
1097               long now = EnvironmentEdgeManager.currentTimeMillis();
1098               if (now < maxWaitTime) {
1099                 LOG.debug("Server is not yet up or region is already in transition; "
1100                   + "waiting up to " + (maxWaitTime - now) + "ms", t);
1101                 Thread.sleep(100);
1102                 i--; // reset the try count
1103               } else if (!(t instanceof RegionAlreadyInTransitionException)) {
1104                 LOG.debug("Server is not up for a while; try a new one", t);
1105                 needNewPlan = true;
1106               }
1107             } catch (InterruptedException ie) {
1108               LOG.warn("Failed to assign "
1109                   + region.getRegionNameAsString() + " since interrupted", ie);
1110               regionStates.updateRegionState(region, State.FAILED_OPEN);
1111               Thread.currentThread().interrupt();
1112               return;
1113             }
1114           } else if (retry) {
1115             needNewPlan = false;
1116             i--; // we want to retry as many times as needed as long as the RS is not dead.
1117             LOG.warn(assignMsg + ", trying to assign to the same region server due ", t);
1118           } else {
1119             needNewPlan = true;
1120             LOG.warn(assignMsg + ", trying to assign elsewhere instead;" +
1121                 " try=" + i + " of " + this.maximumAttempts, t);
1122           }
1123         }
1124 
1125         if (i == this.maximumAttempts) {
1126           // Don't reset the region state or get a new plan any more.
1127           // This is the last try.
1128           continue;
1129         }
1130 
1131         // If region opened on destination of present plan, reassigning to new
1132         // RS may cause double assignments. In case of RegionAlreadyInTransitionException
1133         // reassigning to same RS.
1134         if (needNewPlan) {
1135           // Force a new plan and reassign. Will return null if no servers.
1136           // The new plan could be the same as the existing plan since we don't
1137           // exclude the server of the original plan, which should not be
1138           // excluded since it could be the only server up now.
1139           RegionPlan newPlan = null;
1140           try {
1141             newPlan = getRegionPlan(region, true);
1142           } catch (HBaseIOException e) {
1143             LOG.warn("Failed to get region plan", e);
1144           }
1145           if (newPlan == null) {
1146             regionStates.updateRegionState(region, State.FAILED_OPEN);
1147             LOG.warn("Unable to find a viable location to assign region " +
1148                 region.getRegionNameAsString());
1149             return;
1150           }
1151 
1152           if (plan != newPlan && !plan.getDestination().equals(newPlan.getDestination())) {
1153             // Clean out plan we failed execute and one that doesn't look like it'll
1154             // succeed anyways; we need a new plan!
1155             // Transition back to OFFLINE
1156             regionStates.updateRegionState(region, State.OFFLINE);
1157             plan = newPlan;
1158           } else if(plan.getDestination().equals(newPlan.getDestination()) &&
1159               previousException instanceof FailedServerException) {
1160             try {
1161               LOG.info("Trying to re-assign " + region.getRegionNameAsString() +
1162                 " to the same failed server.");
1163               Thread.sleep(1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1164                 RpcClient.FAILED_SERVER_EXPIRY_DEFAULT));
1165             } catch (InterruptedException ie) {
1166               LOG.warn("Failed to assign "
1167                   + region.getRegionNameAsString() + " since interrupted", ie);
1168               regionStates.updateRegionState(region, State.FAILED_OPEN);
1169               Thread.currentThread().interrupt();
1170               return;
1171             }
1172           }
1173         }
1174       }
1175       // Run out of attempts
1176       regionStates.updateRegionState(region, State.FAILED_OPEN);
1177     } finally {
1178       metricsAssignmentManager.updateAssignmentTime(EnvironmentEdgeManager.currentTimeMillis() - startTime);
1179     }
1180   }
1181 
1182   private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) {
1183     if (this.tableStateManager.isTableState(region.getTable(),
1184         ZooKeeperProtos.Table.State.DISABLED,
1185         ZooKeeperProtos.Table.State.DISABLING) || replicasToClose.contains(region)) {
1186       LOG.info("Table " + region.getTable() + " is disabled or disabling;"
1187         + " skipping assign of " + region.getRegionNameAsString());
1188       offlineDisabledRegion(region);
1189       return true;
1190     }
1191     return false;
1192   }
1193 
1194   /**
1195    * @param region the region to assign
1196    * @return Plan for passed <code>region</code> (If none currently, it creates one or
1197    * if no servers to assign, it returns null).
1198    */
1199   private RegionPlan getRegionPlan(final HRegionInfo region,
1200       final boolean forceNewPlan)  throws HBaseIOException {
1201     return getRegionPlan(region, null, forceNewPlan);
1202   }
1203 
1204   /**
1205    * @param region the region to assign
1206    * @param serverToExclude Server to exclude (we know its bad). Pass null if
1207    * all servers are thought to be assignable.
1208    * @param forceNewPlan If true, then if an existing plan exists, a new plan
1209    * will be generated.
1210    * @return Plan for passed <code>region</code> (If none currently, it creates one or
1211    * if no servers to assign, it returns null).
1212    */
1213   private RegionPlan getRegionPlan(final HRegionInfo region,
1214       final ServerName serverToExclude, final boolean forceNewPlan) throws HBaseIOException {
1215     // Pickup existing plan or make a new one
1216     final String encodedName = region.getEncodedName();
1217     final List<ServerName> destServers =
1218       serverManager.createDestinationServersList(serverToExclude);
1219 
1220     if (destServers.isEmpty()){
1221       LOG.warn("Can't move " + encodedName +
1222         ", there is no destination server available.");
1223       return null;
1224     }
1225 
1226     RegionPlan randomPlan = null;
1227     boolean newPlan = false;
1228     RegionPlan existingPlan;
1229 
1230     synchronized (this.regionPlans) {
1231       existingPlan = this.regionPlans.get(encodedName);
1232 
1233       if (existingPlan != null && existingPlan.getDestination() != null) {
1234         LOG.debug("Found an existing plan for " + region.getRegionNameAsString()
1235           + " destination server is " + existingPlan.getDestination() +
1236             " accepted as a dest server = " + destServers.contains(existingPlan.getDestination()));
1237       }
1238 
1239       if (forceNewPlan
1240           || existingPlan == null
1241           || existingPlan.getDestination() == null
1242           || !destServers.contains(existingPlan.getDestination())) {
1243         newPlan = true;
1244         randomPlan = new RegionPlan(region, null,
1245             balancer.randomAssignment(region, destServers));
1246         if (!region.isMetaTable() && shouldAssignRegionsWithFavoredNodes) {
1247           List<HRegionInfo> regions = new ArrayList<HRegionInfo>(1);
1248           regions.add(region);
1249           try {
1250             processFavoredNodes(regions);
1251           } catch (IOException ie) {
1252             LOG.warn("Ignoring exception in processFavoredNodes " + ie);
1253           }
1254         }
1255         this.regionPlans.put(encodedName, randomPlan);
1256       }
1257     }
1258 
1259     if (newPlan) {
1260       if (randomPlan.getDestination() == null) {
1261         LOG.warn("Can't find a destination for " + encodedName);
1262         return null;
1263       }
1264       LOG.debug("No previous transition plan found (or ignoring " +
1265         "an existing plan) for " + region.getRegionNameAsString() +
1266         "; generated random plan=" + randomPlan + "; " + destServers.size() +
1267         " (online=" + serverManager.getOnlineServers().size() +
1268         ") available servers, forceNewPlan=" + forceNewPlan);
1269         return randomPlan;
1270       }
1271     LOG.debug("Using pre-existing plan for " +
1272       region.getRegionNameAsString() + "; plan=" + existingPlan);
1273     return existingPlan;
1274   }
1275 
1276   /**
1277    * Unassigns the specified region.
1278    * <p>
1279    * Updates the RegionState and sends the CLOSE RPC unless region is being
1280    * split by regionserver; then the unassign fails (silently) because we
1281    * presume the region being unassigned no longer exists (its been split out
1282    * of existence). TODO: What to do if split fails and is rolled back and
1283    * parent is revivified?
1284    * <p>
1285    * If a RegionPlan is already set, it will remain.
1286    *
1287    * @param region server to be unassigned
1288    */
1289   public void unassign(HRegionInfo region) {
1290     unassign(region, null);
1291   }
1292 
1293 
1294   /**
1295    * Unassigns the specified region.
1296    * <p>
1297    * Updates the RegionState and sends the CLOSE RPC unless region is being
1298    * split by regionserver; then the unassign fails (silently) because we
1299    * presume the region being unassigned no longer exists (its been split out
1300    * of existence). TODO: What to do if split fails and is rolled back and
1301    * parent is revivified?
1302    * <p>
1303    * If a RegionPlan is already set, it will remain.
1304    *
1305    * @param region server to be unassigned
1306    * @param dest the destination server of the region
1307    */
1308   public void unassign(HRegionInfo region, ServerName dest) {
1309     // TODO: Method needs refactoring.  Ugly buried returns throughout.  Beware!
1310     LOG.debug("Starting unassign of " + region.getRegionNameAsString()
1311       + " (offlining), current state: " + regionStates.getRegionState(region));
1312 
1313     String encodedName = region.getEncodedName();
1314     // Grab the state of this region and synchronize on it
1315     // We need a lock here as we're going to do a put later and we don't want multiple states
1316     //  creation
1317     ReentrantLock lock = locker.acquireLock(encodedName);
1318     RegionState state = regionStates.getRegionTransitionState(encodedName);
1319     try {
1320       if (state == null || state.isFailedClose()) {
1321         if (state == null) {
1322           // Region is not in transition.
1323           // We can unassign it only if it's not SPLIT/MERGED.
1324           state = regionStates.getRegionState(encodedName);
1325           if (state != null && state.isUnassignable()) {
1326             LOG.info("Attempting to unassign " + state + ", ignored");
1327             // Offline region will be reassigned below
1328             return;
1329           }
1330           if (state == null || state.getServerName() == null) {
1331             // We don't know where the region is, offline it.
1332             // No need to send CLOSE RPC
1333             LOG.warn("Attempting to unassign a region not in RegionStates"
1334               + region.getRegionNameAsString() + ", offlined");
1335             regionOffline(region);
1336             return;
1337           }
1338         }
1339         state = regionStates.updateRegionState(
1340           region, State.PENDING_CLOSE);
1341       } else if (state.isFailedOpen()) {
1342         // The region is not open yet
1343         regionOffline(region);
1344         return;
1345       } else {
1346         LOG.debug("Attempting to unassign " +
1347           region.getRegionNameAsString() + " but it is " +
1348           "already in transition (" + state.getState());
1349         return;
1350       }
1351 
1352       unassign(region, state, dest);
1353     } finally {
1354       lock.unlock();
1355 
1356       // Region is expected to be reassigned afterwards
1357       if (!replicasToClose.contains(region)
1358           && regionStates.isRegionInState(region, State.OFFLINE)) {
1359         assign(region);
1360       }
1361     }
1362   }
1363 
1364   /**
1365    * Used by unit tests. Return the number of regions opened so far in the life
1366    * of the master. Increases by one every time the master opens a region
1367    * @return the counter value of the number of regions opened so far
1368    */
1369   public int getNumRegionsOpened() {
1370     return numRegionsOpened.get();
1371   }
1372 
1373   /**
1374    * Waits until the specified region has completed assignment.
1375    * <p>
1376    * If the region is already assigned, returns immediately.  Otherwise, method
1377    * blocks until the region is assigned.
1378    * @param regionInfo region to wait on assignment for
1379    * @throws InterruptedException
1380    */
1381   public boolean waitForAssignment(HRegionInfo regionInfo)
1382       throws InterruptedException {
1383     while (!regionStates.isRegionOnline(regionInfo)) {
1384       if (regionStates.isRegionInState(regionInfo, State.FAILED_OPEN)
1385           || this.server.isStopped()) {
1386         return false;
1387       }
1388 
1389       // We should receive a notification, but it's
1390       //  better to have a timeout to recheck the condition here:
1391       //  it lowers the impact of a race condition if any
1392       regionStates.waitForUpdate(100);
1393     }
1394     return true;
1395   }
1396 
1397   /**
1398    * Assigns the hbase:meta region.
1399    * <p>
1400    * Assumes that hbase:meta is currently closed and is not being actively served by
1401    * any RegionServer.
1402    * <p>
1403    * Forcibly unsets the current meta region location in ZooKeeper and assigns
1404    * hbase:meta to a random RegionServer.
1405    * @throws KeeperException
1406    */
1407   public void assignMeta() throws KeeperException {
1408     this.server.getMetaTableLocator().deleteMetaLocation(this.server.getZooKeeper());
1409     assign(HRegionInfo.FIRST_META_REGIONINFO);
1410   }
1411 
1412   /**
1413    * Assigns specified regions retaining assignments, if any.
1414    * <p>
1415    * This is a synchronous call and will return once every region has been
1416    * assigned.  If anything fails, an exception is thrown
1417    * @throws InterruptedException
1418    * @throws IOException
1419    */
1420   public void assign(Map<HRegionInfo, ServerName> regions)
1421         throws IOException, InterruptedException {
1422     if (regions == null || regions.isEmpty()) {
1423       return;
1424     }
1425     List<ServerName> servers = serverManager.createDestinationServersList();
1426     if (servers == null || servers.isEmpty()) {
1427       throw new IOException("Found no destination server to assign region(s)");
1428     }
1429 
1430     // Reuse existing assignment info
1431     Map<ServerName, List<HRegionInfo>> bulkPlan =
1432       balancer.retainAssignment(regions, servers);
1433 
1434     assign(regions.size(), servers.size(),
1435       "retainAssignment=true", bulkPlan);
1436   }
1437 
1438   /**
1439    * Assigns specified regions round robin, if any.
1440    * <p>
1441    * This is a synchronous call and will return once every region has been
1442    * assigned.  If anything fails, an exception is thrown
1443    * @throws InterruptedException
1444    * @throws IOException
1445    */
1446   public void assign(List<HRegionInfo> regions)
1447         throws IOException, InterruptedException {
1448     if (regions == null || regions.isEmpty()) {
1449       return;
1450     }
1451 
1452     List<ServerName> servers = serverManager.createDestinationServersList();
1453     if (servers == null || servers.isEmpty()) {
1454       throw new IOException("Found no destination server to assign region(s)");
1455     }
1456 
1457     // Generate a round-robin bulk assignment plan
1458     Map<ServerName, List<HRegionInfo>> bulkPlan
1459       = balancer.roundRobinAssignment(regions, servers);
1460     processFavoredNodes(regions);
1461 
1462     assign(regions.size(), servers.size(),
1463       "round-robin=true", bulkPlan);
1464   }
1465 
1466   private void assign(int regions, int totalServers,
1467       String message, Map<ServerName, List<HRegionInfo>> bulkPlan)
1468           throws InterruptedException, IOException {
1469 
1470     int servers = bulkPlan.size();
1471     if (servers == 1 || (regions < bulkAssignThresholdRegions
1472         && servers < bulkAssignThresholdServers)) {
1473 
1474       // Not use bulk assignment.  This could be more efficient in small
1475       // cluster, especially mini cluster for testing, so that tests won't time out
1476       if (LOG.isTraceEnabled()) {
1477         LOG.trace("Not using bulk assignment since we are assigning only " + regions +
1478           " region(s) to " + servers + " server(s)");
1479       }
1480       for (Map.Entry<ServerName, List<HRegionInfo>> plan: bulkPlan.entrySet()) {
1481         if (!assign(plan.getKey(), plan.getValue())) {
1482           for (HRegionInfo region: plan.getValue()) {
1483             if (!regionStates.isRegionOnline(region)) {
1484               invokeAssign(region);
1485             }
1486           }
1487         }
1488       }
1489     } else {
1490       LOG.info("Bulk assigning " + regions + " region(s) across "
1491         + totalServers + " server(s), " + message);
1492 
1493       // Use fixed count thread pool assigning.
1494       BulkAssigner ba = new GeneralBulkAssigner(
1495         this.server, bulkPlan, this, bulkAssignWaitTillAllAssigned);
1496       ba.bulkAssign();
1497       LOG.info("Bulk assigning done");
1498     }
1499   }
1500 
1501   /**
1502    * Assigns all user regions, if any exist.  Used during cluster startup.
1503    * <p>
1504    * This is a synchronous call and will return once every region has been
1505    * assigned.  If anything fails, an exception is thrown and the cluster
1506    * should be shutdown.
1507    * @throws InterruptedException
1508    * @throws IOException
1509    */
1510   private void assignAllUserRegions(Map<HRegionInfo, ServerName> allRegions)
1511       throws IOException, InterruptedException {
1512     if (allRegions == null || allRegions.isEmpty()) return;
1513 
1514     // Determine what type of assignment to do on startup
1515     boolean retainAssignment = server.getConfiguration().
1516       getBoolean("hbase.master.startup.retainassign", true);
1517 
1518     Set<HRegionInfo> regionsFromMetaScan = allRegions.keySet();
1519     if (retainAssignment) {
1520       assign(allRegions);
1521     } else {
1522       List<HRegionInfo> regions = new ArrayList<HRegionInfo>(regionsFromMetaScan);
1523       assign(regions);
1524     }
1525 
1526     for (HRegionInfo hri : regionsFromMetaScan) {
1527       TableName tableName = hri.getTable();
1528       if (!tableStateManager.isTableState(tableName,
1529           ZooKeeperProtos.Table.State.ENABLED)) {
1530         setEnabledTable(tableName);
1531       }
1532     }
1533     // assign all the replicas that were not recorded in the meta
1534     assign(replicaRegionsNotRecordedInMeta(regionsFromMetaScan, (MasterServices)server));
1535   }
1536 
1537   /**
1538    * Get a list of replica regions that are:
1539    * not recorded in meta yet. We might not have recorded the locations
1540    * for the replicas since the replicas may not have been online yet, master restarted
1541    * in the middle of assigning, ZK erased, etc.
1542    * @param regionsRecordedInMeta the list of regions we know are recorded in meta
1543    * either as a default, or, as the location of a replica
1544    * @param master
1545    * @return list of replica regions
1546    * @throws IOException
1547    */
1548   public static List<HRegionInfo> replicaRegionsNotRecordedInMeta(
1549       Set<HRegionInfo> regionsRecordedInMeta, MasterServices master)throws IOException {
1550     List<HRegionInfo> regionsNotRecordedInMeta = new ArrayList<HRegionInfo>();
1551     for (HRegionInfo hri : regionsRecordedInMeta) {
1552       TableName table = hri.getTable();
1553       HTableDescriptor htd = master.getTableDescriptors().get(table);
1554       // look at the HTD for the replica count. That's the source of truth
1555       int desiredRegionReplication = htd.getRegionReplication();
1556       for (int i = 0; i < desiredRegionReplication; i++) {
1557         HRegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hri, i);
1558         if (regionsRecordedInMeta.contains(replica)) continue;
1559         regionsNotRecordedInMeta.add(replica);
1560       }
1561     }
1562     return regionsNotRecordedInMeta;
1563   }
1564 
1565   /**
1566    * Rebuild the list of user regions and assignment information.
1567    * <p>
1568    * Returns a set of servers that are not found to be online that hosted
1569    * some regions.
1570    * @return set of servers not online that hosted some regions per meta
1571    * @throws IOException
1572    */
1573   Set<ServerName> rebuildUserRegions() throws
1574       IOException, KeeperException, CoordinatedStateException {
1575     Set<TableName> disabledOrEnablingTables = tableStateManager.getTablesInStates(
1576       ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.ENABLING);
1577 
1578     Set<TableName> disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
1579       ZooKeeperProtos.Table.State.DISABLED,
1580       ZooKeeperProtos.Table.State.DISABLING,
1581       ZooKeeperProtos.Table.State.ENABLING);
1582 
1583     // Region assignment from META
1584     List<Result> results = MetaTableAccessor.fullScanOfMeta(server.getShortCircuitConnection());
1585     // Get any new but slow to checkin region server that joined the cluster
1586     Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
1587     // Set of offline servers to be returned
1588     Set<ServerName> offlineServers = new HashSet<ServerName>();
1589     // Iterate regions in META
1590     for (Result result : results) {
1591       if (result == null && LOG.isDebugEnabled()){
1592         LOG.debug("null result from meta - ignoring but this is strange.");
1593         continue;
1594       }
1595       // keep a track of replicas to close. These were the replicas of the originally
1596       // unmerged regions. The master might have closed them before but it mightn't
1597       // maybe because it crashed.
1598       PairOfSameType<HRegionInfo> p = MetaTableAccessor.getMergeRegions(result);
1599       if (p.getFirst() != null && p.getSecond() != null) {
1600         int numReplicas = ((MasterServices)server).getTableDescriptors().get(p.getFirst().
1601             getTable()).getRegionReplication();
1602         for (HRegionInfo merge : p) {
1603           for (int i = 1; i < numReplicas; i++) {
1604             replicasToClose.add(RegionReplicaUtil.getRegionInfoForReplica(merge, i));
1605           }
1606         }
1607       }
1608       RegionLocations rl =  MetaTableAccessor.getRegionLocations(result);
1609       if (rl == null) continue;
1610       HRegionLocation[] locations = rl.getRegionLocations();
1611       if (locations == null) continue;
1612       for (HRegionLocation hrl : locations) {
1613         HRegionInfo regionInfo = hrl.getRegionInfo();
1614         if (regionInfo == null) continue;
1615         int replicaId = regionInfo.getReplicaId();
1616         State state = RegionStateStore.getRegionState(result, replicaId);
1617         // keep a track of replicas to close. These were the replicas of the split parents
1618         // from the previous life of the master. The master should have closed them before
1619         // but it couldn't maybe because it crashed
1620         if (replicaId == 0 && state.equals(State.SPLIT)) {
1621           for (HRegionLocation h : locations) {
1622             replicasToClose.add(h.getRegionInfo());
1623           }
1624         }
1625         ServerName lastHost = hrl.getServerName();
1626         ServerName regionLocation = RegionStateStore.getRegionServer(result, replicaId);
1627         regionStates.createRegionState(regionInfo, state, regionLocation, lastHost);
1628         if (!regionStates.isRegionInState(regionInfo, State.OPEN)) {
1629           // Region is not open (either offline or in transition), skip
1630           continue;
1631         }
1632         TableName tableName = regionInfo.getTable();
1633         if (!onlineServers.contains(regionLocation)) {
1634           // Region is located on a server that isn't online
1635           offlineServers.add(regionLocation);
1636         } else if (!disabledOrEnablingTables.contains(tableName)) {
1637           // Region is being served and on an active server
1638           // add only if region not in disabled or enabling table
1639           regionStates.regionOnline(regionInfo, regionLocation);
1640           balancer.regionOnline(regionInfo, regionLocation);
1641         }
1642         // need to enable the table if not disabled or disabling or enabling
1643         // this will be used in rolling restarts
1644         if (!disabledOrDisablingOrEnabling.contains(tableName)
1645           && !getTableStateManager().isTableState(tableName,
1646             ZooKeeperProtos.Table.State.ENABLED)) {
1647           setEnabledTable(tableName);
1648         }
1649       }
1650     }
1651     return offlineServers;
1652   }
1653 
1654   /**
1655    * Recover the tables that were not fully moved to DISABLED state. These
1656    * tables are in DISABLING state when the master restarted/switched.
1657    *
1658    * @throws KeeperException
1659    * @throws TableNotFoundException
1660    * @throws IOException
1661    */
1662   private void recoverTableInDisablingState()
1663       throws KeeperException, IOException, CoordinatedStateException {
1664     Set<TableName> disablingTables =
1665       tableStateManager.getTablesInStates(ZooKeeperProtos.Table.State.DISABLING);
1666     if (disablingTables.size() != 0) {
1667       for (TableName tableName : disablingTables) {
1668         // Recover by calling DisableTableHandler
1669         LOG.info("The table " + tableName
1670             + " is in DISABLING state.  Hence recovering by moving the table"
1671             + " to DISABLED state.");
1672         new DisableTableHandler(this.server, tableName,
1673             this, tableLockManager, true).prepare().process();
1674       }
1675     }
1676   }
1677 
1678   /**
1679    * Recover the tables that are not fully moved to ENABLED state. These tables
1680    * are in ENABLING state when the master restarted/switched
1681    *
1682    * @throws KeeperException
1683    * @throws org.apache.hadoop.hbase.TableNotFoundException
1684    * @throws IOException
1685    */
1686   private void recoverTableInEnablingState()
1687       throws KeeperException, IOException, CoordinatedStateException {
1688     Set<TableName> enablingTables = tableStateManager.
1689       getTablesInStates(ZooKeeperProtos.Table.State.ENABLING);
1690     if (enablingTables.size() != 0) {
1691       for (TableName tableName : enablingTables) {
1692         // Recover by calling EnableTableHandler
1693         LOG.info("The table " + tableName
1694             + " is in ENABLING state.  Hence recovering by moving the table"
1695             + " to ENABLED state.");
1696         // enableTable in sync way during master startup,
1697         // no need to invoke coprocessor
1698         EnableTableHandler eth = new EnableTableHandler(this.server, tableName,
1699           this, tableLockManager, true);
1700         try {
1701           eth.prepare();
1702         } catch (TableNotFoundException e) {
1703           LOG.warn("Table " + tableName + " not found in hbase:meta to recover.");
1704           continue;
1705         }
1706         eth.process();
1707       }
1708     }
1709   }
1710 
1711   /**
1712    * Processes list of dead servers from result of hbase:meta scan and regions in RIT
1713    *
1714    * @param deadServers
1715    *          The list of dead servers which failed while there was no active
1716    *          master. Can be null.
1717    */
1718   private void processDeadServers(Set<ServerName> deadServers) {
1719     if (deadServers != null && !deadServers.isEmpty()) {
1720       for (ServerName serverName: deadServers) {
1721         if (!serverManager.isServerDead(serverName)) {
1722           serverManager.expireServer(serverName); // Let SSH do region re-assign
1723         }
1724       }
1725     }
1726 
1727     // We need to send RPC call again for PENDING_OPEN/PENDING_CLOSE regions
1728     // in case the RPC call is not sent out yet before the master was shut down
1729     // since we update the state before we send the RPC call. We can't update
1730     // the state after the RPC call. Otherwise, we don't know what's happened
1731     // to the region if the master dies right after the RPC call is out.
1732     Map<String, RegionState> rits = regionStates.getRegionsInTransition();
1733     for (RegionState regionState: rits.values()) {
1734       if (!serverManager.isServerOnline(regionState.getServerName())) {
1735         continue; // SSH will handle it
1736       }
1737       State state = regionState.getState();
1738       LOG.info("Processing " + regionState);
1739       switch (state) {
1740       case PENDING_OPEN:
1741         retrySendRegionOpen(regionState);
1742         break;
1743       case PENDING_CLOSE:
1744         retrySendRegionClose(regionState);
1745         break;
1746       default:
1747         // No process for other states
1748       }
1749     }
1750   }
1751 
1752   /**
1753    * At master failover, for pending_open region, make sure
1754    * sendRegionOpen RPC call is sent to the target regionserver
1755    */
1756   private void retrySendRegionOpen(final RegionState regionState) {
1757     this.executorService.submit(
1758       new EventHandler(server, EventType.M_MASTER_RECOVERY) {
1759         @Override
1760         public void process() throws IOException {
1761           HRegionInfo hri = regionState.getRegion();
1762           ServerName serverName = regionState.getServerName();
1763           ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
1764           try {
1765             if (!regionState.equals(regionStates.getRegionState(hri))) {
1766               return; // Region is not in the expected state any more
1767             }
1768             while (serverManager.isServerOnline(serverName)
1769                 && !server.isStopped() && !server.isAborted()) {
1770               try {
1771                 List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
1772                 if (shouldAssignRegionsWithFavoredNodes) {
1773                   favoredNodes = ((FavoredNodeLoadBalancer)balancer).getFavoredNodes(hri);
1774                 }
1775                 RegionOpeningState regionOpenState = serverManager.sendRegionOpen(
1776                   serverName, hri, favoredNodes);
1777 
1778                 if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
1779                   // Failed opening this region, this means the target server didn't get
1780                   // the original region open RPC, so re-assign it with a new plan
1781                   LOG.debug("Got failed_opening in retry sendRegionOpen for "
1782                     + regionState + ", re-assign it");
1783                   invokeAssign(hri, true);
1784                 }
1785                 return; // Done.
1786               } catch (Throwable t) {
1787                 if (t instanceof RemoteException) {
1788                   t = ((RemoteException) t).unwrapRemoteException();
1789                 }
1790                 // In case SocketTimeoutException/FailedServerException, retry
1791                 if (t instanceof java.net.SocketTimeoutException
1792                     || t instanceof FailedServerException) {
1793                   Threads.sleep(100);
1794                   continue;
1795                 }
1796                 // For other exceptions, re-assign it
1797                 LOG.debug("Got exception in retry sendRegionOpen for "
1798                   + regionState + ", re-assign it", t);
1799                 invokeAssign(hri);
1800                 return; // Done.
1801               }
1802             }
1803           } finally {
1804             lock.unlock();
1805           }
1806         }
1807       });
1808   }
1809 
1810   /**
1811    * At master failover, for pending_close region, make sure
1812    * sendRegionClose RPC call is sent to the target regionserver
1813    */
1814   private void retrySendRegionClose(final RegionState regionState) {
1815     this.executorService.submit(
1816       new EventHandler(server, EventType.M_MASTER_RECOVERY) {
1817         @Override
1818         public void process() throws IOException {
1819           HRegionInfo hri = regionState.getRegion();
1820           ServerName serverName = regionState.getServerName();
1821           ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
1822           try {
1823             if (!regionState.equals(regionStates.getRegionState(hri))) {
1824               return; // Region is not in the expected state any more
1825             }
1826             while (serverManager.isServerOnline(serverName)
1827                 && !server.isStopped() && !server.isAborted()) {
1828               try {
1829                 if (!serverManager.sendRegionClose(serverName, hri, null)) {
1830                   // This means the region is still on the target server
1831                   LOG.debug("Got false in retry sendRegionClose for "
1832                     + regionState + ", re-close it");
1833                   invokeUnAssign(hri);
1834                 }
1835                 return; // Done.
1836               } catch (Throwable t) {
1837                 if (t instanceof RemoteException) {
1838                   t = ((RemoteException) t).unwrapRemoteException();
1839                 }
1840                 // In case SocketTimeoutException/FailedServerException, retry
1841                 if (t instanceof java.net.SocketTimeoutException
1842                     || t instanceof FailedServerException) {
1843                   Threads.sleep(100);
1844                   continue;
1845                 }
1846                 if (!(t instanceof NotServingRegionException
1847                     || t instanceof RegionAlreadyInTransitionException)) {
1848                   // NotServingRegionException/RegionAlreadyInTransitionException
1849                   // means the target server got the original region close request.
1850                   // For other exceptions, re-close it
1851                   LOG.debug("Got exception in retry sendRegionClose for "
1852                     + regionState + ", re-close it", t);
1853                   invokeUnAssign(hri);
1854                 }
1855                 return; // Done.
1856               }
1857             }
1858           } finally {
1859             lock.unlock();
1860           }
1861         }
1862       });
1863   }
1864 
1865   /**
1866    * Set Regions in transitions metrics.
1867    * This takes an iterator on the RegionInTransition map (CLSM), and is not synchronized.
1868    * This iterator is not fail fast, which may lead to stale read; but that's better than
1869    * creating a copy of the map for metrics computation, as this method will be invoked
1870    * on a frequent interval.
1871    */
1872   public void updateRegionsInTransitionMetrics() {
1873     long currentTime = System.currentTimeMillis();
1874     int totalRITs = 0;
1875     int totalRITsOverThreshold = 0;
1876     long oldestRITTime = 0;
1877     int ritThreshold = this.server.getConfiguration().
1878       getInt(HConstants.METRICS_RIT_STUCK_WARNING_THRESHOLD, 60000);
1879     for (RegionState state: regionStates.getRegionsInTransition().values()) {
1880       totalRITs++;
1881       long ritTime = currentTime - state.getStamp();
1882       if (ritTime > ritThreshold) { // more than the threshold
1883         totalRITsOverThreshold++;
1884       }
1885       if (oldestRITTime < ritTime) {
1886         oldestRITTime = ritTime;
1887       }
1888     }
1889     if (this.metricsAssignmentManager != null) {
1890       this.metricsAssignmentManager.updateRITOldestAge(oldestRITTime);
1891       this.metricsAssignmentManager.updateRITCount(totalRITs);
1892       this.metricsAssignmentManager.updateRITCountOverThreshold(totalRITsOverThreshold);
1893     }
1894   }
1895 
1896   /**
1897    * @param region Region whose plan we are to clear.
1898    */
1899   void clearRegionPlan(final HRegionInfo region) {
1900     synchronized (this.regionPlans) {
1901       this.regionPlans.remove(region.getEncodedName());
1902     }
1903   }
1904 
1905   /**
1906    * Wait on region to clear regions-in-transition.
1907    * @param hri Region to wait on.
1908    * @throws IOException
1909    */
1910   public void waitOnRegionToClearRegionsInTransition(final HRegionInfo hri)
1911       throws IOException, InterruptedException {
1912     waitOnRegionToClearRegionsInTransition(hri, -1L);
1913   }
1914 
1915   /**
1916    * Wait on region to clear regions-in-transition or time out
1917    * @param hri
1918    * @param timeOut Milliseconds to wait for current region to be out of transition state.
1919    * @return True when a region clears regions-in-transition before timeout otherwise false
1920    * @throws InterruptedException
1921    */
1922   public boolean waitOnRegionToClearRegionsInTransition(final HRegionInfo hri, long timeOut)
1923       throws InterruptedException {
1924     if (!regionStates.isRegionInTransition(hri)) return true;
1925     long end = (timeOut <= 0) ? Long.MAX_VALUE : EnvironmentEdgeManager.currentTimeMillis()
1926         + timeOut;
1927     // There is already a timeout monitor on regions in transition so I
1928     // should not have to have one here too?
1929     LOG.info("Waiting for " + hri.getEncodedName() +
1930         " to leave regions-in-transition, timeOut=" + timeOut + " ms.");
1931     while (!this.server.isStopped() && regionStates.isRegionInTransition(hri)) {
1932       regionStates.waitForUpdate(100);
1933       if (EnvironmentEdgeManager.currentTimeMillis() > end) {
1934         LOG.info("Timed out on waiting for " + hri.getEncodedName() + " to be assigned.");
1935         return false;
1936       }
1937     }
1938     if (this.server.isStopped()) {
1939       LOG.info("Giving up wait on regions in transition because stoppable.isStopped is set");
1940       return false;
1941     }
1942     return true;
1943   }
1944 
1945   void invokeAssign(HRegionInfo regionInfo) {
1946     invokeAssign(regionInfo, true);
1947   }
1948 
1949   void invokeAssign(HRegionInfo regionInfo, boolean newPlan) {
1950     threadPoolExecutorService.submit(new AssignCallable(this, regionInfo, newPlan));
1951   }
1952 
1953   void invokeUnAssign(HRegionInfo regionInfo) {
1954     threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
1955   }
1956 
1957   public boolean isCarryingMeta(ServerName serverName) {
1958     return isCarryingRegion(serverName, HRegionInfo.FIRST_META_REGIONINFO);
1959   }
1960 
1961   /**
1962    * Check if the shutdown server carries the specific region.
1963    * @return whether the serverName currently hosts the region
1964    */
1965   private boolean isCarryingRegion(ServerName serverName, HRegionInfo hri) {
1966     RegionState regionState = regionStates.getRegionTransitionState(hri);
1967     ServerName transitionAddr = regionState != null? regionState.getServerName(): null;
1968     if (transitionAddr != null) {
1969       boolean matchTransitionAddr = transitionAddr.equals(serverName);
1970       LOG.debug("Checking region=" + hri.getRegionNameAsString()
1971         + ", transitioning on server=" + matchTransitionAddr
1972         + " server being checked: " + serverName
1973         + ", matches=" + matchTransitionAddr);
1974       return matchTransitionAddr;
1975     }
1976 
1977     ServerName assignedAddr = regionStates.getRegionServerOfRegion(hri);
1978     boolean matchAssignedAddr = serverName.equals(assignedAddr);
1979     LOG.debug("based on AM, current region=" + hri.getRegionNameAsString()
1980       + " is on server=" + assignedAddr + ", server being checked: "
1981       + serverName);
1982     return matchAssignedAddr;
1983   }
1984 
1985   /**
1986    * Process shutdown server removing any assignments.
1987    * @param sn Server that went down.
1988    * @return list of regions in transition on this server
1989    */
1990   public List<HRegionInfo> processServerShutdown(final ServerName sn) {
1991     // Clean out any existing assignment plans for this server
1992     synchronized (this.regionPlans) {
1993       for (Iterator <Map.Entry<String, RegionPlan>> i =
1994           this.regionPlans.entrySet().iterator(); i.hasNext();) {
1995         Map.Entry<String, RegionPlan> e = i.next();
1996         ServerName otherSn = e.getValue().getDestination();
1997         // The name will be null if the region is planned for a random assign.
1998         if (otherSn != null && otherSn.equals(sn)) {
1999           // Use iterator's remove else we'll get CME
2000           i.remove();
2001         }
2002       }
2003     }
2004     List<HRegionInfo> regions = regionStates.serverOffline(sn);
2005     for (Iterator<HRegionInfo> it = regions.iterator(); it.hasNext(); ) {
2006       HRegionInfo hri = it.next();
2007       String encodedName = hri.getEncodedName();
2008 
2009       // We need a lock on the region as we could update it
2010       Lock lock = locker.acquireLock(encodedName);
2011       try {
2012         RegionState regionState =
2013           regionStates.getRegionTransitionState(encodedName);
2014         if (regionState == null
2015             || (regionState.getServerName() != null && !regionState.isOnServer(sn))
2016             || !(regionState.isFailedClose() || regionState.isOffline()
2017               || regionState.isPendingOpenOrOpening())) {
2018           LOG.info("Skip " + regionState + " since it is not opening/failed_close"
2019             + " on the dead server any more: " + sn);
2020           it.remove();
2021         } else {
2022           if (tableStateManager.isTableState(hri.getTable(),
2023               ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
2024             regionStates.regionOffline(hri);
2025             it.remove();
2026             continue;
2027           }
2028           // Mark the region offline and assign it again by SSH
2029           regionStates.updateRegionState(hri, State.OFFLINE);
2030         }
2031       } finally {
2032         lock.unlock();
2033       }
2034     }
2035     return regions;
2036   }
2037 
2038   /**
2039    * @param plan Plan to execute.
2040    */
2041   public void balance(final RegionPlan plan) {
2042     HRegionInfo hri = plan.getRegionInfo();
2043     TableName tableName = hri.getTable();
2044     if (tableStateManager.isTableState(tableName,
2045       ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
2046       LOG.info("Ignored moving region of disabling/disabled table "
2047         + tableName);
2048       return;
2049     }
2050 
2051     // Move the region only if it's assigned
2052     String encodedName = hri.getEncodedName();
2053     ReentrantLock lock = locker.acquireLock(encodedName);
2054     try {
2055       if (!regionStates.isRegionOnline(hri)) {
2056         RegionState state = regionStates.getRegionState(encodedName);
2057         LOG.info("Ignored moving region not assigned: " + hri + ", "
2058           + (state == null ? "not in region states" : state));
2059         return;
2060       }
2061       synchronized (this.regionPlans) {
2062         this.regionPlans.put(plan.getRegionName(), plan);
2063       }
2064       unassign(hri, plan.getDestination());
2065     } finally {
2066       lock.unlock();
2067     }
2068   }
2069 
2070   public void stop() {
2071     // Shutdown the threadpool executor service
2072     threadPoolExecutorService.shutdownNow();
2073     regionStateStore.stop();
2074   }
2075 
2076   protected void setEnabledTable(TableName tableName) {
2077     try {
2078       this.tableStateManager.setTableState(tableName,
2079         ZooKeeperProtos.Table.State.ENABLED);
2080     } catch (CoordinatedStateException e) {
2081       // here we can abort as it is the start up flow
2082       String errorMsg = "Unable to ensure that the table " + tableName
2083           + " will be" + " enabled because of a ZooKeeper issue";
2084       LOG.error(errorMsg);
2085       this.server.abort(errorMsg, e);
2086     }
2087   }
2088 
2089   private void onRegionFailedOpen(
2090       final HRegionInfo hri, final ServerName sn) {
2091     String encodedName = hri.getEncodedName();
2092     AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
2093     if (failedOpenCount == null) {
2094       failedOpenCount = new AtomicInteger();
2095       // No need to use putIfAbsent, or extra synchronization since
2096       // this whole handleRegion block is locked on the encoded region
2097       // name, and failedOpenTracker is updated only in this block
2098       failedOpenTracker.put(encodedName, failedOpenCount);
2099     }
2100     if (failedOpenCount.incrementAndGet() >= maximumAttempts) {
2101       regionStates.updateRegionState(hri, State.FAILED_OPEN);
2102       // remove the tracking info to save memory, also reset
2103       // the count for next open initiative
2104       failedOpenTracker.remove(encodedName);
2105     } else {
2106       // Handle this the same as if it were opened and then closed.
2107       RegionState regionState = regionStates.updateRegionState(hri, State.CLOSED);
2108       if (regionState != null) {
2109         // When there are more than one region server a new RS is selected as the
2110         // destination and the same is updated in the region plan. (HBASE-5546)
2111         if (getTableStateManager().isTableState(hri.getTable(),
2112             ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING) ||
2113             replicasToClose.contains(hri)) {
2114           offlineDisabledRegion(hri);
2115           return;
2116         }
2117         // ZK Node is in CLOSED state, assign it.
2118          regionStates.updateRegionState(hri, RegionState.State.CLOSED);
2119         // This below has to do w/ online enable/disable of a table
2120         removeClosedRegion(hri);
2121         try {
2122           getRegionPlan(hri, sn, true);
2123         } catch (HBaseIOException e) {
2124           LOG.warn("Failed to get region plan", e);
2125         }
2126         invokeAssign(hri, false);
2127       }
2128     }
2129   }
2130 
2131   private void onRegionOpen(
2132       final HRegionInfo hri, final ServerName sn, long openSeqNum) {
2133     regionOnline(hri, sn, openSeqNum);
2134 
2135     // reset the count, if any
2136     failedOpenTracker.remove(hri.getEncodedName());
2137     if (getTableStateManager().isTableState(hri.getTable(),
2138         ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
2139       invokeUnAssign(hri);
2140     }
2141   }
2142 
2143   private void onRegionClosed(final HRegionInfo hri) {
2144     if (getTableStateManager().isTableState(hri.getTable(),
2145         ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING) ||
2146         replicasToClose.contains(hri)) {
2147       offlineDisabledRegion(hri);
2148       return;
2149     }
2150     regionStates.updateRegionState(hri, RegionState.State.CLOSED);
2151     sendRegionClosedNotification(hri);
2152     // This below has to do w/ online enable/disable of a table
2153     removeClosedRegion(hri);
2154     invokeAssign(hri, false);
2155   }
2156 
2157   private String onRegionSplit(ServerName sn, TransitionCode code,
2158       final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) {
2159     final RegionState rs_p = regionStates.getRegionState(p);
2160     RegionState rs_a = regionStates.getRegionState(a);
2161     RegionState rs_b = regionStates.getRegionState(b);
2162     if (!(rs_p.isOpenOrSplittingOnServer(sn)
2163         && (rs_a == null || rs_a.isOpenOrSplittingNewOnServer(sn))
2164         && (rs_b == null || rs_b.isOpenOrSplittingNewOnServer(sn)))) {
2165       return "Not in state good for split";
2166     }
2167 
2168     regionStates.updateRegionState(a, State.SPLITTING_NEW, sn);
2169     regionStates.updateRegionState(b, State.SPLITTING_NEW, sn);
2170     regionStates.updateRegionState(p, State.SPLITTING);
2171 
2172     if (code == TransitionCode.SPLIT) {
2173       if (TEST_SKIP_SPLIT_HANDLING) {
2174         return "Skipping split message, TEST_SKIP_SPLIT_HANDLING is set";
2175       }
2176       regionOffline(p, State.SPLIT);
2177       regionOnline(a, sn, 1);
2178       regionOnline(b, sn, 1);
2179 
2180       // User could disable the table before master knows the new region.
2181       if (getTableStateManager().isTableState(p.getTable(),
2182           ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
2183         invokeUnAssign(a);
2184         invokeUnAssign(b);
2185       } else {
2186         Callable<Object> splitReplicasCallable = new Callable<Object>() {
2187           @Override
2188           public Object call() {
2189             doSplittingOfReplicas(p, a, b);
2190             return null;
2191           }
2192         };
2193         threadPoolExecutorService.submit(splitReplicasCallable);
2194       }
2195     } else if (code == TransitionCode.SPLIT_PONR) {
2196       try {
2197         regionStates.splitRegion(p, a, b, sn);
2198       } catch (IOException ioe) {
2199         LOG.info("Failed to record split region " + p.getShortNameToLog());
2200         return "Failed to record the splitting in meta";
2201       }
2202     } else if (code == TransitionCode.SPLIT_REVERTED) {
2203       regionOnline(p, sn);
2204       regionOffline(a);
2205       regionOffline(b);
2206 
2207       if (getTableStateManager().isTableState(p.getTable(),
2208           ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
2209         invokeUnAssign(p);
2210       }
2211     }
2212     return null;
2213   }
2214 
2215   private String onRegionMerge(ServerName sn, TransitionCode code,
2216       final HRegionInfo p, final HRegionInfo a, final HRegionInfo b) {
2217     RegionState rs_p = regionStates.getRegionState(p);
2218     RegionState rs_a = regionStates.getRegionState(a);
2219     RegionState rs_b = regionStates.getRegionState(b);
2220     if (!(rs_a.isOpenOrMergingOnServer(sn) && rs_b.isOpenOrMergingOnServer(sn)
2221         && (rs_p == null || rs_p.isOpenOrMergingNewOnServer(sn)))) {
2222       return "Not in state good for merge";
2223     }
2224 
2225     regionStates.updateRegionState(a, State.MERGING);
2226     regionStates.updateRegionState(b, State.MERGING);
2227     regionStates.updateRegionState(p, State.MERGING_NEW, sn);
2228 
2229     String encodedName = p.getEncodedName();
2230     if (code == TransitionCode.READY_TO_MERGE) {
2231       mergingRegions.put(encodedName,
2232         new PairOfSameType<HRegionInfo>(a, b));
2233     } else if (code == TransitionCode.MERGED) {
2234       mergingRegions.remove(encodedName);
2235       regionOffline(a, State.MERGED);
2236       regionOffline(b, State.MERGED);
2237       regionOnline(p, sn, 1);
2238 
2239       // User could disable the table before master knows the new region.
2240       if (getTableStateManager().isTableState(p.getTable(),
2241           ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
2242         invokeUnAssign(p);
2243       } else {
2244         Callable<Object> mergeReplicasCallable = new Callable<Object>() {
2245           @Override
2246           public Object call() {
2247             doMergingOfReplicas(p, a, b);
2248             return null;
2249           }
2250         };
2251         threadPoolExecutorService.submit(mergeReplicasCallable);
2252       }
2253     } else if (code == TransitionCode.MERGE_PONR) {
2254       try {
2255         regionStates.mergeRegions(p, a, b, sn);
2256       } catch (IOException ioe) {
2257         LOG.info("Failed to record merged region " + p.getShortNameToLog());
2258         return "Failed to record the merging in meta";
2259       }
2260     } else {
2261       mergingRegions.remove(encodedName);
2262       regionOnline(a, sn);
2263       regionOnline(b, sn);
2264       regionOffline(p);
2265 
2266       if (getTableStateManager().isTableState(p.getTable(),
2267           ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
2268         invokeUnAssign(a);
2269         invokeUnAssign(b);
2270       }
2271     }
2272     return null;
2273   }
2274 
2275   private void doMergingOfReplicas(HRegionInfo mergedHri, final HRegionInfo hri_a,
2276       final HRegionInfo hri_b) {
2277     // Close replicas for the original unmerged regions. create/assign new replicas
2278     // for the merged parent.
2279     List<HRegionInfo> unmergedRegions = new ArrayList<HRegionInfo>();
2280     unmergedRegions.add(hri_a);
2281     unmergedRegions.add(hri_b);
2282     Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(unmergedRegions);
2283     Collection<List<HRegionInfo>> c = map.values();
2284     for (List<HRegionInfo> l : c) {
2285       for (HRegionInfo h : l) {
2286         if (!RegionReplicaUtil.isDefaultReplica(h)) {
2287           LOG.debug("Unassigning un-merged replica " + h);
2288           unassign(h);
2289         }
2290       }
2291     }
2292     int numReplicas = 1;
2293     try {
2294       numReplicas = ((MasterServices)server).getTableDescriptors().get(mergedHri.getTable()).
2295           getRegionReplication();
2296     } catch (IOException e) {
2297       LOG.warn("Couldn't get the replication attribute of the table " + mergedHri.getTable() +
2298           " due to " + e.getMessage() + ". The assignment of replicas for the merged region " +
2299           "will not be done");
2300     }
2301     List<HRegionInfo> regions = new ArrayList<HRegionInfo>();
2302     for (int i = 1; i < numReplicas; i++) {
2303       regions.add(RegionReplicaUtil.getRegionInfoForReplica(mergedHri, i));
2304     }
2305     try {
2306       assign(regions);
2307     } catch (IOException ioe) {
2308       LOG.warn("Couldn't assign all replica(s) of region " + mergedHri + " because of " +
2309                 ioe.getMessage());
2310     } catch (InterruptedException ie) {
2311       LOG.warn("Couldn't assign all replica(s) of region " + mergedHri+ " because of " +
2312                 ie.getMessage());
2313     }
2314   }
2315 
2316   private void doSplittingOfReplicas(final HRegionInfo parentHri, final HRegionInfo hri_a,
2317       final HRegionInfo hri_b) {
2318     // create new regions for the replica, and assign them to match with the
2319     // current replica assignments. If replica1 of parent is assigned to RS1,
2320     // the replica1s of daughters will be on the same machine
2321     int numReplicas = 1;
2322     try {
2323       numReplicas = ((MasterServices)server).getTableDescriptors().get(parentHri.getTable()).
2324           getRegionReplication();
2325     } catch (IOException e) {
2326       LOG.warn("Couldn't get the replication attribute of the table " + parentHri.getTable() +
2327           " due to " + e.getMessage() + ". The assignment of daughter replicas " +
2328           "replicas will not be done");
2329     }
2330     // unassign the old replicas
2331     List<HRegionInfo> parentRegion = new ArrayList<HRegionInfo>();
2332     parentRegion.add(parentHri);
2333     Map<ServerName, List<HRegionInfo>> currentAssign =
2334         regionStates.getRegionAssignments(parentRegion);
2335     Collection<List<HRegionInfo>> c = currentAssign.values();
2336     for (List<HRegionInfo> l : c) {
2337       for (HRegionInfo h : l) {
2338         if (!RegionReplicaUtil.isDefaultReplica(h)) {
2339           LOG.debug("Unassigning parent's replica " + h);
2340           unassign(h);
2341         }
2342       }
2343     }
2344     // assign daughter replicas
2345     Map<HRegionInfo, ServerName> map = new HashMap<HRegionInfo, ServerName>();
2346     for (int i = 1; i < numReplicas; i++) {
2347       prepareDaughterReplicaForAssignment(hri_a, parentHri, i, map);
2348       prepareDaughterReplicaForAssignment(hri_b, parentHri, i, map);
2349     }
2350     try {
2351       assign(map);
2352     } catch (IOException e) {
2353       LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)");
2354     } catch (InterruptedException e) {
2355       LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)");
2356     }
2357   }
2358 
2359   private void prepareDaughterReplicaForAssignment(HRegionInfo daughterHri, HRegionInfo parentHri,
2360       int replicaId, Map<HRegionInfo, ServerName> map) {
2361     HRegionInfo parentReplica = RegionReplicaUtil.getRegionInfoForReplica(parentHri, replicaId);
2362     HRegionInfo daughterReplica = RegionReplicaUtil.getRegionInfoForReplica(daughterHri,
2363         replicaId);
2364     LOG.debug("Created replica region for daughter " + daughterReplica);
2365     ServerName sn;
2366     if ((sn = regionStates.getRegionServerOfRegion(parentReplica)) != null) {
2367       map.put(daughterReplica, sn);
2368     } else {
2369       List<ServerName> servers = serverManager.getOnlineServersList();
2370       sn = servers.get((new Random(System.currentTimeMillis())).nextInt(servers.size()));
2371       map.put(daughterReplica, sn);
2372     }
2373   }
2374 
2375   public Set<HRegionInfo> getReplicasToClose() {
2376     return replicasToClose;
2377   }
2378 
2379   /**
2380    * A region is offline.  The new state should be the specified one,
2381    * if not null.  If the specified state is null, the new state is Offline.
2382    * The specified state can be Split/Merged/Offline/null only.
2383    */
2384   private void regionOffline(final HRegionInfo regionInfo, final State state) {
2385     regionStates.regionOffline(regionInfo, state);
2386     removeClosedRegion(regionInfo);
2387     // remove the region plan as well just in case.
2388     clearRegionPlan(regionInfo);
2389     balancer.regionOffline(regionInfo);
2390 
2391     // Tell our listeners that a region was closed
2392     sendRegionClosedNotification(regionInfo);
2393     // also note that all the replicas of the primary should be closed
2394     if (state != null && state.equals(State.SPLIT)) {
2395       Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1);
2396       c.add(regionInfo);
2397       Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(c);
2398       Collection<List<HRegionInfo>> allReplicas = map.values();
2399       for (List<HRegionInfo> list : allReplicas) {
2400         replicasToClose.addAll(list);
2401       }
2402     }
2403     else if (state != null && state.equals(State.MERGED)) {
2404       Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1);
2405       c.add(regionInfo);
2406       Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(c);
2407       Collection<List<HRegionInfo>> allReplicas = map.values();
2408       for (List<HRegionInfo> list : allReplicas) {
2409         replicasToClose.addAll(list);
2410       }
2411     }
2412   }
2413 
2414   private void sendRegionOpenedNotification(final HRegionInfo regionInfo,
2415       final ServerName serverName) {
2416     if (!this.listeners.isEmpty()) {
2417       for (AssignmentListener listener : this.listeners) {
2418         listener.regionOpened(regionInfo, serverName);
2419       }
2420     }
2421   }
2422 
2423   private void sendRegionClosedNotification(final HRegionInfo regionInfo) {
2424     if (!this.listeners.isEmpty()) {
2425       for (AssignmentListener listener : this.listeners) {
2426         listener.regionClosed(regionInfo);
2427       }
2428     }
2429   }
2430 
2431   /**
2432    * Try to update some region states. If the state machine prevents
2433    * such update, an error message is returned to explain the reason.
2434    *
2435    * It's expected that in each transition there should have just one
2436    * region for opening/closing, 3 regions for splitting/merging.
2437    * These regions should be on the server that requested the change.
2438    *
2439    * Region state machine. Only these transitions
2440    * are expected to be triggered by a region server.
2441    *
2442    * On the state transition:
2443    *  (1) Open/Close should be initiated by master
2444    *      (a) Master sets the region to pending_open/pending_close
2445    *        in memory and hbase:meta after sending the request
2446    *        to the region server
2447    *      (b) Region server reports back to the master
2448    *        after open/close is done (either success/failure)
2449    *      (c) If region server has problem to report the status
2450    *        to master, it must be because the master is down or some
2451    *        temporary network issue. Otherwise, the region server should
2452    *        abort since it must be a bug. If the master is not accessible,
2453    *        the region server should keep trying until the server is
2454    *        stopped or till the status is reported to the (new) master
2455    *      (d) If region server dies in the middle of opening/closing
2456    *        a region, SSH picks it up and finishes it
2457    *      (e) If master dies in the middle, the new master recovers
2458    *        the state during initialization from hbase:meta. Region server
2459    *        can report any transition that has not been reported to
2460    *        the previous active master yet
2461    *  (2) Split/merge is initiated by region servers
2462    *      (a) To split a region, a region server sends a request
2463    *        to master to try to set a region to splitting, together with
2464    *        two daughters (to be created) to splitting new. If approved
2465    *        by the master, the splitting can then move ahead
2466    *      (b) To merge two regions, a region server sends a request to
2467    *        master to try to set the new merged region (to be created) to
2468    *        merging_new, together with two regions (to be merged) to merging.
2469    *        If it is ok with the master, the merge can then move ahead
2470    *      (c) Once the splitting/merging is done, the region server
2471    *        reports the status back to the master either success/failure.
2472    *      (d) Other scenarios should be handled similarly as for
2473    *        region open/close
2474    */
2475   protected String onRegionTransition(final ServerName serverName,
2476       final RegionStateTransition transition) {
2477     TransitionCode code = transition.getTransitionCode();
2478     HRegionInfo hri = HRegionInfo.convert(transition.getRegionInfo(0));
2479     RegionState current = regionStates.getRegionState(hri);
2480     if (LOG.isDebugEnabled()) {
2481       LOG.debug("Got transition " + code + " for "
2482         + (current != null ? current.toString() : hri.getShortNameToLog())
2483         + " from " + serverName);
2484     }
2485     String errorMsg = null;
2486     switch (code) {
2487     case OPENED:
2488       if (current != null && current.isOpened() && current.isOnServer(serverName)) {
2489         LOG.info("Region " + hri.getShortNameToLog() + " is already " + current.getState() + " on "
2490             + serverName);
2491         break;
2492       }
2493     case FAILED_OPEN:
2494       if (current == null
2495           || !current.isPendingOpenOrOpeningOnServer(serverName)) {
2496         errorMsg = hri.getShortNameToLog()
2497           + " is not pending open on " + serverName;
2498       } else if (code == TransitionCode.FAILED_OPEN) {
2499         onRegionFailedOpen(hri, serverName);
2500       } else {
2501         long openSeqNum = HConstants.NO_SEQNUM;
2502         if (transition.hasOpenSeqNum()) {
2503           openSeqNum = transition.getOpenSeqNum();
2504         }
2505         if (openSeqNum < 0) {
2506           errorMsg = "Newly opened region has invalid open seq num " + openSeqNum;
2507         } else {
2508           onRegionOpen(hri, serverName, openSeqNum);
2509         }
2510       }
2511       break;
2512 
2513     case CLOSED:
2514       if (current == null
2515           || !current.isPendingCloseOrClosingOnServer(serverName)) {
2516         errorMsg = hri.getShortNameToLog()
2517           + " is not pending close on " + serverName;
2518       } else {
2519         onRegionClosed(hri);
2520       }
2521       break;
2522 
2523     case READY_TO_SPLIT:
2524     case SPLIT_PONR:
2525     case SPLIT:
2526     case SPLIT_REVERTED:
2527       errorMsg = onRegionSplit(serverName, code, hri,
2528         HRegionInfo.convert(transition.getRegionInfo(1)),
2529         HRegionInfo.convert(transition.getRegionInfo(2)));
2530       break;
2531 
2532     case READY_TO_MERGE:
2533     case MERGE_PONR:
2534     case MERGED:
2535     case MERGE_REVERTED:
2536       errorMsg = onRegionMerge(serverName, code, hri,
2537         HRegionInfo.convert(transition.getRegionInfo(1)),
2538         HRegionInfo.convert(transition.getRegionInfo(2)));
2539       break;
2540 
2541     default:
2542       errorMsg = "Unexpected transition code " + code;
2543     }
2544     if (errorMsg != null) {
2545       LOG.error("Failed to transition region from " + current + " to "
2546         + code + " by " + serverName + ": " + errorMsg);
2547     }
2548     return errorMsg;
2549   }
2550 
2551   /**
2552    * @return Instance of load balancer
2553    */
2554   public LoadBalancer getBalancer() {
2555     return this.balancer;
2556   }
2557 
2558   public Map<ServerName, List<HRegionInfo>>
2559     getSnapShotOfAssignment(Collection<HRegionInfo> infos) {
2560     return getRegionStates().getRegionAssignments(infos);
2561   }
2562 }