View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Collection;
24  import java.util.Collections;
25  import java.util.HashMap;
26  import java.util.HashSet;
27  import java.util.Iterator;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.NavigableMap;
31  import java.util.Random;
32  import java.util.Set;
33  import java.util.TreeMap;
34  import java.util.concurrent.Callable;
35  import java.util.concurrent.ConcurrentHashMap;
36  import java.util.concurrent.CopyOnWriteArrayList;
37  import java.util.concurrent.TimeUnit;
38  import java.util.concurrent.atomic.AtomicBoolean;
39  import java.util.concurrent.atomic.AtomicInteger;
40  import java.util.concurrent.locks.Lock;
41  import java.util.concurrent.locks.ReentrantLock;
42  
43  import org.apache.commons.logging.Log;
44  import org.apache.commons.logging.LogFactory;
45  import org.apache.hadoop.hbase.classification.InterfaceAudience;
46  import org.apache.hadoop.conf.Configuration;
47  import org.apache.hadoop.fs.FileSystem;
48  import org.apache.hadoop.fs.Path;
49  import org.apache.hadoop.hbase.HBaseIOException;
50  import org.apache.hadoop.hbase.HConstants;
51  import org.apache.hadoop.hbase.HRegionInfo;
52  import org.apache.hadoop.hbase.HRegionLocation;
53  import org.apache.hadoop.hbase.HTableDescriptor;
54  import org.apache.hadoop.hbase.MetaTableAccessor;
55  import org.apache.hadoop.hbase.NotServingRegionException;
56  import org.apache.hadoop.hbase.RegionLocations;
57  import org.apache.hadoop.hbase.Server;
58  import org.apache.hadoop.hbase.ServerName;
59  import org.apache.hadoop.hbase.TableName;
60  import org.apache.hadoop.hbase.TableNotFoundException;
61  import org.apache.hadoop.hbase.client.RegionReplicaUtil;
62  import org.apache.hadoop.hbase.client.Result;
63  import org.apache.hadoop.hbase.client.TableState;
64  import org.apache.hadoop.hbase.executor.EventHandler;
65  import org.apache.hadoop.hbase.executor.EventType;
66  import org.apache.hadoop.hbase.executor.ExecutorService;
67  import org.apache.hadoop.hbase.ipc.RpcClient;
68  import org.apache.hadoop.hbase.ipc.RpcClient.FailedServerException;
69  import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
70  import org.apache.hadoop.hbase.master.RegionState.State;
71  import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper;
72  import org.apache.hadoop.hbase.master.balancer.FavoredNodeLoadBalancer;
73  import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
74  import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
75  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
76  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
77  import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
78  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
79  import org.apache.hadoop.hbase.regionserver.wal.HLog;
80  import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
81  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
82  import org.apache.hadoop.hbase.util.FSUtils;
83  import org.apache.hadoop.hbase.util.KeyLocker;
84  import org.apache.hadoop.hbase.util.Pair;
85  import org.apache.hadoop.hbase.util.PairOfSameType;
86  import org.apache.hadoop.hbase.util.Threads;
87  import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
88  import org.apache.hadoop.ipc.RemoteException;
89  import org.apache.zookeeper.KeeperException;
90  
91  import com.google.common.annotations.VisibleForTesting;
92  
93  /**
94   * Manages and performs region assignment.
95   * Related communications with regionserver are all done over RPC.
96   */
97  @InterfaceAudience.Private
98  public class AssignmentManager {
99    private static final Log LOG = LogFactory.getLog(AssignmentManager.class);
100 
101   protected final Server server;
102 
103   private ServerManager serverManager;
104 
105   private boolean shouldAssignRegionsWithFavoredNodes;
106 
107   private LoadBalancer balancer;
108 
109   private final MetricsAssignmentManager metricsAssignmentManager;
110 
111   private final TableLockManager tableLockManager;
112 
113   private AtomicInteger numRegionsOpened = new AtomicInteger(0);
114 
115   final private KeyLocker<String> locker = new KeyLocker<String>();
116 
117   Set<HRegionInfo> replicasToClose = Collections.synchronizedSet(new HashSet<HRegionInfo>());
118 
119   /**
120    * Map of regions to reopen after the schema of a table is changed. Key -
121    * encoded region name, value - HRegionInfo
122    */
123   private final Map <String, HRegionInfo> regionsToReopen;
124 
125   /*
126    * Maximum times we recurse an assignment/unassignment.
127    * See below in {@link #assign()} and {@link #unassign()}.
128    */
129   private final int maximumAttempts;
130 
131   /**
132    * The sleep time for which the assignment will wait before retrying in case of hbase:meta assignment
133    * failure due to lack of availability of region plan
134    */
135   private final long sleepTimeBeforeRetryingMetaAssignment;
136 
137   /** Plans for region movement. Key is the encoded version of a region name*/
138   // TODO: When do plans get cleaned out?  Ever? In server open and in server
139   // shutdown processing -- St.Ack
140   // All access to this Map must be synchronized.
141   final NavigableMap<String, RegionPlan> regionPlans =
142     new TreeMap<String, RegionPlan>();
143 
144   private final TableStateManager tableStateManager;
145 
146   private final ExecutorService executorService;
147 
148   // Thread pool executor service. TODO, consolidate with executorService?
149   private java.util.concurrent.ExecutorService threadPoolExecutorService;
150 
151   private final RegionStates regionStates;
152 
153   // The threshold to use bulk assigning. Using bulk assignment
154   // only if assigning at least this many regions to at least this
155   // many servers. If assigning fewer regions to fewer servers,
156   // bulk assigning may be not as efficient.
157   private final int bulkAssignThresholdRegions;
158   private final int bulkAssignThresholdServers;
159 
160   // Should bulk assignment wait till all regions are assigned,
161   // or it is timed out?  This is useful to measure bulk assignment
162   // performance, but not needed in most use cases.
163   private final boolean bulkAssignWaitTillAllAssigned;
164 
165   /**
166    * Indicator that AssignmentManager has recovered the region states so
167    * that ServerShutdownHandler can be fully enabled and re-assign regions
168    * of dead servers. So that when re-assignment happens, AssignmentManager
169    * has proper region states.
170    *
171    * Protected to ease testing.
172    */
173   protected final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
174 
175   /**
176    * A map to track the count a region fails to open in a row.
177    * So that we don't try to open a region forever if the failure is
178    * unrecoverable.  We don't put this information in region states
179    * because we don't expect this to happen frequently; we don't
180    * want to copy this information over during each state transition either.
181    */
182   private final ConcurrentHashMap<String, AtomicInteger>
183     failedOpenTracker = new ConcurrentHashMap<String, AtomicInteger>();
184 
185   // In case not using ZK for region assignment, region states
186   // are persisted in meta with a state store
187   private final RegionStateStore regionStateStore;
188 
189   /**
190    * For testing only!  Set to true to skip handling of split.
191    */
192   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
193   public static boolean TEST_SKIP_SPLIT_HANDLING = false;
194 
195   /** Listeners that are called on assignment events. */
196   private List<AssignmentListener> listeners = new CopyOnWriteArrayList<AssignmentListener>();
197 
198   /**
199    * Constructs a new assignment manager.
200    *
201    * @param server instance of HMaster this AM running inside
202    * @param serverManager serverManager for associated HMaster
203    * @param balancer implementation of {@link LoadBalancer}
204    * @param service Executor service
205    * @param metricsMaster metrics manager
206    * @param tableLockManager TableLock manager
207    * @throws IOException
208    */
209   public AssignmentManager(Server server, ServerManager serverManager,
210       final LoadBalancer balancer,
211       final ExecutorService service, MetricsMaster metricsMaster,
212       final TableLockManager tableLockManager,
213       final TableStateManager tableStateManager)
214           throws IOException {
215     this.server = server;
216     this.serverManager = serverManager;
217     this.executorService = service;
218     this.regionStateStore = new RegionStateStore(server);
219     this.regionsToReopen = Collections.synchronizedMap
220                            (new HashMap<String, HRegionInfo> ());
221     Configuration conf = server.getConfiguration();
222     // Only read favored nodes if using the favored nodes load balancer.
223     this.shouldAssignRegionsWithFavoredNodes = conf.getClass(
224            HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class).equals(
225            FavoredNodeLoadBalancer.class);
226 
227     this.tableStateManager = tableStateManager;
228 
229     // This is the max attempts, not retries, so it should be at least 1.
230     this.maximumAttempts = Math.max(1,
231       this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10));
232     this.sleepTimeBeforeRetryingMetaAssignment = this.server.getConfiguration().getLong(
233         "hbase.meta.assignment.retry.sleeptime", 1000l);
234     this.balancer = balancer;
235     int maxThreads = conf.getInt("hbase.assignment.threads.max", 30);
236     this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool(
237       maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("AM."));
238     this.regionStates = new RegionStates(
239       server, tableStateManager, serverManager, regionStateStore);
240 
241     this.bulkAssignWaitTillAllAssigned =
242       conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false);
243     this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7);
244     this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3);
245 
246     this.metricsAssignmentManager = new MetricsAssignmentManager();
247     this.tableLockManager = tableLockManager;
248   }
249 
250   /**
251    * Add the listener to the notification list.
252    * @param listener The AssignmentListener to register
253    */
254   public void registerListener(final AssignmentListener listener) {
255     this.listeners.add(listener);
256   }
257 
258   /**
259    * Remove the listener from the notification list.
260    * @param listener The AssignmentListener to unregister
261    */
262   public boolean unregisterListener(final AssignmentListener listener) {
263     return this.listeners.remove(listener);
264   }
265 
266   /**
267    * @return Instance of ZKTableStateManager.
268    */
269   public TableStateManager getTableStateManager() {
270     // These are 'expensive' to make involving trip to zk ensemble so allow
271     // sharing.
272     return this.tableStateManager;
273   }
274 
275   /**
276    * This SHOULD not be public. It is public now
277    * because of some unit tests.
278    *
279    * TODO: make it package private and keep RegionStates in the master package
280    */
281   public RegionStates getRegionStates() {
282     return regionStates;
283   }
284 
285   /**
286    * Used in some tests to mock up region state in meta
287    */
288   @VisibleForTesting
289   RegionStateStore getRegionStateStore() {
290     return regionStateStore;
291   }
292 
293   public RegionPlan getRegionReopenPlan(HRegionInfo hri) {
294     return new RegionPlan(hri, null, regionStates.getRegionServerOfRegion(hri));
295   }
296 
297   /**
298    * Add a regionPlan for the specified region.
299    * @param encodedName
300    * @param plan
301    */
302   public void addPlan(String encodedName, RegionPlan plan) {
303     synchronized (regionPlans) {
304       regionPlans.put(encodedName, plan);
305     }
306   }
307 
308   /**
309    * Add a map of region plans.
310    */
311   public void addPlans(Map<String, RegionPlan> plans) {
312     synchronized (regionPlans) {
313       regionPlans.putAll(plans);
314     }
315   }
316 
317   /**
318    * Set the list of regions that will be reopened
319    * because of an update in table schema
320    *
321    * @param regions
322    *          list of regions that should be tracked for reopen
323    */
324   public void setRegionsToReopen(List <HRegionInfo> regions) {
325     for(HRegionInfo hri : regions) {
326       regionsToReopen.put(hri.getEncodedName(), hri);
327     }
328   }
329 
330   /**
331    * Used by the client to identify if all regions have the schema updates
332    *
333    * @param tableName
334    * @return Pair indicating the status of the alter command
335    * @throws IOException
336    */
337   public Pair<Integer, Integer> getReopenStatus(TableName tableName)
338       throws IOException {
339     List<HRegionInfo> hris;
340     if (TableName.META_TABLE_NAME.equals(tableName)) {
341       hris = new MetaTableLocator().getMetaRegions(server.getZooKeeper());
342     } else {
343       hris = MetaTableAccessor.getTableRegions(server.getShortCircuitConnection(), tableName, true);
344     }
345 
346     Integer pending = 0;
347     for (HRegionInfo hri : hris) {
348       String name = hri.getEncodedName();
349       // no lock concurrent access ok: sequential consistency respected.
350       if (regionsToReopen.containsKey(name)
351           || regionStates.isRegionInTransition(name)) {
352         pending++;
353       }
354     }
355     return new Pair<Integer, Integer>(pending, hris.size());
356   }
357 
358   /**
359    * Used by ServerShutdownHandler to make sure AssignmentManager has completed
360    * the failover cleanup before re-assigning regions of dead servers. So that
361    * when re-assignment happens, AssignmentManager has proper region states.
362    */
363   public boolean isFailoverCleanupDone() {
364     return failoverCleanupDone.get();
365   }
366 
367   /**
368    * To avoid racing with AM, external entities may need to lock a region,
369    * for example, when SSH checks what regions to skip re-assigning.
370    */
371   public Lock acquireRegionLock(final String encodedName) {
372     return locker.acquireLock(encodedName);
373   }
374 
375   /**
376    * Now, failover cleanup is completed. Notify server manager to
377    * process queued up dead servers processing, if any.
378    */
379   void failoverCleanupDone() {
380     failoverCleanupDone.set(true);
381     serverManager.processQueuedDeadServers();
382   }
383 
384   /**
385    * Called on startup.
386    * Figures whether a fresh cluster start of we are joining extant running cluster.
387    * @throws IOException
388    * @throws KeeperException
389    * @throws InterruptedException
390    */
391   void joinCluster() throws IOException,
392           KeeperException, InterruptedException {
393     long startTime = System.currentTimeMillis();
394     // Concurrency note: In the below the accesses on regionsInTransition are
395     // outside of a synchronization block where usually all accesses to RIT are
396     // synchronized.  The presumption is that in this case it is safe since this
397     // method is being played by a single thread on startup.
398 
399     // TODO: Regions that have a null location and are not in regionsInTransitions
400     // need to be handled.
401 
402     // Scan hbase:meta to build list of existing regions, servers, and assignment
403     // Returns servers who have not checked in (assumed dead) that some regions
404     // were assigned to (according to the meta)
405     Set<ServerName> deadServers = rebuildUserRegions();
406 
407     // This method will assign all user regions if a clean server startup or
408     // it will reconstruct master state and cleanup any leftovers from
409     // previous master process.
410     boolean failover = processDeadServersAndRegionsInTransition(deadServers);
411 
412     recoverTableInDisablingState();
413     recoverTableInEnablingState();
414     LOG.info("Joined the cluster in " + (System.currentTimeMillis()
415       - startTime) + "ms, failover=" + failover);
416   }
417 
418   /**
419    * Process all regions that are in transition in zookeeper and also
420    * processes the list of dead servers by scanning the META.
421    * Used by master joining an cluster.  If we figure this is a clean cluster
422    * startup, will assign all user regions.
423    * @param deadServers
424    *          Map of dead servers and their regions. Can be null.
425    * @throws IOException
426    * @throws InterruptedException
427    */
428   boolean processDeadServersAndRegionsInTransition(final Set<ServerName> deadServers)
429           throws IOException, InterruptedException {
430     boolean failover = !serverManager.getDeadServers().isEmpty();
431     if (failover) {
432       // This may not be a failover actually, especially if meta is on this master.
433       if (LOG.isDebugEnabled()) {
434         LOG.debug("Found dead servers out on cluster " + serverManager.getDeadServers());
435       }
436     } else {
437       // If any one region except meta is assigned, it's a failover.
438       Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
439       for (Map.Entry<HRegionInfo, ServerName> en:
440           regionStates.getRegionAssignments().entrySet()) {
441         HRegionInfo hri = en.getKey();
442         if (!hri.isMetaTable()
443             && onlineServers.contains(en.getValue())) {
444           LOG.debug("Found " + hri + " out on cluster");
445           failover = true;
446           break;
447         }
448       }
449       if (!failover) {
450         // If any region except meta is in transition on a live server, it's a failover.
451         Map<String, RegionState> regionsInTransition = regionStates.getRegionsInTransition();
452         if (!regionsInTransition.isEmpty()) {
453           for (RegionState regionState: regionsInTransition.values()) {
454             if (!regionState.getRegion().isMetaRegion()
455                 && onlineServers.contains(regionState.getServerName())) {
456               LOG.debug("Found " + regionState + " in RITs");
457               failover = true;
458               break;
459             }
460           }
461         }
462       }
463     }
464     if (!failover) {
465       // If we get here, we have a full cluster restart. It is a failover only
466       // if there are some HLogs are not split yet. For meta HLogs, they should have
467       // been split already, if any. We can walk through those queued dead servers,
468       // if they don't have any HLogs, this restart should be considered as a clean one
469       Set<ServerName> queuedDeadServers = serverManager.getRequeuedDeadServers().keySet();
470       if (!queuedDeadServers.isEmpty()) {
471         Configuration conf = server.getConfiguration();
472         Path rootdir = FSUtils.getRootDir(conf);
473         FileSystem fs = rootdir.getFileSystem(conf);
474         for (ServerName serverName: queuedDeadServers) {
475           Path logDir = new Path(rootdir, HLogUtil.getHLogDirectoryName(serverName.toString()));
476           Path splitDir = logDir.suffix(HLog.SPLITTING_EXT);
477           if (fs.exists(logDir) || fs.exists(splitDir)) {
478             LOG.debug("Found queued dead server " + serverName);
479             failover = true;
480             break;
481           }
482         }
483         if (!failover) {
484           // We figured that it's not a failover, so no need to
485           // work on these re-queued dead servers any more.
486           LOG.info("AM figured that it's not a failover and cleaned up "
487             + queuedDeadServers.size() + " queued dead servers");
488           serverManager.removeRequeuedDeadServers();
489         }
490       }
491     }
492 
493     Set<TableName> disabledOrDisablingOrEnabling = null;
494     Map<HRegionInfo, ServerName> allRegions = null;
495 
496     if (!failover) {
497       disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
498         TableState.State.DISABLED, TableState.State.DISABLING,
499         TableState.State.ENABLING);
500 
501       // Clean re/start, mark all user regions closed before reassignment
502       allRegions = regionStates.closeAllUserRegions(
503         disabledOrDisablingOrEnabling);
504     }
505 
506     // Now region states are restored
507     regionStateStore.start();
508 
509     if (failover) {
510       if (deadServers != null && !deadServers.isEmpty()) {
511         for (ServerName serverName: deadServers) {
512           if (!serverManager.isServerDead(serverName)) {
513             serverManager.expireServer(serverName); // Let SSH do region re-assign
514           }
515         }
516       }
517       processRegionsInTransition(regionStates.getRegionsInTransition().values());
518     }
519 
520     // Now we can safely claim failover cleanup completed and enable
521     // ServerShutdownHandler for further processing. The nodes (below)
522     // in transition, if any, are for regions not related to those
523     // dead servers at all, and can be done in parallel to SSH.
524     failoverCleanupDone();
525     if (!failover) {
526       // Fresh cluster startup.
527       LOG.info("Clean cluster startup. Assigning user regions");
528       assignAllUserRegions(allRegions);
529     }
530     // unassign replicas of the split parents and the merged regions
531     // the daughter replicas are opened in assignAllUserRegions if it was
532     // not already opened.
533     for (HRegionInfo h : replicasToClose) {
534       unassign(h);
535     }
536     replicasToClose.clear();
537     return failover;
538   }
539 
540   /**
541    * When a region is closed, it should be removed from the regionsToReopen
542    * @param hri HRegionInfo of the region which was closed
543    */
544   public void removeClosedRegion(HRegionInfo hri) {
545     if (regionsToReopen.remove(hri.getEncodedName()) != null) {
546       LOG.debug("Removed region from reopening regions because it was closed");
547     }
548   }
549 
550   // TODO: processFavoredNodes might throw an exception, for e.g., if the
551   // meta could not be contacted/updated. We need to see how seriously to treat
552   // this problem as. Should we fail the current assignment. We should be able
553   // to recover from this problem eventually (if the meta couldn't be updated
554   // things should work normally and eventually get fixed up).
555   void processFavoredNodes(List<HRegionInfo> regions) throws IOException {
556     if (!shouldAssignRegionsWithFavoredNodes) return;
557     // The AM gets the favored nodes info for each region and updates the meta
558     // table with that info
559     Map<HRegionInfo, List<ServerName>> regionToFavoredNodes =
560         new HashMap<HRegionInfo, List<ServerName>>();
561     for (HRegionInfo region : regions) {
562       regionToFavoredNodes.put(region,
563           ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region));
564     }
565     FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(regionToFavoredNodes,
566       this.server.getShortCircuitConnection());
567   }
568 
569   /**
570    * Marks the region as online.  Removes it from regions in transition and
571    * updates the in-memory assignment information.
572    * <p>
573    * Used when a region has been successfully opened on a region server.
574    * @param regionInfo
575    * @param sn
576    */
577   void regionOnline(HRegionInfo regionInfo, ServerName sn) {
578     regionOnline(regionInfo, sn, HConstants.NO_SEQNUM);
579   }
580 
581   void regionOnline(HRegionInfo regionInfo, ServerName sn, long openSeqNum) {
582     numRegionsOpened.incrementAndGet();
583     regionStates.regionOnline(regionInfo, sn, openSeqNum);
584 
585     // Remove plan if one.
586     clearRegionPlan(regionInfo);
587     balancer.regionOnline(regionInfo, sn);
588 
589     // Tell our listeners that a region was opened
590     sendRegionOpenedNotification(regionInfo, sn);
591   }
592 
593   /**
594    * Marks the region as offline.  Removes it from regions in transition and
595    * removes in-memory assignment information.
596    * <p>
597    * Used when a region has been closed and should remain closed.
598    * @param regionInfo
599    */
600   public void regionOffline(final HRegionInfo regionInfo) {
601     regionOffline(regionInfo, null);
602   }
603 
604   public void offlineDisabledRegion(HRegionInfo regionInfo) {
605     replicasToClose.remove(regionInfo);
606     regionOffline(regionInfo);
607   }
608 
609   // Assignment methods
610 
611   /**
612    * Assigns the specified region.
613    * <p>
614    * If a RegionPlan is available with a valid destination then it will be used
615    * to determine what server region is assigned to.  If no RegionPlan is
616    * available, region will be assigned to a random available server.
617    * <p>
618    * Updates the RegionState and sends the OPEN RPC.
619    * <p>
620    * This will only succeed if the region is in transition and in a CLOSED or
621    * OFFLINE state or not in transition, and of course, the
622    * chosen server is up and running (It may have just crashed!).
623    *
624    * @param region server to be assigned
625    */
626   public void assign(HRegionInfo region) {
627     assign(region, false);
628   }
629 
630   /**
631    * Use care with forceNewPlan. It could cause double assignment.
632    */
633   public void assign(HRegionInfo region, boolean forceNewPlan) {
634     if (isDisabledorDisablingRegionInRIT(region)) {
635       return;
636     }
637     String encodedName = region.getEncodedName();
638     Lock lock = locker.acquireLock(encodedName);
639     try {
640       RegionState state = forceRegionStateToOffline(region, forceNewPlan);
641       if (state != null) {
642         if (regionStates.wasRegionOnDeadServer(encodedName)) {
643           LOG.info("Skip assigning " + region.getRegionNameAsString()
644             + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
645             + " is dead but not processed yet");
646           return;
647         }
648         assign(state, forceNewPlan);
649       }
650     } finally {
651       lock.unlock();
652     }
653   }
654 
655   /**
656    * Bulk assign regions to <code>destination</code>.
657    * @param destination
658    * @param regions Regions to assign.
659    * @return true if successful
660    */
661   boolean assign(final ServerName destination, final List<HRegionInfo> regions)
662     throws InterruptedException {
663     long startTime = EnvironmentEdgeManager.currentTime();
664     try {
665       int regionCount = regions.size();
666       if (regionCount == 0) {
667         return true;
668       }
669       LOG.info("Assigning " + regionCount + " region(s) to " + destination.toString());
670       Set<String> encodedNames = new HashSet<String>(regionCount);
671       for (HRegionInfo region : regions) {
672         encodedNames.add(region.getEncodedName());
673       }
674 
675       List<HRegionInfo> failedToOpenRegions = new ArrayList<HRegionInfo>();
676       Map<String, Lock> locks = locker.acquireLocks(encodedNames);
677       try {
678         Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>(regionCount);
679         List<RegionState> states = new ArrayList<RegionState>(regionCount);
680         for (HRegionInfo region : regions) {
681           String encodedName = region.getEncodedName();
682           if (!isDisabledorDisablingRegionInRIT(region)) {
683             RegionState state = forceRegionStateToOffline(region, false);
684             boolean onDeadServer = false;
685             if (state != null) {
686               if (regionStates.wasRegionOnDeadServer(encodedName)) {
687                 LOG.info("Skip assigning " + region.getRegionNameAsString()
688                   + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
689                   + " is dead but not processed yet");
690                 onDeadServer = true;
691               } else {
692                 RegionPlan plan = new RegionPlan(region, state.getServerName(), destination);
693                 plans.put(encodedName, plan);
694                 states.add(state);
695                 continue;
696               }
697             }
698             // Reassign if the region wasn't on a dead server
699             if (!onDeadServer) {
700               LOG.info("failed to force region state to offline, "
701                 + "will reassign later: " + region);
702               failedToOpenRegions.add(region); // assign individually later
703             }
704           }
705           // Release the lock, this region is excluded from bulk assign because
706           // we can't update its state, or set its znode to offline.
707           Lock lock = locks.remove(encodedName);
708           lock.unlock();
709         }
710 
711         if (server.isStopped()) {
712           return false;
713         }
714 
715         // Add region plans, so we can updateTimers when one region is opened so
716         // that unnecessary timeout on RIT is reduced.
717         this.addPlans(plans);
718 
719         List<Pair<HRegionInfo, List<ServerName>>> regionOpenInfos =
720           new ArrayList<Pair<HRegionInfo, List<ServerName>>>(states.size());
721         for (RegionState state: states) {
722           HRegionInfo region = state.getRegion();
723           regionStates.updateRegionState(
724             region, State.PENDING_OPEN, destination);
725           List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
726           if (this.shouldAssignRegionsWithFavoredNodes) {
727             favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
728           }
729           regionOpenInfos.add(new Pair<HRegionInfo, List<ServerName>>(
730             region, favoredNodes));
731         }
732 
733         // Move on to open regions.
734         try {
735           // Send OPEN RPC. If it fails on a IOE or RemoteException,
736           // regions will be assigned individually.
737           Configuration conf = server.getConfiguration();
738           long maxWaitTime = System.currentTimeMillis() +
739             conf.getLong("hbase.regionserver.rpc.startup.waittime", 60000);
740           for (int i = 1; i <= maximumAttempts && !server.isStopped(); i++) {
741             try {
742               List<RegionOpeningState> regionOpeningStateList = serverManager
743                 .sendRegionOpen(destination, regionOpenInfos);
744               for (int k = 0, n = regionOpeningStateList.size(); k < n; k++) {
745                 RegionOpeningState openingState = regionOpeningStateList.get(k);
746                 if (openingState != RegionOpeningState.OPENED) {
747                   HRegionInfo region = regionOpenInfos.get(k).getFirst();
748                   LOG.info("Got opening state " + openingState
749                     + ", will reassign later: " + region);
750                   // Failed opening this region, reassign it later
751                   forceRegionStateToOffline(region, true);
752                   failedToOpenRegions.add(region);
753                 }
754               }
755               break;
756             } catch (IOException e) {
757               if (e instanceof RemoteException) {
758                 e = ((RemoteException)e).unwrapRemoteException();
759               }
760               if (e instanceof RegionServerStoppedException) {
761                 LOG.warn("The region server was shut down, ", e);
762                 // No need to retry, the region server is a goner.
763                 return false;
764               } else if (e instanceof ServerNotRunningYetException) {
765                 long now = System.currentTimeMillis();
766                 if (now < maxWaitTime) {
767                   if (LOG.isDebugEnabled()) {
768                     LOG.debug("Server is not yet up; waiting up to " +
769                       (maxWaitTime - now) + "ms", e);
770                   }
771                   Thread.sleep(100);
772                   i--; // reset the try count
773                   continue;
774                 }
775               } else if (e instanceof java.net.SocketTimeoutException
776                   && this.serverManager.isServerOnline(destination)) {
777                 // In case socket is timed out and the region server is still online,
778                 // the openRegion RPC could have been accepted by the server and
779                 // just the response didn't go through.  So we will retry to
780                 // open the region on the same server.
781                 if (LOG.isDebugEnabled()) {
782                   LOG.debug("Bulk assigner openRegion() to " + destination
783                     + " has timed out, but the regions might"
784                     + " already be opened on it.", e);
785                 }
786                 // wait and reset the re-try count, server might be just busy.
787                 Thread.sleep(100);
788                 i--;
789                 continue;
790               } else if (e instanceof FailedServerException && i < maximumAttempts) {
791                 // In case the server is in the failed server list, no point to
792                 // retry too soon. Retry after the failed_server_expiry time
793                 long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
794                   RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
795                 if (LOG.isDebugEnabled()) {
796                   LOG.debug(destination + " is on failed server list; waiting "
797                     + sleepTime + "ms", e);
798                 }
799                 Thread.sleep(sleepTime);
800                 continue;
801               }
802               throw e;
803             }
804           }
805         } catch (IOException e) {
806           // Can be a socket timeout, EOF, NoRouteToHost, etc
807           LOG.info("Unable to communicate with " + destination
808             + " in order to assign regions, ", e);
809           for (RegionState state: states) {
810             HRegionInfo region = state.getRegion();
811             forceRegionStateToOffline(region, true);
812           }
813           return false;
814         }
815       } finally {
816         for (Lock lock : locks.values()) {
817           lock.unlock();
818         }
819       }
820 
821       if (!failedToOpenRegions.isEmpty()) {
822         for (HRegionInfo region : failedToOpenRegions) {
823           if (!regionStates.isRegionOnline(region)) {
824             invokeAssign(region);
825           }
826         }
827       }
828       LOG.debug("Bulk assigning done for " + destination);
829       return true;
830     } finally {
831       metricsAssignmentManager.updateBulkAssignTime(EnvironmentEdgeManager.currentTime() - startTime);
832     }
833   }
834 
835   /**
836    * Send CLOSE RPC if the server is online, otherwise, offline the region.
837    *
838    * The RPC will be sent only to the region sever found in the region state
839    * if it is passed in, otherwise, to the src server specified. If region
840    * state is not specified, we don't update region state at all, instead
841    * we just send the RPC call. This is useful for some cleanup without
842    * messing around the region states (see handleRegion, on region opened
843    * on an unexpected server scenario, for an example)
844    */
845   private void unassign(final HRegionInfo region,
846       final ServerName server, final ServerName dest) {
847     for (int i = 1; i <= this.maximumAttempts; i++) {
848       if (this.server.isStopped() || this.server.isAborted()) {
849         LOG.debug("Server stopped/aborted; skipping unassign of " + region);
850         return;
851       }
852       if (!serverManager.isServerOnline(server)) {
853         LOG.debug("Offline " + region.getRegionNameAsString()
854           + ", no need to unassign since it's on a dead server: " + server);
855         regionStates.updateRegionState(region, State.OFFLINE);
856         return;
857       }
858       try {
859         // Send CLOSE RPC
860         if (serverManager.sendRegionClose(server, region, dest)) {
861           LOG.debug("Sent CLOSE to " + server + " for region " +
862             region.getRegionNameAsString());
863           return;
864         }
865         // This never happens. Currently regionserver close always return true.
866         // Todo; this can now happen (0.96) if there is an exception in a coprocessor
867         LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
868           region.getRegionNameAsString());
869       } catch (Throwable t) {
870         if (t instanceof RemoteException) {
871           t = ((RemoteException)t).unwrapRemoteException();
872         }
873         if (t instanceof NotServingRegionException
874             || t instanceof RegionServerStoppedException
875             || t instanceof ServerNotRunningYetException) {
876           LOG.debug("Offline " + region.getRegionNameAsString()
877             + ", it's not any more on " + server, t);
878           regionStates.updateRegionState(region, State.OFFLINE);
879           return;
880         } else if (t instanceof FailedServerException && i < maximumAttempts) {
881           // In case the server is in the failed server list, no point to
882           // retry too soon. Retry after the failed_server_expiry time
883           try {
884             Configuration conf = this.server.getConfiguration();
885             long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
886               RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
887             if (LOG.isDebugEnabled()) {
888               LOG.debug(server + " is on failed server list; waiting "
889                 + sleepTime + "ms", t);
890             }
891             Thread.sleep(sleepTime);
892           } catch (InterruptedException ie) {
893             LOG.warn("Failed to unassign "
894               + region.getRegionNameAsString() + " since interrupted", ie);
895             regionStates.updateRegionState(region, State.FAILED_CLOSE);
896             Thread.currentThread().interrupt();
897             return;
898           }
899         }
900 
901         LOG.info("Server " + server + " returned " + t + " for "
902           + region.getRegionNameAsString() + ", try=" + i
903           + " of " + this.maximumAttempts, t);
904       }
905     }
906     // Run out of attempts
907     regionStates.updateRegionState(region, State.FAILED_CLOSE);
908   }
909 
910   /**
911    * Set region to OFFLINE unless it is opening and forceNewPlan is false.
912    */
913   private RegionState forceRegionStateToOffline(
914       final HRegionInfo region, final boolean forceNewPlan) {
915     RegionState state = regionStates.getRegionState(region);
916     if (state == null) {
917       LOG.warn("Assigning a region not in region states: " + region);
918       state = regionStates.createRegionState(region);
919     }
920 
921     if (forceNewPlan && LOG.isDebugEnabled()) {
922       LOG.debug("Force region state offline " + state);
923     }
924 
925     switch (state.getState()) {
926     case OPEN:
927     case OPENING:
928     case PENDING_OPEN:
929     case CLOSING:
930     case PENDING_CLOSE:
931       if (!forceNewPlan) {
932         LOG.debug("Skip assigning " +
933           region + ", it is already " + state);
934         return null;
935       }
936     case FAILED_CLOSE:
937     case FAILED_OPEN:
938       regionStates.updateRegionState(region, State.PENDING_CLOSE);
939       unassign(region, state.getServerName(), null);
940       state = regionStates.getRegionState(region);
941       if (!state.isOffline() && !state.isClosed()) {
942         // If the region isn't offline, we can't re-assign
943         // it now. It will be assigned automatically after
944         // the regionserver reports it's closed.
945         return null;
946       }
947     case OFFLINE:
948     case CLOSED:
949       break;
950     default:
951       LOG.error("Trying to assign region " + region
952         + ", which is " + state);
953       return null;
954     }
955     return state;
956   }
957 
958   /**
959    * Caller must hold lock on the passed <code>state</code> object.
960    * @param state
961    * @param forceNewPlan
962    */
963   private void assign(RegionState state, boolean forceNewPlan) {
964     long startTime = EnvironmentEdgeManager.currentTime();
965     try {
966       Configuration conf = server.getConfiguration();
967       RegionPlan plan = null;
968       long maxWaitTime = -1;
969       HRegionInfo region = state.getRegion();
970       Throwable previousException = null;
971       for (int i = 1; i <= maximumAttempts; i++) {
972         if (server.isStopped() || server.isAborted()) {
973           LOG.info("Skip assigning " + region.getRegionNameAsString()
974             + ", the server is stopped/aborted");
975           return;
976         }
977         if (plan == null) { // Get a server for the region at first
978           try {
979             plan = getRegionPlan(region, forceNewPlan);
980           } catch (HBaseIOException e) {
981             LOG.warn("Failed to get region plan", e);
982           }
983         }
984         if (plan == null) {
985           LOG.warn("Unable to determine a plan to assign " + region);
986           if (region.isMetaRegion()) {
987             try {
988               Thread.sleep(this.sleepTimeBeforeRetryingMetaAssignment);
989               if (i == maximumAttempts) i = 1;
990               continue;
991             } catch (InterruptedException e) {
992               LOG.error("Got exception while waiting for hbase:meta assignment");
993               Thread.currentThread().interrupt();
994             }
995           }
996           regionStates.updateRegionState(region, State.FAILED_OPEN);
997           return;
998         }
999         // In case of assignment from EnableTableHandler table state is ENABLING. Any how
1000         // EnableTableHandler will set ENABLED after assigning all the table regions. If we
1001         // try to set to ENABLED directly then client API may think table is enabled.
1002         // When we have a case such as all the regions are added directly into hbase:meta and we call
1003         // assignRegion then we need to make the table ENABLED. Hence in such case the table
1004         // will not be in ENABLING or ENABLED state.
1005         TableName tableName = region.getTable();
1006         if (!tableStateManager.isTableState(tableName,
1007           TableState.State.ENABLED, TableState.State.ENABLING)) {
1008           LOG.debug("Setting table " + tableName + " to ENABLED state.");
1009           setEnabledTable(tableName);
1010         }
1011         LOG.info("Assigning " + region.getRegionNameAsString() +
1012             " to " + plan.getDestination().toString());
1013         // Transition RegionState to PENDING_OPEN
1014        regionStates.updateRegionState(region,
1015           State.PENDING_OPEN, plan.getDestination());
1016 
1017         boolean needNewPlan = false;
1018         final String assignMsg = "Failed assignment of " + region.getRegionNameAsString() +
1019             " to " + plan.getDestination();
1020         try {
1021           List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
1022           if (this.shouldAssignRegionsWithFavoredNodes) {
1023             favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
1024           }
1025           serverManager.sendRegionOpen(plan.getDestination(), region, favoredNodes);
1026           return; // we're done
1027         } catch (Throwable t) {
1028           if (t instanceof RemoteException) {
1029             t = ((RemoteException) t).unwrapRemoteException();
1030           }
1031           previousException = t;
1032 
1033           // Should we wait a little before retrying? If the server is starting it's yes.
1034           boolean hold = (t instanceof ServerNotRunningYetException);
1035 
1036           // In case socket is timed out and the region server is still online,
1037           // the openRegion RPC could have been accepted by the server and
1038           // just the response didn't go through.  So we will retry to
1039           // open the region on the same server.
1040           boolean retry = !hold && (t instanceof java.net.SocketTimeoutException
1041               && this.serverManager.isServerOnline(plan.getDestination()));
1042 
1043           if (hold) {
1044             LOG.warn(assignMsg + ", waiting a little before trying on the same region server " +
1045               "try=" + i + " of " + this.maximumAttempts, t);
1046 
1047             if (maxWaitTime < 0) {
1048               maxWaitTime = EnvironmentEdgeManager.currentTime()
1049                 + this.server.getConfiguration().getLong(
1050                   "hbase.regionserver.rpc.startup.waittime", 60000);
1051             }
1052             try {
1053               long now = EnvironmentEdgeManager.currentTime();
1054               if (now < maxWaitTime) {
1055                 if (LOG.isDebugEnabled()) {
1056                   LOG.debug("Server is not yet up; waiting up to "
1057                     + (maxWaitTime - now) + "ms", t);
1058                 }
1059                 Thread.sleep(100);
1060                 i--; // reset the try count
1061               } else {
1062                 LOG.debug("Server is not up for a while; try a new one", t);
1063                 needNewPlan = true;
1064               }
1065             } catch (InterruptedException ie) {
1066               LOG.warn("Failed to assign "
1067                   + region.getRegionNameAsString() + " since interrupted", ie);
1068               regionStates.updateRegionState(region, State.FAILED_OPEN);
1069               Thread.currentThread().interrupt();
1070               return;
1071             }
1072           } else if (retry) {
1073             i--; // we want to retry as many times as needed as long as the RS is not dead.
1074             if (LOG.isDebugEnabled()) {
1075               LOG.debug(assignMsg + ", trying to assign to the same region server due ", t);
1076             }
1077           } else {
1078             needNewPlan = true;
1079             LOG.warn(assignMsg + ", trying to assign elsewhere instead;" +
1080                 " try=" + i + " of " + this.maximumAttempts, t);
1081           }
1082         }
1083 
1084         if (i == this.maximumAttempts) {
1085           // Don't reset the region state or get a new plan any more.
1086           // This is the last try.
1087           continue;
1088         }
1089 
1090         // If region opened on destination of present plan, reassigning to new
1091         // RS may cause double assignments. In case of RegionAlreadyInTransitionException
1092         // reassigning to same RS.
1093         if (needNewPlan) {
1094           // Force a new plan and reassign. Will return null if no servers.
1095           // The new plan could be the same as the existing plan since we don't
1096           // exclude the server of the original plan, which should not be
1097           // excluded since it could be the only server up now.
1098           RegionPlan newPlan = null;
1099           try {
1100             newPlan = getRegionPlan(region, true);
1101           } catch (HBaseIOException e) {
1102             LOG.warn("Failed to get region plan", e);
1103           }
1104           if (newPlan == null) {
1105             regionStates.updateRegionState(region, State.FAILED_OPEN);
1106             LOG.warn("Unable to find a viable location to assign region " +
1107                 region.getRegionNameAsString());
1108             return;
1109           }
1110 
1111           if (plan != newPlan && !plan.getDestination().equals(newPlan.getDestination())) {
1112             // Clean out plan we failed execute and one that doesn't look like it'll
1113             // succeed anyways; we need a new plan!
1114             // Transition back to OFFLINE
1115             regionStates.updateRegionState(region, State.OFFLINE);
1116             plan = newPlan;
1117           } else if(plan.getDestination().equals(newPlan.getDestination()) &&
1118               previousException instanceof FailedServerException) {
1119             try {
1120               LOG.info("Trying to re-assign " + region.getRegionNameAsString() +
1121                 " to the same failed server.");
1122               Thread.sleep(1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1123                 RpcClient.FAILED_SERVER_EXPIRY_DEFAULT));
1124             } catch (InterruptedException ie) {
1125               LOG.warn("Failed to assign "
1126                   + region.getRegionNameAsString() + " since interrupted", ie);
1127               regionStates.updateRegionState(region, State.FAILED_OPEN);
1128               Thread.currentThread().interrupt();
1129               return;
1130             }
1131           }
1132         }
1133       }
1134       // Run out of attempts
1135       regionStates.updateRegionState(region, State.FAILED_OPEN);
1136     } finally {
1137       metricsAssignmentManager.updateAssignmentTime(EnvironmentEdgeManager.currentTime() - startTime);
1138     }
1139   }
1140 
1141   private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) {
1142     if (this.tableStateManager.isTableState(region.getTable(),
1143             TableState.State.DISABLED,
1144             TableState.State.DISABLING) || replicasToClose.contains(region)) {
1145       LOG.info("Table " + region.getTable() + " is disabled or disabling;"
1146         + " skipping assign of " + region.getRegionNameAsString());
1147       offlineDisabledRegion(region);
1148       return true;
1149     }
1150     return false;
1151   }
1152 
1153   /**
1154    * @param region the region to assign
1155    * @param forceNewPlan If true, then if an existing plan exists, a new plan
1156    * will be generated.
1157    * @return Plan for passed <code>region</code> (If none currently, it creates one or
1158    * if no servers to assign, it returns null).
1159    */
1160   private RegionPlan getRegionPlan(final HRegionInfo region,
1161       final boolean forceNewPlan) throws HBaseIOException {
1162     // Pickup existing plan or make a new one
1163     final String encodedName = region.getEncodedName();
1164     final List<ServerName> destServers =
1165       serverManager.createDestinationServersList();
1166 
1167     if (destServers.isEmpty()){
1168       LOG.warn("Can't move " + encodedName +
1169         ", there is no destination server available.");
1170       return null;
1171     }
1172 
1173     RegionPlan randomPlan = null;
1174     boolean newPlan = false;
1175     RegionPlan existingPlan;
1176 
1177     synchronized (this.regionPlans) {
1178       existingPlan = this.regionPlans.get(encodedName);
1179 
1180       if (existingPlan != null && existingPlan.getDestination() != null) {
1181         LOG.debug("Found an existing plan for " + region.getRegionNameAsString()
1182           + " destination server is " + existingPlan.getDestination() +
1183             " accepted as a dest server = " + destServers.contains(existingPlan.getDestination()));
1184       }
1185 
1186       if (forceNewPlan
1187           || existingPlan == null
1188           || existingPlan.getDestination() == null
1189           || !destServers.contains(existingPlan.getDestination())) {
1190         newPlan = true;
1191         randomPlan = new RegionPlan(region, null,
1192             balancer.randomAssignment(region, destServers));
1193         if (!region.isMetaTable() && shouldAssignRegionsWithFavoredNodes) {
1194           List<HRegionInfo> regions = new ArrayList<HRegionInfo>(1);
1195           regions.add(region);
1196           try {
1197             processFavoredNodes(regions);
1198           } catch (IOException ie) {
1199             LOG.warn("Ignoring exception in processFavoredNodes " + ie);
1200           }
1201         }
1202         this.regionPlans.put(encodedName, randomPlan);
1203       }
1204     }
1205 
1206     if (newPlan) {
1207       if (randomPlan.getDestination() == null) {
1208         LOG.warn("Can't find a destination for " + encodedName);
1209         return null;
1210       }
1211       if (LOG.isDebugEnabled()) {
1212         LOG.debug("No previous transition plan found (or ignoring " +
1213           "an existing plan) for " + region.getRegionNameAsString() +
1214           "; generated random plan=" + randomPlan + "; " + destServers.size() +
1215           " (online=" + serverManager.getOnlineServers().size() +
1216           ") available servers, forceNewPlan=" + forceNewPlan);
1217       }
1218       return randomPlan;
1219     }
1220     if (LOG.isDebugEnabled()) {
1221       LOG.debug("Using pre-existing plan for " +
1222         region.getRegionNameAsString() + "; plan=" + existingPlan);
1223     }
1224     return existingPlan;
1225   }
1226 
1227   /**
1228    * Unassigns the specified region.
1229    * <p>
1230    * Updates the RegionState and sends the CLOSE RPC unless region is being
1231    * split by regionserver; then the unassign fails (silently) because we
1232    * presume the region being unassigned no longer exists (its been split out
1233    * of existence). TODO: What to do if split fails and is rolled back and
1234    * parent is revivified?
1235    * <p>
1236    * If a RegionPlan is already set, it will remain.
1237    *
1238    * @param region server to be unassigned
1239    */
1240   public void unassign(HRegionInfo region) {
1241     unassign(region, null);
1242   }
1243 
1244 
1245   /**
1246    * Unassigns the specified region.
1247    * <p>
1248    * Updates the RegionState and sends the CLOSE RPC unless region is being
1249    * split by regionserver; then the unassign fails (silently) because we
1250    * presume the region being unassigned no longer exists (its been split out
1251    * of existence). TODO: What to do if split fails and is rolled back and
1252    * parent is revivified?
1253    * <p>
1254    * If a RegionPlan is already set, it will remain.
1255    *
1256    * @param region server to be unassigned
1257    * @param dest the destination server of the region
1258    */
1259   public void unassign(HRegionInfo region, ServerName dest) {
1260     // TODO: Method needs refactoring.  Ugly buried returns throughout.  Beware!
1261     LOG.debug("Starting unassign of " + region.getRegionNameAsString()
1262       + " (offlining), current state: " + regionStates.getRegionState(region));
1263 
1264     String encodedName = region.getEncodedName();
1265     // Grab the state of this region and synchronize on it
1266     // We need a lock here as we're going to do a put later and we don't want multiple states
1267     //  creation
1268     ReentrantLock lock = locker.acquireLock(encodedName);
1269     RegionState state = regionStates.getRegionTransitionState(encodedName);
1270     try {
1271       if (state == null || state.isFailedClose()) {
1272         if (state == null) {
1273           // Region is not in transition.
1274           // We can unassign it only if it's not SPLIT/MERGED.
1275           state = regionStates.getRegionState(encodedName);
1276           if (state != null && state.isUnassignable()) {
1277             LOG.info("Attempting to unassign " + state + ", ignored");
1278             // Offline region will be reassigned below
1279             return;
1280           }
1281           if (state == null || state.getServerName() == null) {
1282             // We don't know where the region is, offline it.
1283             // No need to send CLOSE RPC
1284             LOG.warn("Attempting to unassign a region not in RegionStates"
1285               + region.getRegionNameAsString() + ", offlined");
1286             regionOffline(region);
1287             return;
1288           }
1289         }
1290         state = regionStates.updateRegionState(
1291           region, State.PENDING_CLOSE);
1292       } else if (state.isFailedOpen()) {
1293         // The region is not open yet
1294         regionOffline(region);
1295         return;
1296       } else {
1297         LOG.debug("Attempting to unassign " +
1298           region.getRegionNameAsString() + " but it is " +
1299           "already in transition (" + state.getState());
1300         return;
1301       }
1302 
1303       unassign(region, state.getServerName(), dest);
1304     } finally {
1305       lock.unlock();
1306 
1307       // Region is expected to be reassigned afterwards
1308       if (!replicasToClose.contains(region)
1309           && regionStates.isRegionInState(region, State.OFFLINE)) {
1310         assign(region);
1311       }
1312     }
1313   }
1314 
1315   /**
1316    * Used by unit tests. Return the number of regions opened so far in the life
1317    * of the master. Increases by one every time the master opens a region
1318    * @return the counter value of the number of regions opened so far
1319    */
1320   public int getNumRegionsOpened() {
1321     return numRegionsOpened.get();
1322   }
1323 
1324   /**
1325    * Waits until the specified region has completed assignment.
1326    * <p>
1327    * If the region is already assigned, returns immediately.  Otherwise, method
1328    * blocks until the region is assigned.
1329    * @param regionInfo region to wait on assignment for
1330    * @throws InterruptedException
1331    */
1332   public boolean waitForAssignment(HRegionInfo regionInfo)
1333       throws InterruptedException {
1334     while (!regionStates.isRegionOnline(regionInfo)) {
1335       if (regionStates.isRegionInState(regionInfo, State.FAILED_OPEN)
1336           || this.server.isStopped()) {
1337         return false;
1338       }
1339 
1340       // We should receive a notification, but it's
1341       //  better to have a timeout to recheck the condition here:
1342       //  it lowers the impact of a race condition if any
1343       regionStates.waitForUpdate(100);
1344     }
1345     return true;
1346   }
1347 
1348   /**
1349    * Assigns the hbase:meta region.
1350    * <p>
1351    * Assumes that hbase:meta is currently closed and is not being actively served by
1352    * any RegionServer.
1353    */
1354   public void assignMeta() throws KeeperException {
1355     regionStates.updateRegionState(HRegionInfo.FIRST_META_REGIONINFO, State.OFFLINE);
1356     assign(HRegionInfo.FIRST_META_REGIONINFO);
1357   }
1358 
1359   /**
1360    * Assigns specified regions retaining assignments, if any.
1361    * <p>
1362    * This is a synchronous call and will return once every region has been
1363    * assigned.  If anything fails, an exception is thrown
1364    * @throws InterruptedException
1365    * @throws IOException
1366    */
1367   public void assign(Map<HRegionInfo, ServerName> regions)
1368         throws IOException, InterruptedException {
1369     if (regions == null || regions.isEmpty()) {
1370       return;
1371     }
1372     List<ServerName> servers = serverManager.createDestinationServersList();
1373     if (servers == null || servers.isEmpty()) {
1374       throw new IOException("Found no destination server to assign region(s)");
1375     }
1376 
1377     // Reuse existing assignment info
1378     Map<ServerName, List<HRegionInfo>> bulkPlan =
1379       balancer.retainAssignment(regions, servers);
1380     if (bulkPlan == null) {
1381       throw new IOException("Unable to determine a plan to assign region(s)");
1382     }
1383 
1384     assign(regions.size(), servers.size(),
1385       "retainAssignment=true", bulkPlan);
1386   }
1387 
1388   /**
1389    * Assigns specified regions round robin, if any.
1390    * <p>
1391    * This is a synchronous call and will return once every region has been
1392    * assigned.  If anything fails, an exception is thrown
1393    * @throws InterruptedException
1394    * @throws IOException
1395    */
1396   public void assign(List<HRegionInfo> regions)
1397         throws IOException, InterruptedException {
1398     if (regions == null || regions.isEmpty()) {
1399       return;
1400     }
1401 
1402     List<ServerName> servers = serverManager.createDestinationServersList();
1403     if (servers == null || servers.isEmpty()) {
1404       throw new IOException("Found no destination server to assign region(s)");
1405     }
1406 
1407     // Generate a round-robin bulk assignment plan
1408     Map<ServerName, List<HRegionInfo>> bulkPlan
1409       = balancer.roundRobinAssignment(regions, servers);
1410     if (bulkPlan == null) {
1411       throw new IOException("Unable to determine a plan to assign region(s)");
1412     }
1413 
1414     processFavoredNodes(regions);
1415     assign(regions.size(), servers.size(),
1416       "round-robin=true", bulkPlan);
1417   }
1418 
1419   private void assign(int regions, int totalServers,
1420       String message, Map<ServerName, List<HRegionInfo>> bulkPlan)
1421           throws InterruptedException, IOException {
1422 
1423     int servers = bulkPlan.size();
1424     if (servers == 1 || (regions < bulkAssignThresholdRegions
1425         && servers < bulkAssignThresholdServers)) {
1426 
1427       // Not use bulk assignment.  This could be more efficient in small
1428       // cluster, especially mini cluster for testing, so that tests won't time out
1429       if (LOG.isTraceEnabled()) {
1430         LOG.trace("Not using bulk assignment since we are assigning only " + regions +
1431           " region(s) to " + servers + " server(s)");
1432       }
1433       for (Map.Entry<ServerName, List<HRegionInfo>> plan: bulkPlan.entrySet()) {
1434         if (!assign(plan.getKey(), plan.getValue()) && !server.isStopped()) {
1435           for (HRegionInfo region: plan.getValue()) {
1436             if (!regionStates.isRegionOnline(region)) {
1437               invokeAssign(region);
1438             }
1439           }
1440         }
1441       }
1442     } else {
1443       LOG.info("Bulk assigning " + regions + " region(s) across "
1444         + totalServers + " server(s), " + message);
1445 
1446       // Use fixed count thread pool assigning.
1447       BulkAssigner ba = new GeneralBulkAssigner(
1448         this.server, bulkPlan, this, bulkAssignWaitTillAllAssigned);
1449       ba.bulkAssign();
1450       LOG.info("Bulk assigning done");
1451     }
1452   }
1453 
1454   /**
1455    * Assigns all user regions, if any exist.  Used during cluster startup.
1456    * <p>
1457    * This is a synchronous call and will return once every region has been
1458    * assigned.  If anything fails, an exception is thrown and the cluster
1459    * should be shutdown.
1460    * @throws InterruptedException
1461    * @throws IOException
1462    */
1463   private void assignAllUserRegions(Map<HRegionInfo, ServerName> allRegions)
1464       throws IOException, InterruptedException {
1465     if (allRegions == null || allRegions.isEmpty()) return;
1466 
1467     // Determine what type of assignment to do on startup
1468     boolean retainAssignment = server.getConfiguration().
1469       getBoolean("hbase.master.startup.retainassign", true);
1470 
1471     Set<HRegionInfo> regionsFromMetaScan = allRegions.keySet();
1472     if (retainAssignment) {
1473       assign(allRegions);
1474     } else {
1475       List<HRegionInfo> regions = new ArrayList<HRegionInfo>(regionsFromMetaScan);
1476       assign(regions);
1477     }
1478 
1479     for (HRegionInfo hri : regionsFromMetaScan) {
1480       TableName tableName = hri.getTable();
1481       if (!tableStateManager.isTableState(tableName,
1482               TableState.State.ENABLED)) {
1483         setEnabledTable(tableName);
1484       }
1485     }
1486     // assign all the replicas that were not recorded in the meta
1487     assign(replicaRegionsNotRecordedInMeta(regionsFromMetaScan, (MasterServices)server));
1488   }
1489 
1490   /**
1491    * Get a list of replica regions that are:
1492    * not recorded in meta yet. We might not have recorded the locations
1493    * for the replicas since the replicas may not have been online yet, master restarted
1494    * in the middle of assigning, ZK erased, etc.
1495    * @param regionsRecordedInMeta the list of regions we know are recorded in meta
1496    * either as a default, or, as the location of a replica
1497    * @param master
1498    * @return list of replica regions
1499    * @throws IOException
1500    */
1501   public static List<HRegionInfo> replicaRegionsNotRecordedInMeta(
1502       Set<HRegionInfo> regionsRecordedInMeta, MasterServices master)throws IOException {
1503     List<HRegionInfo> regionsNotRecordedInMeta = new ArrayList<HRegionInfo>();
1504     for (HRegionInfo hri : regionsRecordedInMeta) {
1505       TableName table = hri.getTable();
1506       HTableDescriptor htd = master.getTableDescriptors().get(table);
1507       // look at the HTD for the replica count. That's the source of truth
1508       int desiredRegionReplication = htd.getRegionReplication();
1509       for (int i = 0; i < desiredRegionReplication; i++) {
1510         HRegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hri, i);
1511         if (regionsRecordedInMeta.contains(replica)) continue;
1512         regionsNotRecordedInMeta.add(replica);
1513       }
1514     }
1515     return regionsNotRecordedInMeta;
1516   }
1517 
1518   /**
1519    * Rebuild the list of user regions and assignment information.
1520    * <p>
1521    * Returns a set of servers that are not found to be online that hosted
1522    * some regions.
1523    * @return set of servers not online that hosted some regions per meta
1524    * @throws IOException
1525    */
1526   Set<ServerName> rebuildUserRegions() throws
1527           IOException, KeeperException {
1528     Set<TableName> disabledOrEnablingTables = tableStateManager.getTablesInStates(
1529             TableState.State.DISABLED, TableState.State.ENABLING);
1530 
1531     Set<TableName> disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
1532             TableState.State.DISABLED,
1533             TableState.State.DISABLING,
1534             TableState.State.ENABLING);
1535 
1536     // Region assignment from META
1537     List<Result> results = MetaTableAccessor.fullScanOfMeta(server.getShortCircuitConnection());
1538     // Get any new but slow to checkin region server that joined the cluster
1539     Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
1540     // Set of offline servers to be returned
1541     Set<ServerName> offlineServers = new HashSet<ServerName>();
1542     // Iterate regions in META
1543     for (Result result : results) {
1544       if (result == null && LOG.isDebugEnabled()){
1545         LOG.debug("null result from meta - ignoring but this is strange.");
1546         continue;
1547       }
1548       // keep a track of replicas to close. These were the replicas of the originally
1549       // unmerged regions. The master might have closed them before but it mightn't
1550       // maybe because it crashed.
1551       PairOfSameType<HRegionInfo> p = MetaTableAccessor.getMergeRegions(result);
1552       if (p.getFirst() != null && p.getSecond() != null) {
1553         int numReplicas = ((MasterServices)server).getTableDescriptors().get(p.getFirst().
1554             getTable()).getRegionReplication();
1555         for (HRegionInfo merge : p) {
1556           for (int i = 1; i < numReplicas; i++) {
1557             replicasToClose.add(RegionReplicaUtil.getRegionInfoForReplica(merge, i));
1558           }
1559         }
1560       }
1561       RegionLocations rl =  MetaTableAccessor.getRegionLocations(result);
1562       if (rl == null) continue;
1563       HRegionLocation[] locations = rl.getRegionLocations();
1564       if (locations == null) continue;
1565       for (HRegionLocation hrl : locations) {
1566         HRegionInfo regionInfo = hrl.getRegionInfo();
1567         if (regionInfo == null) continue;
1568         int replicaId = regionInfo.getReplicaId();
1569         State state = RegionStateStore.getRegionState(result, replicaId);
1570         // keep a track of replicas to close. These were the replicas of the split parents
1571         // from the previous life of the master. The master should have closed them before
1572         // but it couldn't maybe because it crashed
1573         if (replicaId == 0 && state.equals(State.SPLIT)) {
1574           for (HRegionLocation h : locations) {
1575             replicasToClose.add(h.getRegionInfo());
1576           }
1577         }
1578         ServerName lastHost = hrl.getServerName();
1579         ServerName regionLocation = RegionStateStore.getRegionServer(result, replicaId);
1580         regionStates.createRegionState(regionInfo, state, regionLocation, lastHost);
1581         if (!regionStates.isRegionInState(regionInfo, State.OPEN)) {
1582           // Region is not open (either offline or in transition), skip
1583           continue;
1584         }
1585         TableName tableName = regionInfo.getTable();
1586         if (!onlineServers.contains(regionLocation)) {
1587           // Region is located on a server that isn't online
1588           offlineServers.add(regionLocation);
1589         } else if (!disabledOrEnablingTables.contains(tableName)) {
1590           // Region is being served and on an active server
1591           // add only if region not in disabled or enabling table
1592           regionStates.regionOnline(regionInfo, regionLocation);
1593           balancer.regionOnline(regionInfo, regionLocation);
1594         }
1595         // need to enable the table if not disabled or disabling or enabling
1596         // this will be used in rolling restarts
1597         if (!disabledOrDisablingOrEnabling.contains(tableName)
1598           && !getTableStateManager().isTableState(tableName,
1599                 TableState.State.ENABLED)) {
1600           setEnabledTable(tableName);
1601         }
1602       }
1603     }
1604     return offlineServers;
1605   }
1606 
1607   /**
1608    * Recover the tables that were not fully moved to DISABLED state. These
1609    * tables are in DISABLING state when the master restarted/switched.
1610    *
1611    * @throws KeeperException
1612    * @throws TableNotFoundException
1613    * @throws IOException
1614    */
1615   private void recoverTableInDisablingState()
1616           throws KeeperException, IOException {
1617     Set<TableName> disablingTables =
1618             tableStateManager.getTablesInStates(TableState.State.DISABLING);
1619     if (disablingTables.size() != 0) {
1620       for (TableName tableName : disablingTables) {
1621         // Recover by calling DisableTableHandler
1622         LOG.info("The table " + tableName
1623             + " is in DISABLING state.  Hence recovering by moving the table"
1624             + " to DISABLED state.");
1625         new DisableTableHandler(this.server, tableName,
1626             this, tableLockManager, true).prepare().process();
1627       }
1628     }
1629   }
1630 
1631   /**
1632    * Recover the tables that are not fully moved to ENABLED state. These tables
1633    * are in ENABLING state when the master restarted/switched
1634    *
1635    * @throws KeeperException
1636    * @throws org.apache.hadoop.hbase.TableNotFoundException
1637    * @throws IOException
1638    */
1639   private void recoverTableInEnablingState()
1640           throws KeeperException, IOException {
1641     Set<TableName> enablingTables = tableStateManager.
1642             getTablesInStates(TableState.State.ENABLING);
1643     if (enablingTables.size() != 0) {
1644       for (TableName tableName : enablingTables) {
1645         // Recover by calling EnableTableHandler
1646         LOG.info("The table " + tableName
1647             + " is in ENABLING state.  Hence recovering by moving the table"
1648             + " to ENABLED state.");
1649         // enableTable in sync way during master startup,
1650         // no need to invoke coprocessor
1651         EnableTableHandler eth = new EnableTableHandler(this.server, tableName,
1652           this, tableLockManager, true);
1653         try {
1654           eth.prepare();
1655         } catch (TableNotFoundException e) {
1656           LOG.warn("Table " + tableName + " not found in hbase:meta to recover.");
1657           continue;
1658         }
1659         eth.process();
1660       }
1661     }
1662   }
1663 
1664   /**
1665    * Processes list of regions in transition at startup
1666    */
1667   void processRegionsInTransition(Collection<RegionState> regionStates) {
1668     // We need to send RPC call again for PENDING_OPEN/PENDING_CLOSE regions
1669     // in case the RPC call is not sent out yet before the master was shut down
1670     // since we update the state before we send the RPC call. We can't update
1671     // the state after the RPC call. Otherwise, we don't know what's happened
1672     // to the region if the master dies right after the RPC call is out.
1673     for (RegionState regionState: regionStates) {
1674       if (!serverManager.isServerOnline(regionState.getServerName())) {
1675         continue; // SSH will handle it
1676       }
1677       RegionState.State state = regionState.getState();
1678       LOG.info("Processing " + regionState);
1679       switch (state) {
1680       case CLOSED:
1681         invokeAssign(regionState.getRegion());
1682         break;
1683       case PENDING_OPEN:
1684         retrySendRegionOpen(regionState);
1685         break;
1686       case PENDING_CLOSE:
1687         retrySendRegionClose(regionState);
1688         break;
1689       default:
1690         // No process for other states
1691       }
1692     }
1693   }
1694 
1695   /**
1696    * At master failover, for pending_open region, make sure
1697    * sendRegionOpen RPC call is sent to the target regionserver
1698    */
1699   private void retrySendRegionOpen(final RegionState regionState) {
1700     this.executorService.submit(
1701       new EventHandler(server, EventType.M_MASTER_RECOVERY) {
1702         @Override
1703         public void process() throws IOException {
1704           HRegionInfo hri = regionState.getRegion();
1705           ServerName serverName = regionState.getServerName();
1706           ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
1707           try {
1708             for (int i = 1; i <= maximumAttempts; i++) {
1709               if (!serverManager.isServerOnline(serverName)
1710                   || server.isStopped() || server.isAborted()) {
1711                 return; // No need any more
1712               }
1713               try {
1714                 if (!regionState.equals(regionStates.getRegionState(hri))) {
1715                   return; // Region is not in the expected state any more
1716                 }
1717                 List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
1718                 if (shouldAssignRegionsWithFavoredNodes) {
1719                   favoredNodes = ((FavoredNodeLoadBalancer)balancer).getFavoredNodes(hri);
1720                 }
1721                 serverManager.sendRegionOpen(serverName, hri, favoredNodes);
1722                 return; // we're done
1723               } catch (Throwable t) {
1724                 if (t instanceof RemoteException) {
1725                   t = ((RemoteException) t).unwrapRemoteException();
1726                 }
1727                 if (t instanceof FailedServerException && i < maximumAttempts) {
1728                   // In case the server is in the failed server list, no point to
1729                   // retry too soon. Retry after the failed_server_expiry time
1730                   try {
1731                     Configuration conf = this.server.getConfiguration();
1732                     long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1733                       RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
1734                     if (LOG.isDebugEnabled()) {
1735                       LOG.debug(serverName + " is on failed server list; waiting "
1736                         + sleepTime + "ms", t);
1737                     }
1738                     Thread.sleep(sleepTime);
1739                     continue;
1740                   } catch (InterruptedException ie) {
1741                     LOG.warn("Failed to assign "
1742                       + hri.getRegionNameAsString() + " since interrupted", ie);
1743                     regionStates.updateRegionState(hri, State.FAILED_OPEN);
1744                     Thread.currentThread().interrupt();
1745                     return;
1746                   }
1747                 }
1748                 if (serverManager.isServerOnline(serverName)
1749                     && t instanceof java.net.SocketTimeoutException) {
1750                   i--; // reset the try count
1751                 } else {
1752                   LOG.info("Got exception in retrying sendRegionOpen for "
1753                     + regionState + "; try=" + i + " of " + maximumAttempts, t);
1754                 }
1755                 Threads.sleep(100);
1756               }
1757             }
1758             // Run out of attempts
1759             regionStates.updateRegionState(hri, State.FAILED_OPEN);
1760           } finally {
1761             lock.unlock();
1762           }
1763         }
1764       });
1765   }
1766 
1767   /**
1768    * At master failover, for pending_close region, make sure
1769    * sendRegionClose RPC call is sent to the target regionserver
1770    */
1771   private void retrySendRegionClose(final RegionState regionState) {
1772     this.executorService.submit(
1773       new EventHandler(server, EventType.M_MASTER_RECOVERY) {
1774         @Override
1775         public void process() throws IOException {
1776           HRegionInfo hri = regionState.getRegion();
1777           ServerName serverName = regionState.getServerName();
1778           ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
1779           try {
1780             for (int i = 1; i <= maximumAttempts; i++) {
1781               if (!serverManager.isServerOnline(serverName)
1782                   || server.isStopped() || server.isAborted()) {
1783                 return; // No need any more
1784               }
1785               try {
1786                 if (!regionState.equals(regionStates.getRegionState(hri))) {
1787                   return; // Region is not in the expected state any more
1788                 }
1789                 serverManager.sendRegionClose(serverName, hri, null);
1790                 return; // Done.
1791               } catch (Throwable t) {
1792                 if (t instanceof RemoteException) {
1793                   t = ((RemoteException) t).unwrapRemoteException();
1794                 }
1795                 if (t instanceof FailedServerException && i < maximumAttempts) {
1796                   // In case the server is in the failed server list, no point to
1797                   // retry too soon. Retry after the failed_server_expiry time
1798                   try {
1799                     Configuration conf = this.server.getConfiguration();
1800                     long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1801                       RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
1802                     if (LOG.isDebugEnabled()) {
1803                       LOG.debug(serverName + " is on failed server list; waiting "
1804                         + sleepTime + "ms", t);
1805                     }
1806                     Thread.sleep(sleepTime);
1807                     continue;
1808                   } catch (InterruptedException ie) {
1809                     LOG.warn("Failed to unassign "
1810                       + hri.getRegionNameAsString() + " since interrupted", ie);
1811                     regionStates.updateRegionState(hri, RegionState.State.FAILED_CLOSE);
1812                     Thread.currentThread().interrupt();
1813                     return;
1814                   }
1815                 }
1816                 if (serverManager.isServerOnline(serverName)
1817                     && t instanceof java.net.SocketTimeoutException) {
1818                   i--; // reset the try count
1819                 } else {
1820                   LOG.info("Got exception in retrying sendRegionClose for "
1821                     + regionState + "; try=" + i + " of " + maximumAttempts, t);
1822                 }
1823                 Threads.sleep(100);
1824               }
1825             }
1826             // Run out of attempts
1827             regionStates.updateRegionState(hri, State.FAILED_CLOSE);
1828           } finally {
1829             lock.unlock();
1830           }
1831         }
1832       });
1833   }
1834 
1835   /**
1836    * Set Regions in transitions metrics.
1837    * This takes an iterator on the RegionInTransition map (CLSM), and is not synchronized.
1838    * This iterator is not fail fast, which may lead to stale read; but that's better than
1839    * creating a copy of the map for metrics computation, as this method will be invoked
1840    * on a frequent interval.
1841    */
1842   public void updateRegionsInTransitionMetrics() {
1843     long currentTime = System.currentTimeMillis();
1844     int totalRITs = 0;
1845     int totalRITsOverThreshold = 0;
1846     long oldestRITTime = 0;
1847     int ritThreshold = this.server.getConfiguration().
1848       getInt(HConstants.METRICS_RIT_STUCK_WARNING_THRESHOLD, 60000);
1849     for (RegionState state: regionStates.getRegionsInTransition().values()) {
1850       totalRITs++;
1851       long ritTime = currentTime - state.getStamp();
1852       if (ritTime > ritThreshold) { // more than the threshold
1853         totalRITsOverThreshold++;
1854       }
1855       if (oldestRITTime < ritTime) {
1856         oldestRITTime = ritTime;
1857       }
1858     }
1859     if (this.metricsAssignmentManager != null) {
1860       this.metricsAssignmentManager.updateRITOldestAge(oldestRITTime);
1861       this.metricsAssignmentManager.updateRITCount(totalRITs);
1862       this.metricsAssignmentManager.updateRITCountOverThreshold(totalRITsOverThreshold);
1863     }
1864   }
1865 
1866   /**
1867    * @param region Region whose plan we are to clear.
1868    */
1869   private void clearRegionPlan(final HRegionInfo region) {
1870     synchronized (this.regionPlans) {
1871       this.regionPlans.remove(region.getEncodedName());
1872     }
1873   }
1874 
1875   /**
1876    * Wait on region to clear regions-in-transition.
1877    * @param hri Region to wait on.
1878    * @throws IOException
1879    */
1880   public void waitOnRegionToClearRegionsInTransition(final HRegionInfo hri)
1881       throws IOException, InterruptedException {
1882     waitOnRegionToClearRegionsInTransition(hri, -1L);
1883   }
1884 
1885   /**
1886    * Wait on region to clear regions-in-transition or time out
1887    * @param hri
1888    * @param timeOut Milliseconds to wait for current region to be out of transition state.
1889    * @return True when a region clears regions-in-transition before timeout otherwise false
1890    * @throws InterruptedException
1891    */
1892   public boolean waitOnRegionToClearRegionsInTransition(final HRegionInfo hri, long timeOut)
1893       throws InterruptedException {
1894     if (!regionStates.isRegionInTransition(hri)) return true;
1895     long end = (timeOut <= 0) ? Long.MAX_VALUE : EnvironmentEdgeManager.currentTime()
1896         + timeOut;
1897     // There is already a timeout monitor on regions in transition so I
1898     // should not have to have one here too?
1899     LOG.info("Waiting for " + hri.getEncodedName() +
1900         " to leave regions-in-transition, timeOut=" + timeOut + " ms.");
1901     while (!this.server.isStopped() && regionStates.isRegionInTransition(hri)) {
1902       regionStates.waitForUpdate(100);
1903       if (EnvironmentEdgeManager.currentTime() > end) {
1904         LOG.info("Timed out on waiting for " + hri.getEncodedName() + " to be assigned.");
1905         return false;
1906       }
1907     }
1908     if (this.server.isStopped()) {
1909       LOG.info("Giving up wait on regions in transition because stoppable.isStopped is set");
1910       return false;
1911     }
1912     return true;
1913   }
1914 
1915   void invokeAssign(HRegionInfo regionInfo) {
1916     threadPoolExecutorService.submit(new AssignCallable(this, regionInfo));
1917   }
1918 
1919   void invokeUnAssign(HRegionInfo regionInfo) {
1920     threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
1921   }
1922 
1923   public boolean isCarryingMeta(ServerName serverName) {
1924     return isCarryingRegion(serverName, HRegionInfo.FIRST_META_REGIONINFO);
1925   }
1926 
1927   /**
1928    * Check if the shutdown server carries the specific region.
1929    * @return whether the serverName currently hosts the region
1930    */
1931   private boolean isCarryingRegion(ServerName serverName, HRegionInfo hri) {
1932     RegionState regionState = regionStates.getRegionTransitionState(hri);
1933     ServerName transitionAddr = regionState != null? regionState.getServerName(): null;
1934     if (transitionAddr != null) {
1935       boolean matchTransitionAddr = transitionAddr.equals(serverName);
1936       LOG.debug("Checking region=" + hri.getRegionNameAsString()
1937         + ", transitioning on server=" + matchTransitionAddr
1938         + " server being checked: " + serverName
1939         + ", matches=" + matchTransitionAddr);
1940       return matchTransitionAddr;
1941     }
1942 
1943     ServerName assignedAddr = regionStates.getRegionServerOfRegion(hri);
1944     boolean matchAssignedAddr = serverName.equals(assignedAddr);
1945     LOG.debug("based on AM, current region=" + hri.getRegionNameAsString()
1946       + " is on server=" + assignedAddr + ", server being checked: "
1947       + serverName);
1948     return matchAssignedAddr;
1949   }
1950 
1951   /**
1952    * Process shutdown server removing any assignments.
1953    * @param sn Server that went down.
1954    * @return list of regions in transition on this server
1955    */
1956   public List<HRegionInfo> processServerShutdown(final ServerName sn) {
1957     // Clean out any existing assignment plans for this server
1958     synchronized (this.regionPlans) {
1959       for (Iterator <Map.Entry<String, RegionPlan>> i =
1960           this.regionPlans.entrySet().iterator(); i.hasNext();) {
1961         Map.Entry<String, RegionPlan> e = i.next();
1962         ServerName otherSn = e.getValue().getDestination();
1963         // The name will be null if the region is planned for a random assign.
1964         if (otherSn != null && otherSn.equals(sn)) {
1965           // Use iterator's remove else we'll get CME
1966           i.remove();
1967         }
1968       }
1969     }
1970     List<HRegionInfo> rits = regionStates.serverOffline(sn);
1971     for (Iterator<HRegionInfo> it = rits.iterator(); it.hasNext(); ) {
1972       HRegionInfo hri = it.next();
1973       String encodedName = hri.getEncodedName();
1974 
1975       // We need a lock on the region as we could update it
1976       Lock lock = locker.acquireLock(encodedName);
1977       try {
1978         RegionState regionState =
1979           regionStates.getRegionTransitionState(encodedName);
1980         if (regionState == null
1981             || (regionState.getServerName() != null && !regionState.isOnServer(sn))
1982             || !RegionStates.isOneOfStates(regionState, State.PENDING_OPEN,
1983                 State.OPENING, State.FAILED_OPEN, State.FAILED_CLOSE, State.OFFLINE)) {
1984           LOG.info("Skip " + regionState + " since it is not opening/failed_close"
1985             + " on the dead server any more: " + sn);
1986           it.remove();
1987         } else {
1988           if (tableStateManager.isTableState(hri.getTable(),
1989                   TableState.State.DISABLED, TableState.State.DISABLING)) {
1990             regionStates.regionOffline(hri);
1991             it.remove();
1992             continue;
1993           }
1994           // Mark the region offline and assign it again by SSH
1995           regionStates.updateRegionState(hri, State.OFFLINE);
1996         }
1997       } finally {
1998         lock.unlock();
1999       }
2000     }
2001     return rits;
2002   }
2003 
2004   /**
2005    * @param plan Plan to execute.
2006    */
2007   public void balance(final RegionPlan plan) {
2008     HRegionInfo hri = plan.getRegionInfo();
2009     TableName tableName = hri.getTable();
2010     if (tableStateManager.isTableState(tableName,
2011             TableState.State.DISABLED, TableState.State.DISABLING)) {
2012       LOG.info("Ignored moving region of disabling/disabled table "
2013         + tableName);
2014       return;
2015     }
2016 
2017     // Move the region only if it's assigned
2018     String encodedName = hri.getEncodedName();
2019     ReentrantLock lock = locker.acquireLock(encodedName);
2020     try {
2021       if (!regionStates.isRegionOnline(hri)) {
2022         RegionState state = regionStates.getRegionState(encodedName);
2023         LOG.info("Ignored moving region not assigned: " + hri + ", "
2024           + (state == null ? "not in region states" : state));
2025         return;
2026       }
2027       synchronized (this.regionPlans) {
2028         this.regionPlans.put(plan.getRegionName(), plan);
2029       }
2030       unassign(hri, plan.getDestination());
2031     } finally {
2032       lock.unlock();
2033     }
2034   }
2035 
2036   public void stop() {
2037     // Shutdown the threadpool executor service
2038     threadPoolExecutorService.shutdownNow();
2039     regionStateStore.stop();
2040   }
2041 
2042   protected void setEnabledTable(TableName tableName) {
2043     try {
2044       this.tableStateManager.setTableState(tableName,
2045               TableState.State.ENABLED);
2046     } catch (IOException e) {
2047       // here we can abort as it is the start up flow
2048       String errorMsg = "Unable to ensure that the table " + tableName
2049           + " will be" + " enabled because of a ZooKeeper issue";
2050       LOG.error(errorMsg);
2051       this.server.abort(errorMsg, e);
2052     }
2053   }
2054 
2055   private String onRegionFailedOpen(final RegionState current,
2056       final HRegionInfo hri, final ServerName serverName) {
2057     // The region must be opening on this server.
2058     // If current state is failed_open on the same server,
2059     // it could be a reportRegionTransition RPC retry.
2060     if (current == null || !current.isOpeningOrFailedOpenOnServer(serverName)) {
2061       return hri.getShortNameToLog() + " is not opening on " + serverName;
2062     }
2063 
2064     // Just return in case of retrying
2065     if (current.isFailedOpen()) {
2066       return null;
2067     }
2068 
2069     String encodedName = hri.getEncodedName();
2070     AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
2071     if (failedOpenCount == null) {
2072       failedOpenCount = new AtomicInteger();
2073       // No need to use putIfAbsent, or extra synchronization since
2074       // this whole handleRegion block is locked on the encoded region
2075       // name, and failedOpenTracker is updated only in this block
2076       failedOpenTracker.put(encodedName, failedOpenCount);
2077     }
2078     if (failedOpenCount.incrementAndGet() >= maximumAttempts) {
2079       regionStates.updateRegionState(hri, State.FAILED_OPEN);
2080       // remove the tracking info to save memory, also reset
2081       // the count for next open initiative
2082       failedOpenTracker.remove(encodedName);
2083     } else {
2084       // Handle this the same as if it were opened and then closed.
2085       RegionState regionState = regionStates.updateRegionState(hri, State.CLOSED);
2086       if (regionState != null) {
2087         // When there are more than one region server a new RS is selected as the
2088         // destination and the same is updated in the region plan. (HBASE-5546)
2089         if (getTableStateManager().isTableState(hri.getTable(),
2090                 TableState.State.DISABLED, TableState.State.DISABLING) ||
2091                 replicasToClose.contains(hri)) {
2092           offlineDisabledRegion(hri);
2093           return null;
2094         }
2095         regionStates.updateRegionState(hri, RegionState.State.CLOSED);
2096         // This below has to do w/ online enable/disable of a table
2097         removeClosedRegion(hri);
2098         try {
2099           getRegionPlan(hri, true);
2100         } catch (HBaseIOException e) {
2101           LOG.warn("Failed to get region plan", e);
2102         }
2103         invokeAssign(hri);
2104       }
2105     }
2106     // Null means no error
2107     return null;
2108   }
2109 
2110   private String onRegionOpen(final RegionState current, final HRegionInfo hri,
2111       final ServerName serverName, final RegionStateTransition transition) {
2112     // The region must be opening on this server.
2113     // If current state is already opened on the same server,
2114     // it could be a reportRegionTransition RPC retry.
2115     if (current == null || !current.isOpeningOrOpenedOnServer(serverName)) {
2116       return hri.getShortNameToLog() + " is not opening on " + serverName;
2117     }
2118 
2119     // Just return in case of retrying
2120     if (current.isOpened()) {
2121       return null;
2122     }
2123 
2124     long openSeqNum = transition.hasOpenSeqNum()
2125       ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM;
2126     if (openSeqNum < 0) {
2127       return "Newly opened region has invalid open seq num " + openSeqNum;
2128     }
2129     regionOnline(hri, serverName, openSeqNum);
2130 
2131     // reset the count, if any
2132     failedOpenTracker.remove(hri.getEncodedName());
2133     if (getTableStateManager().isTableState(hri.getTable(),
2134             TableState.State.DISABLED, TableState.State.DISABLING)) {
2135       invokeUnAssign(hri);
2136     }
2137     return null;
2138   }
2139 
2140   private String onRegionClosed(final RegionState current,
2141       final HRegionInfo hri, final ServerName serverName) {
2142     // Region will be usually assigned right after closed. When a RPC retry comes
2143     // in, the region may already have moved away from closed state. However, on the
2144     // region server side, we don't care much about the response for this transition.
2145     // We only make sure master has got and processed this report, either
2146     // successfully or not. So this is fine, not a problem at all.
2147     if (current == null || !current.isClosingOrClosedOnServer(serverName)) {
2148       return hri.getShortNameToLog() + " is not closing on " + serverName;
2149     }
2150 
2151     // Just return in case of retrying
2152     if (current.isClosed()) {
2153       return null;
2154     }
2155 
2156     if (getTableStateManager().isTableState(hri.getTable(), TableState.State.DISABLED,
2157         TableState.State.DISABLING) || replicasToClose.contains(hri)) {
2158       offlineDisabledRegion(hri);
2159       return null;
2160     }
2161 
2162     regionStates.updateRegionState(hri, RegionState.State.CLOSED);
2163     sendRegionClosedNotification(hri);
2164     // This below has to do w/ online enable/disable of a table
2165     removeClosedRegion(hri);
2166     invokeAssign(hri);
2167     return null;
2168   }
2169 
2170   private String onRegionReadyToSplit(final RegionState current, final HRegionInfo hri,
2171       final ServerName serverName, final RegionStateTransition transition) {
2172     // The region must be opened on this server.
2173     // If current state is already splitting on the same server,
2174     // it could be a reportRegionTransition RPC retry.
2175     if (current == null || !current.isSplittingOrOpenedOnServer(serverName)) {
2176       return hri.getShortNameToLog() + " is not opening on " + serverName;
2177     }
2178 
2179     // Just return in case of retrying
2180     if (current.isSplitting()) {
2181       return null;
2182     }
2183 
2184     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2185     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2186     RegionState rs_a = regionStates.getRegionState(a);
2187     RegionState rs_b = regionStates.getRegionState(b);
2188     if (rs_a != null || rs_b != null) {
2189       return "Some daughter is already existing. "
2190         + "a=" + rs_a + ", b=" + rs_b;
2191     }
2192 
2193     // Server holding is not updated at this stage.
2194     // It is done after PONR.
2195     regionStates.updateRegionState(hri, State.SPLITTING);
2196     regionStates.createRegionState(
2197       a, State.SPLITTING_NEW, serverName, null);
2198     regionStates.createRegionState(
2199       b, State.SPLITTING_NEW, serverName, null);
2200     return null;
2201   }
2202 
2203   private String onRegionSplitPONR(final RegionState current, final HRegionInfo hri,
2204       final ServerName serverName, final RegionStateTransition transition) {
2205     // The region must be splitting on this server, and the daughters must be in
2206     // splitting_new state. To check RPC retry, we use server holding info.
2207     if (current == null || !current.isSplittingOnServer(serverName)) {
2208       return hri.getShortNameToLog() + " is not splitting on " + serverName;
2209     }
2210 
2211     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2212     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2213     RegionState rs_a = regionStates.getRegionState(a);
2214     RegionState rs_b = regionStates.getRegionState(b);
2215 
2216     // Master could have restarted and lost the new region
2217     // states, if so, they must be lost together
2218     if (rs_a == null && rs_b == null) {
2219       rs_a = regionStates.createRegionState(
2220         a, State.SPLITTING_NEW, serverName, null);
2221       rs_b = regionStates.createRegionState(
2222         b, State.SPLITTING_NEW, serverName, null);
2223     }
2224 
2225     if (rs_a == null || !rs_a.isSplittingNewOnServer(serverName)
2226         || rs_b == null || !rs_b.isSplittingNewOnServer(serverName)) {
2227       return "Some daughter is not known to be splitting on " + serverName
2228         + ", a=" + rs_a + ", b=" + rs_b;
2229     }
2230 
2231     // Just return in case of retrying
2232     if (!regionStates.isRegionOnServer(hri, serverName)) {
2233       return null;
2234     }
2235 
2236     try {
2237       regionStates.splitRegion(hri, a, b, serverName);
2238     } catch (IOException ioe) {
2239       LOG.info("Failed to record split region " + hri.getShortNameToLog());
2240       return "Failed to record the splitting in meta";
2241     }
2242     return null;
2243   }
2244 
2245   private String onRegionSplit(final RegionState current, final HRegionInfo hri,
2246       final ServerName serverName, final RegionStateTransition transition) {
2247     // The region must be splitting on this server, and the daughters must be in
2248     // splitting_new state.
2249     // If current state is already split on the same server,
2250     // it could be a reportRegionTransition RPC retry.
2251     if (current == null || !current.isSplittingOrSplitOnServer(serverName)) {
2252       return hri.getShortNameToLog() + " is not splitting on " + serverName;
2253     }
2254 
2255     // Just return in case of retrying
2256     if (current.isSplit()) {
2257       return null;
2258     }
2259 
2260     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2261     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2262     RegionState rs_a = regionStates.getRegionState(a);
2263     RegionState rs_b = regionStates.getRegionState(b);
2264     if (rs_a == null || !rs_a.isSplittingNewOnServer(serverName)
2265         || rs_b == null || !rs_b.isSplittingNewOnServer(serverName)) {
2266       return "Some daughter is not known to be splitting on " + serverName
2267         + ", a=" + rs_a + ", b=" + rs_b;
2268     }
2269 
2270     if (TEST_SKIP_SPLIT_HANDLING) {
2271       return "Skipping split message, TEST_SKIP_SPLIT_HANDLING is set";
2272     }
2273     regionOffline(hri, State.SPLIT);
2274     regionOnline(a, serverName, 1);
2275     regionOnline(b, serverName, 1);
2276 
2277     // User could disable the table before master knows the new region.
2278     if (getTableStateManager().isTableState(hri.getTable(),
2279         TableState.State.DISABLED, TableState.State.DISABLING)) {
2280       invokeUnAssign(a);
2281       invokeUnAssign(b);
2282     } else {
2283       Callable<Object> splitReplicasCallable = new Callable<Object>() {
2284         @Override
2285         public Object call() {
2286           doSplittingOfReplicas(hri, a, b);
2287           return null;
2288         }
2289       };
2290       threadPoolExecutorService.submit(splitReplicasCallable);
2291     }
2292     return null;
2293   }
2294 
2295   private String onRegionSplitReverted(final RegionState current, final HRegionInfo hri,
2296       final ServerName serverName, final RegionStateTransition transition) {
2297     // The region must be splitting on this server, and the daughters must be in
2298     // splitting_new state.
2299     // If the region is in open state, it could be an RPC retry.
2300     if (current == null || !current.isSplittingOrOpenedOnServer(serverName)) {
2301       return hri.getShortNameToLog() + " is not splitting on " + serverName;
2302     }
2303 
2304     // Just return in case of retrying
2305     if (current.isOpened()) {
2306       return null;
2307     }
2308 
2309     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2310     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2311     RegionState rs_a = regionStates.getRegionState(a);
2312     RegionState rs_b = regionStates.getRegionState(b);
2313     if (rs_a == null || !rs_a.isSplittingNewOnServer(serverName)
2314         || rs_b == null || !rs_b.isSplittingNewOnServer(serverName)) {
2315       return "Some daughter is not known to be splitting on " + serverName
2316         + ", a=" + rs_a + ", b=" + rs_b;
2317     }
2318 
2319     regionOnline(hri, serverName);
2320     regionOffline(a);
2321     regionOffline(b);
2322     if (getTableStateManager().isTableState(hri.getTable(),
2323         TableState.State.DISABLED, TableState.State.DISABLING)) {
2324       invokeUnAssign(hri);
2325     }
2326     return null;
2327   }
2328 
2329   private String onRegionReadyToMerge(final RegionState current, final HRegionInfo hri,
2330       final ServerName serverName, final RegionStateTransition transition) {
2331     // The region must be new, and the daughters must be open on this server.
2332     // If the region is in merge_new state, it could be an RPC retry.
2333     if (current != null && !current.isMergingNewOnServer(serverName)) {
2334       return "Merging daughter region already exists, p=" + current;
2335     }
2336 
2337     // Just return in case of retrying
2338     if (current != null) {
2339       return null;
2340     }
2341 
2342     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2343     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2344     Set<String> encodedNames = new HashSet<String>(2);
2345     encodedNames.add(a.getEncodedName());
2346     encodedNames.add(b.getEncodedName());
2347     Map<String, Lock> locks = locker.acquireLocks(encodedNames);
2348     try {
2349       RegionState rs_a = regionStates.getRegionState(a);
2350       RegionState rs_b = regionStates.getRegionState(b);
2351       if (rs_a == null || !rs_a.isOpenedOnServer(serverName)
2352           || rs_b == null || !rs_b.isOpenedOnServer(serverName)) {
2353         return "Some daughter is not in a state to merge on " + serverName
2354           + ", a=" + rs_a + ", b=" + rs_b;
2355       }
2356 
2357       regionStates.updateRegionState(a, State.MERGING);
2358       regionStates.updateRegionState(b, State.MERGING);
2359       regionStates.createRegionState(
2360         hri, State.MERGING_NEW, serverName, null);
2361       return null;
2362     } finally {
2363       for (Lock lock: locks.values()) {
2364         lock.unlock();
2365       }
2366     }
2367   }
2368 
2369   private String onRegionMergePONR(final RegionState current, final HRegionInfo hri,
2370       final ServerName serverName, final RegionStateTransition transition) {
2371     // The region must be in merging_new state, and the daughters must be
2372     // merging. To check RPC retry, we use server holding info.
2373     if (current != null && !current.isMergingNewOnServer(serverName)) {
2374       return hri.getShortNameToLog() + " is not merging on " + serverName;
2375     }
2376 
2377     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2378     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2379     RegionState rs_a = regionStates.getRegionState(a);
2380     RegionState rs_b = regionStates.getRegionState(b);
2381     if (rs_a == null || !rs_a.isMergingOnServer(serverName)
2382         || rs_b == null || !rs_b.isMergingOnServer(serverName)) {
2383       return "Some daughter is not known to be merging on " + serverName
2384         + ", a=" + rs_a + ", b=" + rs_b;
2385     }
2386 
2387     // Master could have restarted and lost the new region state
2388     if (current == null) {
2389       regionStates.createRegionState(
2390         hri, State.MERGING_NEW, serverName, null);
2391     }
2392 
2393     // Just return in case of retrying
2394     if (regionStates.isRegionOnServer(hri, serverName)) {
2395       return null;
2396     }
2397 
2398     try {
2399       regionStates.mergeRegions(hri, a, b, serverName);
2400     } catch (IOException ioe) {
2401       LOG.info("Failed to record merged region " + hri.getShortNameToLog());
2402       return "Failed to record the merging in meta";
2403     }
2404     return null;
2405   }
2406 
2407   private String onRegionMerged(final RegionState current, final HRegionInfo hri,
2408       final ServerName serverName, final RegionStateTransition transition) {
2409     // The region must be in merging_new state, and the daughters must be
2410     // merging on this server.
2411     // If current state is already opened on the same server,
2412     // it could be a reportRegionTransition RPC retry.
2413     if (current == null || !current.isMergingNewOrOpenedOnServer(serverName)) {
2414       return hri.getShortNameToLog() + " is not merging on " + serverName;
2415     }
2416 
2417     // Just return in case of retrying
2418     if (current.isOpened()) {
2419       return null;
2420     }
2421 
2422     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2423     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2424     RegionState rs_a = regionStates.getRegionState(a);
2425     RegionState rs_b = regionStates.getRegionState(b);
2426     if (rs_a == null || !rs_a.isMergingOnServer(serverName)
2427         || rs_b == null || !rs_b.isMergingOnServer(serverName)) {
2428       return "Some daughter is not known to be merging on " + serverName
2429         + ", a=" + rs_a + ", b=" + rs_b;
2430     }
2431 
2432     regionOffline(a, State.MERGED);
2433     regionOffline(b, State.MERGED);
2434     regionOnline(hri, serverName, 1);
2435 
2436     // User could disable the table before master knows the new region.
2437     if (getTableStateManager().isTableState(hri.getTable(),
2438         TableState.State.DISABLED, TableState.State.DISABLING)) {
2439       invokeUnAssign(hri);
2440     } else {
2441       Callable<Object> mergeReplicasCallable = new Callable<Object>() {
2442         @Override
2443         public Object call() {
2444           doMergingOfReplicas(hri, a, b);
2445           return null;
2446         }
2447       };
2448       threadPoolExecutorService.submit(mergeReplicasCallable);
2449     }
2450     return null;
2451   }
2452 
2453   private String onRegionMergeReverted(final RegionState current, final HRegionInfo hri,
2454       final ServerName serverName, final RegionStateTransition transition) {
2455     // The region must be in merging_new state, and the daughters must be
2456     // merging on this server.
2457     // If the region is in offline state, it could be an RPC retry.
2458     if (current == null || !current.isMergingNewOrOfflineOnServer(serverName)) {
2459       return hri.getShortNameToLog() + " is not merging on " + serverName;
2460     }
2461 
2462     // Just return in case of retrying
2463     if (current.isOffline()) {
2464       return null;
2465     }
2466 
2467     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2468     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2469     RegionState rs_a = regionStates.getRegionState(a);
2470     RegionState rs_b = regionStates.getRegionState(b);
2471     if (rs_a == null || !rs_a.isMergingOnServer(serverName)
2472         || rs_b == null || !rs_b.isMergingOnServer(serverName)) {
2473       return "Some daughter is not known to be merging on " + serverName
2474         + ", a=" + rs_a + ", b=" + rs_b;
2475     }
2476 
2477     regionOnline(a, serverName);
2478     regionOnline(b, serverName);
2479     regionOffline(hri);
2480 
2481     if (getTableStateManager().isTableState(hri.getTable(),
2482         TableState.State.DISABLED, TableState.State.DISABLING)) {
2483       invokeUnAssign(a);
2484       invokeUnAssign(b);
2485     }
2486     return null;
2487   }
2488 
2489   private void doMergingOfReplicas(HRegionInfo mergedHri, final HRegionInfo hri_a,
2490       final HRegionInfo hri_b) {
2491     // Close replicas for the original unmerged regions. create/assign new replicas
2492     // for the merged parent.
2493     List<HRegionInfo> unmergedRegions = new ArrayList<HRegionInfo>();
2494     unmergedRegions.add(hri_a);
2495     unmergedRegions.add(hri_b);
2496     Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(unmergedRegions);
2497     Collection<List<HRegionInfo>> c = map.values();
2498     for (List<HRegionInfo> l : c) {
2499       for (HRegionInfo h : l) {
2500         if (!RegionReplicaUtil.isDefaultReplica(h)) {
2501           LOG.debug("Unassigning un-merged replica " + h);
2502           unassign(h);
2503         }
2504       }
2505     }
2506     int numReplicas = 1;
2507     try {
2508       numReplicas = ((MasterServices)server).getTableDescriptors().get(mergedHri.getTable()).
2509           getRegionReplication();
2510     } catch (IOException e) {
2511       LOG.warn("Couldn't get the replication attribute of the table " + mergedHri.getTable() +
2512           " due to " + e.getMessage() + ". The assignment of replicas for the merged region " +
2513           "will not be done");
2514     }
2515     List<HRegionInfo> regions = new ArrayList<HRegionInfo>();
2516     for (int i = 1; i < numReplicas; i++) {
2517       regions.add(RegionReplicaUtil.getRegionInfoForReplica(mergedHri, i));
2518     }
2519     try {
2520       assign(regions);
2521     } catch (IOException ioe) {
2522       LOG.warn("Couldn't assign all replica(s) of region " + mergedHri + " because of " +
2523                 ioe.getMessage());
2524     } catch (InterruptedException ie) {
2525       LOG.warn("Couldn't assign all replica(s) of region " + mergedHri+ " because of " +
2526                 ie.getMessage());
2527     }
2528   }
2529 
2530   private void doSplittingOfReplicas(final HRegionInfo parentHri, final HRegionInfo hri_a,
2531       final HRegionInfo hri_b) {
2532     // create new regions for the replica, and assign them to match with the
2533     // current replica assignments. If replica1 of parent is assigned to RS1,
2534     // the replica1s of daughters will be on the same machine
2535     int numReplicas = 1;
2536     try {
2537       numReplicas = ((MasterServices)server).getTableDescriptors().get(parentHri.getTable()).
2538           getRegionReplication();
2539     } catch (IOException e) {
2540       LOG.warn("Couldn't get the replication attribute of the table " + parentHri.getTable() +
2541           " due to " + e.getMessage() + ". The assignment of daughter replicas " +
2542           "replicas will not be done");
2543     }
2544     // unassign the old replicas
2545     List<HRegionInfo> parentRegion = new ArrayList<HRegionInfo>();
2546     parentRegion.add(parentHri);
2547     Map<ServerName, List<HRegionInfo>> currentAssign =
2548         regionStates.getRegionAssignments(parentRegion);
2549     Collection<List<HRegionInfo>> c = currentAssign.values();
2550     for (List<HRegionInfo> l : c) {
2551       for (HRegionInfo h : l) {
2552         if (!RegionReplicaUtil.isDefaultReplica(h)) {
2553           LOG.debug("Unassigning parent's replica " + h);
2554           unassign(h);
2555         }
2556       }
2557     }
2558     // assign daughter replicas
2559     Map<HRegionInfo, ServerName> map = new HashMap<HRegionInfo, ServerName>();
2560     for (int i = 1; i < numReplicas; i++) {
2561       prepareDaughterReplicaForAssignment(hri_a, parentHri, i, map);
2562       prepareDaughterReplicaForAssignment(hri_b, parentHri, i, map);
2563     }
2564     try {
2565       assign(map);
2566     } catch (IOException e) {
2567       LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)");
2568     } catch (InterruptedException e) {
2569       LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)");
2570     }
2571   }
2572 
2573   private void prepareDaughterReplicaForAssignment(HRegionInfo daughterHri, HRegionInfo parentHri,
2574       int replicaId, Map<HRegionInfo, ServerName> map) {
2575     HRegionInfo parentReplica = RegionReplicaUtil.getRegionInfoForReplica(parentHri, replicaId);
2576     HRegionInfo daughterReplica = RegionReplicaUtil.getRegionInfoForReplica(daughterHri,
2577         replicaId);
2578     LOG.debug("Created replica region for daughter " + daughterReplica);
2579     ServerName sn;
2580     if ((sn = regionStates.getRegionServerOfRegion(parentReplica)) != null) {
2581       map.put(daughterReplica, sn);
2582     } else {
2583       List<ServerName> servers = serverManager.getOnlineServersList();
2584       sn = servers.get((new Random(System.currentTimeMillis())).nextInt(servers.size()));
2585       map.put(daughterReplica, sn);
2586     }
2587   }
2588 
2589   public Set<HRegionInfo> getReplicasToClose() {
2590     return replicasToClose;
2591   }
2592 
2593   /**
2594    * A region is offline.  The new state should be the specified one,
2595    * if not null.  If the specified state is null, the new state is Offline.
2596    * The specified state can be Split/Merged/Offline/null only.
2597    */
2598   private void regionOffline(final HRegionInfo regionInfo, final State state) {
2599     regionStates.regionOffline(regionInfo, state);
2600     removeClosedRegion(regionInfo);
2601     // remove the region plan as well just in case.
2602     clearRegionPlan(regionInfo);
2603     balancer.regionOffline(regionInfo);
2604 
2605     // Tell our listeners that a region was closed
2606     sendRegionClosedNotification(regionInfo);
2607     // also note that all the replicas of the primary should be closed
2608     if (state != null && state.equals(State.SPLIT)) {
2609       Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1);
2610       c.add(regionInfo);
2611       Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(c);
2612       Collection<List<HRegionInfo>> allReplicas = map.values();
2613       for (List<HRegionInfo> list : allReplicas) {
2614         replicasToClose.addAll(list);
2615       }
2616     }
2617     else if (state != null && state.equals(State.MERGED)) {
2618       Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1);
2619       c.add(regionInfo);
2620       Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(c);
2621       Collection<List<HRegionInfo>> allReplicas = map.values();
2622       for (List<HRegionInfo> list : allReplicas) {
2623         replicasToClose.addAll(list);
2624       }
2625     }
2626   }
2627 
2628   private void sendRegionOpenedNotification(final HRegionInfo regionInfo,
2629       final ServerName serverName) {
2630     if (!this.listeners.isEmpty()) {
2631       for (AssignmentListener listener : this.listeners) {
2632         listener.regionOpened(regionInfo, serverName);
2633       }
2634     }
2635   }
2636 
2637   private void sendRegionClosedNotification(final HRegionInfo regionInfo) {
2638     if (!this.listeners.isEmpty()) {
2639       for (AssignmentListener listener : this.listeners) {
2640         listener.regionClosed(regionInfo);
2641       }
2642     }
2643   }
2644 
2645   /**
2646    * Try to update some region states. If the state machine prevents
2647    * such update, an error message is returned to explain the reason.
2648    *
2649    * It's expected that in each transition there should have just one
2650    * region for opening/closing, 3 regions for splitting/merging.
2651    * These regions should be on the server that requested the change.
2652    *
2653    * Region state machine. Only these transitions
2654    * are expected to be triggered by a region server.
2655    *
2656    * On the state transition:
2657    *  (1) Open/Close should be initiated by master
2658    *      (a) Master sets the region to pending_open/pending_close
2659    *        in memory and hbase:meta after sending the request
2660    *        to the region server
2661    *      (b) Region server reports back to the master
2662    *        after open/close is done (either success/failure)
2663    *      (c) If region server has problem to report the status
2664    *        to master, it must be because the master is down or some
2665    *        temporary network issue. Otherwise, the region server should
2666    *        abort since it must be a bug. If the master is not accessible,
2667    *        the region server should keep trying until the server is
2668    *        stopped or till the status is reported to the (new) master
2669    *      (d) If region server dies in the middle of opening/closing
2670    *        a region, SSH picks it up and finishes it
2671    *      (e) If master dies in the middle, the new master recovers
2672    *        the state during initialization from hbase:meta. Region server
2673    *        can report any transition that has not been reported to
2674    *        the previous active master yet
2675    *  (2) Split/merge is initiated by region servers
2676    *      (a) To split a region, a region server sends a request
2677    *        to master to try to set a region to splitting, together with
2678    *        two daughters (to be created) to splitting new. If approved
2679    *        by the master, the splitting can then move ahead
2680    *      (b) To merge two regions, a region server sends a request to
2681    *        master to try to set the new merged region (to be created) to
2682    *        merging_new, together with two regions (to be merged) to merging.
2683    *        If it is ok with the master, the merge can then move ahead
2684    *      (c) Once the splitting/merging is done, the region server
2685    *        reports the status back to the master either success/failure.
2686    *      (d) Other scenarios should be handled similarly as for
2687    *        region open/close
2688    */
2689   protected String onRegionTransition(final ServerName serverName,
2690       final RegionStateTransition transition) {
2691     TransitionCode code = transition.getTransitionCode();
2692     HRegionInfo hri = HRegionInfo.convert(transition.getRegionInfo(0));
2693     Lock lock = locker.acquireLock(hri.getEncodedName());
2694     try {
2695       RegionState current = regionStates.getRegionState(hri);
2696       if (LOG.isDebugEnabled()) {
2697         LOG.debug("Got transition " + code + " for "
2698           + (current != null ? current.toString() : hri.getShortNameToLog())
2699           + " from " + serverName);
2700       }
2701       String errorMsg = null;
2702       switch (code) {
2703       case OPENED:
2704         errorMsg = onRegionOpen(current, hri, serverName, transition);
2705         break;
2706       case FAILED_OPEN:
2707         errorMsg = onRegionFailedOpen(current, hri, serverName);
2708         break;
2709       case CLOSED:
2710         errorMsg = onRegionClosed(current, hri, serverName);
2711         break;
2712       case READY_TO_SPLIT:
2713         errorMsg = onRegionReadyToSplit(current, hri, serverName, transition);
2714         break;
2715       case SPLIT_PONR:
2716         errorMsg = onRegionSplitPONR(current, hri, serverName, transition);
2717         break;
2718       case SPLIT:
2719         errorMsg = onRegionSplit(current, hri, serverName, transition);
2720         break;
2721       case SPLIT_REVERTED:
2722         errorMsg = onRegionSplitReverted(current, hri, serverName, transition);
2723         break;
2724       case READY_TO_MERGE:
2725         errorMsg = onRegionReadyToMerge(current, hri, serverName, transition);
2726         break;
2727       case MERGE_PONR:
2728         errorMsg = onRegionMergePONR(current, hri, serverName, transition);
2729         break;
2730       case MERGED:
2731         errorMsg = onRegionMerged(current, hri, serverName, transition);
2732         break;
2733       case MERGE_REVERTED:
2734         errorMsg = onRegionMergeReverted(current, hri, serverName, transition);
2735         break;
2736 
2737       default:
2738         errorMsg = "Unexpected transition code " + code;
2739       }
2740       if (errorMsg != null) {
2741         LOG.info("Could not transition region from " + current + " on "
2742           + code + " by " + serverName + ": " + errorMsg);
2743       }
2744       return errorMsg;
2745     } finally {
2746       lock.unlock();
2747     }
2748   }
2749 
2750   /**
2751    * @return Instance of load balancer
2752    */
2753   public LoadBalancer getBalancer() {
2754     return this.balancer;
2755   }
2756 
2757   public Map<ServerName, List<HRegionInfo>>
2758     getSnapShotOfAssignment(Collection<HRegionInfo> infos) {
2759     return getRegionStates().getRegionAssignments(infos);
2760   }
2761 }