View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Collection;
24  import java.util.Collections;
25  import java.util.HashMap;
26  import java.util.HashSet;
27  import java.util.Iterator;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.NavigableMap;
31  import java.util.Random;
32  import java.util.Set;
33  import java.util.TreeMap;
34  import java.util.concurrent.Callable;
35  import java.util.concurrent.ConcurrentHashMap;
36  import java.util.concurrent.CopyOnWriteArrayList;
37  import java.util.concurrent.TimeUnit;
38  import java.util.concurrent.atomic.AtomicBoolean;
39  import java.util.concurrent.atomic.AtomicInteger;
40  import java.util.concurrent.locks.Lock;
41  import java.util.concurrent.locks.ReentrantLock;
42  
43  import org.apache.commons.logging.Log;
44  import org.apache.commons.logging.LogFactory;
45  import org.apache.hadoop.hbase.classification.InterfaceAudience;
46  import org.apache.hadoop.conf.Configuration;
47  import org.apache.hadoop.fs.FileSystem;
48  import org.apache.hadoop.fs.Path;
49  import org.apache.hadoop.hbase.HBaseIOException;
50  import org.apache.hadoop.hbase.HConstants;
51  import org.apache.hadoop.hbase.HRegionInfo;
52  import org.apache.hadoop.hbase.HRegionLocation;
53  import org.apache.hadoop.hbase.HTableDescriptor;
54  import org.apache.hadoop.hbase.MetaTableAccessor;
55  import org.apache.hadoop.hbase.NotServingRegionException;
56  import org.apache.hadoop.hbase.RegionLocations;
57  import org.apache.hadoop.hbase.ServerName;
58  import org.apache.hadoop.hbase.TableName;
59  import org.apache.hadoop.hbase.TableNotFoundException;
60  import org.apache.hadoop.hbase.client.RegionReplicaUtil;
61  import org.apache.hadoop.hbase.client.Result;
62  import org.apache.hadoop.hbase.client.TableState;
63  import org.apache.hadoop.hbase.executor.EventHandler;
64  import org.apache.hadoop.hbase.executor.EventType;
65  import org.apache.hadoop.hbase.executor.ExecutorService;
66  import org.apache.hadoop.hbase.ipc.FailedServerException;
67  import org.apache.hadoop.hbase.ipc.RpcClient;
68  import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
69  import org.apache.hadoop.hbase.master.RegionState.State;
70  import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper;
71  import org.apache.hadoop.hbase.master.balancer.FavoredNodeLoadBalancer;
72  import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
73  import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
74  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
75  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
76  import org.apache.hadoop.hbase.quotas.RegionStateListener;
77  import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
78  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
79  import org.apache.hadoop.hbase.wal.DefaultWALProvider;
80  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
81  import org.apache.hadoop.hbase.util.FSUtils;
82  import org.apache.hadoop.hbase.util.KeyLocker;
83  import org.apache.hadoop.hbase.util.Pair;
84  import org.apache.hadoop.hbase.util.PairOfSameType;
85  import org.apache.hadoop.hbase.util.Threads;
86  import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
87  import org.apache.hadoop.ipc.RemoteException;
88  import org.apache.hadoop.util.StringUtils;
89  import org.apache.zookeeper.KeeperException;
90  
91  import com.google.common.annotations.VisibleForTesting;
92  
93  /**
94   * Manages and performs region assignment.
95   * Related communications with regionserver are all done over RPC.
96   */
97  @InterfaceAudience.Private
98  public class AssignmentManager {
99    private static final Log LOG = LogFactory.getLog(AssignmentManager.class);
100 
101   protected final MasterServices server;
102 
103   private ServerManager serverManager;
104 
105   private boolean shouldAssignRegionsWithFavoredNodes;
106 
107   private LoadBalancer balancer;
108 
109   private final MetricsAssignmentManager metricsAssignmentManager;
110 
111   private final TableLockManager tableLockManager;
112 
113   private AtomicInteger numRegionsOpened = new AtomicInteger(0);
114 
115   final private KeyLocker<String> locker = new KeyLocker<String>();
116 
117   Set<HRegionInfo> replicasToClose = Collections.synchronizedSet(new HashSet<HRegionInfo>());
118 
119   /**
120    * Map of regions to reopen after the schema of a table is changed. Key -
121    * encoded region name, value - HRegionInfo
122    */
123   private final Map <String, HRegionInfo> regionsToReopen;
124 
125   /*
126    * Maximum times we recurse an assignment/unassignment.
127    * See below in {@link #assign()} and {@link #unassign()}.
128    */
129   private final int maximumAttempts;
130 
131   /**
132    * The sleep time for which the assignment will wait before retrying in case of
133    * hbase:meta assignment failure due to lack of availability of region plan or bad region plan
134    */
135   private final long sleepTimeBeforeRetryingMetaAssignment;
136 
137   /** Plans for region movement. Key is the encoded version of a region name*/
138   // TODO: When do plans get cleaned out?  Ever? In server open and in server
139   // shutdown processing -- St.Ack
140   // All access to this Map must be synchronized.
141   final NavigableMap<String, RegionPlan> regionPlans =
142     new TreeMap<String, RegionPlan>();
143 
144   private final TableStateManager tableStateManager;
145 
146   private final ExecutorService executorService;
147 
148   // Thread pool executor service. TODO, consolidate with executorService?
149   private java.util.concurrent.ExecutorService threadPoolExecutorService;
150 
151   private final RegionStates regionStates;
152 
153   // The threshold to use bulk assigning. Using bulk assignment
154   // only if assigning at least this many regions to at least this
155   // many servers. If assigning fewer regions to fewer servers,
156   // bulk assigning may be not as efficient.
157   private final int bulkAssignThresholdRegions;
158   private final int bulkAssignThresholdServers;
159 
160   // Should bulk assignment wait till all regions are assigned,
161   // or it is timed out?  This is useful to measure bulk assignment
162   // performance, but not needed in most use cases.
163   private final boolean bulkAssignWaitTillAllAssigned;
164 
165   /**
166    * Indicator that AssignmentManager has recovered the region states so
167    * that ServerShutdownHandler can be fully enabled and re-assign regions
168    * of dead servers. So that when re-assignment happens, AssignmentManager
169    * has proper region states.
170    *
171    * Protected to ease testing.
172    */
173   protected final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
174 
175   /**
176    * A map to track the count a region fails to open in a row.
177    * So that we don't try to open a region forever if the failure is
178    * unrecoverable.  We don't put this information in region states
179    * because we don't expect this to happen frequently; we don't
180    * want to copy this information over during each state transition either.
181    */
182   private final ConcurrentHashMap<String, AtomicInteger>
183     failedOpenTracker = new ConcurrentHashMap<String, AtomicInteger>();
184 
185   // In case not using ZK for region assignment, region states
186   // are persisted in meta with a state store
187   private final RegionStateStore regionStateStore;
188 
189   /**
190    * For testing only!  Set to true to skip handling of split.
191    */
192   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
193   public static boolean TEST_SKIP_SPLIT_HANDLING = false;
194 
195   /** Listeners that are called on assignment events. */
196   private List<AssignmentListener> listeners = new CopyOnWriteArrayList<AssignmentListener>();
197   
198   private RegionStateListener regionStateListener;
199 
200   /**
201    * Constructs a new assignment manager.
202    *
203    * @param server instance of HMaster this AM running inside
204    * @param serverManager serverManager for associated HMaster
205    * @param balancer implementation of {@link LoadBalancer}
206    * @param service Executor service
207    * @param metricsMaster metrics manager
208    * @param tableLockManager TableLock manager
209    * @throws IOException
210    */
211   public AssignmentManager(MasterServices server, ServerManager serverManager,
212       final LoadBalancer balancer,
213       final ExecutorService service, MetricsMaster metricsMaster,
214       final TableLockManager tableLockManager,
215       final TableStateManager tableStateManager)
216           throws IOException {
217     this.server = server;
218     this.serverManager = serverManager;
219     this.executorService = service;
220     this.regionStateStore = new RegionStateStore(server);
221     this.regionsToReopen = Collections.synchronizedMap
222                            (new HashMap<String, HRegionInfo> ());
223     Configuration conf = server.getConfiguration();
224     // Only read favored nodes if using the favored nodes load balancer.
225     this.shouldAssignRegionsWithFavoredNodes = conf.getClass(
226            HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class).equals(
227            FavoredNodeLoadBalancer.class);
228 
229     this.tableStateManager = tableStateManager;
230 
231     // This is the max attempts, not retries, so it should be at least 1.
232     this.maximumAttempts = Math.max(1,
233       this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10));
234     this.sleepTimeBeforeRetryingMetaAssignment = this.server.getConfiguration().getLong(
235         "hbase.meta.assignment.retry.sleeptime", 1000l);
236     this.balancer = balancer;
237     int maxThreads = conf.getInt("hbase.assignment.threads.max", 30);
238     this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool(
239       maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("AM."));
240     this.regionStates = new RegionStates(
241       server, tableStateManager, serverManager, regionStateStore);
242 
243     this.bulkAssignWaitTillAllAssigned =
244       conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false);
245     this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7);
246     this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3);
247 
248     this.metricsAssignmentManager = new MetricsAssignmentManager();
249     this.tableLockManager = tableLockManager;
250   }
251 
252   /**
253    * Add the listener to the notification list.
254    * @param listener The AssignmentListener to register
255    */
256   public void registerListener(final AssignmentListener listener) {
257     this.listeners.add(listener);
258   }
259 
260   /**
261    * Remove the listener from the notification list.
262    * @param listener The AssignmentListener to unregister
263    */
264   public boolean unregisterListener(final AssignmentListener listener) {
265     return this.listeners.remove(listener);
266   }
267 
268   /**
269    * @return Instance of ZKTableStateManager.
270    */
271   public TableStateManager getTableStateManager() {
272     // These are 'expensive' to make involving trip to zk ensemble so allow
273     // sharing.
274     return this.tableStateManager;
275   }
276 
277   /**
278    * This SHOULD not be public. It is public now
279    * because of some unit tests.
280    *
281    * TODO: make it package private and keep RegionStates in the master package
282    */
283   public RegionStates getRegionStates() {
284     return regionStates;
285   }
286 
287   /**
288    * Used in some tests to mock up region state in meta
289    */
290   @VisibleForTesting
291   RegionStateStore getRegionStateStore() {
292     return regionStateStore;
293   }
294 
295   public RegionPlan getRegionReopenPlan(HRegionInfo hri) {
296     return new RegionPlan(hri, null, regionStates.getRegionServerOfRegion(hri));
297   }
298 
299   /**
300    * Add a regionPlan for the specified region.
301    * @param encodedName
302    * @param plan
303    */
304   public void addPlan(String encodedName, RegionPlan plan) {
305     synchronized (regionPlans) {
306       regionPlans.put(encodedName, plan);
307     }
308   }
309 
310   /**
311    * Add a map of region plans.
312    */
313   public void addPlans(Map<String, RegionPlan> plans) {
314     synchronized (regionPlans) {
315       regionPlans.putAll(plans);
316     }
317   }
318 
319   /**
320    * Set the list of regions that will be reopened
321    * because of an update in table schema
322    *
323    * @param regions
324    *          list of regions that should be tracked for reopen
325    */
326   public void setRegionsToReopen(List <HRegionInfo> regions) {
327     for(HRegionInfo hri : regions) {
328       regionsToReopen.put(hri.getEncodedName(), hri);
329     }
330   }
331 
332   /**
333    * Used by the client to identify if all regions have the schema updates
334    *
335    * @param tableName
336    * @return Pair indicating the status of the alter command
337    * @throws IOException
338    */
339   public Pair<Integer, Integer> getReopenStatus(TableName tableName)
340       throws IOException {
341     List<HRegionInfo> hris;
342     if (TableName.META_TABLE_NAME.equals(tableName)) {
343       hris = new MetaTableLocator().getMetaRegions(server.getZooKeeper());
344     } else {
345       hris = MetaTableAccessor.getTableRegions(server.getConnection(), tableName, true);
346     }
347 
348     Integer pending = 0;
349     for (HRegionInfo hri : hris) {
350       String name = hri.getEncodedName();
351       // no lock concurrent access ok: sequential consistency respected.
352       if (regionsToReopen.containsKey(name)
353           || regionStates.isRegionInTransition(name)) {
354         pending++;
355       }
356     }
357     return new Pair<Integer, Integer>(pending, hris.size());
358   }
359 
360   /**
361    * Used by ServerShutdownHandler to make sure AssignmentManager has completed
362    * the failover cleanup before re-assigning regions of dead servers. So that
363    * when re-assignment happens, AssignmentManager has proper region states.
364    */
365   public boolean isFailoverCleanupDone() {
366     return failoverCleanupDone.get();
367   }
368 
369   /**
370    * To avoid racing with AM, external entities may need to lock a region,
371    * for example, when SSH checks what regions to skip re-assigning.
372    */
373   public Lock acquireRegionLock(final String encodedName) {
374     return locker.acquireLock(encodedName);
375   }
376 
377   /**
378    * Now, failover cleanup is completed. Notify server manager to
379    * process queued up dead servers processing, if any.
380    */
381   void failoverCleanupDone() {
382     failoverCleanupDone.set(true);
383     serverManager.processQueuedDeadServers();
384   }
385 
386   /**
387    * Called on startup.
388    * Figures whether a fresh cluster start of we are joining extant running cluster.
389    * @throws IOException
390    * @throws KeeperException
391    * @throws InterruptedException
392    */
393   void joinCluster() throws IOException,
394           KeeperException, InterruptedException {
395     long startTime = System.currentTimeMillis();
396     // Concurrency note: In the below the accesses on regionsInTransition are
397     // outside of a synchronization block where usually all accesses to RIT are
398     // synchronized.  The presumption is that in this case it is safe since this
399     // method is being played by a single thread on startup.
400 
401     // TODO: Regions that have a null location and are not in regionsInTransitions
402     // need to be handled.
403 
404     // Scan hbase:meta to build list of existing regions, servers, and assignment
405     // Returns servers who have not checked in (assumed dead) that some regions
406     // were assigned to (according to the meta)
407     Set<ServerName> deadServers = rebuildUserRegions();
408 
409     // This method will assign all user regions if a clean server startup or
410     // it will reconstruct master state and cleanup any leftovers from
411     // previous master process.
412     boolean failover = processDeadServersAndRegionsInTransition(deadServers);
413 
414     recoverTableInDisablingState();
415     recoverTableInEnablingState();
416     LOG.info("Joined the cluster in " + (System.currentTimeMillis()
417       - startTime) + "ms, failover=" + failover);
418   }
419 
420   /**
421    * Process all regions that are in transition in zookeeper and also
422    * processes the list of dead servers by scanning the META.
423    * Used by master joining an cluster.  If we figure this is a clean cluster
424    * startup, will assign all user regions.
425    * @param deadServers
426    *          Map of dead servers and their regions. Can be null.
427    * @throws IOException
428    * @throws InterruptedException
429    */
430   boolean processDeadServersAndRegionsInTransition(final Set<ServerName> deadServers)
431           throws IOException, InterruptedException {
432     boolean failover = !serverManager.getDeadServers().isEmpty();
433     if (failover) {
434       // This may not be a failover actually, especially if meta is on this master.
435       if (LOG.isDebugEnabled()) {
436         LOG.debug("Found dead servers out on cluster " + serverManager.getDeadServers());
437       }
438     } else {
439       // If any one region except meta is assigned, it's a failover.
440       Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
441       for (Map.Entry<HRegionInfo, ServerName> en:
442           regionStates.getRegionAssignments().entrySet()) {
443         HRegionInfo hri = en.getKey();
444         if (!hri.isMetaTable()
445             && onlineServers.contains(en.getValue())) {
446           LOG.debug("Found " + hri + " out on cluster");
447           failover = true;
448           break;
449         }
450       }
451       if (!failover) {
452         // If any region except meta is in transition on a live server, it's a failover.
453         Map<String, RegionState> regionsInTransition = regionStates.getRegionsInTransition();
454         if (!regionsInTransition.isEmpty()) {
455           for (RegionState regionState: regionsInTransition.values()) {
456             ServerName serverName = regionState.getServerName();
457             if (!regionState.getRegion().isMetaRegion()
458                 && serverName != null && onlineServers.contains(serverName)) {
459               LOG.debug("Found " + regionState + " in RITs");
460               failover = true;
461               break;
462             }
463           }
464         }
465       }
466     }
467     if (!failover) {
468       // If we get here, we have a full cluster restart. It is a failover only
469       // if there are some WALs are not split yet. For meta WALs, they should have
470       // been split already, if any. We can walk through those queued dead servers,
471       // if they don't have any WALs, this restart should be considered as a clean one
472       Set<ServerName> queuedDeadServers = serverManager.getRequeuedDeadServers().keySet();
473       if (!queuedDeadServers.isEmpty()) {
474         Configuration conf = server.getConfiguration();
475         Path rootdir = FSUtils.getRootDir(conf);
476         FileSystem fs = rootdir.getFileSystem(conf);
477         for (ServerName serverName: queuedDeadServers) {
478           // In the case of a clean exit, the shutdown handler would have presplit any WALs and
479           // removed empty directories.
480           Path logDir = new Path(rootdir,
481               DefaultWALProvider.getWALDirectoryName(serverName.toString()));
482           Path splitDir = logDir.suffix(DefaultWALProvider.SPLITTING_EXT);
483           if (fs.exists(logDir) || fs.exists(splitDir)) {
484             LOG.debug("Found queued dead server " + serverName);
485             failover = true;
486             break;
487           }
488         }
489         if (!failover) {
490           // We figured that it's not a failover, so no need to
491           // work on these re-queued dead servers any more.
492           LOG.info("AM figured that it's not a failover and cleaned up "
493             + queuedDeadServers.size() + " queued dead servers");
494           serverManager.removeRequeuedDeadServers();
495         }
496       }
497     }
498 
499     Set<TableName> disabledOrDisablingOrEnabling = null;
500     Map<HRegionInfo, ServerName> allRegions = null;
501 
502     if (!failover) {
503       disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
504         TableState.State.DISABLED, TableState.State.DISABLING,
505         TableState.State.ENABLING);
506 
507       // Clean re/start, mark all user regions closed before reassignment
508       allRegions = regionStates.closeAllUserRegions(
509         disabledOrDisablingOrEnabling);
510     }
511 
512     // Now region states are restored
513     regionStateStore.start();
514 
515     if (failover) {
516       if (deadServers != null && !deadServers.isEmpty()) {
517         for (ServerName serverName: deadServers) {
518           if (!serverManager.isServerDead(serverName)) {
519             serverManager.expireServer(serverName); // Let SSH do region re-assign
520           }
521         }
522       }
523       processRegionsInTransition(regionStates.getRegionsInTransition().values());
524     }
525 
526     // Now we can safely claim failover cleanup completed and enable
527     // ServerShutdownHandler for further processing. The nodes (below)
528     // in transition, if any, are for regions not related to those
529     // dead servers at all, and can be done in parallel to SSH.
530     failoverCleanupDone();
531     if (!failover) {
532       // Fresh cluster startup.
533       LOG.info("Clean cluster startup. Assigning user regions");
534       assignAllUserRegions(allRegions);
535     }
536     // unassign replicas of the split parents and the merged regions
537     // the daughter replicas are opened in assignAllUserRegions if it was
538     // not already opened.
539     for (HRegionInfo h : replicasToClose) {
540       unassign(h);
541     }
542     replicasToClose.clear();
543     return failover;
544   }
545 
546   /**
547    * When a region is closed, it should be removed from the regionsToReopen
548    * @param hri HRegionInfo of the region which was closed
549    */
550   public void removeClosedRegion(HRegionInfo hri) {
551     if (regionsToReopen.remove(hri.getEncodedName()) != null) {
552       LOG.debug("Removed region from reopening regions because it was closed");
553     }
554   }
555 
556   // TODO: processFavoredNodes might throw an exception, for e.g., if the
557   // meta could not be contacted/updated. We need to see how seriously to treat
558   // this problem as. Should we fail the current assignment. We should be able
559   // to recover from this problem eventually (if the meta couldn't be updated
560   // things should work normally and eventually get fixed up).
561   void processFavoredNodes(List<HRegionInfo> regions) throws IOException {
562     if (!shouldAssignRegionsWithFavoredNodes) return;
563     // The AM gets the favored nodes info for each region and updates the meta
564     // table with that info
565     Map<HRegionInfo, List<ServerName>> regionToFavoredNodes =
566         new HashMap<HRegionInfo, List<ServerName>>();
567     for (HRegionInfo region : regions) {
568       regionToFavoredNodes.put(region,
569           ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region));
570     }
571     FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(regionToFavoredNodes,
572       this.server.getConnection());
573   }
574 
575   /**
576    * Marks the region as online.  Removes it from regions in transition and
577    * updates the in-memory assignment information.
578    * <p>
579    * Used when a region has been successfully opened on a region server.
580    * @param regionInfo
581    * @param sn
582    */
583   void regionOnline(HRegionInfo regionInfo, ServerName sn) {
584     regionOnline(regionInfo, sn, HConstants.NO_SEQNUM);
585   }
586 
587   void regionOnline(HRegionInfo regionInfo, ServerName sn, long openSeqNum) {
588     numRegionsOpened.incrementAndGet();
589     regionStates.regionOnline(regionInfo, sn, openSeqNum);
590 
591     // Remove plan if one.
592     clearRegionPlan(regionInfo);
593     balancer.regionOnline(regionInfo, sn);
594 
595     // Tell our listeners that a region was opened
596     sendRegionOpenedNotification(regionInfo, sn);
597   }
598 
599   /**
600    * Marks the region as offline.  Removes it from regions in transition and
601    * removes in-memory assignment information.
602    * <p>
603    * Used when a region has been closed and should remain closed.
604    * @param regionInfo
605    */
606   public void regionOffline(final HRegionInfo regionInfo) {
607     regionOffline(regionInfo, null);
608   }
609 
610   public void offlineDisabledRegion(HRegionInfo regionInfo) {
611     replicasToClose.remove(regionInfo);
612     regionOffline(regionInfo);
613   }
614 
615   // Assignment methods
616 
617   /**
618    * Assigns the specified region.
619    * <p>
620    * If a RegionPlan is available with a valid destination then it will be used
621    * to determine what server region is assigned to.  If no RegionPlan is
622    * available, region will be assigned to a random available server.
623    * <p>
624    * Updates the RegionState and sends the OPEN RPC.
625    * <p>
626    * This will only succeed if the region is in transition and in a CLOSED or
627    * OFFLINE state or not in transition, and of course, the
628    * chosen server is up and running (It may have just crashed!).
629    *
630    * @param region server to be assigned
631    */
632   public void assign(HRegionInfo region) {
633     assign(region, false);
634   }
635 
636   /**
637    * Use care with forceNewPlan. It could cause double assignment.
638    */
639   public void assign(HRegionInfo region, boolean forceNewPlan) {
640     if (isDisabledorDisablingRegionInRIT(region)) {
641       return;
642     }
643     String encodedName = region.getEncodedName();
644     Lock lock = locker.acquireLock(encodedName);
645     try {
646       RegionState state = forceRegionStateToOffline(region, forceNewPlan);
647       if (state != null) {
648         if (regionStates.wasRegionOnDeadServer(encodedName)) {
649           LOG.info("Skip assigning " + region.getRegionNameAsString()
650             + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
651             + " is dead but not processed yet");
652           return;
653         }
654         assign(state, forceNewPlan);
655       }
656     } finally {
657       lock.unlock();
658     }
659   }
660 
661   /**
662    * Bulk assign regions to <code>destination</code>.
663    * @param destination
664    * @param regions Regions to assign.
665    * @return true if successful
666    */
667   boolean assign(final ServerName destination, final List<HRegionInfo> regions)
668     throws InterruptedException {
669     long startTime = EnvironmentEdgeManager.currentTime();
670     try {
671       int regionCount = regions.size();
672       if (regionCount == 0) {
673         return true;
674       }
675       LOG.info("Assigning " + regionCount + " region(s) to " + destination.toString());
676       Set<String> encodedNames = new HashSet<String>(regionCount);
677       for (HRegionInfo region : regions) {
678         encodedNames.add(region.getEncodedName());
679       }
680 
681       List<HRegionInfo> failedToOpenRegions = new ArrayList<HRegionInfo>();
682       Map<String, Lock> locks = locker.acquireLocks(encodedNames);
683       try {
684         Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>(regionCount);
685         List<RegionState> states = new ArrayList<RegionState>(regionCount);
686         for (HRegionInfo region : regions) {
687           String encodedName = region.getEncodedName();
688           if (!isDisabledorDisablingRegionInRIT(region)) {
689             RegionState state = forceRegionStateToOffline(region, false);
690             boolean onDeadServer = false;
691             if (state != null) {
692               if (regionStates.wasRegionOnDeadServer(encodedName)) {
693                 LOG.info("Skip assigning " + region.getRegionNameAsString()
694                   + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
695                   + " is dead but not processed yet");
696                 onDeadServer = true;
697               } else {
698                 RegionPlan plan = new RegionPlan(region, state.getServerName(), destination);
699                 plans.put(encodedName, plan);
700                 states.add(state);
701                 continue;
702               }
703             }
704             // Reassign if the region wasn't on a dead server
705             if (!onDeadServer) {
706               LOG.info("failed to force region state to offline, "
707                 + "will reassign later: " + region);
708               failedToOpenRegions.add(region); // assign individually later
709             }
710           }
711           // Release the lock, this region is excluded from bulk assign because
712           // we can't update its state, or set its znode to offline.
713           Lock lock = locks.remove(encodedName);
714           lock.unlock();
715         }
716 
717         if (server.isStopped()) {
718           return false;
719         }
720 
721         // Add region plans, so we can updateTimers when one region is opened so
722         // that unnecessary timeout on RIT is reduced.
723         this.addPlans(plans);
724 
725         List<Pair<HRegionInfo, List<ServerName>>> regionOpenInfos =
726           new ArrayList<Pair<HRegionInfo, List<ServerName>>>(states.size());
727         for (RegionState state: states) {
728           HRegionInfo region = state.getRegion();
729           regionStates.updateRegionState(
730             region, State.PENDING_OPEN, destination);
731           List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
732           if (this.shouldAssignRegionsWithFavoredNodes) {
733             favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
734           }
735           regionOpenInfos.add(new Pair<HRegionInfo, List<ServerName>>(
736             region, favoredNodes));
737         }
738 
739         // Move on to open regions.
740         try {
741           // Send OPEN RPC. If it fails on a IOE or RemoteException,
742           // regions will be assigned individually.
743           Configuration conf = server.getConfiguration();
744           long maxWaitTime = System.currentTimeMillis() +
745             conf.getLong("hbase.regionserver.rpc.startup.waittime", 60000);
746           for (int i = 1; i <= maximumAttempts && !server.isStopped(); i++) {
747             try {
748               List<RegionOpeningState> regionOpeningStateList = serverManager
749                 .sendRegionOpen(destination, regionOpenInfos);
750               for (int k = 0, n = regionOpeningStateList.size(); k < n; k++) {
751                 RegionOpeningState openingState = regionOpeningStateList.get(k);
752                 if (openingState != RegionOpeningState.OPENED) {
753                   HRegionInfo region = regionOpenInfos.get(k).getFirst();
754                   LOG.info("Got opening state " + openingState
755                     + ", will reassign later: " + region);
756                   // Failed opening this region, reassign it later
757                   forceRegionStateToOffline(region, true);
758                   failedToOpenRegions.add(region);
759                 }
760               }
761               break;
762             } catch (IOException e) {
763               if (e instanceof RemoteException) {
764                 e = ((RemoteException)e).unwrapRemoteException();
765               }
766               if (e instanceof RegionServerStoppedException) {
767                 LOG.warn("The region server was shut down, ", e);
768                 // No need to retry, the region server is a goner.
769                 return false;
770               } else if (e instanceof ServerNotRunningYetException) {
771                 long now = System.currentTimeMillis();
772                 if (now < maxWaitTime) {
773                   if (LOG.isDebugEnabled()) {
774                     LOG.debug("Server is not yet up; waiting up to " +
775                       (maxWaitTime - now) + "ms", e);
776                   }
777                   Thread.sleep(100);
778                   i--; // reset the try count
779                   continue;
780                 }
781               } else if (e instanceof java.net.SocketTimeoutException
782                   && this.serverManager.isServerOnline(destination)) {
783                 // In case socket is timed out and the region server is still online,
784                 // the openRegion RPC could have been accepted by the server and
785                 // just the response didn't go through.  So we will retry to
786                 // open the region on the same server.
787                 if (LOG.isDebugEnabled()) {
788                   LOG.debug("Bulk assigner openRegion() to " + destination
789                     + " has timed out, but the regions might"
790                     + " already be opened on it.", e);
791                 }
792                 // wait and reset the re-try count, server might be just busy.
793                 Thread.sleep(100);
794                 i--;
795                 continue;
796               } else if (e instanceof FailedServerException && i < maximumAttempts) {
797                 // In case the server is in the failed server list, no point to
798                 // retry too soon. Retry after the failed_server_expiry time
799                 long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
800                   RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
801                 if (LOG.isDebugEnabled()) {
802                   LOG.debug(destination + " is on failed server list; waiting "
803                     + sleepTime + "ms", e);
804                 }
805                 Thread.sleep(sleepTime);
806                 continue;
807               }
808               throw e;
809             }
810           }
811         } catch (IOException e) {
812           // Can be a socket timeout, EOF, NoRouteToHost, etc
813           LOG.info("Unable to communicate with " + destination
814             + " in order to assign regions, ", e);
815           for (RegionState state: states) {
816             HRegionInfo region = state.getRegion();
817             forceRegionStateToOffline(region, true);
818           }
819           return false;
820         }
821       } finally {
822         for (Lock lock : locks.values()) {
823           lock.unlock();
824         }
825       }
826 
827       if (!failedToOpenRegions.isEmpty()) {
828         for (HRegionInfo region : failedToOpenRegions) {
829           if (!regionStates.isRegionOnline(region)) {
830             invokeAssign(region);
831           }
832         }
833       }
834       LOG.debug("Bulk assigning done for " + destination);
835       return true;
836     } finally {
837       metricsAssignmentManager.updateBulkAssignTime(EnvironmentEdgeManager.currentTime() - startTime);
838     }
839   }
840 
841   /**
842    * Send CLOSE RPC if the server is online, otherwise, offline the region.
843    *
844    * The RPC will be sent only to the region sever found in the region state
845    * if it is passed in, otherwise, to the src server specified. If region
846    * state is not specified, we don't update region state at all, instead
847    * we just send the RPC call. This is useful for some cleanup without
848    * messing around the region states (see handleRegion, on region opened
849    * on an unexpected server scenario, for an example)
850    */
851   private void unassign(final HRegionInfo region,
852       final ServerName server, final ServerName dest) {
853     for (int i = 1; i <= this.maximumAttempts; i++) {
854       if (this.server.isStopped() || this.server.isAborted()) {
855         LOG.debug("Server stopped/aborted; skipping unassign of " + region);
856         return;
857       }
858       if (!serverManager.isServerOnline(server)) {
859         LOG.debug("Offline " + region.getRegionNameAsString()
860           + ", no need to unassign since it's on a dead server: " + server);
861         regionStates.updateRegionState(region, State.OFFLINE);
862         return;
863       }
864       try {
865         // Send CLOSE RPC
866         if (serverManager.sendRegionClose(server, region, dest)) {
867           LOG.debug("Sent CLOSE to " + server + " for region " +
868             region.getRegionNameAsString());
869           return;
870         }
871         // This never happens. Currently regionserver close always return true.
872         // Todo; this can now happen (0.96) if there is an exception in a coprocessor
873         LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
874           region.getRegionNameAsString());
875       } catch (Throwable t) {
876         if (t instanceof RemoteException) {
877           t = ((RemoteException)t).unwrapRemoteException();
878         }
879         if (t instanceof NotServingRegionException
880             || t instanceof RegionServerStoppedException
881             || t instanceof ServerNotRunningYetException) {
882           LOG.debug("Offline " + region.getRegionNameAsString()
883             + ", it's not any more on " + server, t);
884           regionStates.updateRegionState(region, State.OFFLINE);
885           return;
886         } else if (t instanceof FailedServerException && i < maximumAttempts) {
887           // In case the server is in the failed server list, no point to
888           // retry too soon. Retry after the failed_server_expiry time
889           try {
890             Configuration conf = this.server.getConfiguration();
891             long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
892               RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
893             if (LOG.isDebugEnabled()) {
894               LOG.debug(server + " is on failed server list; waiting "
895                 + sleepTime + "ms", t);
896             }
897             Thread.sleep(sleepTime);
898           } catch (InterruptedException ie) {
899             LOG.warn("Failed to unassign "
900               + region.getRegionNameAsString() + " since interrupted", ie);
901             regionStates.updateRegionState(region, State.FAILED_CLOSE);
902             Thread.currentThread().interrupt();
903             return;
904           }
905         }
906 
907         LOG.info("Server " + server + " returned " + t + " for "
908           + region.getRegionNameAsString() + ", try=" + i
909           + " of " + this.maximumAttempts, t);
910       }
911     }
912     // Run out of attempts
913     regionStates.updateRegionState(region, State.FAILED_CLOSE);
914   }
915 
916   /**
917    * Set region to OFFLINE unless it is opening and forceNewPlan is false.
918    */
919   private RegionState forceRegionStateToOffline(
920       final HRegionInfo region, final boolean forceNewPlan) {
921     RegionState state = regionStates.getRegionState(region);
922     if (state == null) {
923       LOG.warn("Assigning a region not in region states: " + region);
924       state = regionStates.createRegionState(region);
925     }
926 
927     if (forceNewPlan && LOG.isDebugEnabled()) {
928       LOG.debug("Force region state offline " + state);
929     }
930 
931     switch (state.getState()) {
932     case OPEN:
933     case OPENING:
934     case PENDING_OPEN:
935     case CLOSING:
936     case PENDING_CLOSE:
937       if (!forceNewPlan) {
938         LOG.debug("Skip assigning " +
939           region + ", it is already " + state);
940         return null;
941       }
942     case FAILED_CLOSE:
943     case FAILED_OPEN:
944       regionStates.updateRegionState(region, State.PENDING_CLOSE);
945       unassign(region, state.getServerName(), null);
946       state = regionStates.getRegionState(region);
947       if (!state.isOffline() && !state.isClosed()) {
948         // If the region isn't offline, we can't re-assign
949         // it now. It will be assigned automatically after
950         // the regionserver reports it's closed.
951         return null;
952       }
953     case OFFLINE:
954     case CLOSED:
955       break;
956     default:
957       LOG.error("Trying to assign region " + region
958         + ", which is " + state);
959       return null;
960     }
961     return state;
962   }
963 
964   /**
965    * Caller must hold lock on the passed <code>state</code> object.
966    * @param state
967    * @param forceNewPlan
968    */
969   private void assign(RegionState state, boolean forceNewPlan) {
970     long startTime = EnvironmentEdgeManager.currentTime();
971     try {
972       Configuration conf = server.getConfiguration();
973       RegionPlan plan = null;
974       long maxWaitTime = -1;
975       HRegionInfo region = state.getRegion();
976       Throwable previousException = null;
977       for (int i = 1; i <= maximumAttempts; i++) {
978         if (server.isStopped() || server.isAborted()) {
979           LOG.info("Skip assigning " + region.getRegionNameAsString()
980             + ", the server is stopped/aborted");
981           return;
982         }
983 
984         if (plan == null) { // Get a server for the region at first
985           try {
986             plan = getRegionPlan(region, forceNewPlan);
987           } catch (HBaseIOException e) {
988             LOG.warn("Failed to get region plan", e);
989           }
990         }
991 
992         if (plan == null) {
993           LOG.warn("Unable to determine a plan to assign " + region);
994 
995           // For meta region, we have to keep retrying until succeeding
996           if (region.isMetaRegion()) {
997             if (i == maximumAttempts) {
998               i = 0; // re-set attempt count to 0 for at least 1 retry
999 
1000               LOG.warn("Unable to determine a plan to assign a hbase:meta region " + region +
1001                 " after maximumAttempts (" + this.maximumAttempts +
1002                 "). Reset attempts count and continue retrying.");
1003             }
1004             waitForRetryingMetaAssignment();
1005             continue;
1006           }
1007 
1008           regionStates.updateRegionState(region, State.FAILED_OPEN);
1009           return;
1010         }
1011         // In case of assignment from EnableTableHandler table state is ENABLING. Any how
1012         // EnableTableHandler will set ENABLED after assigning all the table regions. If we
1013         // try to set to ENABLED directly then client API may think table is enabled.
1014         // When we have a case such as all the regions are added directly into hbase:meta and we call
1015         // assignRegion then we need to make the table ENABLED. Hence in such case the table
1016         // will not be in ENABLING or ENABLED state.
1017         TableName tableName = region.getTable();
1018         if (!tableStateManager.isTableState(tableName,
1019           TableState.State.ENABLED, TableState.State.ENABLING)) {
1020           LOG.debug("Setting table " + tableName + " to ENABLED state.");
1021           setEnabledTable(tableName);
1022         }
1023         LOG.info("Assigning " + region.getRegionNameAsString() +
1024             " to " + plan.getDestination().toString());
1025         // Transition RegionState to PENDING_OPEN
1026        regionStates.updateRegionState(region,
1027           State.PENDING_OPEN, plan.getDestination());
1028 
1029         boolean needNewPlan = false;
1030         final String assignMsg = "Failed assignment of " + region.getRegionNameAsString() +
1031             " to " + plan.getDestination();
1032         try {
1033           List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
1034           if (this.shouldAssignRegionsWithFavoredNodes) {
1035             favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
1036           }
1037           serverManager.sendRegionOpen(plan.getDestination(), region, favoredNodes);
1038           return; // we're done
1039         } catch (Throwable t) {
1040           if (t instanceof RemoteException) {
1041             t = ((RemoteException) t).unwrapRemoteException();
1042           }
1043           previousException = t;
1044 
1045           // Should we wait a little before retrying? If the server is starting it's yes.
1046           boolean hold = (t instanceof ServerNotRunningYetException);
1047 
1048           // In case socket is timed out and the region server is still online,
1049           // the openRegion RPC could have been accepted by the server and
1050           // just the response didn't go through.  So we will retry to
1051           // open the region on the same server.
1052           boolean retry = !hold && (t instanceof java.net.SocketTimeoutException
1053               && this.serverManager.isServerOnline(plan.getDestination()));
1054 
1055           if (hold) {
1056             LOG.warn(assignMsg + ", waiting a little before trying on the same region server " +
1057               "try=" + i + " of " + this.maximumAttempts, t);
1058 
1059             if (maxWaitTime < 0) {
1060               maxWaitTime = EnvironmentEdgeManager.currentTime()
1061                 + this.server.getConfiguration().getLong(
1062                   "hbase.regionserver.rpc.startup.waittime", 60000);
1063             }
1064             try {
1065               long now = EnvironmentEdgeManager.currentTime();
1066               if (now < maxWaitTime) {
1067                 if (LOG.isDebugEnabled()) {
1068                   LOG.debug("Server is not yet up; waiting up to "
1069                     + (maxWaitTime - now) + "ms", t);
1070                 }
1071                 Thread.sleep(100);
1072                 i--; // reset the try count
1073               } else {
1074                 LOG.debug("Server is not up for a while; try a new one", t);
1075                 needNewPlan = true;
1076               }
1077             } catch (InterruptedException ie) {
1078               LOG.warn("Failed to assign "
1079                   + region.getRegionNameAsString() + " since interrupted", ie);
1080               regionStates.updateRegionState(region, State.FAILED_OPEN);
1081               Thread.currentThread().interrupt();
1082               return;
1083             }
1084           } else if (retry) {
1085             i--; // we want to retry as many times as needed as long as the RS is not dead.
1086             if (LOG.isDebugEnabled()) {
1087               LOG.debug(assignMsg + ", trying to assign to the same region server due ", t);
1088             }
1089           } else {
1090             needNewPlan = true;
1091             LOG.warn(assignMsg + ", trying to assign elsewhere instead;" +
1092                 " try=" + i + " of " + this.maximumAttempts, t);
1093           }
1094         }
1095 
1096         if (i == this.maximumAttempts) {
1097           // For meta region, we have to keep retrying until succeeding
1098           if (region.isMetaRegion()) {
1099             i = 0; // re-set attempt count to 0 for at least 1 retry
1100             LOG.warn(assignMsg +
1101                 ", trying to assign a hbase:meta region reached to maximumAttempts (" +
1102                 this.maximumAttempts + ").  Reset attempt counts and continue retrying.");
1103             waitForRetryingMetaAssignment();
1104           }
1105           else {
1106             // Don't reset the region state or get a new plan any more.
1107             // This is the last try.
1108             continue;
1109           }
1110         }
1111 
1112         // If region opened on destination of present plan, reassigning to new
1113         // RS may cause double assignments. In case of RegionAlreadyInTransitionException
1114         // reassigning to same RS.
1115         if (needNewPlan) {
1116           // Force a new plan and reassign. Will return null if no servers.
1117           // The new plan could be the same as the existing plan since we don't
1118           // exclude the server of the original plan, which should not be
1119           // excluded since it could be the only server up now.
1120           RegionPlan newPlan = null;
1121           try {
1122             newPlan = getRegionPlan(region, true);
1123           } catch (HBaseIOException e) {
1124             LOG.warn("Failed to get region plan", e);
1125           }
1126           if (newPlan == null) {
1127             regionStates.updateRegionState(region, State.FAILED_OPEN);
1128             LOG.warn("Unable to find a viable location to assign region " +
1129                 region.getRegionNameAsString());
1130             return;
1131           }
1132 
1133           if (plan != newPlan && !plan.getDestination().equals(newPlan.getDestination())) {
1134             // Clean out plan we failed execute and one that doesn't look like it'll
1135             // succeed anyways; we need a new plan!
1136             // Transition back to OFFLINE
1137             regionStates.updateRegionState(region, State.OFFLINE);
1138             plan = newPlan;
1139           } else if(plan.getDestination().equals(newPlan.getDestination()) &&
1140               previousException instanceof FailedServerException) {
1141             try {
1142               LOG.info("Trying to re-assign " + region.getRegionNameAsString() +
1143                 " to the same failed server.");
1144               Thread.sleep(1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1145                 RpcClient.FAILED_SERVER_EXPIRY_DEFAULT));
1146             } catch (InterruptedException ie) {
1147               LOG.warn("Failed to assign "
1148                   + region.getRegionNameAsString() + " since interrupted", ie);
1149               regionStates.updateRegionState(region, State.FAILED_OPEN);
1150               Thread.currentThread().interrupt();
1151               return;
1152             }
1153           }
1154         }
1155       }
1156       // Run out of attempts
1157       regionStates.updateRegionState(region, State.FAILED_OPEN);
1158     } finally {
1159       metricsAssignmentManager.updateAssignmentTime(EnvironmentEdgeManager.currentTime() - startTime);
1160     }
1161   }
1162 
1163   private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) {
1164     if (this.tableStateManager.isTableState(region.getTable(),
1165             TableState.State.DISABLED,
1166             TableState.State.DISABLING) || replicasToClose.contains(region)) {
1167       LOG.info("Table " + region.getTable() + " is disabled or disabling;"
1168         + " skipping assign of " + region.getRegionNameAsString());
1169       offlineDisabledRegion(region);
1170       return true;
1171     }
1172     return false;
1173   }
1174 
1175   /**
1176    * @param region the region to assign
1177    * @param forceNewPlan If true, then if an existing plan exists, a new plan
1178    * will be generated.
1179    * @return Plan for passed <code>region</code> (If none currently, it creates one or
1180    * if no servers to assign, it returns null).
1181    */
1182   private RegionPlan getRegionPlan(final HRegionInfo region,
1183       final boolean forceNewPlan) throws HBaseIOException {
1184     // Pickup existing plan or make a new one
1185     final String encodedName = region.getEncodedName();
1186     final List<ServerName> destServers =
1187       serverManager.createDestinationServersList();
1188 
1189     if (destServers.isEmpty()){
1190       LOG.warn("Can't move " + encodedName +
1191         ", there is no destination server available.");
1192       return null;
1193     }
1194 
1195     RegionPlan randomPlan = null;
1196     boolean newPlan = false;
1197     RegionPlan existingPlan;
1198 
1199     synchronized (this.regionPlans) {
1200       existingPlan = this.regionPlans.get(encodedName);
1201 
1202       if (existingPlan != null && existingPlan.getDestination() != null) {
1203         LOG.debug("Found an existing plan for " + region.getRegionNameAsString()
1204           + " destination server is " + existingPlan.getDestination() +
1205             " accepted as a dest server = " + destServers.contains(existingPlan.getDestination()));
1206       }
1207 
1208       if (forceNewPlan
1209           || existingPlan == null
1210           || existingPlan.getDestination() == null
1211           || !destServers.contains(existingPlan.getDestination())) {
1212         newPlan = true;
1213         randomPlan = new RegionPlan(region, null,
1214             balancer.randomAssignment(region, destServers));
1215         if (!region.isMetaTable() && shouldAssignRegionsWithFavoredNodes) {
1216           List<HRegionInfo> regions = new ArrayList<HRegionInfo>(1);
1217           regions.add(region);
1218           try {
1219             processFavoredNodes(regions);
1220           } catch (IOException ie) {
1221             LOG.warn("Ignoring exception in processFavoredNodes " + ie);
1222           }
1223         }
1224         this.regionPlans.put(encodedName, randomPlan);
1225       }
1226     }
1227 
1228     if (newPlan) {
1229       if (randomPlan.getDestination() == null) {
1230         LOG.warn("Can't find a destination for " + encodedName);
1231         return null;
1232       }
1233       if (LOG.isDebugEnabled()) {
1234         LOG.debug("No previous transition plan found (or ignoring " +
1235           "an existing plan) for " + region.getRegionNameAsString() +
1236           "; generated random plan=" + randomPlan + "; " + destServers.size() +
1237           " (online=" + serverManager.getOnlineServers().size() +
1238           ") available servers, forceNewPlan=" + forceNewPlan);
1239       }
1240       return randomPlan;
1241     }
1242     if (LOG.isDebugEnabled()) {
1243       LOG.debug("Using pre-existing plan for " +
1244         region.getRegionNameAsString() + "; plan=" + existingPlan);
1245     }
1246     return existingPlan;
1247   }
1248 
1249   /**
1250    * Wait for some time before retrying meta table region assignment
1251    */
1252   private void waitForRetryingMetaAssignment() {
1253     try {
1254       Thread.sleep(this.sleepTimeBeforeRetryingMetaAssignment);
1255     } catch (InterruptedException e) {
1256       LOG.error("Got exception while waiting for hbase:meta assignment");
1257       Thread.currentThread().interrupt();
1258     }
1259   }
1260 
1261   /**
1262    * Unassigns the specified region.
1263    * <p>
1264    * Updates the RegionState and sends the CLOSE RPC unless region is being
1265    * split by regionserver; then the unassign fails (silently) because we
1266    * presume the region being unassigned no longer exists (its been split out
1267    * of existence). TODO: What to do if split fails and is rolled back and
1268    * parent is revivified?
1269    * <p>
1270    * If a RegionPlan is already set, it will remain.
1271    *
1272    * @param region server to be unassigned
1273    */
1274   public void unassign(HRegionInfo region) {
1275     unassign(region, null);
1276   }
1277 
1278 
1279   /**
1280    * Unassigns the specified region.
1281    * <p>
1282    * Updates the RegionState and sends the CLOSE RPC unless region is being
1283    * split by regionserver; then the unassign fails (silently) because we
1284    * presume the region being unassigned no longer exists (its been split out
1285    * of existence). TODO: What to do if split fails and is rolled back and
1286    * parent is revivified?
1287    * <p>
1288    * If a RegionPlan is already set, it will remain.
1289    *
1290    * @param region server to be unassigned
1291    * @param dest the destination server of the region
1292    */
1293   public void unassign(HRegionInfo region, ServerName dest) {
1294     // TODO: Method needs refactoring.  Ugly buried returns throughout.  Beware!
1295     LOG.debug("Starting unassign of " + region.getRegionNameAsString()
1296       + " (offlining), current state: " + regionStates.getRegionState(region));
1297 
1298     String encodedName = region.getEncodedName();
1299     // Grab the state of this region and synchronize on it
1300     // We need a lock here as we're going to do a put later and we don't want multiple states
1301     //  creation
1302     ReentrantLock lock = locker.acquireLock(encodedName);
1303     RegionState state = regionStates.getRegionTransitionState(encodedName);
1304     try {
1305       if (state == null || state.isFailedClose()) {
1306         if (state == null) {
1307           // Region is not in transition.
1308           // We can unassign it only if it's not SPLIT/MERGED.
1309           state = regionStates.getRegionState(encodedName);
1310           if (state != null && state.isUnassignable()) {
1311             LOG.info("Attempting to unassign " + state + ", ignored");
1312             // Offline region will be reassigned below
1313             return;
1314           }
1315           if (state == null || state.getServerName() == null) {
1316             // We don't know where the region is, offline it.
1317             // No need to send CLOSE RPC
1318             LOG.warn("Attempting to unassign a region not in RegionStates"
1319               + region.getRegionNameAsString() + ", offlined");
1320             regionOffline(region);
1321             return;
1322           }
1323         }
1324         state = regionStates.updateRegionState(
1325           region, State.PENDING_CLOSE);
1326       } else if (state.isFailedOpen()) {
1327         // The region is not open yet
1328         regionOffline(region);
1329         return;
1330       } else {
1331         LOG.debug("Attempting to unassign " +
1332           region.getRegionNameAsString() + " but it is " +
1333           "already in transition (" + state.getState());
1334         return;
1335       }
1336 
1337       unassign(region, state.getServerName(), dest);
1338     } finally {
1339       lock.unlock();
1340 
1341       // Region is expected to be reassigned afterwards
1342       if (!replicasToClose.contains(region)
1343           && regionStates.isRegionInState(region, State.OFFLINE)) {
1344         assign(region);
1345       }
1346     }
1347   }
1348 
1349   /**
1350    * Used by unit tests. Return the number of regions opened so far in the life
1351    * of the master. Increases by one every time the master opens a region
1352    * @return the counter value of the number of regions opened so far
1353    */
1354   public int getNumRegionsOpened() {
1355     return numRegionsOpened.get();
1356   }
1357 
1358   /**
1359    * Waits until the specified region has completed assignment.
1360    * <p>
1361    * If the region is already assigned, returns immediately.  Otherwise, method
1362    * blocks until the region is assigned.
1363    * @param regionInfo region to wait on assignment for
1364    * @throws InterruptedException
1365    */
1366   public boolean waitForAssignment(HRegionInfo regionInfo)
1367       throws InterruptedException {
1368     while (!regionStates.isRegionOnline(regionInfo)) {
1369       if (regionStates.isRegionInState(regionInfo, State.FAILED_OPEN)
1370           || this.server.isStopped()) {
1371         return false;
1372       }
1373 
1374       // We should receive a notification, but it's
1375       //  better to have a timeout to recheck the condition here:
1376       //  it lowers the impact of a race condition if any
1377       regionStates.waitForUpdate(100);
1378     }
1379     return true;
1380   }
1381 
1382   /**
1383    * Assigns the hbase:meta region or a replica.
1384    * <p>
1385    * Assumes that hbase:meta is currently closed and is not being actively served by
1386    * any RegionServer.
1387    * @param hri TODO
1388    */
1389   public void assignMeta(HRegionInfo hri) throws KeeperException {
1390     regionStates.updateRegionState(hri, State.OFFLINE);
1391     assign(hri);
1392   }
1393 
1394   /**
1395    * Assigns specified regions retaining assignments, if any.
1396    * <p>
1397    * This is a synchronous call and will return once every region has been
1398    * assigned.  If anything fails, an exception is thrown
1399    * @throws InterruptedException
1400    * @throws IOException
1401    */
1402   public void assign(Map<HRegionInfo, ServerName> regions)
1403         throws IOException, InterruptedException {
1404     if (regions == null || regions.isEmpty()) {
1405       return;
1406     }
1407     List<ServerName> servers = serverManager.createDestinationServersList();
1408     if (servers == null || servers.isEmpty()) {
1409       throw new IOException("Found no destination server to assign region(s)");
1410     }
1411 
1412     // Reuse existing assignment info
1413     Map<ServerName, List<HRegionInfo>> bulkPlan =
1414       balancer.retainAssignment(regions, servers);
1415     if (bulkPlan == null) {
1416       throw new IOException("Unable to determine a plan to assign region(s)");
1417     }
1418 
1419     assign(regions.size(), servers.size(),
1420       "retainAssignment=true", bulkPlan);
1421   }
1422 
1423   /**
1424    * Assigns specified regions round robin, if any.
1425    * <p>
1426    * This is a synchronous call and will return once every region has been
1427    * assigned.  If anything fails, an exception is thrown
1428    * @throws InterruptedException
1429    * @throws IOException
1430    */
1431   public void assign(List<HRegionInfo> regions)
1432         throws IOException, InterruptedException {
1433     if (regions == null || regions.isEmpty()) {
1434       return;
1435     }
1436 
1437     List<ServerName> servers = serverManager.createDestinationServersList();
1438     if (servers == null || servers.isEmpty()) {
1439       throw new IOException("Found no destination server to assign region(s)");
1440     }
1441 
1442     // Generate a round-robin bulk assignment plan
1443     Map<ServerName, List<HRegionInfo>> bulkPlan
1444       = balancer.roundRobinAssignment(regions, servers);
1445     if (bulkPlan == null) {
1446       throw new IOException("Unable to determine a plan to assign region(s)");
1447     }
1448 
1449     processFavoredNodes(regions);
1450     assign(regions.size(), servers.size(),
1451       "round-robin=true", bulkPlan);
1452   }
1453 
1454   private void assign(int regions, int totalServers,
1455       String message, Map<ServerName, List<HRegionInfo>> bulkPlan)
1456           throws InterruptedException, IOException {
1457 
1458     int servers = bulkPlan.size();
1459     if (servers == 1 || (regions < bulkAssignThresholdRegions
1460         && servers < bulkAssignThresholdServers)) {
1461 
1462       // Not use bulk assignment.  This could be more efficient in small
1463       // cluster, especially mini cluster for testing, so that tests won't time out
1464       if (LOG.isTraceEnabled()) {
1465         LOG.trace("Not using bulk assignment since we are assigning only " + regions +
1466           " region(s) to " + servers + " server(s)");
1467       }
1468       for (Map.Entry<ServerName, List<HRegionInfo>> plan: bulkPlan.entrySet()) {
1469         if (!assign(plan.getKey(), plan.getValue()) && !server.isStopped()) {
1470           for (HRegionInfo region: plan.getValue()) {
1471             if (!regionStates.isRegionOnline(region)) {
1472               invokeAssign(region);
1473             }
1474           }
1475         }
1476       }
1477     } else {
1478       LOG.info("Bulk assigning " + regions + " region(s) across "
1479         + totalServers + " server(s), " + message);
1480 
1481       // Use fixed count thread pool assigning.
1482       BulkAssigner ba = new GeneralBulkAssigner(
1483         this.server, bulkPlan, this, bulkAssignWaitTillAllAssigned);
1484       ba.bulkAssign();
1485       LOG.info("Bulk assigning done");
1486     }
1487   }
1488 
1489   /**
1490    * Assigns all user regions, if any exist.  Used during cluster startup.
1491    * <p>
1492    * This is a synchronous call and will return once every region has been
1493    * assigned.  If anything fails, an exception is thrown and the cluster
1494    * should be shutdown.
1495    * @throws InterruptedException
1496    * @throws IOException
1497    */
1498   private void assignAllUserRegions(Map<HRegionInfo, ServerName> allRegions)
1499       throws IOException, InterruptedException {
1500     if (allRegions == null || allRegions.isEmpty()) return;
1501 
1502     // Determine what type of assignment to do on startup
1503     boolean retainAssignment = server.getConfiguration().
1504       getBoolean("hbase.master.startup.retainassign", true);
1505 
1506     Set<HRegionInfo> regionsFromMetaScan = allRegions.keySet();
1507     if (retainAssignment) {
1508       assign(allRegions);
1509     } else {
1510       List<HRegionInfo> regions = new ArrayList<HRegionInfo>(regionsFromMetaScan);
1511       assign(regions);
1512     }
1513 
1514     for (HRegionInfo hri : regionsFromMetaScan) {
1515       TableName tableName = hri.getTable();
1516       if (!tableStateManager.isTableState(tableName,
1517               TableState.State.ENABLED)) {
1518         setEnabledTable(tableName);
1519       }
1520     }
1521     // assign all the replicas that were not recorded in the meta
1522     assign(replicaRegionsNotRecordedInMeta(regionsFromMetaScan, (MasterServices)server));
1523   }
1524 
1525   /**
1526    * Get a list of replica regions that are:
1527    * not recorded in meta yet. We might not have recorded the locations
1528    * for the replicas since the replicas may not have been online yet, master restarted
1529    * in the middle of assigning, ZK erased, etc.
1530    * @param regionsRecordedInMeta the list of regions we know are recorded in meta
1531    * either as a default, or, as the location of a replica
1532    * @param master
1533    * @return list of replica regions
1534    * @throws IOException
1535    */
1536   public static List<HRegionInfo> replicaRegionsNotRecordedInMeta(
1537       Set<HRegionInfo> regionsRecordedInMeta, MasterServices master)throws IOException {
1538     List<HRegionInfo> regionsNotRecordedInMeta = new ArrayList<HRegionInfo>();
1539     for (HRegionInfo hri : regionsRecordedInMeta) {
1540       TableName table = hri.getTable();
1541       HTableDescriptor htd = master.getTableDescriptors().get(table);
1542       // look at the HTD for the replica count. That's the source of truth
1543       int desiredRegionReplication = htd.getRegionReplication();
1544       for (int i = 0; i < desiredRegionReplication; i++) {
1545         HRegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hri, i);
1546         if (regionsRecordedInMeta.contains(replica)) continue;
1547         regionsNotRecordedInMeta.add(replica);
1548       }
1549     }
1550     return regionsNotRecordedInMeta;
1551   }
1552 
1553   /**
1554    * Rebuild the list of user regions and assignment information.
1555    * <p>
1556    * Returns a set of servers that are not found to be online that hosted
1557    * some regions.
1558    * @return set of servers not online that hosted some regions per meta
1559    * @throws IOException
1560    */
1561   Set<ServerName> rebuildUserRegions() throws
1562           IOException, KeeperException {
1563     Set<TableName> disabledOrEnablingTables = tableStateManager.getTablesInStates(
1564             TableState.State.DISABLED, TableState.State.ENABLING);
1565 
1566     Set<TableName> disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
1567             TableState.State.DISABLED,
1568             TableState.State.DISABLING,
1569             TableState.State.ENABLING);
1570 
1571     // Region assignment from META
1572     List<Result> results = MetaTableAccessor.fullScanRegions(server.getConnection());
1573     // Get any new but slow to checkin region server that joined the cluster
1574     Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
1575     // Set of offline servers to be returned
1576     Set<ServerName> offlineServers = new HashSet<ServerName>();
1577     // Iterate regions in META
1578     for (Result result : results) {
1579       if (result == null && LOG.isDebugEnabled()){
1580         LOG.debug("null result from meta - ignoring but this is strange.");
1581         continue;
1582       }
1583       // keep a track of replicas to close. These were the replicas of the originally
1584       // unmerged regions. The master might have closed them before but it mightn't
1585       // maybe because it crashed.
1586       PairOfSameType<HRegionInfo> p = MetaTableAccessor.getMergeRegions(result);
1587       if (p.getFirst() != null && p.getSecond() != null) {
1588         int numReplicas = ((MasterServices)server).getTableDescriptors().get(p.getFirst().
1589             getTable()).getRegionReplication();
1590         for (HRegionInfo merge : p) {
1591           for (int i = 1; i < numReplicas; i++) {
1592             replicasToClose.add(RegionReplicaUtil.getRegionInfoForReplica(merge, i));
1593           }
1594         }
1595       }
1596       RegionLocations rl =  MetaTableAccessor.getRegionLocations(result);
1597       if (rl == null) continue;
1598       HRegionLocation[] locations = rl.getRegionLocations();
1599       if (locations == null) continue;
1600       for (HRegionLocation hrl : locations) {
1601         HRegionInfo regionInfo = hrl.getRegionInfo();
1602         if (regionInfo == null) continue;
1603         int replicaId = regionInfo.getReplicaId();
1604         State state = RegionStateStore.getRegionState(result, replicaId);
1605         // keep a track of replicas to close. These were the replicas of the split parents
1606         // from the previous life of the master. The master should have closed them before
1607         // but it couldn't maybe because it crashed
1608         if (replicaId == 0 && state.equals(State.SPLIT)) {
1609           for (HRegionLocation h : locations) {
1610             replicasToClose.add(h.getRegionInfo());
1611           }
1612         }
1613         ServerName lastHost = hrl.getServerName();
1614         ServerName regionLocation = RegionStateStore.getRegionServer(result, replicaId);
1615         regionStates.createRegionState(regionInfo, state, regionLocation, lastHost);
1616         if (!regionStates.isRegionInState(regionInfo, State.OPEN)) {
1617           // Region is not open (either offline or in transition), skip
1618           continue;
1619         }
1620         TableName tableName = regionInfo.getTable();
1621         if (!onlineServers.contains(regionLocation)) {
1622           // Region is located on a server that isn't online
1623           offlineServers.add(regionLocation);
1624         } else if (!disabledOrEnablingTables.contains(tableName)) {
1625           // Region is being served and on an active server
1626           // add only if region not in disabled or enabling table
1627           regionStates.regionOnline(regionInfo, regionLocation);
1628           balancer.regionOnline(regionInfo, regionLocation);
1629         }
1630         // need to enable the table if not disabled or disabling or enabling
1631         // this will be used in rolling restarts
1632         if (!disabledOrDisablingOrEnabling.contains(tableName)
1633           && !getTableStateManager().isTableState(tableName,
1634                 TableState.State.ENABLED)) {
1635           setEnabledTable(tableName);
1636         }
1637       }
1638     }
1639     return offlineServers;
1640   }
1641 
1642   /**
1643    * Recover the tables that were not fully moved to DISABLED state. These
1644    * tables are in DISABLING state when the master restarted/switched.
1645    *
1646    * @throws KeeperException
1647    * @throws TableNotFoundException
1648    * @throws IOException
1649    */
1650   private void recoverTableInDisablingState()
1651           throws KeeperException, IOException {
1652     Set<TableName> disablingTables =
1653             tableStateManager.getTablesInStates(TableState.State.DISABLING);
1654     if (disablingTables.size() != 0) {
1655       for (TableName tableName : disablingTables) {
1656         // Recover by calling DisableTableHandler
1657         LOG.info("The table " + tableName
1658             + " is in DISABLING state.  Hence recovering by moving the table"
1659             + " to DISABLED state.");
1660         new DisableTableHandler(this.server, tableName,
1661             this, tableLockManager, true).prepare().process();
1662       }
1663     }
1664   }
1665 
1666   /**
1667    * Recover the tables that are not fully moved to ENABLED state. These tables
1668    * are in ENABLING state when the master restarted/switched
1669    *
1670    * @throws KeeperException
1671    * @throws org.apache.hadoop.hbase.TableNotFoundException
1672    * @throws IOException
1673    */
1674   private void recoverTableInEnablingState()
1675           throws KeeperException, IOException {
1676     Set<TableName> enablingTables = tableStateManager.
1677             getTablesInStates(TableState.State.ENABLING);
1678     if (enablingTables.size() != 0) {
1679       for (TableName tableName : enablingTables) {
1680         // Recover by calling EnableTableHandler
1681         LOG.info("The table " + tableName
1682             + " is in ENABLING state.  Hence recovering by moving the table"
1683             + " to ENABLED state.");
1684         // enableTable in sync way during master startup,
1685         // no need to invoke coprocessor
1686         EnableTableHandler eth = new EnableTableHandler(this.server, tableName,
1687           this, tableLockManager, true);
1688         try {
1689           eth.prepare();
1690         } catch (TableNotFoundException e) {
1691           LOG.warn("Table " + tableName + " not found in hbase:meta to recover.");
1692           continue;
1693         }
1694         eth.process();
1695       }
1696     }
1697   }
1698 
1699   /**
1700    * Processes list of regions in transition at startup
1701    */
1702   void processRegionsInTransition(Collection<RegionState> regionsInTransition) {
1703     // We need to send RPC call again for PENDING_OPEN/PENDING_CLOSE regions
1704     // in case the RPC call is not sent out yet before the master was shut down
1705     // since we update the state before we send the RPC call. We can't update
1706     // the state after the RPC call. Otherwise, we don't know what's happened
1707     // to the region if the master dies right after the RPC call is out.
1708     for (RegionState regionState: regionsInTransition) {
1709       LOG.info("Processing " + regionState);
1710       ServerName serverName = regionState.getServerName();
1711       // Server could be null in case of FAILED_OPEN when master cannot find a region plan. In that
1712       // case, try assigning it here.
1713       if (serverName != null && !serverManager.getOnlineServers().containsKey(serverName)) {
1714         LOG.info("Server " + serverName + " isn't online. SSH will handle this");
1715         continue; // SSH will handle it
1716       }
1717       HRegionInfo regionInfo = regionState.getRegion();
1718       RegionState.State state = regionState.getState();
1719       switch (state) {
1720       case CLOSED:
1721         invokeAssign(regionState.getRegion());
1722         break;
1723       case PENDING_OPEN:
1724         retrySendRegionOpen(regionState);
1725         break;
1726       case PENDING_CLOSE:
1727         retrySendRegionClose(regionState);
1728         break;
1729       case FAILED_CLOSE:
1730       case FAILED_OPEN:
1731         invokeUnAssign(regionInfo);
1732         break;
1733       default:
1734         // No process for other states
1735       }
1736     }
1737   }
1738 
1739   /**
1740    * At master failover, for pending_open region, make sure
1741    * sendRegionOpen RPC call is sent to the target regionserver
1742    */
1743   private void retrySendRegionOpen(final RegionState regionState) {
1744     this.executorService.submit(
1745       new EventHandler(server, EventType.M_MASTER_RECOVERY) {
1746         @Override
1747         public void process() throws IOException {
1748           HRegionInfo hri = regionState.getRegion();
1749           ServerName serverName = regionState.getServerName();
1750           ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
1751           try {
1752             for (int i = 1; i <= maximumAttempts; i++) {
1753               if (!serverManager.isServerOnline(serverName)
1754                   || server.isStopped() || server.isAborted()) {
1755                 return; // No need any more
1756               }
1757               try {
1758                 if (!regionState.equals(regionStates.getRegionState(hri))) {
1759                   return; // Region is not in the expected state any more
1760                 }
1761                 List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
1762                 if (shouldAssignRegionsWithFavoredNodes) {
1763                   favoredNodes = ((FavoredNodeLoadBalancer)balancer).getFavoredNodes(hri);
1764                 }
1765                 serverManager.sendRegionOpen(serverName, hri, favoredNodes);
1766                 return; // we're done
1767               } catch (Throwable t) {
1768                 if (t instanceof RemoteException) {
1769                   t = ((RemoteException) t).unwrapRemoteException();
1770                 }
1771                 if (t instanceof FailedServerException && i < maximumAttempts) {
1772                   // In case the server is in the failed server list, no point to
1773                   // retry too soon. Retry after the failed_server_expiry time
1774                   try {
1775                     Configuration conf = this.server.getConfiguration();
1776                     long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1777                       RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
1778                     if (LOG.isDebugEnabled()) {
1779                       LOG.debug(serverName + " is on failed server list; waiting "
1780                         + sleepTime + "ms", t);
1781                     }
1782                     Thread.sleep(sleepTime);
1783                     continue;
1784                   } catch (InterruptedException ie) {
1785                     LOG.warn("Failed to assign "
1786                       + hri.getRegionNameAsString() + " since interrupted", ie);
1787                     regionStates.updateRegionState(hri, State.FAILED_OPEN);
1788                     Thread.currentThread().interrupt();
1789                     return;
1790                   }
1791                 }
1792                 if (serverManager.isServerOnline(serverName)
1793                     && t instanceof java.net.SocketTimeoutException) {
1794                   i--; // reset the try count
1795                 } else {
1796                   LOG.info("Got exception in retrying sendRegionOpen for "
1797                     + regionState + "; try=" + i + " of " + maximumAttempts, t);
1798                 }
1799                 Threads.sleep(100);
1800               }
1801             }
1802             // Run out of attempts
1803             regionStates.updateRegionState(hri, State.FAILED_OPEN);
1804           } finally {
1805             lock.unlock();
1806           }
1807         }
1808       });
1809   }
1810 
1811   /**
1812    * At master failover, for pending_close region, make sure
1813    * sendRegionClose RPC call is sent to the target regionserver
1814    */
1815   private void retrySendRegionClose(final RegionState regionState) {
1816     this.executorService.submit(
1817       new EventHandler(server, EventType.M_MASTER_RECOVERY) {
1818         @Override
1819         public void process() throws IOException {
1820           HRegionInfo hri = regionState.getRegion();
1821           ServerName serverName = regionState.getServerName();
1822           ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
1823           try {
1824             for (int i = 1; i <= maximumAttempts; i++) {
1825               if (!serverManager.isServerOnline(serverName)
1826                   || server.isStopped() || server.isAborted()) {
1827                 return; // No need any more
1828               }
1829               try {
1830                 if (!regionState.equals(regionStates.getRegionState(hri))) {
1831                   return; // Region is not in the expected state any more
1832                 }
1833                 serverManager.sendRegionClose(serverName, hri, null);
1834                 return; // Done.
1835               } catch (Throwable t) {
1836                 if (t instanceof RemoteException) {
1837                   t = ((RemoteException) t).unwrapRemoteException();
1838                 }
1839                 if (t instanceof FailedServerException && i < maximumAttempts) {
1840                   // In case the server is in the failed server list, no point to
1841                   // retry too soon. Retry after the failed_server_expiry time
1842                   try {
1843                     Configuration conf = this.server.getConfiguration();
1844                     long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1845                       RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
1846                     if (LOG.isDebugEnabled()) {
1847                       LOG.debug(serverName + " is on failed server list; waiting "
1848                         + sleepTime + "ms", t);
1849                     }
1850                     Thread.sleep(sleepTime);
1851                     continue;
1852                   } catch (InterruptedException ie) {
1853                     LOG.warn("Failed to unassign "
1854                       + hri.getRegionNameAsString() + " since interrupted", ie);
1855                     regionStates.updateRegionState(hri, RegionState.State.FAILED_CLOSE);
1856                     Thread.currentThread().interrupt();
1857                     return;
1858                   }
1859                 }
1860                 if (serverManager.isServerOnline(serverName)
1861                     && t instanceof java.net.SocketTimeoutException) {
1862                   i--; // reset the try count
1863                 } else {
1864                   LOG.info("Got exception in retrying sendRegionClose for "
1865                     + regionState + "; try=" + i + " of " + maximumAttempts, t);
1866                 }
1867                 Threads.sleep(100);
1868               }
1869             }
1870             // Run out of attempts
1871             regionStates.updateRegionState(hri, State.FAILED_CLOSE);
1872           } finally {
1873             lock.unlock();
1874           }
1875         }
1876       });
1877   }
1878 
1879   /**
1880    * Set Regions in transitions metrics.
1881    * This takes an iterator on the RegionInTransition map (CLSM), and is not synchronized.
1882    * This iterator is not fail fast, which may lead to stale read; but that's better than
1883    * creating a copy of the map for metrics computation, as this method will be invoked
1884    * on a frequent interval.
1885    */
1886   public void updateRegionsInTransitionMetrics() {
1887     long currentTime = System.currentTimeMillis();
1888     int totalRITs = 0;
1889     int totalRITsOverThreshold = 0;
1890     long oldestRITTime = 0;
1891     int ritThreshold = this.server.getConfiguration().
1892       getInt(HConstants.METRICS_RIT_STUCK_WARNING_THRESHOLD, 60000);
1893     for (RegionState state: regionStates.getRegionsInTransition().values()) {
1894       totalRITs++;
1895       long ritTime = currentTime - state.getStamp();
1896       if (ritTime > ritThreshold) { // more than the threshold
1897         totalRITsOverThreshold++;
1898       }
1899       if (oldestRITTime < ritTime) {
1900         oldestRITTime = ritTime;
1901       }
1902     }
1903     if (this.metricsAssignmentManager != null) {
1904       this.metricsAssignmentManager.updateRITOldestAge(oldestRITTime);
1905       this.metricsAssignmentManager.updateRITCount(totalRITs);
1906       this.metricsAssignmentManager.updateRITCountOverThreshold(totalRITsOverThreshold);
1907     }
1908   }
1909 
1910   /**
1911    * @param region Region whose plan we are to clear.
1912    */
1913   private void clearRegionPlan(final HRegionInfo region) {
1914     synchronized (this.regionPlans) {
1915       this.regionPlans.remove(region.getEncodedName());
1916     }
1917   }
1918 
1919   /**
1920    * Wait on region to clear regions-in-transition.
1921    * @param hri Region to wait on.
1922    * @throws IOException
1923    */
1924   public void waitOnRegionToClearRegionsInTransition(final HRegionInfo hri)
1925       throws IOException, InterruptedException {
1926     waitOnRegionToClearRegionsInTransition(hri, -1L);
1927   }
1928 
1929   /**
1930    * Wait on region to clear regions-in-transition or time out
1931    * @param hri
1932    * @param timeOut Milliseconds to wait for current region to be out of transition state.
1933    * @return True when a region clears regions-in-transition before timeout otherwise false
1934    * @throws InterruptedException
1935    */
1936   public boolean waitOnRegionToClearRegionsInTransition(final HRegionInfo hri, long timeOut)
1937       throws InterruptedException {
1938     if (!regionStates.isRegionInTransition(hri)) return true;
1939     long end = (timeOut <= 0) ? Long.MAX_VALUE : EnvironmentEdgeManager.currentTime()
1940         + timeOut;
1941     // There is already a timeout monitor on regions in transition so I
1942     // should not have to have one here too?
1943     LOG.info("Waiting for " + hri.getEncodedName() +
1944         " to leave regions-in-transition, timeOut=" + timeOut + " ms.");
1945     while (!this.server.isStopped() && regionStates.isRegionInTransition(hri)) {
1946       regionStates.waitForUpdate(100);
1947       if (EnvironmentEdgeManager.currentTime() > end) {
1948         LOG.info("Timed out on waiting for " + hri.getEncodedName() + " to be assigned.");
1949         return false;
1950       }
1951     }
1952     if (this.server.isStopped()) {
1953       LOG.info("Giving up wait on regions in transition because stoppable.isStopped is set");
1954       return false;
1955     }
1956     return true;
1957   }
1958 
1959   void invokeAssign(HRegionInfo regionInfo) {
1960     threadPoolExecutorService.submit(new AssignCallable(this, regionInfo));
1961   }
1962 
1963   void invokeUnAssign(HRegionInfo regionInfo) {
1964     threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
1965   }
1966 
1967   public boolean isCarryingMeta(ServerName serverName) {
1968     return isCarryingRegion(serverName, HRegionInfo.FIRST_META_REGIONINFO);
1969   }
1970 
1971   public boolean isCarryingMetaReplica(ServerName serverName, int replicaId) {
1972     return isCarryingRegion(serverName,
1973         RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO, replicaId));
1974   }
1975 
1976   public boolean isCarryingMetaReplica(ServerName serverName, HRegionInfo metaHri) {
1977     return isCarryingRegion(serverName, metaHri);
1978   }
1979 
1980   /**
1981    * Check if the shutdown server carries the specific region.
1982    * @return whether the serverName currently hosts the region
1983    */
1984   private boolean isCarryingRegion(ServerName serverName, HRegionInfo hri) {
1985     RegionState regionState = regionStates.getRegionTransitionState(hri);
1986     ServerName transitionAddr = regionState != null? regionState.getServerName(): null;
1987     if (transitionAddr != null) {
1988       boolean matchTransitionAddr = transitionAddr.equals(serverName);
1989       LOG.debug("Checking region=" + hri.getRegionNameAsString()
1990         + ", transitioning on server=" + matchTransitionAddr
1991         + " server being checked: " + serverName
1992         + ", matches=" + matchTransitionAddr);
1993       return matchTransitionAddr;
1994     }
1995 
1996     ServerName assignedAddr = regionStates.getRegionServerOfRegion(hri);
1997     boolean matchAssignedAddr = serverName.equals(assignedAddr);
1998     LOG.debug("based on AM, current region=" + hri.getRegionNameAsString()
1999       + " is on server=" + assignedAddr + ", server being checked: "
2000       + serverName);
2001     return matchAssignedAddr;
2002   }
2003 
2004   /**
2005    * Process shutdown server removing any assignments.
2006    * @param sn Server that went down.
2007    * @return list of regions in transition on this server
2008    */
2009   public List<HRegionInfo> processServerShutdown(final ServerName sn) {
2010     // Clean out any existing assignment plans for this server
2011     synchronized (this.regionPlans) {
2012       for (Iterator <Map.Entry<String, RegionPlan>> i =
2013           this.regionPlans.entrySet().iterator(); i.hasNext();) {
2014         Map.Entry<String, RegionPlan> e = i.next();
2015         ServerName otherSn = e.getValue().getDestination();
2016         // The name will be null if the region is planned for a random assign.
2017         if (otherSn != null && otherSn.equals(sn)) {
2018           // Use iterator's remove else we'll get CME
2019           i.remove();
2020         }
2021       }
2022     }
2023     List<HRegionInfo> rits = regionStates.serverOffline(sn);
2024     for (Iterator<HRegionInfo> it = rits.iterator(); it.hasNext(); ) {
2025       HRegionInfo hri = it.next();
2026       String encodedName = hri.getEncodedName();
2027 
2028       // We need a lock on the region as we could update it
2029       Lock lock = locker.acquireLock(encodedName);
2030       try {
2031         RegionState regionState =
2032           regionStates.getRegionTransitionState(encodedName);
2033         if (regionState == null
2034             || (regionState.getServerName() != null && !regionState.isOnServer(sn))
2035             || !RegionStates.isOneOfStates(regionState, State.PENDING_OPEN,
2036                 State.OPENING, State.FAILED_OPEN, State.FAILED_CLOSE, State.OFFLINE)) {
2037           LOG.info("Skip " + regionState + " since it is not opening/failed_close"
2038             + " on the dead server any more: " + sn);
2039           it.remove();
2040         } else {
2041           if (tableStateManager.isTableState(hri.getTable(),
2042                   TableState.State.DISABLED, TableState.State.DISABLING)) {
2043             regionStates.regionOffline(hri);
2044             it.remove();
2045             continue;
2046           }
2047           // Mark the region offline and assign it again by SSH
2048           regionStates.updateRegionState(hri, State.OFFLINE);
2049         }
2050       } finally {
2051         lock.unlock();
2052       }
2053     }
2054     return rits;
2055   }
2056 
2057   /**
2058    * @param plan Plan to execute.
2059    */
2060   public void balance(final RegionPlan plan) {
2061     HRegionInfo hri = plan.getRegionInfo();
2062     TableName tableName = hri.getTable();
2063     if (tableStateManager.isTableState(tableName,
2064             TableState.State.DISABLED, TableState.State.DISABLING)) {
2065       LOG.info("Ignored moving region of disabling/disabled table "
2066         + tableName);
2067       return;
2068     }
2069 
2070     // Move the region only if it's assigned
2071     String encodedName = hri.getEncodedName();
2072     ReentrantLock lock = locker.acquireLock(encodedName);
2073     try {
2074       if (!regionStates.isRegionOnline(hri)) {
2075         RegionState state = regionStates.getRegionState(encodedName);
2076         LOG.info("Ignored moving region not assigned: " + hri + ", "
2077           + (state == null ? "not in region states" : state));
2078         return;
2079       }
2080       synchronized (this.regionPlans) {
2081         this.regionPlans.put(plan.getRegionName(), plan);
2082       }
2083       unassign(hri, plan.getDestination());
2084     } finally {
2085       lock.unlock();
2086     }
2087   }
2088 
2089   public void stop() {
2090     // Shutdown the threadpool executor service
2091     threadPoolExecutorService.shutdownNow();
2092     regionStateStore.stop();
2093   }
2094 
2095   protected void setEnabledTable(TableName tableName) {
2096     try {
2097       this.tableStateManager.setTableState(tableName,
2098               TableState.State.ENABLED);
2099     } catch (IOException e) {
2100       // here we can abort as it is the start up flow
2101       String errorMsg = "Unable to ensure that the table " + tableName
2102           + " will be" + " enabled because of a ZooKeeper issue";
2103       LOG.error(errorMsg);
2104       this.server.abort(errorMsg, e);
2105     }
2106   }
2107 
2108   private String onRegionFailedOpen(final RegionState current,
2109       final HRegionInfo hri, final ServerName serverName) {
2110     // The region must be opening on this server.
2111     // If current state is failed_open on the same server,
2112     // it could be a reportRegionTransition RPC retry.
2113     if (current == null || !current.isOpeningOrFailedOpenOnServer(serverName)) {
2114       return hri.getShortNameToLog() + " is not opening on " + serverName;
2115     }
2116 
2117     // Just return in case of retrying
2118     if (current.isFailedOpen()) {
2119       return null;
2120     }
2121 
2122     String encodedName = hri.getEncodedName();
2123     AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
2124     if (failedOpenCount == null) {
2125       failedOpenCount = new AtomicInteger();
2126       // No need to use putIfAbsent, or extra synchronization since
2127       // this whole handleRegion block is locked on the encoded region
2128       // name, and failedOpenTracker is updated only in this block
2129       failedOpenTracker.put(encodedName, failedOpenCount);
2130     }
2131     if (failedOpenCount.incrementAndGet() >= maximumAttempts && !hri.isMetaRegion()) {
2132       regionStates.updateRegionState(hri, State.FAILED_OPEN);
2133       // remove the tracking info to save memory, also reset
2134       // the count for next open initiative
2135       failedOpenTracker.remove(encodedName);
2136     } else {
2137       if (hri.isMetaRegion() && failedOpenCount.get() >= maximumAttempts) {
2138         // Log a warning message if a meta region failedOpenCount exceeds maximumAttempts
2139         // so that we are aware of potential problem if it persists for a long time.
2140         LOG.warn("Failed to open the hbase:meta region " +
2141             hri.getRegionNameAsString() + " after" +
2142             failedOpenCount.get() + " retries. Continue retrying.");
2143       }
2144 
2145       // Handle this the same as if it were opened and then closed.
2146       RegionState regionState = regionStates.updateRegionState(hri, State.CLOSED);
2147       if (regionState != null) {
2148         // When there are more than one region server a new RS is selected as the
2149         // destination and the same is updated in the region plan. (HBASE-5546)
2150         if (getTableStateManager().isTableState(hri.getTable(),
2151                 TableState.State.DISABLED, TableState.State.DISABLING) ||
2152                 replicasToClose.contains(hri)) {
2153           offlineDisabledRegion(hri);
2154           return null;
2155         }
2156         regionStates.updateRegionState(hri, RegionState.State.CLOSED);
2157         // This below has to do w/ online enable/disable of a table
2158         removeClosedRegion(hri);
2159         try {
2160           getRegionPlan(hri, true);
2161         } catch (HBaseIOException e) {
2162           LOG.warn("Failed to get region plan", e);
2163         }
2164         invokeAssign(hri);
2165       }
2166     }
2167     // Null means no error
2168     return null;
2169   }
2170 
2171   private String onRegionOpen(final RegionState current, final HRegionInfo hri,
2172       final ServerName serverName, final RegionStateTransition transition) {
2173     // The region must be opening on this server.
2174     // If current state is already opened on the same server,
2175     // it could be a reportRegionTransition RPC retry.
2176     if (current == null || !current.isOpeningOrOpenedOnServer(serverName)) {
2177       return hri.getShortNameToLog() + " is not opening on " + serverName;
2178     }
2179 
2180     // Just return in case of retrying
2181     if (current.isOpened()) {
2182       return null;
2183     }
2184 
2185     long openSeqNum = transition.hasOpenSeqNum()
2186       ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM;
2187     if (openSeqNum < 0) {
2188       return "Newly opened region has invalid open seq num " + openSeqNum;
2189     }
2190     regionOnline(hri, serverName, openSeqNum);
2191 
2192     // reset the count, if any
2193     failedOpenTracker.remove(hri.getEncodedName());
2194     if (getTableStateManager().isTableState(hri.getTable(),
2195             TableState.State.DISABLED, TableState.State.DISABLING)) {
2196       invokeUnAssign(hri);
2197     }
2198     return null;
2199   }
2200 
2201   private String onRegionClosed(final RegionState current,
2202       final HRegionInfo hri, final ServerName serverName) {
2203     // Region will be usually assigned right after closed. When a RPC retry comes
2204     // in, the region may already have moved away from closed state. However, on the
2205     // region server side, we don't care much about the response for this transition.
2206     // We only make sure master has got and processed this report, either
2207     // successfully or not. So this is fine, not a problem at all.
2208     if (current == null || !current.isClosingOrClosedOnServer(serverName)) {
2209       return hri.getShortNameToLog() + " is not closing on " + serverName;
2210     }
2211 
2212     // Just return in case of retrying
2213     if (current.isClosed()) {
2214       return null;
2215     }
2216 
2217     if (getTableStateManager().isTableState(hri.getTable(), TableState.State.DISABLED,
2218         TableState.State.DISABLING) || replicasToClose.contains(hri)) {
2219       offlineDisabledRegion(hri);
2220       return null;
2221     }
2222 
2223     regionStates.updateRegionState(hri, RegionState.State.CLOSED);
2224     sendRegionClosedNotification(hri);
2225     // This below has to do w/ online enable/disable of a table
2226     removeClosedRegion(hri);
2227     invokeAssign(hri);
2228     return null;
2229   }
2230 
2231   private String onRegionReadyToSplit(final RegionState current, final HRegionInfo hri,
2232       final ServerName serverName, final RegionStateTransition transition) {
2233     // The region must be opened on this server.
2234     // If current state is already splitting on the same server,
2235     // it could be a reportRegionTransition RPC retry.
2236     if (current == null || !current.isSplittingOrOpenedOnServer(serverName)) {
2237       return hri.getShortNameToLog() + " is not opening on " + serverName;
2238     }
2239 
2240     // Just return in case of retrying
2241     if (current.isSplitting()) {
2242       return null;
2243     }
2244 
2245     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2246     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2247     RegionState rs_a = regionStates.getRegionState(a);
2248     RegionState rs_b = regionStates.getRegionState(b);
2249     if (rs_a != null || rs_b != null) {
2250       return "Some daughter is already existing. "
2251         + "a=" + rs_a + ", b=" + rs_b;
2252     }
2253 
2254     // Server holding is not updated at this stage.
2255     // It is done after PONR.
2256     regionStates.updateRegionState(hri, State.SPLITTING);
2257     regionStates.createRegionState(
2258       a, State.SPLITTING_NEW, serverName, null);
2259     regionStates.createRegionState(
2260       b, State.SPLITTING_NEW, serverName, null);
2261     return null;
2262   }
2263 
2264   private String onRegionSplitPONR(final RegionState current, final HRegionInfo hri,
2265       final ServerName serverName, final RegionStateTransition transition) {
2266     // The region must be splitting on this server, and the daughters must be in
2267     // splitting_new state. To check RPC retry, we use server holding info.
2268     if (current == null || !current.isSplittingOnServer(serverName)) {
2269       return hri.getShortNameToLog() + " is not splitting on " + serverName;
2270     }
2271 
2272     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2273     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2274     RegionState rs_a = regionStates.getRegionState(a);
2275     RegionState rs_b = regionStates.getRegionState(b);
2276 
2277     // Master could have restarted and lost the new region
2278     // states, if so, they must be lost together
2279     if (rs_a == null && rs_b == null) {
2280       rs_a = regionStates.createRegionState(
2281         a, State.SPLITTING_NEW, serverName, null);
2282       rs_b = regionStates.createRegionState(
2283         b, State.SPLITTING_NEW, serverName, null);
2284     }
2285 
2286     if (rs_a == null || !rs_a.isSplittingNewOnServer(serverName)
2287         || rs_b == null || !rs_b.isSplittingNewOnServer(serverName)) {
2288       return "Some daughter is not known to be splitting on " + serverName
2289         + ", a=" + rs_a + ", b=" + rs_b;
2290     }
2291 
2292     // Just return in case of retrying
2293     if (!regionStates.isRegionOnServer(hri, serverName)) {
2294       return null;
2295     }
2296 
2297     try {
2298       regionStates.splitRegion(hri, a, b, serverName);
2299     } catch (IOException ioe) {
2300       LOG.info("Failed to record split region " + hri.getShortNameToLog());
2301       return "Failed to record the splitting in meta";
2302     }
2303     return null;
2304   }
2305 
2306   private String onRegionSplit(final RegionState current, final HRegionInfo hri,
2307       final ServerName serverName, final RegionStateTransition transition) {
2308     // The region must be splitting on this server, and the daughters must be in
2309     // splitting_new state.
2310     // If current state is already split on the same server,
2311     // it could be a reportRegionTransition RPC retry.
2312     if (current == null || !current.isSplittingOrSplitOnServer(serverName)) {
2313       return hri.getShortNameToLog() + " is not splitting on " + serverName;
2314     }
2315 
2316     // Just return in case of retrying
2317     if (current.isSplit()) {
2318       return null;
2319     }
2320 
2321     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2322     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2323     RegionState rs_a = regionStates.getRegionState(a);
2324     RegionState rs_b = regionStates.getRegionState(b);
2325     if (rs_a == null || !rs_a.isSplittingNewOnServer(serverName)
2326         || rs_b == null || !rs_b.isSplittingNewOnServer(serverName)) {
2327       return "Some daughter is not known to be splitting on " + serverName
2328         + ", a=" + rs_a + ", b=" + rs_b;
2329     }
2330 
2331     if (TEST_SKIP_SPLIT_HANDLING) {
2332       return "Skipping split message, TEST_SKIP_SPLIT_HANDLING is set";
2333     }
2334     regionOffline(hri, State.SPLIT);
2335     regionOnline(a, serverName, 1);
2336     regionOnline(b, serverName, 1);
2337 
2338     // User could disable the table before master knows the new region.
2339     if (getTableStateManager().isTableState(hri.getTable(),
2340         TableState.State.DISABLED, TableState.State.DISABLING)) {
2341       invokeUnAssign(a);
2342       invokeUnAssign(b);
2343     } else {
2344       Callable<Object> splitReplicasCallable = new Callable<Object>() {
2345         @Override
2346         public Object call() {
2347           doSplittingOfReplicas(hri, a, b);
2348           return null;
2349         }
2350       };
2351       threadPoolExecutorService.submit(splitReplicasCallable);
2352     }
2353     return null;
2354   }
2355 
2356   private String onRegionSplitReverted(final RegionState current, final HRegionInfo hri,
2357       final ServerName serverName, final RegionStateTransition transition) {
2358     // The region must be splitting on this server, and the daughters must be in
2359     // splitting_new state.
2360     // If the region is in open state, it could be an RPC retry.
2361     if (current == null || !current.isSplittingOrOpenedOnServer(serverName)) {
2362       return hri.getShortNameToLog() + " is not splitting on " + serverName;
2363     }
2364 
2365     // Just return in case of retrying
2366     if (current.isOpened()) {
2367       return null;
2368     }
2369 
2370     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2371     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2372     RegionState rs_a = regionStates.getRegionState(a);
2373     RegionState rs_b = regionStates.getRegionState(b);
2374     if (rs_a == null || !rs_a.isSplittingNewOnServer(serverName)
2375         || rs_b == null || !rs_b.isSplittingNewOnServer(serverName)) {
2376       return "Some daughter is not known to be splitting on " + serverName
2377         + ", a=" + rs_a + ", b=" + rs_b;
2378     }
2379 
2380     regionOnline(hri, serverName);
2381     regionOffline(a);
2382     regionOffline(b);
2383     if (getTableStateManager().isTableState(hri.getTable(),
2384         TableState.State.DISABLED, TableState.State.DISABLING)) {
2385       invokeUnAssign(hri);
2386     }
2387     return null;
2388   }
2389 
2390   private String onRegionReadyToMerge(final RegionState current, final HRegionInfo hri,
2391       final ServerName serverName, final RegionStateTransition transition) {
2392     // The region must be new, and the daughters must be open on this server.
2393     // If the region is in merge_new state, it could be an RPC retry.
2394     if (current != null && !current.isMergingNewOnServer(serverName)) {
2395       return "Merging daughter region already exists, p=" + current;
2396     }
2397 
2398     // Just return in case of retrying
2399     if (current != null) {
2400       return null;
2401     }
2402 
2403     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2404     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2405     Set<String> encodedNames = new HashSet<String>(2);
2406     encodedNames.add(a.getEncodedName());
2407     encodedNames.add(b.getEncodedName());
2408     Map<String, Lock> locks = locker.acquireLocks(encodedNames);
2409     try {
2410       RegionState rs_a = regionStates.getRegionState(a);
2411       RegionState rs_b = regionStates.getRegionState(b);
2412       if (rs_a == null || !rs_a.isOpenedOnServer(serverName)
2413           || rs_b == null || !rs_b.isOpenedOnServer(serverName)) {
2414         return "Some daughter is not in a state to merge on " + serverName
2415           + ", a=" + rs_a + ", b=" + rs_b;
2416       }
2417 
2418       regionStates.updateRegionState(a, State.MERGING);
2419       regionStates.updateRegionState(b, State.MERGING);
2420       regionStates.createRegionState(
2421         hri, State.MERGING_NEW, serverName, null);
2422       return null;
2423     } finally {
2424       for (Lock lock: locks.values()) {
2425         lock.unlock();
2426       }
2427     }
2428   }
2429 
2430   private String onRegionMergePONR(final RegionState current, final HRegionInfo hri,
2431       final ServerName serverName, final RegionStateTransition transition) {
2432     // The region must be in merging_new state, and the daughters must be
2433     // merging. To check RPC retry, we use server holding info.
2434     if (current != null && !current.isMergingNewOnServer(serverName)) {
2435       return hri.getShortNameToLog() + " is not merging on " + serverName;
2436     }
2437 
2438     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2439     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2440     RegionState rs_a = regionStates.getRegionState(a);
2441     RegionState rs_b = regionStates.getRegionState(b);
2442     if (rs_a == null || !rs_a.isMergingOnServer(serverName)
2443         || rs_b == null || !rs_b.isMergingOnServer(serverName)) {
2444       return "Some daughter is not known to be merging on " + serverName
2445         + ", a=" + rs_a + ", b=" + rs_b;
2446     }
2447 
2448     // Master could have restarted and lost the new region state
2449     if (current == null) {
2450       regionStates.createRegionState(
2451         hri, State.MERGING_NEW, serverName, null);
2452     }
2453 
2454     // Just return in case of retrying
2455     if (regionStates.isRegionOnServer(hri, serverName)) {
2456       return null;
2457     }
2458 
2459     try {
2460       regionStates.mergeRegions(hri, a, b, serverName);
2461     } catch (IOException ioe) {
2462       LOG.info("Failed to record merged region " + hri.getShortNameToLog());
2463       return "Failed to record the merging in meta";
2464     }
2465     return null;
2466   }
2467 
2468   private String onRegionMerged(final RegionState current, final HRegionInfo hri,
2469       final ServerName serverName, final RegionStateTransition transition) {
2470     // The region must be in merging_new state, and the daughters must be
2471     // merging on this server.
2472     // If current state is already opened on the same server,
2473     // it could be a reportRegionTransition RPC retry.
2474     if (current == null || !current.isMergingNewOrOpenedOnServer(serverName)) {
2475       return hri.getShortNameToLog() + " is not merging on " + serverName;
2476     }
2477 
2478     // Just return in case of retrying
2479     if (current.isOpened()) {
2480       return null;
2481     }
2482 
2483     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2484     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2485     RegionState rs_a = regionStates.getRegionState(a);
2486     RegionState rs_b = regionStates.getRegionState(b);
2487     if (rs_a == null || !rs_a.isMergingOnServer(serverName)
2488         || rs_b == null || !rs_b.isMergingOnServer(serverName)) {
2489       return "Some daughter is not known to be merging on " + serverName
2490         + ", a=" + rs_a + ", b=" + rs_b;
2491     }
2492 
2493     regionOffline(a, State.MERGED);
2494     regionOffline(b, State.MERGED);
2495     regionOnline(hri, serverName, 1);
2496 
2497     // User could disable the table before master knows the new region.
2498     if (getTableStateManager().isTableState(hri.getTable(),
2499         TableState.State.DISABLED, TableState.State.DISABLING)) {
2500       invokeUnAssign(hri);
2501     } else {
2502       Callable<Object> mergeReplicasCallable = new Callable<Object>() {
2503         @Override
2504         public Object call() {
2505           doMergingOfReplicas(hri, a, b);
2506           return null;
2507         }
2508       };
2509       threadPoolExecutorService.submit(mergeReplicasCallable);
2510     }
2511     return null;
2512   }
2513 
2514   private String onRegionMergeReverted(final RegionState current, final HRegionInfo hri,
2515       final ServerName serverName, final RegionStateTransition transition) {
2516     // The region must be in merging_new state, and the daughters must be
2517     // merging on this server.
2518     // If the region is in offline state, it could be an RPC retry.
2519     if (current == null || !current.isMergingNewOrOfflineOnServer(serverName)) {
2520       return hri.getShortNameToLog() + " is not merging on " + serverName;
2521     }
2522 
2523     // Just return in case of retrying
2524     if (current.isOffline()) {
2525       return null;
2526     }
2527 
2528     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2529     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2530     RegionState rs_a = regionStates.getRegionState(a);
2531     RegionState rs_b = regionStates.getRegionState(b);
2532     if (rs_a == null || !rs_a.isMergingOnServer(serverName)
2533         || rs_b == null || !rs_b.isMergingOnServer(serverName)) {
2534       return "Some daughter is not known to be merging on " + serverName
2535         + ", a=" + rs_a + ", b=" + rs_b;
2536     }
2537 
2538     regionOnline(a, serverName);
2539     regionOnline(b, serverName);
2540     regionOffline(hri);
2541 
2542     if (getTableStateManager().isTableState(hri.getTable(),
2543         TableState.State.DISABLED, TableState.State.DISABLING)) {
2544       invokeUnAssign(a);
2545       invokeUnAssign(b);
2546     }
2547     return null;
2548   }
2549 
2550   private void doMergingOfReplicas(HRegionInfo mergedHri, final HRegionInfo hri_a,
2551       final HRegionInfo hri_b) {
2552     // Close replicas for the original unmerged regions. create/assign new replicas
2553     // for the merged parent.
2554     List<HRegionInfo> unmergedRegions = new ArrayList<HRegionInfo>();
2555     unmergedRegions.add(hri_a);
2556     unmergedRegions.add(hri_b);
2557     Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(unmergedRegions);
2558     Collection<List<HRegionInfo>> c = map.values();
2559     for (List<HRegionInfo> l : c) {
2560       for (HRegionInfo h : l) {
2561         if (!RegionReplicaUtil.isDefaultReplica(h)) {
2562           LOG.debug("Unassigning un-merged replica " + h);
2563           unassign(h);
2564         }
2565       }
2566     }
2567     int numReplicas = 1;
2568     try {
2569       numReplicas = ((MasterServices)server).getTableDescriptors().get(mergedHri.getTable()).
2570           getRegionReplication();
2571     } catch (IOException e) {
2572       LOG.warn("Couldn't get the replication attribute of the table " + mergedHri.getTable() +
2573           " due to " + e.getMessage() + ". The assignment of replicas for the merged region " +
2574           "will not be done");
2575     }
2576     List<HRegionInfo> regions = new ArrayList<HRegionInfo>();
2577     for (int i = 1; i < numReplicas; i++) {
2578       regions.add(RegionReplicaUtil.getRegionInfoForReplica(mergedHri, i));
2579     }
2580     try {
2581       assign(regions);
2582     } catch (IOException ioe) {
2583       LOG.warn("Couldn't assign all replica(s) of region " + mergedHri + " because of " +
2584                 ioe.getMessage());
2585     } catch (InterruptedException ie) {
2586       LOG.warn("Couldn't assign all replica(s) of region " + mergedHri+ " because of " +
2587                 ie.getMessage());
2588     }
2589   }
2590 
2591   private void doSplittingOfReplicas(final HRegionInfo parentHri, final HRegionInfo hri_a,
2592       final HRegionInfo hri_b) {
2593     // create new regions for the replica, and assign them to match with the
2594     // current replica assignments. If replica1 of parent is assigned to RS1,
2595     // the replica1s of daughters will be on the same machine
2596     int numReplicas = 1;
2597     try {
2598       numReplicas = ((MasterServices)server).getTableDescriptors().get(parentHri.getTable()).
2599           getRegionReplication();
2600     } catch (IOException e) {
2601       LOG.warn("Couldn't get the replication attribute of the table " + parentHri.getTable() +
2602           " due to " + e.getMessage() + ". The assignment of daughter replicas " +
2603           "replicas will not be done");
2604     }
2605     // unassign the old replicas
2606     List<HRegionInfo> parentRegion = new ArrayList<HRegionInfo>();
2607     parentRegion.add(parentHri);
2608     Map<ServerName, List<HRegionInfo>> currentAssign =
2609         regionStates.getRegionAssignments(parentRegion);
2610     Collection<List<HRegionInfo>> c = currentAssign.values();
2611     for (List<HRegionInfo> l : c) {
2612       for (HRegionInfo h : l) {
2613         if (!RegionReplicaUtil.isDefaultReplica(h)) {
2614           LOG.debug("Unassigning parent's replica " + h);
2615           unassign(h);
2616         }
2617       }
2618     }
2619     // assign daughter replicas
2620     Map<HRegionInfo, ServerName> map = new HashMap<HRegionInfo, ServerName>();
2621     for (int i = 1; i < numReplicas; i++) {
2622       prepareDaughterReplicaForAssignment(hri_a, parentHri, i, map);
2623       prepareDaughterReplicaForAssignment(hri_b, parentHri, i, map);
2624     }
2625     try {
2626       assign(map);
2627     } catch (IOException e) {
2628       LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)");
2629     } catch (InterruptedException e) {
2630       LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)");
2631     }
2632   }
2633 
2634   private void prepareDaughterReplicaForAssignment(HRegionInfo daughterHri, HRegionInfo parentHri,
2635       int replicaId, Map<HRegionInfo, ServerName> map) {
2636     HRegionInfo parentReplica = RegionReplicaUtil.getRegionInfoForReplica(parentHri, replicaId);
2637     HRegionInfo daughterReplica = RegionReplicaUtil.getRegionInfoForReplica(daughterHri,
2638         replicaId);
2639     LOG.debug("Created replica region for daughter " + daughterReplica);
2640     ServerName sn;
2641     if ((sn = regionStates.getRegionServerOfRegion(parentReplica)) != null) {
2642       map.put(daughterReplica, sn);
2643     } else {
2644       List<ServerName> servers = serverManager.getOnlineServersList();
2645       sn = servers.get((new Random(System.currentTimeMillis())).nextInt(servers.size()));
2646       map.put(daughterReplica, sn);
2647     }
2648   }
2649 
2650   public Set<HRegionInfo> getReplicasToClose() {
2651     return replicasToClose;
2652   }
2653 
2654   /**
2655    * A region is offline.  The new state should be the specified one,
2656    * if not null.  If the specified state is null, the new state is Offline.
2657    * The specified state can be Split/Merged/Offline/null only.
2658    */
2659   private void regionOffline(final HRegionInfo regionInfo, final State state) {
2660     regionStates.regionOffline(regionInfo, state);
2661     removeClosedRegion(regionInfo);
2662     // remove the region plan as well just in case.
2663     clearRegionPlan(regionInfo);
2664     balancer.regionOffline(regionInfo);
2665 
2666     // Tell our listeners that a region was closed
2667     sendRegionClosedNotification(regionInfo);
2668     // also note that all the replicas of the primary should be closed
2669     if (state != null && state.equals(State.SPLIT)) {
2670       Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1);
2671       c.add(regionInfo);
2672       Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(c);
2673       Collection<List<HRegionInfo>> allReplicas = map.values();
2674       for (List<HRegionInfo> list : allReplicas) {
2675         replicasToClose.addAll(list);
2676       }
2677     }
2678     else if (state != null && state.equals(State.MERGED)) {
2679       Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1);
2680       c.add(regionInfo);
2681       Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(c);
2682       Collection<List<HRegionInfo>> allReplicas = map.values();
2683       for (List<HRegionInfo> list : allReplicas) {
2684         replicasToClose.addAll(list);
2685       }
2686     }
2687   }
2688 
2689   private void sendRegionOpenedNotification(final HRegionInfo regionInfo,
2690       final ServerName serverName) {
2691     if (!this.listeners.isEmpty()) {
2692       for (AssignmentListener listener : this.listeners) {
2693         listener.regionOpened(regionInfo, serverName);
2694       }
2695     }
2696   }
2697 
2698   private void sendRegionClosedNotification(final HRegionInfo regionInfo) {
2699     if (!this.listeners.isEmpty()) {
2700       for (AssignmentListener listener : this.listeners) {
2701         listener.regionClosed(regionInfo);
2702       }
2703     }
2704   }
2705 
2706   /**
2707    * Try to update some region states. If the state machine prevents
2708    * such update, an error message is returned to explain the reason.
2709    *
2710    * It's expected that in each transition there should have just one
2711    * region for opening/closing, 3 regions for splitting/merging.
2712    * These regions should be on the server that requested the change.
2713    *
2714    * Region state machine. Only these transitions
2715    * are expected to be triggered by a region server.
2716    *
2717    * On the state transition:
2718    *  (1) Open/Close should be initiated by master
2719    *      (a) Master sets the region to pending_open/pending_close
2720    *        in memory and hbase:meta after sending the request
2721    *        to the region server
2722    *      (b) Region server reports back to the master
2723    *        after open/close is done (either success/failure)
2724    *      (c) If region server has problem to report the status
2725    *        to master, it must be because the master is down or some
2726    *        temporary network issue. Otherwise, the region server should
2727    *        abort since it must be a bug. If the master is not accessible,
2728    *        the region server should keep trying until the server is
2729    *        stopped or till the status is reported to the (new) master
2730    *      (d) If region server dies in the middle of opening/closing
2731    *        a region, SSH picks it up and finishes it
2732    *      (e) If master dies in the middle, the new master recovers
2733    *        the state during initialization from hbase:meta. Region server
2734    *        can report any transition that has not been reported to
2735    *        the previous active master yet
2736    *  (2) Split/merge is initiated by region servers
2737    *      (a) To split a region, a region server sends a request
2738    *        to master to try to set a region to splitting, together with
2739    *        two daughters (to be created) to splitting new. If approved
2740    *        by the master, the splitting can then move ahead
2741    *      (b) To merge two regions, a region server sends a request to
2742    *        master to try to set the new merged region (to be created) to
2743    *        merging_new, together with two regions (to be merged) to merging.
2744    *        If it is ok with the master, the merge can then move ahead
2745    *      (c) Once the splitting/merging is done, the region server
2746    *        reports the status back to the master either success/failure.
2747    *      (d) Other scenarios should be handled similarly as for
2748    *        region open/close
2749    */
2750   protected String onRegionTransition(final ServerName serverName,
2751       final RegionStateTransition transition) {
2752     TransitionCode code = transition.getTransitionCode();
2753     HRegionInfo hri = HRegionInfo.convert(transition.getRegionInfo(0));
2754     Lock lock = locker.acquireLock(hri.getEncodedName());
2755     try {
2756       RegionState current = regionStates.getRegionState(hri);
2757       if (LOG.isDebugEnabled()) {
2758         LOG.debug("Got transition " + code + " for "
2759           + (current != null ? current.toString() : hri.getShortNameToLog())
2760           + " from " + serverName);
2761       }
2762       String errorMsg = null;
2763       switch (code) {
2764       case OPENED:
2765         errorMsg = onRegionOpen(current, hri, serverName, transition);
2766         break;
2767       case FAILED_OPEN:
2768         errorMsg = onRegionFailedOpen(current, hri, serverName);
2769         break;
2770       case CLOSED:
2771         errorMsg = onRegionClosed(current, hri, serverName);
2772         break;
2773       case READY_TO_SPLIT:
2774         try {
2775           regionStateListener.onRegionSplit(hri);
2776           errorMsg = onRegionReadyToSplit(current, hri, serverName, transition);
2777         } catch (IOException exp) {
2778           errorMsg = StringUtils.stringifyException(exp);
2779         }
2780         break;
2781       case SPLIT_PONR:
2782         errorMsg = onRegionSplitPONR(current, hri, serverName, transition);
2783         break;
2784       case SPLIT:
2785         errorMsg = onRegionSplit(current, hri, serverName, transition);
2786         break;
2787       case SPLIT_REVERTED:
2788         errorMsg = onRegionSplitReverted(current, hri, serverName, transition);
2789         if (org.apache.commons.lang.StringUtils.isEmpty(errorMsg)) {
2790           try {
2791             regionStateListener.onRegionSplitReverted(hri);
2792           } catch (IOException exp) {
2793             LOG.warn(StringUtils.stringifyException(exp));
2794           }
2795         }
2796         break;
2797       case READY_TO_MERGE:
2798         errorMsg = onRegionReadyToMerge(current, hri, serverName, transition);
2799         break;
2800       case MERGE_PONR:
2801         errorMsg = onRegionMergePONR(current, hri, serverName, transition);
2802         break;
2803       case MERGED:
2804         try {
2805           errorMsg = onRegionMerged(current, hri, serverName, transition);
2806           regionStateListener.onRegionMerged(hri);
2807         } catch (IOException exp) {
2808           errorMsg = StringUtils.stringifyException(exp);
2809         }
2810         break;
2811       case MERGE_REVERTED:
2812         errorMsg = onRegionMergeReverted(current, hri, serverName, transition);
2813         break;
2814 
2815       default:
2816         errorMsg = "Unexpected transition code " + code;
2817       }
2818       if (errorMsg != null) {
2819         LOG.info("Could not transition region from " + current + " on "
2820           + code + " by " + serverName + ": " + errorMsg);
2821       }
2822       return errorMsg;
2823     } finally {
2824       lock.unlock();
2825     }
2826   }
2827 
2828   /**
2829    * @return Instance of load balancer
2830    */
2831   public LoadBalancer getBalancer() {
2832     return this.balancer;
2833   }
2834 
2835   public Map<ServerName, List<HRegionInfo>>
2836     getSnapShotOfAssignment(Collection<HRegionInfo> infos) {
2837     return getRegionStates().getRegionAssignments(infos);
2838   }
2839 
2840   void setRegionStateListener(RegionStateListener listener) {
2841     this.regionStateListener = listener;
2842   }
2843 }