View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Collection;
24  import java.util.Collections;
25  import java.util.HashMap;
26  import java.util.HashSet;
27  import java.util.Iterator;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.NavigableMap;
31  import java.util.Random;
32  import java.util.Set;
33  import java.util.TreeMap;
34  import java.util.concurrent.Callable;
35  import java.util.concurrent.ConcurrentHashMap;
36  import java.util.concurrent.CopyOnWriteArrayList;
37  import java.util.concurrent.TimeUnit;
38  import java.util.concurrent.atomic.AtomicBoolean;
39  import java.util.concurrent.atomic.AtomicInteger;
40  import java.util.concurrent.locks.Lock;
41  import java.util.concurrent.locks.ReentrantLock;
42  
43  import org.apache.commons.logging.Log;
44  import org.apache.commons.logging.LogFactory;
45  import org.apache.hadoop.hbase.classification.InterfaceAudience;
46  import org.apache.hadoop.conf.Configuration;
47  import org.apache.hadoop.fs.FileSystem;
48  import org.apache.hadoop.fs.Path;
49  import org.apache.hadoop.hbase.CoordinatedStateException;
50  import org.apache.hadoop.hbase.HBaseIOException;
51  import org.apache.hadoop.hbase.HConstants;
52  import org.apache.hadoop.hbase.HRegionInfo;
53  import org.apache.hadoop.hbase.HRegionLocation;
54  import org.apache.hadoop.hbase.HTableDescriptor;
55  import org.apache.hadoop.hbase.MetaTableAccessor;
56  import org.apache.hadoop.hbase.NotServingRegionException;
57  import org.apache.hadoop.hbase.RegionLocations;
58  import org.apache.hadoop.hbase.RegionStateListener;
59  import org.apache.hadoop.hbase.ServerName;
60  import org.apache.hadoop.hbase.TableName;
61  import org.apache.hadoop.hbase.TableNotFoundException;
62  import org.apache.hadoop.hbase.client.RegionReplicaUtil;
63  import org.apache.hadoop.hbase.client.Result;
64  import org.apache.hadoop.hbase.client.TableState;
65  import org.apache.hadoop.hbase.executor.EventHandler;
66  import org.apache.hadoop.hbase.executor.EventType;
67  import org.apache.hadoop.hbase.executor.ExecutorService;
68  import org.apache.hadoop.hbase.ipc.FailedServerException;
69  import org.apache.hadoop.hbase.ipc.RpcClient;
70  import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
71  import org.apache.hadoop.hbase.master.RegionState.State;
72  import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper;
73  import org.apache.hadoop.hbase.master.balancer.FavoredNodeLoadBalancer;
74  import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
75  import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
76  import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
77  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
78  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
79  import org.apache.hadoop.hbase.quotas.QuotaExceededException;
80  import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
81  import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
82  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
83  import org.apache.hadoop.hbase.wal.DefaultWALProvider;
84  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
85  import org.apache.hadoop.hbase.util.FSUtils;
86  import org.apache.hadoop.hbase.util.KeyLocker;
87  import org.apache.hadoop.hbase.util.Pair;
88  import org.apache.hadoop.hbase.util.PairOfSameType;
89  import org.apache.hadoop.hbase.util.Threads;
90  import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
91  import org.apache.hadoop.ipc.RemoteException;
92  import org.apache.hadoop.util.StringUtils;
93  import org.apache.zookeeper.KeeperException;
94  
95  import com.google.common.annotations.VisibleForTesting;
96  
97  /**
98   * Manages and performs region assignment.
99   * Related communications with regionserver are all done over RPC.
100  */
101 @InterfaceAudience.Private
102 public class AssignmentManager {
103   private static final Log LOG = LogFactory.getLog(AssignmentManager.class);
104 
105   protected final MasterServices server;
106 
107   private ServerManager serverManager;
108 
109   private boolean shouldAssignRegionsWithFavoredNodes;
110 
111   private LoadBalancer balancer;
112 
113   private final MetricsAssignmentManager metricsAssignmentManager;
114 
115   private final TableLockManager tableLockManager;
116 
117   private AtomicInteger numRegionsOpened = new AtomicInteger(0);
118 
119   final private KeyLocker<String> locker = new KeyLocker<String>();
120 
121   Set<HRegionInfo> replicasToClose = Collections.synchronizedSet(new HashSet<HRegionInfo>());
122 
123   /**
124    * Map of regions to reopen after the schema of a table is changed. Key -
125    * encoded region name, value - HRegionInfo
126    */
127   private final Map <String, HRegionInfo> regionsToReopen;
128 
129   /*
130    * Maximum times we recurse an assignment/unassignment.
131    * See below in {@link #assign()} and {@link #unassign()}.
132    */
133   private final int maximumAttempts;
134 
135   /**
136    * The sleep time for which the assignment will wait before retrying in case of
137    * hbase:meta assignment failure due to lack of availability of region plan or bad region plan
138    */
139   private final long sleepTimeBeforeRetryingMetaAssignment;
140 
141   /** Plans for region movement. Key is the encoded version of a region name*/
142   // TODO: When do plans get cleaned out?  Ever? In server open and in server
143   // shutdown processing -- St.Ack
144   // All access to this Map must be synchronized.
145   final NavigableMap<String, RegionPlan> regionPlans =
146     new TreeMap<String, RegionPlan>();
147 
148   private final TableStateManager tableStateManager;
149 
150   private final ExecutorService executorService;
151 
152   // Thread pool executor service. TODO, consolidate with executorService?
153   private java.util.concurrent.ExecutorService threadPoolExecutorService;
154 
155   private final RegionStates regionStates;
156 
157   // The threshold to use bulk assigning. Using bulk assignment
158   // only if assigning at least this many regions to at least this
159   // many servers. If assigning fewer regions to fewer servers,
160   // bulk assigning may be not as efficient.
161   private final int bulkAssignThresholdRegions;
162   private final int bulkAssignThresholdServers;
163   private final int bulkPerRegionOpenTimeGuesstimate;
164 
165   // Should bulk assignment wait till all regions are assigned,
166   // or it is timed out?  This is useful to measure bulk assignment
167   // performance, but not needed in most use cases.
168   private final boolean bulkAssignWaitTillAllAssigned;
169 
170   /**
171    * Indicator that AssignmentManager has recovered the region states so
172    * that ServerShutdownHandler can be fully enabled and re-assign regions
173    * of dead servers. So that when re-assignment happens, AssignmentManager
174    * has proper region states.
175    *
176    * Protected to ease testing.
177    */
178   protected final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
179 
180   /**
181    * A map to track the count a region fails to open in a row.
182    * So that we don't try to open a region forever if the failure is
183    * unrecoverable.  We don't put this information in region states
184    * because we don't expect this to happen frequently; we don't
185    * want to copy this information over during each state transition either.
186    */
187   private final ConcurrentHashMap<String, AtomicInteger>
188     failedOpenTracker = new ConcurrentHashMap<String, AtomicInteger>();
189 
190   // In case not using ZK for region assignment, region states
191   // are persisted in meta with a state store
192   private final RegionStateStore regionStateStore;
193 
194   /**
195    * For testing only!  Set to true to skip handling of split.
196    */
197   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
198   public static boolean TEST_SKIP_SPLIT_HANDLING = false;
199 
200   /** Listeners that are called on assignment events. */
201   private List<AssignmentListener> listeners = new CopyOnWriteArrayList<AssignmentListener>();
202 
203   private RegionStateListener regionStateListener;
204 
205   /**
206    * Constructs a new assignment manager.
207    *
208    * @param server instance of HMaster this AM running inside
209    * @param serverManager serverManager for associated HMaster
210    * @param balancer implementation of {@link LoadBalancer}
211    * @param service Executor service
212    * @param metricsMaster metrics manager
213    * @param tableLockManager TableLock manager
214    * @throws IOException
215    */
216   public AssignmentManager(MasterServices server, ServerManager serverManager,
217       final LoadBalancer balancer,
218       final ExecutorService service, MetricsMaster metricsMaster,
219       final TableLockManager tableLockManager,
220       final TableStateManager tableStateManager)
221           throws IOException {
222     this.server = server;
223     this.serverManager = serverManager;
224     this.executorService = service;
225     this.regionStateStore = new RegionStateStore(server);
226     this.regionsToReopen = Collections.synchronizedMap
227                            (new HashMap<String, HRegionInfo> ());
228     Configuration conf = server.getConfiguration();
229     // Only read favored nodes if using the favored nodes load balancer.
230     this.shouldAssignRegionsWithFavoredNodes = conf.getClass(
231            HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class).equals(
232            FavoredNodeLoadBalancer.class);
233 
234     this.tableStateManager = tableStateManager;
235 
236     // This is the max attempts, not retries, so it should be at least 1.
237     this.maximumAttempts = Math.max(1,
238       this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10));
239     this.sleepTimeBeforeRetryingMetaAssignment = this.server.getConfiguration().getLong(
240         "hbase.meta.assignment.retry.sleeptime", 1000l);
241     this.balancer = balancer;
242     int maxThreads = conf.getInt("hbase.assignment.threads.max", 30);
243     this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool(
244       maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("AM."));
245     this.regionStates = new RegionStates(
246       server, tableStateManager, serverManager, regionStateStore);
247 
248     this.bulkAssignWaitTillAllAssigned =
249       conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false);
250     this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7);
251     this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3);
252     this.bulkPerRegionOpenTimeGuesstimate =
253       conf.getInt("hbase.bulk.assignment.perregion.open.time", 10000);
254 
255     this.metricsAssignmentManager = new MetricsAssignmentManager();
256     this.tableLockManager = tableLockManager;
257   }
258 
259   /**
260    * Add the listener to the notification list.
261    * @param listener The AssignmentListener to register
262    */
263   public void registerListener(final AssignmentListener listener) {
264     this.listeners.add(listener);
265   }
266 
267   /**
268    * Remove the listener from the notification list.
269    * @param listener The AssignmentListener to unregister
270    */
271   public boolean unregisterListener(final AssignmentListener listener) {
272     return this.listeners.remove(listener);
273   }
274 
275   /**
276    * @return Instance of ZKTableStateManager.
277    */
278   public TableStateManager getTableStateManager() {
279     // These are 'expensive' to make involving trip to zk ensemble so allow
280     // sharing.
281     return this.tableStateManager;
282   }
283 
284   /**
285    * This SHOULD not be public. It is public now
286    * because of some unit tests.
287    *
288    * TODO: make it package private and keep RegionStates in the master package
289    */
290   public RegionStates getRegionStates() {
291     return regionStates;
292   }
293 
294   /**
295    * Used in some tests to mock up region state in meta
296    */
297   @VisibleForTesting
298   RegionStateStore getRegionStateStore() {
299     return regionStateStore;
300   }
301 
302   public RegionPlan getRegionReopenPlan(HRegionInfo hri) {
303     return new RegionPlan(hri, null, regionStates.getRegionServerOfRegion(hri));
304   }
305 
306   /**
307    * Add a regionPlan for the specified region.
308    * @param encodedName
309    * @param plan
310    */
311   public void addPlan(String encodedName, RegionPlan plan) {
312     synchronized (regionPlans) {
313       regionPlans.put(encodedName, plan);
314     }
315   }
316 
317   /**
318    * Add a map of region plans.
319    */
320   public void addPlans(Map<String, RegionPlan> plans) {
321     synchronized (regionPlans) {
322       regionPlans.putAll(plans);
323     }
324   }
325 
326   /**
327    * Set the list of regions that will be reopened
328    * because of an update in table schema
329    *
330    * @param regions
331    *          list of regions that should be tracked for reopen
332    */
333   public void setRegionsToReopen(List <HRegionInfo> regions) {
334     for(HRegionInfo hri : regions) {
335       regionsToReopen.put(hri.getEncodedName(), hri);
336     }
337   }
338 
339   /**
340    * Used by the client to identify if all regions have the schema updates
341    *
342    * @param tableName
343    * @return Pair indicating the status of the alter command
344    * @throws IOException
345    */
346   public Pair<Integer, Integer> getReopenStatus(TableName tableName)
347       throws IOException {
348     List<HRegionInfo> hris;
349     if (TableName.META_TABLE_NAME.equals(tableName)) {
350       hris = new MetaTableLocator().getMetaRegions(server.getZooKeeper());
351     } else {
352       hris = MetaTableAccessor.getTableRegions(server.getConnection(), tableName, true);
353     }
354 
355     Integer pending = 0;
356     for (HRegionInfo hri : hris) {
357       String name = hri.getEncodedName();
358       // no lock concurrent access ok: sequential consistency respected.
359       if (regionsToReopen.containsKey(name)
360           || regionStates.isRegionInTransition(name)) {
361         pending++;
362       }
363     }
364     return new Pair<Integer, Integer>(pending, hris.size());
365   }
366 
367   /**
368    * Used by ServerShutdownHandler to make sure AssignmentManager has completed
369    * the failover cleanup before re-assigning regions of dead servers. So that
370    * when re-assignment happens, AssignmentManager has proper region states.
371    */
372   public boolean isFailoverCleanupDone() {
373     return failoverCleanupDone.get();
374   }
375 
376   /**
377    * To avoid racing with AM, external entities may need to lock a region,
378    * for example, when SSH checks what regions to skip re-assigning.
379    */
380   public Lock acquireRegionLock(final String encodedName) {
381     return locker.acquireLock(encodedName);
382   }
383 
384   /**
385    * Now, failover cleanup is completed. Notify server manager to
386    * process queued up dead servers processing, if any.
387    */
388   void failoverCleanupDone() {
389     failoverCleanupDone.set(true);
390     serverManager.processQueuedDeadServers();
391   }
392 
393   /**
394    * Called on startup.
395    * Figures whether a fresh cluster start of we are joining extant running cluster.
396    * @throws IOException
397    * @throws KeeperException
398    * @throws InterruptedException
399    * @throws CoordinatedStateException
400    */
401   void joinCluster()
402   throws IOException, KeeperException, InterruptedException, CoordinatedStateException {
403     long startTime = System.currentTimeMillis();
404     // Concurrency note: In the below the accesses on regionsInTransition are
405     // outside of a synchronization block where usually all accesses to RIT are
406     // synchronized.  The presumption is that in this case it is safe since this
407     // method is being played by a single thread on startup.
408 
409     // TODO: Regions that have a null location and are not in regionsInTransitions
410     // need to be handled.
411 
412     // Scan hbase:meta to build list of existing regions, servers, and assignment
413     // Returns servers who have not checked in (assumed dead) that some regions
414     // were assigned to (according to the meta)
415     Set<ServerName> deadServers = rebuildUserRegions();
416 
417     // This method will assign all user regions if a clean server startup or
418     // it will reconstruct master state and cleanup any leftovers from previous master process.
419     boolean failover = processDeadServersAndRegionsInTransition(deadServers);
420 
421     recoverTableInDisablingState();
422     recoverTableInEnablingState();
423     LOG.info("Joined the cluster in " + (System.currentTimeMillis()
424       - startTime) + "ms, failover=" + failover);
425   }
426 
427   /**
428    * Process all regions that are in transition in zookeeper and also
429    * processes the list of dead servers.
430    * Used by master joining an cluster.  If we figure this is a clean cluster
431    * startup, will assign all user regions.
432    * @param deadServers Set of servers that are offline probably legitimately that were carrying
433    * regions according to a scan of hbase:meta. Can be null.
434    * @throws IOException
435    * @throws InterruptedException
436    */
437   boolean processDeadServersAndRegionsInTransition(final Set<ServerName> deadServers)
438   throws KeeperException, IOException, InterruptedException, CoordinatedStateException {
439     // TODO Needed? List<String> nodes = ZKUtil.listChildrenNoWatch(watcher, watcher.assignmentZNode);
440     boolean failover = !serverManager.getDeadServers().isEmpty();
441     if (failover) {
442       // This may not be a failover actually, especially if meta is on this master.
443       if (LOG.isDebugEnabled()) {
444         LOG.debug("Found dead servers out on cluster " + serverManager.getDeadServers());
445       }
446     } else {
447       // If any one region except meta is assigned, it's a failover.
448       Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
449       for (Map.Entry<HRegionInfo, ServerName> en:
450           regionStates.getRegionAssignments().entrySet()) {
451         HRegionInfo hri = en.getKey();
452         if (!hri.isMetaTable()
453             && onlineServers.contains(en.getValue())) {
454           LOG.debug("Found " + hri + " out on cluster");
455           failover = true;
456           break;
457         }
458       }
459       if (!failover) {
460         // If any region except meta is in transition on a live server, it's a failover.
461         Map<String, RegionState> regionsInTransition = regionStates.getRegionsInTransition();
462         if (!regionsInTransition.isEmpty()) {
463           for (RegionState regionState: regionsInTransition.values()) {
464             ServerName serverName = regionState.getServerName();
465             if (!regionState.getRegion().isMetaRegion()
466                 && serverName != null && onlineServers.contains(serverName)) {
467               LOG.debug("Found " + regionState + " in RITs");
468               failover = true;
469               break;
470             }
471           }
472         }
473       }
474     }
475     if (!failover) {
476       // If we get here, we have a full cluster restart. It is a failover only
477       // if there are some WALs are not split yet. For meta WALs, they should have
478       // been split already, if any. We can walk through those queued dead servers,
479       // if they don't have any WALs, this restart should be considered as a clean one
480       Set<ServerName> queuedDeadServers = serverManager.getRequeuedDeadServers().keySet();
481       if (!queuedDeadServers.isEmpty()) {
482         Configuration conf = server.getConfiguration();
483         Path rootdir = FSUtils.getRootDir(conf);
484         FileSystem fs = rootdir.getFileSystem(conf);
485         for (ServerName serverName: queuedDeadServers) {
486           // In the case of a clean exit, the shutdown handler would have presplit any WALs and
487           // removed empty directories.
488           Path logDir = new Path(rootdir,
489               DefaultWALProvider.getWALDirectoryName(serverName.toString()));
490           Path splitDir = logDir.suffix(DefaultWALProvider.SPLITTING_EXT);
491           if (fs.exists(logDir) || fs.exists(splitDir)) {
492             LOG.debug("Found queued dead server " + serverName);
493             failover = true;
494             break;
495           }
496         }
497         if (!failover) {
498           // We figured that it's not a failover, so no need to
499           // work on these re-queued dead servers any more.
500           LOG.info("AM figured that it's not a failover and cleaned up "
501             + queuedDeadServers.size() + " queued dead servers");
502           serverManager.removeRequeuedDeadServers();
503         }
504       }
505     }
506 
507     Set<TableName> disabledOrDisablingOrEnabling = null;
508     Map<HRegionInfo, ServerName> allRegions = null;
509 
510     if (!failover) {
511       disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
512         TableState.State.DISABLED, TableState.State.DISABLING,
513         TableState.State.ENABLING);
514 
515       // Clean re/start, mark all user regions closed before reassignment
516       allRegions = regionStates.closeAllUserRegions(
517         disabledOrDisablingOrEnabling);
518     }
519 
520     // Now region states are restored
521     regionStateStore.start();
522 
523     if (failover) {
524       if (deadServers != null && !deadServers.isEmpty()) {
525         for (ServerName serverName: deadServers) {
526           if (!serverManager.isServerDead(serverName)) {
527             serverManager.expireServer(serverName); // Let SSH do region re-assign
528           }
529         }
530       }
531       processRegionsInTransition(regionStates.getRegionsInTransition().values());
532     }
533 
534     // Now we can safely claim failover cleanup completed and enable
535     // ServerShutdownHandler for further processing. The nodes (below)
536     // in transition, if any, are for regions not related to those
537     // dead servers at all, and can be done in parallel to SSH.
538     failoverCleanupDone();
539     if (!failover) {
540       // Fresh cluster startup.
541       LOG.info("Clean cluster startup. Assigning user regions");
542       assignAllUserRegions(allRegions);
543     }
544     // unassign replicas of the split parents and the merged regions
545     // the daughter replicas are opened in assignAllUserRegions if it was
546     // not already opened.
547     for (HRegionInfo h : replicasToClose) {
548       unassign(h);
549     }
550     replicasToClose.clear();
551     return failover;
552   }
553 
554   /**
555    * When a region is closed, it should be removed from the regionsToReopen
556    * @param hri HRegionInfo of the region which was closed
557    */
558   public void removeClosedRegion(HRegionInfo hri) {
559     if (regionsToReopen.remove(hri.getEncodedName()) != null) {
560       LOG.debug("Removed region from reopening regions because it was closed");
561     }
562   }
563 
564   // TODO: processFavoredNodes might throw an exception, for e.g., if the
565   // meta could not be contacted/updated. We need to see how seriously to treat
566   // this problem as. Should we fail the current assignment. We should be able
567   // to recover from this problem eventually (if the meta couldn't be updated
568   // things should work normally and eventually get fixed up).
569   void processFavoredNodes(List<HRegionInfo> regions) throws IOException {
570     if (!shouldAssignRegionsWithFavoredNodes) return;
571     // The AM gets the favored nodes info for each region and updates the meta
572     // table with that info
573     Map<HRegionInfo, List<ServerName>> regionToFavoredNodes =
574         new HashMap<HRegionInfo, List<ServerName>>();
575     for (HRegionInfo region : regions) {
576       regionToFavoredNodes.put(region,
577           ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region));
578     }
579     FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(regionToFavoredNodes,
580       this.server.getConnection());
581   }
582 
583   /**
584    * Marks the region as online.  Removes it from regions in transition and
585    * updates the in-memory assignment information.
586    * <p>
587    * Used when a region has been successfully opened on a region server.
588    * @param regionInfo
589    * @param sn
590    */
591   void regionOnline(HRegionInfo regionInfo, ServerName sn) {
592     regionOnline(regionInfo, sn, HConstants.NO_SEQNUM);
593   }
594 
595   void regionOnline(HRegionInfo regionInfo, ServerName sn, long openSeqNum) {
596     numRegionsOpened.incrementAndGet();
597     regionStates.regionOnline(regionInfo, sn, openSeqNum);
598 
599     // Remove plan if one.
600     clearRegionPlan(regionInfo);
601     balancer.regionOnline(regionInfo, sn);
602 
603     // Tell our listeners that a region was opened
604     sendRegionOpenedNotification(regionInfo, sn);
605   }
606 
607   /**
608    * Marks the region as offline.  Removes it from regions in transition and
609    * removes in-memory assignment information.
610    * <p>
611    * Used when a region has been closed and should remain closed.
612    * @param regionInfo
613    */
614   public void regionOffline(final HRegionInfo regionInfo) {
615     regionOffline(regionInfo, null);
616   }
617 
618   public void offlineDisabledRegion(HRegionInfo regionInfo) {
619     replicasToClose.remove(regionInfo);
620     regionOffline(regionInfo);
621   }
622 
623   // Assignment methods
624 
625   /**
626    * Assigns the specified region.
627    * <p>
628    * If a RegionPlan is available with a valid destination then it will be used
629    * to determine what server region is assigned to.  If no RegionPlan is
630    * available, region will be assigned to a random available server.
631    * <p>
632    * Updates the RegionState and sends the OPEN RPC.
633    * <p>
634    * This will only succeed if the region is in transition and in a CLOSED or
635    * OFFLINE state or not in transition, and of course, the
636    * chosen server is up and running (It may have just crashed!).
637    *
638    * @param region server to be assigned
639    */
640   public void assign(HRegionInfo region) {
641     assign(region, false);
642   }
643 
644   /**
645    * Use care with forceNewPlan. It could cause double assignment.
646    */
647   public void assign(HRegionInfo region, boolean forceNewPlan) {
648     if (isDisabledorDisablingRegionInRIT(region)) {
649       return;
650     }
651     String encodedName = region.getEncodedName();
652     Lock lock = locker.acquireLock(encodedName);
653     try {
654       RegionState state = forceRegionStateToOffline(region, forceNewPlan);
655       if (state != null) {
656         if (regionStates.wasRegionOnDeadServer(encodedName)) {
657           LOG.info("Skip assigning " + region.getRegionNameAsString()
658             + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
659             + " is dead but not processed yet");
660           return;
661         }
662         assign(state, forceNewPlan);
663       }
664     } finally {
665       lock.unlock();
666     }
667   }
668 
669   /**
670    * Bulk assign regions to <code>destination</code>.
671    * @param destination
672    * @param regions Regions to assign.
673    * @return true if successful
674    */
675   boolean assign(final ServerName destination, final List<HRegionInfo> regions)
676     throws InterruptedException {
677     long startTime = EnvironmentEdgeManager.currentTime();
678     try {
679       int regionCount = regions.size();
680       if (regionCount == 0) {
681         return true;
682       }
683       LOG.info("Assigning " + regionCount + " region(s) to " + destination.toString());
684       Set<String> encodedNames = new HashSet<String>(regionCount);
685       for (HRegionInfo region : regions) {
686         encodedNames.add(region.getEncodedName());
687       }
688 
689       List<HRegionInfo> failedToOpenRegions = new ArrayList<HRegionInfo>();
690       Map<String, Lock> locks = locker.acquireLocks(encodedNames);
691       try {
692         Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>(regionCount);
693         List<RegionState> states = new ArrayList<RegionState>(regionCount);
694         for (HRegionInfo region : regions) {
695           String encodedName = region.getEncodedName();
696           if (!isDisabledorDisablingRegionInRIT(region)) {
697             RegionState state = forceRegionStateToOffline(region, false);
698             boolean onDeadServer = false;
699             if (state != null) {
700               if (regionStates.wasRegionOnDeadServer(encodedName)) {
701                 LOG.info("Skip assigning " + region.getRegionNameAsString()
702                   + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
703                   + " is dead but not processed yet");
704                 onDeadServer = true;
705               } else {
706                 RegionPlan plan = new RegionPlan(region, state.getServerName(), destination);
707                 plans.put(encodedName, plan);
708                 states.add(state);
709                 continue;
710               }
711             }
712             // Reassign if the region wasn't on a dead server
713             if (!onDeadServer) {
714               LOG.info("failed to force region state to offline, "
715                 + "will reassign later: " + region);
716               failedToOpenRegions.add(region); // assign individually later
717             }
718           }
719           // Release the lock, this region is excluded from bulk assign because
720           // we can't update its state, or set its znode to offline.
721           Lock lock = locks.remove(encodedName);
722           lock.unlock();
723         }
724 
725         if (server.isStopped()) {
726           return false;
727         }
728 
729         // Add region plans, so we can updateTimers when one region is opened so
730         // that unnecessary timeout on RIT is reduced.
731         this.addPlans(plans);
732 
733         List<Pair<HRegionInfo, List<ServerName>>> regionOpenInfos =
734           new ArrayList<Pair<HRegionInfo, List<ServerName>>>(states.size());
735         for (RegionState state: states) {
736           HRegionInfo region = state.getRegion();
737           regionStates.updateRegionState(
738             region, State.PENDING_OPEN, destination);
739           List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
740           if (this.shouldAssignRegionsWithFavoredNodes) {
741             favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
742           }
743           regionOpenInfos.add(new Pair<HRegionInfo, List<ServerName>>(
744             region, favoredNodes));
745         }
746 
747         // Move on to open regions.
748         try {
749           // Send OPEN RPC. If it fails on a IOE or RemoteException,
750           // regions will be assigned individually.
751           Configuration conf = server.getConfiguration();
752           long maxWaitTime = System.currentTimeMillis() +
753             conf.getLong("hbase.regionserver.rpc.startup.waittime", 60000);
754           for (int i = 1; i <= maximumAttempts && !server.isStopped(); i++) {
755             try {
756               List<RegionOpeningState> regionOpeningStateList = serverManager
757                 .sendRegionOpen(destination, regionOpenInfos);
758               for (int k = 0, n = regionOpeningStateList.size(); k < n; k++) {
759                 RegionOpeningState openingState = regionOpeningStateList.get(k);
760                 if (openingState != RegionOpeningState.OPENED) {
761                   HRegionInfo region = regionOpenInfos.get(k).getFirst();
762                   LOG.info("Got opening state " + openingState
763                     + ", will reassign later: " + region);
764                   // Failed opening this region, reassign it later
765                   forceRegionStateToOffline(region, true);
766                   failedToOpenRegions.add(region);
767                 }
768               }
769               break;
770             } catch (IOException e) {
771               if (e instanceof RemoteException) {
772                 e = ((RemoteException)e).unwrapRemoteException();
773               }
774               if (e instanceof RegionServerStoppedException) {
775                 LOG.warn("The region server was shut down, ", e);
776                 // No need to retry, the region server is a goner.
777                 return false;
778               } else if (e instanceof ServerNotRunningYetException) {
779                 long now = System.currentTimeMillis();
780                 if (now < maxWaitTime) {
781                   if (LOG.isDebugEnabled()) {
782                     LOG.debug("Server is not yet up; waiting up to " +
783                       (maxWaitTime - now) + "ms", e);
784                   }
785                   Thread.sleep(100);
786                   i--; // reset the try count
787                   continue;
788                 }
789               } else if (e instanceof java.net.SocketTimeoutException
790                   && this.serverManager.isServerOnline(destination)) {
791                 // In case socket is timed out and the region server is still online,
792                 // the openRegion RPC could have been accepted by the server and
793                 // just the response didn't go through.  So we will retry to
794                 // open the region on the same server.
795                 if (LOG.isDebugEnabled()) {
796                   LOG.debug("Bulk assigner openRegion() to " + destination
797                     + " has timed out, but the regions might"
798                     + " already be opened on it.", e);
799                 }
800                 // wait and reset the re-try count, server might be just busy.
801                 Thread.sleep(100);
802                 i--;
803                 continue;
804               } else if (e instanceof FailedServerException && i < maximumAttempts) {
805                 // In case the server is in the failed server list, no point to
806                 // retry too soon. Retry after the failed_server_expiry time
807                 long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
808                   RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
809                 if (LOG.isDebugEnabled()) {
810                   LOG.debug(destination + " is on failed server list; waiting "
811                     + sleepTime + "ms", e);
812                 }
813                 Thread.sleep(sleepTime);
814                 continue;
815               }
816               throw e;
817             }
818           }
819         } catch (IOException e) {
820           // Can be a socket timeout, EOF, NoRouteToHost, etc
821           LOG.info("Unable to communicate with " + destination
822             + " in order to assign regions, ", e);
823           for (RegionState state: states) {
824             HRegionInfo region = state.getRegion();
825             forceRegionStateToOffline(region, true);
826           }
827           return false;
828         }
829       } finally {
830         for (Lock lock : locks.values()) {
831           lock.unlock();
832         }
833       }
834 
835       if (!failedToOpenRegions.isEmpty()) {
836         for (HRegionInfo region : failedToOpenRegions) {
837           if (!regionStates.isRegionOnline(region)) {
838             invokeAssign(region);
839           }
840         }
841       }
842 
843       // wait for assignment completion
844       ArrayList<HRegionInfo> userRegionSet = new ArrayList<HRegionInfo>(regions.size());
845       for (HRegionInfo region: regions) {
846         if (!region.getTable().isSystemTable()) {
847           userRegionSet.add(region);
848         }
849       }
850       if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
851             System.currentTimeMillis())) {
852         LOG.debug("some user regions are still in transition: " + userRegionSet);
853       }
854       LOG.debug("Bulk assigning done for " + destination);
855       return true;
856     } finally {
857       metricsAssignmentManager.updateBulkAssignTime(EnvironmentEdgeManager.currentTime() - startTime);
858     }
859   }
860 
861   /**
862    * Send CLOSE RPC if the server is online, otherwise, offline the region.
863    *
864    * The RPC will be sent only to the region sever found in the region state
865    * if it is passed in, otherwise, to the src server specified. If region
866    * state is not specified, we don't update region state at all, instead
867    * we just send the RPC call. This is useful for some cleanup without
868    * messing around the region states (see handleRegion, on region opened
869    * on an unexpected server scenario, for an example)
870    */
871   private void unassign(final HRegionInfo region,
872       final ServerName server, final ServerName dest) {
873     for (int i = 1; i <= this.maximumAttempts; i++) {
874       if (this.server.isStopped() || this.server.isAborted()) {
875         LOG.debug("Server stopped/aborted; skipping unassign of " + region);
876         return;
877       }
878       if (!serverManager.isServerOnline(server)) {
879         LOG.debug("Offline " + region.getRegionNameAsString()
880           + ", no need to unassign since it's on a dead server: " + server);
881         regionStates.updateRegionState(region, State.OFFLINE);
882         return;
883       }
884       try {
885         // Send CLOSE RPC
886         if (serverManager.sendRegionClose(server, region, dest)) {
887           LOG.debug("Sent CLOSE to " + server + " for region " +
888             region.getRegionNameAsString());
889           return;
890         }
891         // This never happens. Currently regionserver close always return true.
892         // Todo; this can now happen (0.96) if there is an exception in a coprocessor
893         LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
894           region.getRegionNameAsString());
895       } catch (Throwable t) {
896         long sleepTime = 0;
897         Configuration conf = this.server.getConfiguration();
898         if (t instanceof RemoteException) {
899           t = ((RemoteException)t).unwrapRemoteException();
900         }
901         if (t instanceof RegionServerAbortedException
902             || t instanceof RegionServerStoppedException
903             || t instanceof ServerNotRunningYetException) {
904           // RS is aborting, we cannot offline the region since the region may need to do WAL
905           // recovery. Until we see  the RS expiration, we should retry.
906           sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
907             RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
908 
909         } else if (t instanceof NotServingRegionException) {
910           LOG.debug("Offline " + region.getRegionNameAsString()
911             + ", it's not any more on " + server, t);
912           regionStates.updateRegionState(region, State.OFFLINE);
913           return;
914         } else if (t instanceof FailedServerException && i < maximumAttempts) {
915           // In case the server is in the failed server list, no point to
916           // retry too soon. Retry after the failed_server_expiry time
917           sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
918           RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
919           if (LOG.isDebugEnabled()) {
920             LOG.debug(server + " is on failed server list; waiting " + sleepTime + "ms", t);
921           }
922        }
923        try {
924          if (sleepTime > 0) {
925            Thread.sleep(sleepTime);
926          }
927        } catch (InterruptedException ie) {
928          LOG.warn("Interrupted unassign " + region.getRegionNameAsString(), ie);
929          Thread.currentThread().interrupt();
930          regionStates.updateRegionState(region, State.FAILED_CLOSE);
931          return;
932        }
933        LOG.info("Server " + server + " returned " + t + " for "
934          + region.getRegionNameAsString() + ", try=" + i
935          + " of " + this.maximumAttempts, t);
936       }
937     }
938     // Run out of attempts
939     regionStates.updateRegionState(region, State.FAILED_CLOSE);
940   }
941 
942   /**
943    * Set region to OFFLINE unless it is opening and forceNewPlan is false.
944    */
945   private RegionState forceRegionStateToOffline(
946       final HRegionInfo region, final boolean forceNewPlan) {
947     RegionState state = regionStates.getRegionState(region);
948     if (state == null) {
949       LOG.warn("Assigning but not in region states: " + region);
950       state = regionStates.createRegionState(region);
951     }
952 
953     if (forceNewPlan && LOG.isDebugEnabled()) {
954       LOG.debug("Force region state offline " + state);
955     }
956 
957     switch (state.getState()) {
958     case OPEN:
959     case OPENING:
960     case PENDING_OPEN:
961     case CLOSING:
962     case PENDING_CLOSE:
963       if (!forceNewPlan) {
964         LOG.debug("Skip assigning " +
965           region + ", it is already " + state);
966         return null;
967       }
968     case FAILED_CLOSE:
969     case FAILED_OPEN:
970       regionStates.updateRegionState(region, State.PENDING_CLOSE);
971       unassign(region, state.getServerName(), null);
972       state = regionStates.getRegionState(region);
973       if (!state.isOffline() && !state.isClosed()) {
974         // If the region isn't offline, we can't re-assign
975         // it now. It will be assigned automatically after
976         // the regionserver reports it's closed.
977         return null;
978       }
979     case OFFLINE:
980     case CLOSED:
981       break;
982     default:
983       LOG.error("Trying to assign region " + region
984         + ", which is " + state);
985       return null;
986     }
987     return state;
988   }
989 
990   /**
991    * Caller must hold lock on the passed <code>state</code> object.
992    * @param state
993    * @param forceNewPlan
994    */
995   private void assign(RegionState state, boolean forceNewPlan) {
996     long startTime = EnvironmentEdgeManager.currentTime();
997     try {
998       Configuration conf = server.getConfiguration();
999       RegionPlan plan = null;
1000       long maxWaitTime = -1;
1001       HRegionInfo region = state.getRegion();
1002       Throwable previousException = null;
1003       for (int i = 1; i <= maximumAttempts; i++) {
1004         if (server.isStopped() || server.isAborted()) {
1005           LOG.info("Skip assigning " + region.getRegionNameAsString()
1006             + ", the server is stopped/aborted");
1007           return;
1008         }
1009 
1010         if (plan == null) { // Get a server for the region at first
1011           try {
1012             plan = getRegionPlan(region, forceNewPlan);
1013           } catch (HBaseIOException e) {
1014             LOG.warn("Failed to get region plan", e);
1015           }
1016         }
1017 
1018         if (plan == null) {
1019           LOG.warn("Unable to determine a plan to assign " + region);
1020 
1021           // For meta region, we have to keep retrying until succeeding
1022           if (region.isMetaRegion()) {
1023             if (i == maximumAttempts) {
1024               i = 0; // re-set attempt count to 0 for at least 1 retry
1025 
1026               LOG.warn("Unable to determine a plan to assign a hbase:meta region " + region +
1027                 " after maximumAttempts (" + this.maximumAttempts +
1028                 "). Reset attempts count and continue retrying.");
1029             }
1030             waitForRetryingMetaAssignment();
1031             continue;
1032           }
1033 
1034           regionStates.updateRegionState(region, State.FAILED_OPEN);
1035           return;
1036         }
1037         LOG.info("Assigning " + region.getRegionNameAsString() +
1038             " to " + plan.getDestination().toString());
1039         // Transition RegionState to PENDING_OPEN
1040        regionStates.updateRegionState(region,
1041           State.PENDING_OPEN, plan.getDestination());
1042 
1043         boolean needNewPlan = false;
1044         final String assignMsg = "Failed assignment of " + region.getRegionNameAsString() +
1045             " to " + plan.getDestination();
1046         try {
1047           List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
1048           if (this.shouldAssignRegionsWithFavoredNodes) {
1049             favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
1050           }
1051           serverManager.sendRegionOpen(plan.getDestination(), region, favoredNodes);
1052           return; // we're done
1053         } catch (Throwable t) {
1054           if (t instanceof RemoteException) {
1055             t = ((RemoteException) t).unwrapRemoteException();
1056           }
1057           previousException = t;
1058 
1059           // Should we wait a little before retrying? If the server is starting it's yes.
1060           boolean hold = (t instanceof ServerNotRunningYetException);
1061 
1062           // In case socket is timed out and the region server is still online,
1063           // the openRegion RPC could have been accepted by the server and
1064           // just the response didn't go through.  So we will retry to
1065           // open the region on the same server.
1066           boolean retry = !hold && (t instanceof java.net.SocketTimeoutException
1067               && this.serverManager.isServerOnline(plan.getDestination()));
1068 
1069           if (hold) {
1070             LOG.warn(assignMsg + ", waiting a little before trying on the same region server " +
1071               "try=" + i + " of " + this.maximumAttempts, t);
1072 
1073             if (maxWaitTime < 0) {
1074               maxWaitTime = EnvironmentEdgeManager.currentTime()
1075                 + this.server.getConfiguration().getLong(
1076                   "hbase.regionserver.rpc.startup.waittime", 60000);
1077             }
1078             try {
1079               long now = EnvironmentEdgeManager.currentTime();
1080               if (now < maxWaitTime) {
1081                 if (LOG.isDebugEnabled()) {
1082                   LOG.debug("Server is not yet up; waiting up to "
1083                     + (maxWaitTime - now) + "ms", t);
1084                 }
1085                 Thread.sleep(100);
1086                 i--; // reset the try count
1087               } else {
1088                 LOG.debug("Server is not up for a while; try a new one", t);
1089                 needNewPlan = true;
1090               }
1091             } catch (InterruptedException ie) {
1092               LOG.warn("Failed to assign "
1093                   + region.getRegionNameAsString() + " since interrupted", ie);
1094               regionStates.updateRegionState(region, State.FAILED_OPEN);
1095               Thread.currentThread().interrupt();
1096               return;
1097             }
1098           } else if (retry) {
1099             i--; // we want to retry as many times as needed as long as the RS is not dead.
1100             if (LOG.isDebugEnabled()) {
1101               LOG.debug(assignMsg + ", trying to assign to the same region server due ", t);
1102             }
1103           } else {
1104             needNewPlan = true;
1105             LOG.warn(assignMsg + ", trying to assign elsewhere instead;" +
1106                 " try=" + i + " of " + this.maximumAttempts, t);
1107           }
1108         }
1109 
1110         if (i == this.maximumAttempts) {
1111           // For meta region, we have to keep retrying until succeeding
1112           if (region.isMetaRegion()) {
1113             i = 0; // re-set attempt count to 0 for at least 1 retry
1114             LOG.warn(assignMsg +
1115                 ", trying to assign a hbase:meta region reached to maximumAttempts (" +
1116                 this.maximumAttempts + ").  Reset attempt counts and continue retrying.");
1117             waitForRetryingMetaAssignment();
1118           }
1119           else {
1120             // Don't reset the region state or get a new plan any more.
1121             // This is the last try.
1122             continue;
1123           }
1124         }
1125 
1126         // If region opened on destination of present plan, reassigning to new
1127         // RS may cause double assignments. In case of RegionAlreadyInTransitionException
1128         // reassigning to same RS.
1129         if (needNewPlan) {
1130           // Force a new plan and reassign. Will return null if no servers.
1131           // The new plan could be the same as the existing plan since we don't
1132           // exclude the server of the original plan, which should not be
1133           // excluded since it could be the only server up now.
1134           RegionPlan newPlan = null;
1135           try {
1136             newPlan = getRegionPlan(region, true);
1137           } catch (HBaseIOException e) {
1138             LOG.warn("Failed to get region plan", e);
1139           }
1140           if (newPlan == null) {
1141             regionStates.updateRegionState(region, State.FAILED_OPEN);
1142             LOG.warn("Unable to find a viable location to assign region " +
1143                 region.getRegionNameAsString());
1144             return;
1145           }
1146 
1147           if (plan != newPlan && !plan.getDestination().equals(newPlan.getDestination())) {
1148             // Clean out plan we failed execute and one that doesn't look like it'll
1149             // succeed anyways; we need a new plan!
1150             // Transition back to OFFLINE
1151             regionStates.updateRegionState(region, State.OFFLINE);
1152             plan = newPlan;
1153           } else if(plan.getDestination().equals(newPlan.getDestination()) &&
1154               previousException instanceof FailedServerException) {
1155             try {
1156               LOG.info("Trying to re-assign " + region.getRegionNameAsString() +
1157                 " to the same failed server.");
1158               Thread.sleep(1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1159                 RpcClient.FAILED_SERVER_EXPIRY_DEFAULT));
1160             } catch (InterruptedException ie) {
1161               LOG.warn("Failed to assign "
1162                   + region.getRegionNameAsString() + " since interrupted", ie);
1163               regionStates.updateRegionState(region, State.FAILED_OPEN);
1164               Thread.currentThread().interrupt();
1165               return;
1166             }
1167           }
1168         }
1169       }
1170       // Run out of attempts
1171       regionStates.updateRegionState(region, State.FAILED_OPEN);
1172     } finally {
1173       metricsAssignmentManager.updateAssignmentTime(EnvironmentEdgeManager.currentTime() - startTime);
1174     }
1175   }
1176 
1177   private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) {
1178     if (this.tableStateManager.isTableState(region.getTable(),
1179             TableState.State.DISABLED,
1180             TableState.State.DISABLING) || replicasToClose.contains(region)) {
1181       LOG.info("Table " + region.getTable() + " is disabled or disabling;"
1182         + " skipping assign of " + region.getRegionNameAsString());
1183       offlineDisabledRegion(region);
1184       return true;
1185     }
1186     return false;
1187   }
1188 
1189   /**
1190    * @param region the region to assign
1191    * @param forceNewPlan If true, then if an existing plan exists, a new plan
1192    * will be generated.
1193    * @return Plan for passed <code>region</code> (If none currently, it creates one or
1194    * if no servers to assign, it returns null).
1195    */
1196   private RegionPlan getRegionPlan(final HRegionInfo region,
1197       final boolean forceNewPlan) throws HBaseIOException {
1198     // Pickup existing plan or make a new one
1199     final String encodedName = region.getEncodedName();
1200     final List<ServerName> destServers =
1201       serverManager.createDestinationServersList();
1202 
1203     if (destServers.isEmpty()){
1204       LOG.warn("Can't move " + encodedName +
1205         ", there is no destination server available.");
1206       return null;
1207     }
1208 
1209     RegionPlan randomPlan = null;
1210     boolean newPlan = false;
1211     RegionPlan existingPlan;
1212 
1213     synchronized (this.regionPlans) {
1214       existingPlan = this.regionPlans.get(encodedName);
1215 
1216       if (existingPlan != null && existingPlan.getDestination() != null) {
1217         LOG.debug("Found an existing plan for " + region.getRegionNameAsString()
1218           + " destination server is " + existingPlan.getDestination() +
1219             " accepted as a dest server = " + destServers.contains(existingPlan.getDestination()));
1220       }
1221 
1222       if (forceNewPlan
1223           || existingPlan == null
1224           || existingPlan.getDestination() == null
1225           || !destServers.contains(existingPlan.getDestination())) {
1226         newPlan = true;
1227         randomPlan = new RegionPlan(region, null,
1228             balancer.randomAssignment(region, destServers));
1229         if (!region.isMetaTable() && shouldAssignRegionsWithFavoredNodes) {
1230           List<HRegionInfo> regions = new ArrayList<HRegionInfo>(1);
1231           regions.add(region);
1232           try {
1233             processFavoredNodes(regions);
1234           } catch (IOException ie) {
1235             LOG.warn("Ignoring exception in processFavoredNodes " + ie);
1236           }
1237         }
1238         this.regionPlans.put(encodedName, randomPlan);
1239       }
1240     }
1241 
1242     if (newPlan) {
1243       if (randomPlan.getDestination() == null) {
1244         LOG.warn("Can't find a destination for " + encodedName);
1245         return null;
1246       }
1247       if (LOG.isDebugEnabled()) {
1248         LOG.debug("No previous transition plan found (or ignoring " +
1249           "an existing plan) for " + region.getRegionNameAsString() +
1250           "; generated random plan=" + randomPlan + "; " + destServers.size() +
1251           " (online=" + serverManager.getOnlineServers().size() +
1252           ") available servers, forceNewPlan=" + forceNewPlan);
1253       }
1254       return randomPlan;
1255     }
1256     if (LOG.isDebugEnabled()) {
1257       LOG.debug("Using pre-existing plan for " +
1258         region.getRegionNameAsString() + "; plan=" + existingPlan);
1259     }
1260     return existingPlan;
1261   }
1262 
1263   /**
1264    * Wait for some time before retrying meta table region assignment
1265    */
1266   private void waitForRetryingMetaAssignment() {
1267     try {
1268       Thread.sleep(this.sleepTimeBeforeRetryingMetaAssignment);
1269     } catch (InterruptedException e) {
1270       LOG.error("Got exception while waiting for hbase:meta assignment");
1271       Thread.currentThread().interrupt();
1272     }
1273   }
1274 
1275   /**
1276    * Unassigns the specified region.
1277    * <p>
1278    * Updates the RegionState and sends the CLOSE RPC unless region is being
1279    * split by regionserver; then the unassign fails (silently) because we
1280    * presume the region being unassigned no longer exists (its been split out
1281    * of existence). TODO: What to do if split fails and is rolled back and
1282    * parent is revivified?
1283    * <p>
1284    * If a RegionPlan is already set, it will remain.
1285    *
1286    * @param region server to be unassigned
1287    */
1288   public void unassign(HRegionInfo region) {
1289     unassign(region, null);
1290   }
1291 
1292 
1293   /**
1294    * Unassigns the specified region.
1295    * <p>
1296    * Updates the RegionState and sends the CLOSE RPC unless region is being
1297    * split by regionserver; then the unassign fails (silently) because we
1298    * presume the region being unassigned no longer exists (its been split out
1299    * of existence). TODO: What to do if split fails and is rolled back and
1300    * parent is revivified?
1301    * <p>
1302    * If a RegionPlan is already set, it will remain.
1303    *
1304    * @param region server to be unassigned
1305    * @param dest the destination server of the region
1306    */
1307   public void unassign(HRegionInfo region, ServerName dest) {
1308     // TODO: Method needs refactoring.  Ugly buried returns throughout.  Beware!
1309     LOG.debug("Starting unassign of " + region.getRegionNameAsString()
1310       + " (offlining), current state: " + regionStates.getRegionState(region));
1311 
1312     String encodedName = region.getEncodedName();
1313     // Grab the state of this region and synchronize on it
1314     // We need a lock here as we're going to do a put later and we don't want multiple states
1315     //  creation
1316     ReentrantLock lock = locker.acquireLock(encodedName);
1317     RegionState state = regionStates.getRegionTransitionState(encodedName);
1318     try {
1319       if (state == null || state.isFailedClose()) {
1320         if (state == null) {
1321           // Region is not in transition.
1322           // We can unassign it only if it's not SPLIT/MERGED.
1323           state = regionStates.getRegionState(encodedName);
1324           if (state != null && state.isUnassignable()) {
1325             LOG.info("Attempting to unassign " + state + ", ignored");
1326             // Offline region will be reassigned below
1327             return;
1328           }
1329           if (state == null || state.getServerName() == null) {
1330             // We don't know where the region is, offline it.
1331             // No need to send CLOSE RPC
1332             LOG.warn("Attempting to unassign a region not in RegionStates "
1333               + region.getRegionNameAsString() + ", offlined");
1334             regionOffline(region);
1335             return;
1336           }
1337         }
1338         state = regionStates.updateRegionState(
1339           region, State.PENDING_CLOSE);
1340       } else if (state.isFailedOpen()) {
1341         // The region is not open yet
1342         regionOffline(region);
1343         return;
1344       } else {
1345         LOG.debug("Attempting to unassign " +
1346           region.getRegionNameAsString() + " but it is " +
1347           "already in transition (" + state.getState());
1348         return;
1349       }
1350 
1351       unassign(region, state.getServerName(), dest);
1352     } finally {
1353       lock.unlock();
1354 
1355       // Region is expected to be reassigned afterwards
1356       if (!replicasToClose.contains(region)
1357           && regionStates.isRegionInState(region, State.OFFLINE)) {
1358         assign(region);
1359       }
1360     }
1361   }
1362 
1363   /**
1364    * Used by unit tests. Return the number of regions opened so far in the life
1365    * of the master. Increases by one every time the master opens a region
1366    * @return the counter value of the number of regions opened so far
1367    */
1368   public int getNumRegionsOpened() {
1369     return numRegionsOpened.get();
1370   }
1371 
1372   /**
1373    * Waits until the specified region has completed assignment.
1374    * <p>
1375    * If the region is already assigned, returns immediately.  Otherwise, method
1376    * blocks until the region is assigned.
1377    * @param regionInfo region to wait on assignment for
1378    * @return true if the region is assigned false otherwise.
1379    * @throws InterruptedException
1380    */
1381   public boolean waitForAssignment(HRegionInfo regionInfo)
1382       throws InterruptedException {
1383     ArrayList<HRegionInfo> regionSet = new ArrayList<HRegionInfo>(1);
1384     regionSet.add(regionInfo);
1385     return waitForAssignment(regionSet, true, Long.MAX_VALUE);
1386   }
1387 
1388   /**
1389    * Waits until the specified region has completed assignment, or the deadline is reached.
1390    */
1391   protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
1392       final boolean waitTillAllAssigned, final int reassigningRegions,
1393       final long minEndTime) throws InterruptedException {
1394     long deadline = minEndTime + bulkPerRegionOpenTimeGuesstimate * (reassigningRegions + 1);
1395     return waitForAssignment(regionSet, waitTillAllAssigned, deadline);
1396   }
1397 
1398   /**
1399    * Waits until the specified region has completed assignment, or the deadline is reached.
1400    * @param regionSet set of region to wait on. the set is modified and the assigned regions removed
1401    * @param waitTillAllAssigned true if we should wait all the regions to be assigned
1402    * @param deadline the timestamp after which the wait is aborted
1403    * @return true if all the regions are assigned false otherwise.
1404    * @throws InterruptedException
1405    */
1406   protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
1407       final boolean waitTillAllAssigned, final long deadline) throws InterruptedException {
1408     // We're not synchronizing on regionsInTransition now because we don't use any iterator.
1409     while (!regionSet.isEmpty() && !server.isStopped() && deadline > System.currentTimeMillis()) {
1410       int failedOpenCount = 0;
1411       Iterator<HRegionInfo> regionInfoIterator = regionSet.iterator();
1412       while (regionInfoIterator.hasNext()) {
1413         HRegionInfo hri = regionInfoIterator.next();
1414         if (regionStates.isRegionOnline(hri) || regionStates.isRegionInState(hri,
1415             State.SPLITTING, State.SPLIT, State.MERGING, State.MERGED)) {
1416           regionInfoIterator.remove();
1417         } else if (regionStates.isRegionInState(hri, State.FAILED_OPEN)) {
1418           failedOpenCount++;
1419         }
1420       }
1421       if (!waitTillAllAssigned) {
1422         // No need to wait, let assignment going on asynchronously
1423         break;
1424       }
1425       if (!regionSet.isEmpty()) {
1426         if (failedOpenCount == regionSet.size()) {
1427           // all the regions we are waiting had an error on open.
1428           break;
1429         }
1430         regionStates.waitForUpdate(100);
1431       }
1432     }
1433     return regionSet.isEmpty();
1434   }
1435 
1436   /**
1437    * Assigns the hbase:meta region or a replica.
1438    * <p>
1439    * Assumes that hbase:meta is currently closed and is not being actively served by
1440    * any RegionServer.
1441    * @param hri TODO
1442    */
1443   public void assignMeta(HRegionInfo hri) throws KeeperException {
1444     regionStates.updateRegionState(hri, State.OFFLINE);
1445     assign(hri);
1446   }
1447 
1448   /**
1449    * Assigns specified regions retaining assignments, if any.
1450    * <p>
1451    * This is a synchronous call and will return once every region has been
1452    * assigned.  If anything fails, an exception is thrown
1453    * @throws InterruptedException
1454    * @throws IOException
1455    */
1456   public void assign(Map<HRegionInfo, ServerName> regions)
1457         throws IOException, InterruptedException {
1458     if (regions == null || regions.isEmpty()) {
1459       return;
1460     }
1461     List<ServerName> servers = serverManager.createDestinationServersList();
1462     if (servers == null || servers.isEmpty()) {
1463       throw new IOException("Found no destination server to assign region(s)");
1464     }
1465 
1466     // Reuse existing assignment info
1467     Map<ServerName, List<HRegionInfo>> bulkPlan =
1468       balancer.retainAssignment(regions, servers);
1469     if (bulkPlan == null) {
1470       throw new IOException("Unable to determine a plan to assign region(s)");
1471     }
1472 
1473     assign(regions.size(), servers.size(),
1474       "retainAssignment=true", bulkPlan);
1475   }
1476 
1477   /**
1478    * Assigns specified regions round robin, if any.
1479    * <p>
1480    * This is a synchronous call and will return once every region has been
1481    * assigned.  If anything fails, an exception is thrown
1482    * @throws InterruptedException
1483    * @throws IOException
1484    */
1485   public void assign(List<HRegionInfo> regions)
1486         throws IOException, InterruptedException {
1487     if (regions == null || regions.isEmpty()) {
1488       return;
1489     }
1490 
1491     List<ServerName> servers = serverManager.createDestinationServersList();
1492     if (servers == null || servers.isEmpty()) {
1493       throw new IOException("Found no destination server to assign region(s)");
1494     }
1495 
1496     // Generate a round-robin bulk assignment plan
1497     Map<ServerName, List<HRegionInfo>> bulkPlan = balancer.roundRobinAssignment(regions, servers);
1498     if (bulkPlan == null) {
1499       throw new IOException("Unable to determine a plan to assign region(s)");
1500     }
1501 
1502     processFavoredNodes(regions);
1503     assign(regions.size(), servers.size(), "round-robin=true", bulkPlan);
1504   }
1505 
1506   private void assign(int regions, int totalServers,
1507       String message, Map<ServerName, List<HRegionInfo>> bulkPlan)
1508           throws InterruptedException, IOException {
1509 
1510     int servers = bulkPlan.size();
1511     if (servers == 1 || (regions < bulkAssignThresholdRegions
1512         && servers < bulkAssignThresholdServers)) {
1513 
1514       // Not use bulk assignment.  This could be more efficient in small
1515       // cluster, especially mini cluster for testing, so that tests won't time out
1516       if (LOG.isTraceEnabled()) {
1517         LOG.trace("Not using bulk assignment since we are assigning only " + regions +
1518           " region(s) to " + servers + " server(s)");
1519       }
1520 
1521       // invoke assignment (async)
1522       ArrayList<HRegionInfo> userRegionSet = new ArrayList<HRegionInfo>(regions);
1523       for (Map.Entry<ServerName, List<HRegionInfo>> plan: bulkPlan.entrySet()) {
1524         if (!assign(plan.getKey(), plan.getValue()) && !server.isStopped()) {
1525           for (HRegionInfo region: plan.getValue()) {
1526             if (!regionStates.isRegionOnline(region)) {
1527               invokeAssign(region);
1528               if (!region.getTable().isSystemTable()) {
1529                 userRegionSet.add(region);
1530               }
1531             }
1532           }
1533         }
1534       }
1535 
1536       // wait for assignment completion
1537       if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
1538             System.currentTimeMillis())) {
1539         LOG.debug("some user regions are still in transition: " + userRegionSet);
1540       }
1541     } else {
1542       LOG.info("Bulk assigning " + regions + " region(s) across "
1543         + totalServers + " server(s), " + message);
1544 
1545       // Use fixed count thread pool assigning.
1546       BulkAssigner ba = new GeneralBulkAssigner(
1547         this.server, bulkPlan, this, bulkAssignWaitTillAllAssigned);
1548       ba.bulkAssign();
1549       LOG.info("Bulk assigning done");
1550     }
1551   }
1552 
1553   /**
1554    * Assigns all user regions, if any exist.  Used during cluster startup.
1555    * <p>
1556    * This is a synchronous call and will return once every region has been
1557    * assigned.  If anything fails, an exception is thrown and the cluster
1558    * should be shutdown.
1559    * @throws InterruptedException
1560    * @throws IOException
1561    */
1562   private void assignAllUserRegions(Map<HRegionInfo, ServerName> allRegions)
1563       throws IOException, InterruptedException {
1564     if (allRegions == null || allRegions.isEmpty()) return;
1565 
1566     // Determine what type of assignment to do on startup
1567     boolean retainAssignment = server.getConfiguration().
1568       getBoolean("hbase.master.startup.retainassign", true);
1569 
1570     Set<HRegionInfo> regionsFromMetaScan = allRegions.keySet();
1571     if (retainAssignment) {
1572       assign(allRegions);
1573     } else {
1574       List<HRegionInfo> regions = new ArrayList<HRegionInfo>(regionsFromMetaScan);
1575       assign(regions);
1576     }
1577 
1578     for (HRegionInfo hri : regionsFromMetaScan) {
1579       TableName tableName = hri.getTable();
1580       if (!tableStateManager.isTableState(tableName,
1581               TableState.State.ENABLED)) {
1582         setEnabledTable(tableName);
1583       }
1584     }
1585     // assign all the replicas that were not recorded in the meta
1586     assign(replicaRegionsNotRecordedInMeta(regionsFromMetaScan, (MasterServices)server));
1587   }
1588 
1589   /**
1590    * Get a list of replica regions that are:
1591    * not recorded in meta yet. We might not have recorded the locations
1592    * for the replicas since the replicas may not have been online yet, master restarted
1593    * in the middle of assigning, ZK erased, etc.
1594    * @param regionsRecordedInMeta the list of regions we know are recorded in meta
1595    * either as a default, or, as the location of a replica
1596    * @param master
1597    * @return list of replica regions
1598    * @throws IOException
1599    */
1600   public static List<HRegionInfo> replicaRegionsNotRecordedInMeta(
1601       Set<HRegionInfo> regionsRecordedInMeta, MasterServices master)throws IOException {
1602     List<HRegionInfo> regionsNotRecordedInMeta = new ArrayList<HRegionInfo>();
1603     for (HRegionInfo hri : regionsRecordedInMeta) {
1604       TableName table = hri.getTable();
1605       HTableDescriptor htd = master.getTableDescriptors().get(table);
1606       // look at the HTD for the replica count. That's the source of truth
1607       int desiredRegionReplication = htd.getRegionReplication();
1608       for (int i = 0; i < desiredRegionReplication; i++) {
1609         HRegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hri, i);
1610         if (regionsRecordedInMeta.contains(replica)) continue;
1611         regionsNotRecordedInMeta.add(replica);
1612       }
1613     }
1614     return regionsNotRecordedInMeta;
1615   }
1616 
1617   /**
1618    * Rebuild the list of user regions and assignment information.
1619    * Updates regionstates with findings as we go through list of regions.
1620    * @return set of servers not online that hosted some regions according to a scan of hbase:meta
1621    * @throws IOException
1622    */
1623   Set<ServerName> rebuildUserRegions() throws
1624           IOException, KeeperException {
1625     Set<TableName> disabledOrEnablingTables = tableStateManager.getTablesInStates(
1626             TableState.State.DISABLED, TableState.State.ENABLING);
1627 
1628     Set<TableName> disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
1629             TableState.State.DISABLED,
1630             TableState.State.DISABLING,
1631             TableState.State.ENABLING);
1632 
1633     // Region assignment from META
1634     List<Result> results = MetaTableAccessor.fullScanRegions(server.getConnection());
1635     // Get any new but slow to checkin region server that joined the cluster
1636     Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
1637     // Set of offline servers to be returned
1638     Set<ServerName> offlineServers = new HashSet<ServerName>();
1639     // Iterate regions in META
1640     for (Result result : results) {
1641       if (result == null && LOG.isDebugEnabled()){
1642         LOG.debug("null result from meta - ignoring but this is strange.");
1643         continue;
1644       }
1645       // keep a track of replicas to close. These were the replicas of the originally
1646       // unmerged regions. The master might have closed them before but it mightn't
1647       // maybe because it crashed.
1648       PairOfSameType<HRegionInfo> p = MetaTableAccessor.getMergeRegions(result);
1649       if (p.getFirst() != null && p.getSecond() != null) {
1650         int numReplicas = ((MasterServices)server).getTableDescriptors().get(p.getFirst().
1651             getTable()).getRegionReplication();
1652         for (HRegionInfo merge : p) {
1653           for (int i = 1; i < numReplicas; i++) {
1654             replicasToClose.add(RegionReplicaUtil.getRegionInfoForReplica(merge, i));
1655           }
1656         }
1657       }
1658       RegionLocations rl =  MetaTableAccessor.getRegionLocations(result);
1659       if (rl == null) {
1660         continue;
1661       }
1662       HRegionLocation[] locations = rl.getRegionLocations();
1663       if (locations == null) {
1664         continue;
1665       }
1666       for (HRegionLocation hrl : locations) {
1667         if (hrl == null) continue;
1668         HRegionInfo regionInfo = hrl.getRegionInfo();
1669         if (regionInfo == null) continue;
1670         int replicaId = regionInfo.getReplicaId();
1671         State state = RegionStateStore.getRegionState(result, replicaId);
1672         // keep a track of replicas to close. These were the replicas of the split parents
1673         // from the previous life of the master. The master should have closed them before
1674         // but it couldn't maybe because it crashed
1675         if (replicaId == 0 && state.equals(State.SPLIT)) {
1676           for (HRegionLocation h : locations) {
1677             replicasToClose.add(h.getRegionInfo());
1678           }
1679         }
1680         ServerName lastHost = hrl.getServerName();
1681         ServerName regionLocation = RegionStateStore.getRegionServer(result, replicaId);
1682         regionStates.createRegionState(regionInfo, state, regionLocation, lastHost);
1683         if (!regionStates.isRegionInState(regionInfo, State.OPEN)) {
1684           // Region is not open (either offline or in transition), skip
1685           continue;
1686         }
1687         TableName tableName = regionInfo.getTable();
1688         if (!onlineServers.contains(regionLocation)) {
1689           // Region is located on a server that isn't online
1690           offlineServers.add(regionLocation);
1691         } else if (!disabledOrEnablingTables.contains(tableName)) {
1692           // Region is being served and on an active server
1693           // add only if region not in disabled or enabling table
1694           regionStates.regionOnline(regionInfo, regionLocation);
1695           balancer.regionOnline(regionInfo, regionLocation);
1696         }
1697         // need to enable the table if not disabled or disabling or enabling
1698         // this will be used in rolling restarts
1699         if (!disabledOrDisablingOrEnabling.contains(tableName)
1700           && !getTableStateManager().isTableState(tableName,
1701                 TableState.State.ENABLED)) {
1702           setEnabledTable(tableName);
1703         }
1704       }
1705     }
1706     return offlineServers;
1707   }
1708 
1709   /**
1710    * Recover the tables that were not fully moved to DISABLED state. These
1711    * tables are in DISABLING state when the master restarted/switched.
1712    *
1713    * @throws KeeperException
1714    * @throws TableNotFoundException
1715    * @throws IOException
1716    */
1717   private void recoverTableInDisablingState()
1718           throws KeeperException, IOException {
1719     Set<TableName> disablingTables =
1720             tableStateManager.getTablesInStates(TableState.State.DISABLING);
1721     if (disablingTables.size() != 0) {
1722       for (TableName tableName : disablingTables) {
1723         // Recover by calling DisableTableHandler
1724         LOG.info("The table " + tableName
1725             + " is in DISABLING state.  Hence recovering by moving the table"
1726             + " to DISABLED state.");
1727         new DisableTableHandler(this.server, tableName,
1728             this, tableLockManager, true).prepare().process();
1729       }
1730     }
1731   }
1732 
1733   /**
1734    * Recover the tables that are not fully moved to ENABLED state. These tables
1735    * are in ENABLING state when the master restarted/switched
1736    *
1737    * @throws KeeperException
1738    * @throws org.apache.hadoop.hbase.TableNotFoundException
1739    * @throws IOException
1740    */
1741   private void recoverTableInEnablingState()
1742           throws KeeperException, IOException {
1743     Set<TableName> enablingTables = tableStateManager.
1744             getTablesInStates(TableState.State.ENABLING);
1745     if (enablingTables.size() != 0) {
1746       for (TableName tableName : enablingTables) {
1747         // Recover by calling EnableTableHandler
1748         LOG.info("The table " + tableName
1749             + " is in ENABLING state.  Hence recovering by moving the table"
1750             + " to ENABLED state.");
1751         // enableTable in sync way during master startup,
1752         // no need to invoke coprocessor
1753         EnableTableHandler eth = new EnableTableHandler(this.server, tableName,
1754           this, tableLockManager, true);
1755         try {
1756           eth.prepare();
1757         } catch (TableNotFoundException e) {
1758           LOG.warn("Table " + tableName + " not found in hbase:meta to recover.");
1759           continue;
1760         }
1761         eth.process();
1762       }
1763     }
1764   }
1765 
1766   /**
1767    * Processes list of regions in transition at startup
1768    */
1769   void processRegionsInTransition(Collection<RegionState> regionsInTransition) {
1770     // We need to send RPC call again for PENDING_OPEN/PENDING_CLOSE regions
1771     // in case the RPC call is not sent out yet before the master was shut down
1772     // since we update the state before we send the RPC call. We can't update
1773     // the state after the RPC call. Otherwise, we don't know what's happened
1774     // to the region if the master dies right after the RPC call is out.
1775     for (RegionState regionState: regionsInTransition) {
1776       LOG.info("Processing " + regionState);
1777       ServerName serverName = regionState.getServerName();
1778       // Server could be null in case of FAILED_OPEN when master cannot find a region plan. In that
1779       // case, try assigning it here.
1780       if (serverName != null && !serverManager.getOnlineServers().containsKey(serverName)) {
1781         LOG.info("Server " + serverName + " isn't online. SSH will handle this");
1782         continue; // SSH will handle it
1783       }
1784       HRegionInfo regionInfo = regionState.getRegion();
1785       RegionState.State state = regionState.getState();
1786       switch (state) {
1787       case CLOSED:
1788         invokeAssign(regionState.getRegion());
1789         break;
1790       case PENDING_OPEN:
1791         retrySendRegionOpen(regionState);
1792         break;
1793       case PENDING_CLOSE:
1794         retrySendRegionClose(regionState);
1795         break;
1796       case FAILED_CLOSE:
1797       case FAILED_OPEN:
1798         invokeUnAssign(regionInfo);
1799         break;
1800       default:
1801           // No process for other states
1802           break;
1803       }
1804     }
1805   }
1806 
1807   /**
1808    * At master failover, for pending_open region, make sure
1809    * sendRegionOpen RPC call is sent to the target regionserver
1810    */
1811   private void retrySendRegionOpen(final RegionState regionState) {
1812     this.executorService.submit(
1813       new EventHandler(server, EventType.M_MASTER_RECOVERY) {
1814         @Override
1815         public void process() throws IOException {
1816           HRegionInfo hri = regionState.getRegion();
1817           ServerName serverName = regionState.getServerName();
1818           ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
1819           try {
1820             for (int i = 1; i <= maximumAttempts; i++) {
1821               if (!serverManager.isServerOnline(serverName)
1822                   || server.isStopped() || server.isAborted()) {
1823                 return; // No need any more
1824               }
1825               try {
1826                 if (!regionState.equals(regionStates.getRegionState(hri))) {
1827                   return; // Region is not in the expected state any more
1828                 }
1829                 List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
1830                 if (shouldAssignRegionsWithFavoredNodes) {
1831                   favoredNodes = ((FavoredNodeLoadBalancer)balancer).getFavoredNodes(hri);
1832                 }
1833                 serverManager.sendRegionOpen(serverName, hri, favoredNodes);
1834                 return; // we're done
1835               } catch (Throwable t) {
1836                 if (t instanceof RemoteException) {
1837                   t = ((RemoteException) t).unwrapRemoteException();
1838                 }
1839                 if (t instanceof FailedServerException && i < maximumAttempts) {
1840                   // In case the server is in the failed server list, no point to
1841                   // retry too soon. Retry after the failed_server_expiry time
1842                   try {
1843                     Configuration conf = this.server.getConfiguration();
1844                     long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1845                       RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
1846                     if (LOG.isDebugEnabled()) {
1847                       LOG.debug(serverName + " is on failed server list; waiting "
1848                         + sleepTime + "ms", t);
1849                     }
1850                     Thread.sleep(sleepTime);
1851                     continue;
1852                   } catch (InterruptedException ie) {
1853                     LOG.warn("Failed to assign "
1854                       + hri.getRegionNameAsString() + " since interrupted", ie);
1855                     regionStates.updateRegionState(hri, State.FAILED_OPEN);
1856                     Thread.currentThread().interrupt();
1857                     return;
1858                   }
1859                 }
1860                 if (serverManager.isServerOnline(serverName)
1861                     && t instanceof java.net.SocketTimeoutException) {
1862                   i--; // reset the try count
1863                 } else {
1864                   LOG.info("Got exception in retrying sendRegionOpen for "
1865                     + regionState + "; try=" + i + " of " + maximumAttempts, t);
1866                 }
1867                 Threads.sleep(100);
1868               }
1869             }
1870             // Run out of attempts
1871             regionStates.updateRegionState(hri, State.FAILED_OPEN);
1872           } finally {
1873             lock.unlock();
1874           }
1875         }
1876       });
1877   }
1878 
1879   /**
1880    * At master failover, for pending_close region, make sure
1881    * sendRegionClose RPC call is sent to the target regionserver
1882    */
1883   private void retrySendRegionClose(final RegionState regionState) {
1884     this.executorService.submit(
1885       new EventHandler(server, EventType.M_MASTER_RECOVERY) {
1886         @Override
1887         public void process() throws IOException {
1888           HRegionInfo hri = regionState.getRegion();
1889           ServerName serverName = regionState.getServerName();
1890           ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
1891           try {
1892             for (int i = 1; i <= maximumAttempts; i++) {
1893               if (!serverManager.isServerOnline(serverName)
1894                   || server.isStopped() || server.isAborted()) {
1895                 return; // No need any more
1896               }
1897               try {
1898                 if (!regionState.equals(regionStates.getRegionState(hri))) {
1899                   return; // Region is not in the expected state any more
1900                 }
1901                 serverManager.sendRegionClose(serverName, hri, null);
1902                 return; // Done.
1903               } catch (Throwable t) {
1904                 if (t instanceof RemoteException) {
1905                   t = ((RemoteException) t).unwrapRemoteException();
1906                 }
1907                 if (t instanceof FailedServerException && i < maximumAttempts) {
1908                   // In case the server is in the failed server list, no point to
1909                   // retry too soon. Retry after the failed_server_expiry time
1910                   try {
1911                     Configuration conf = this.server.getConfiguration();
1912                     long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1913                       RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
1914                     if (LOG.isDebugEnabled()) {
1915                       LOG.debug(serverName + " is on failed server list; waiting "
1916                         + sleepTime + "ms", t);
1917                     }
1918                     Thread.sleep(sleepTime);
1919                     continue;
1920                   } catch (InterruptedException ie) {
1921                     LOG.warn("Failed to unassign "
1922                       + hri.getRegionNameAsString() + " since interrupted", ie);
1923                     regionStates.updateRegionState(hri, RegionState.State.FAILED_CLOSE);
1924                     Thread.currentThread().interrupt();
1925                     return;
1926                   }
1927                 }
1928                 if (serverManager.isServerOnline(serverName)
1929                     && t instanceof java.net.SocketTimeoutException) {
1930                   i--; // reset the try count
1931                 } else {
1932                   LOG.info("Got exception in retrying sendRegionClose for "
1933                     + regionState + "; try=" + i + " of " + maximumAttempts, t);
1934                 }
1935                 Threads.sleep(100);
1936               }
1937             }
1938             // Run out of attempts
1939             regionStates.updateRegionState(hri, State.FAILED_CLOSE);
1940           } finally {
1941             lock.unlock();
1942           }
1943         }
1944       });
1945   }
1946 
1947   /**
1948    * Set Regions in transitions metrics.
1949    * This takes an iterator on the RegionInTransition map (CLSM), and is not synchronized.
1950    * This iterator is not fail fast, which may lead to stale read; but that's better than
1951    * creating a copy of the map for metrics computation, as this method will be invoked
1952    * on a frequent interval.
1953    */
1954   public void updateRegionsInTransitionMetrics() {
1955     long currentTime = System.currentTimeMillis();
1956     int totalRITs = 0;
1957     int totalRITsOverThreshold = 0;
1958     long oldestRITTime = 0;
1959     int ritThreshold = this.server.getConfiguration().
1960       getInt(HConstants.METRICS_RIT_STUCK_WARNING_THRESHOLD, 60000);
1961     for (RegionState state: regionStates.getRegionsInTransition().values()) {
1962       totalRITs++;
1963       long ritTime = currentTime - state.getStamp();
1964       if (ritTime > ritThreshold) { // more than the threshold
1965         totalRITsOverThreshold++;
1966       }
1967       if (oldestRITTime < ritTime) {
1968         oldestRITTime = ritTime;
1969       }
1970     }
1971     if (this.metricsAssignmentManager != null) {
1972       this.metricsAssignmentManager.updateRITOldestAge(oldestRITTime);
1973       this.metricsAssignmentManager.updateRITCount(totalRITs);
1974       this.metricsAssignmentManager.updateRITCountOverThreshold(totalRITsOverThreshold);
1975     }
1976   }
1977 
1978   /**
1979    * @param region Region whose plan we are to clear.
1980    */
1981   private void clearRegionPlan(final HRegionInfo region) {
1982     synchronized (this.regionPlans) {
1983       this.regionPlans.remove(region.getEncodedName());
1984     }
1985   }
1986 
1987   /**
1988    * Wait on region to clear regions-in-transition.
1989    * @param hri Region to wait on.
1990    * @throws IOException
1991    */
1992   public void waitOnRegionToClearRegionsInTransition(final HRegionInfo hri)
1993       throws IOException, InterruptedException {
1994     waitOnRegionToClearRegionsInTransition(hri, -1L);
1995   }
1996 
1997   /**
1998    * Wait on region to clear regions-in-transition or time out
1999    * @param hri
2000    * @param timeOut Milliseconds to wait for current region to be out of transition state.
2001    * @return True when a region clears regions-in-transition before timeout otherwise false
2002    * @throws InterruptedException
2003    */
2004   public boolean waitOnRegionToClearRegionsInTransition(final HRegionInfo hri, long timeOut)
2005       throws InterruptedException {
2006     if (!regionStates.isRegionInTransition(hri)) {
2007       return true;
2008     }
2009     long end = (timeOut <= 0) ? Long.MAX_VALUE : EnvironmentEdgeManager.currentTime()
2010         + timeOut;
2011     // There is already a timeout monitor on regions in transition so I
2012     // should not have to have one here too?
2013     LOG.info("Waiting for " + hri.getEncodedName() +
2014         " to leave regions-in-transition, timeOut=" + timeOut + " ms.");
2015     while (!this.server.isStopped() && regionStates.isRegionInTransition(hri)) {
2016       regionStates.waitForUpdate(100);
2017       if (EnvironmentEdgeManager.currentTime() > end) {
2018         LOG.info("Timed out on waiting for " + hri.getEncodedName() + " to be assigned.");
2019         return false;
2020       }
2021     }
2022     if (this.server.isStopped()) {
2023       LOG.info("Giving up wait on regions in transition because stoppable.isStopped is set");
2024       return false;
2025     }
2026     return true;
2027   }
2028 
2029   void invokeAssign(HRegionInfo regionInfo) {
2030     threadPoolExecutorService.submit(new AssignCallable(this, regionInfo));
2031   }
2032 
2033   void invokeUnAssign(HRegionInfo regionInfo) {
2034     threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
2035   }
2036 
2037   public boolean isCarryingMeta(ServerName serverName) {
2038     return isCarryingRegion(serverName, HRegionInfo.FIRST_META_REGIONINFO);
2039   }
2040 
2041   public boolean isCarryingMetaReplica(ServerName serverName, int replicaId) {
2042     return isCarryingRegion(serverName,
2043         RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO, replicaId));
2044   }
2045 
2046   public boolean isCarryingMetaReplica(ServerName serverName, HRegionInfo metaHri) {
2047     return isCarryingRegion(serverName, metaHri);
2048   }
2049 
2050   /**
2051    * Check if the shutdown server carries the specific region.
2052    * @return whether the serverName currently hosts the region
2053    */
2054   private boolean isCarryingRegion(ServerName serverName, HRegionInfo hri) {
2055     RegionState regionState = regionStates.getRegionTransitionState(hri);
2056     ServerName transitionAddr = regionState != null? regionState.getServerName(): null;
2057     if (transitionAddr != null) {
2058       boolean matchTransitionAddr = transitionAddr.equals(serverName);
2059       LOG.debug("Checking region=" + hri.getRegionNameAsString()
2060         + ", transitioning on server=" + matchTransitionAddr
2061         + " server being checked: " + serverName
2062         + ", matches=" + matchTransitionAddr);
2063       return matchTransitionAddr;
2064     }
2065 
2066     ServerName assignedAddr = regionStates.getRegionServerOfRegion(hri);
2067     boolean matchAssignedAddr = serverName.equals(assignedAddr);
2068     LOG.debug("based on AM, current region=" + hri.getRegionNameAsString()
2069       + " is on server=" + assignedAddr + ", server being checked: "
2070       + serverName);
2071     return matchAssignedAddr;
2072   }
2073 
2074   /**
2075    * Clean out crashed server removing any assignments.
2076    * @param sn Server that went down.
2077    * @return list of regions in transition on this server
2078    */
2079   public List<HRegionInfo> cleanOutCrashedServerReferences(final ServerName sn) {
2080     // Clean out any existing assignment plans for this server
2081     synchronized (this.regionPlans) {
2082       for (Iterator <Map.Entry<String, RegionPlan>> i = this.regionPlans.entrySet().iterator();
2083           i.hasNext();) {
2084         Map.Entry<String, RegionPlan> e = i.next();
2085         ServerName otherSn = e.getValue().getDestination();
2086         // The name will be null if the region is planned for a random assign.
2087         if (otherSn != null && otherSn.equals(sn)) {
2088           // Use iterator's remove else we'll get CME
2089           i.remove();
2090         }
2091       }
2092     }
2093     List<HRegionInfo> rits = regionStates.serverOffline(sn);
2094     for (Iterator<HRegionInfo> it = rits.iterator(); it.hasNext(); ) {
2095       HRegionInfo hri = it.next();
2096       String encodedName = hri.getEncodedName();
2097 
2098       // We need a lock on the region as we could update it
2099       Lock lock = locker.acquireLock(encodedName);
2100       try {
2101         RegionState regionState = regionStates.getRegionTransitionState(encodedName);
2102         if (regionState == null
2103             || (regionState.getServerName() != null && !regionState.isOnServer(sn))
2104             || !RegionStates.isOneOfStates(regionState, State.PENDING_OPEN,
2105                 State.OPENING, State.FAILED_OPEN, State.FAILED_CLOSE, State.OFFLINE)) {
2106           LOG.info("Skip " + regionState + " since it is not opening/failed_close"
2107             + " on the dead server any more: " + sn);
2108           it.remove();
2109         } else {
2110           if (tableStateManager.isTableState(hri.getTable(),
2111                   TableState.State.DISABLED, TableState.State.DISABLING)) {
2112             regionStates.regionOffline(hri);
2113             it.remove();
2114             continue;
2115           }
2116           // Mark the region offline and assign it again by SSH
2117           regionStates.updateRegionState(hri, State.OFFLINE);
2118         }
2119       } finally {
2120         lock.unlock();
2121       }
2122     }
2123     return rits;
2124   }
2125 
2126   /**
2127    * @param plan Plan to execute.
2128    */
2129   public void balance(final RegionPlan plan) {
2130 
2131     HRegionInfo hri = plan.getRegionInfo();
2132     TableName tableName = hri.getTable();
2133     if (tableStateManager.isTableState(tableName,
2134             TableState.State.DISABLED, TableState.State.DISABLING)) {
2135       LOG.info("Ignored moving region of disabling/disabled table "
2136         + tableName);
2137       return;
2138     }
2139 
2140     // Move the region only if it's assigned
2141     String encodedName = hri.getEncodedName();
2142     ReentrantLock lock = locker.acquireLock(encodedName);
2143     try {
2144       if (!regionStates.isRegionOnline(hri)) {
2145         RegionState state = regionStates.getRegionState(encodedName);
2146         LOG.info("Ignored moving region not assigned: " + hri + ", "
2147           + (state == null ? "not in region states" : state));
2148         return;
2149       }
2150       synchronized (this.regionPlans) {
2151         this.regionPlans.put(plan.getRegionName(), plan);
2152       }
2153       unassign(hri, plan.getDestination());
2154     } finally {
2155       lock.unlock();
2156     }
2157   }
2158 
2159   public void stop() {
2160     // Shutdown the threadpool executor service
2161     threadPoolExecutorService.shutdownNow();
2162     regionStateStore.stop();
2163   }
2164 
2165   protected void setEnabledTable(TableName tableName) {
2166     try {
2167       this.tableStateManager.setTableState(tableName,
2168               TableState.State.ENABLED);
2169     } catch (IOException e) {
2170       // here we can abort as it is the start up flow
2171       String errorMsg = "Unable to ensure that the table " + tableName
2172           + " will be" + " enabled because of a ZooKeeper issue";
2173       LOG.error(errorMsg);
2174       this.server.abort(errorMsg, e);
2175     }
2176   }
2177 
2178   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
2179       value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
2180       justification="Worth fixing but not the end of the world.")
2181   private String onRegionFailedOpen(final RegionState current,
2182       final HRegionInfo hri, final ServerName serverName) {
2183     // The region must be opening on this server.
2184     // If current state is failed_open on the same server,
2185     // it could be a reportRegionTransition RPC retry.
2186     if (current == null || !current.isOpeningOrFailedOpenOnServer(serverName)) {
2187       return hri.getShortNameToLog() + " is not opening on " + serverName;
2188     }
2189 
2190     // Just return in case of retrying
2191     if (current.isFailedOpen()) {
2192       return null;
2193     }
2194 
2195     String encodedName = hri.getEncodedName();
2196     // FindBugs: AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION Worth fixing!!!
2197     AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
2198     if (failedOpenCount == null) {
2199       failedOpenCount = new AtomicInteger();
2200       // No need to use putIfAbsent, or extra synchronization since
2201       // this whole handleRegion block is locked on the encoded region
2202       // name, and failedOpenTracker is updated only in this block
2203       failedOpenTracker.put(encodedName, failedOpenCount);
2204     }
2205     if (failedOpenCount.incrementAndGet() >= maximumAttempts && !hri.isMetaRegion()) {
2206       regionStates.updateRegionState(hri, State.FAILED_OPEN);
2207       // remove the tracking info to save memory, also reset
2208       // the count for next open initiative
2209       failedOpenTracker.remove(encodedName);
2210     } else {
2211       if (hri.isMetaRegion() && failedOpenCount.get() >= maximumAttempts) {
2212         // Log a warning message if a meta region failedOpenCount exceeds maximumAttempts
2213         // so that we are aware of potential problem if it persists for a long time.
2214         LOG.warn("Failed to open the hbase:meta region " +
2215             hri.getRegionNameAsString() + " after" +
2216             failedOpenCount.get() + " retries. Continue retrying.");
2217       }
2218 
2219       // Handle this the same as if it were opened and then closed.
2220       RegionState regionState = regionStates.updateRegionState(hri, State.CLOSED);
2221       if (regionState != null) {
2222         // When there are more than one region server a new RS is selected as the
2223         // destination and the same is updated in the region plan. (HBASE-5546)
2224         if (getTableStateManager().isTableState(hri.getTable(),
2225                 TableState.State.DISABLED, TableState.State.DISABLING) ||
2226                 replicasToClose.contains(hri)) {
2227           offlineDisabledRegion(hri);
2228           return null;
2229         }
2230         regionStates.updateRegionState(hri, RegionState.State.CLOSED);
2231         // This below has to do w/ online enable/disable of a table
2232         removeClosedRegion(hri);
2233         try {
2234           getRegionPlan(hri, true);
2235         } catch (HBaseIOException e) {
2236           LOG.warn("Failed to get region plan", e);
2237         }
2238         invokeAssign(hri);
2239       }
2240     }
2241     // Null means no error
2242     return null;
2243   }
2244 
2245   private String onRegionOpen(final RegionState current, final HRegionInfo hri,
2246       final ServerName serverName, final RegionStateTransition transition) {
2247     // The region must be opening on this server.
2248     // If current state is already opened on the same server,
2249     // it could be a reportRegionTransition RPC retry.
2250     if (current == null || !current.isOpeningOrOpenedOnServer(serverName)) {
2251       return hri.getShortNameToLog() + " is not opening on " + serverName;
2252     }
2253 
2254     // Just return in case of retrying
2255     if (current.isOpened()) {
2256       return null;
2257     }
2258 
2259     long openSeqNum = transition.hasOpenSeqNum()
2260       ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM;
2261     if (openSeqNum < 0) {
2262       return "Newly opened region has invalid open seq num " + openSeqNum;
2263     }
2264     regionOnline(hri, serverName, openSeqNum);
2265 
2266     // reset the count, if any
2267     failedOpenTracker.remove(hri.getEncodedName());
2268     if (getTableStateManager().isTableState(hri.getTable(),
2269             TableState.State.DISABLED, TableState.State.DISABLING)) {
2270       invokeUnAssign(hri);
2271     }
2272     return null;
2273   }
2274 
2275   private String onRegionClosed(final RegionState current,
2276       final HRegionInfo hri, final ServerName serverName) {
2277     // Region will be usually assigned right after closed. When a RPC retry comes
2278     // in, the region may already have moved away from closed state. However, on the
2279     // region server side, we don't care much about the response for this transition.
2280     // We only make sure master has got and processed this report, either
2281     // successfully or not. So this is fine, not a problem at all.
2282     if (current == null || !current.isClosingOrClosedOnServer(serverName)) {
2283       return hri.getShortNameToLog() + " is not closing on " + serverName;
2284     }
2285 
2286     // Just return in case of retrying
2287     if (current.isClosed()) {
2288       return null;
2289     }
2290 
2291     if (getTableStateManager().isTableState(hri.getTable(), TableState.State.DISABLED,
2292         TableState.State.DISABLING) || replicasToClose.contains(hri)) {
2293       offlineDisabledRegion(hri);
2294       return null;
2295     }
2296 
2297     regionStates.updateRegionState(hri, RegionState.State.CLOSED);
2298     sendRegionClosedNotification(hri);
2299     // This below has to do w/ online enable/disable of a table
2300     removeClosedRegion(hri);
2301     invokeAssign(hri);
2302     return null;
2303   }
2304 
2305   private String onRegionReadyToSplit(final RegionState current, final HRegionInfo hri,
2306       final ServerName serverName, final RegionStateTransition transition) {
2307     // The region must be opened on this server.
2308     // If current state is already splitting on the same server,
2309     // it could be a reportRegionTransition RPC retry.
2310     if (current == null || !current.isSplittingOrOpenedOnServer(serverName)) {
2311       return hri.getShortNameToLog() + " is not opening on " + serverName;
2312     }
2313 
2314     // Just return in case of retrying
2315     if (current.isSplitting()) {
2316       return null;
2317     }
2318 
2319     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2320     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2321     RegionState rs_a = regionStates.getRegionState(a);
2322     RegionState rs_b = regionStates.getRegionState(b);
2323     if (rs_a != null || rs_b != null) {
2324       return "Some daughter is already existing. "
2325         + "a=" + rs_a + ", b=" + rs_b;
2326     }
2327 
2328     // Server holding is not updated at this stage.
2329     // It is done after PONR.
2330     regionStates.updateRegionState(hri, State.SPLITTING);
2331     regionStates.createRegionState(
2332       a, State.SPLITTING_NEW, serverName, null);
2333     regionStates.createRegionState(
2334       b, State.SPLITTING_NEW, serverName, null);
2335     return null;
2336   }
2337 
2338   private String onRegionSplitPONR(final RegionState current, final HRegionInfo hri,
2339       final ServerName serverName, final RegionStateTransition transition) {
2340     // The region must be splitting on this server, and the daughters must be in
2341     // splitting_new state. To check RPC retry, we use server holding info.
2342     if (current == null || !current.isSplittingOnServer(serverName)) {
2343       return hri.getShortNameToLog() + " is not splitting on " + serverName;
2344     }
2345 
2346     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2347     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2348     RegionState rs_a = regionStates.getRegionState(a);
2349     RegionState rs_b = regionStates.getRegionState(b);
2350 
2351     // Master could have restarted and lost the new region
2352     // states, if so, they must be lost together
2353     if (rs_a == null && rs_b == null) {
2354       rs_a = regionStates.createRegionState(
2355         a, State.SPLITTING_NEW, serverName, null);
2356       rs_b = regionStates.createRegionState(
2357         b, State.SPLITTING_NEW, serverName, null);
2358     }
2359 
2360     if (rs_a == null || !rs_a.isSplittingNewOnServer(serverName)
2361         || rs_b == null || !rs_b.isSplittingNewOnServer(serverName)) {
2362       return "Some daughter is not known to be splitting on " + serverName
2363         + ", a=" + rs_a + ", b=" + rs_b;
2364     }
2365 
2366     // Just return in case of retrying
2367     if (!regionStates.isRegionOnServer(hri, serverName)) {
2368       return null;
2369     }
2370 
2371     try {
2372       regionStates.splitRegion(hri, a, b, serverName);
2373     } catch (IOException ioe) {
2374       LOG.info("Failed to record split region " + hri.getShortNameToLog());
2375       return "Failed to record the splitting in meta";
2376     }
2377     return null;
2378   }
2379 
2380   private String onRegionSplit(final RegionState current, final HRegionInfo hri,
2381       final ServerName serverName, final RegionStateTransition transition) {
2382     // The region must be splitting on this server, and the daughters must be in
2383     // splitting_new state.
2384     // If current state is already split on the same server,
2385     // it could be a reportRegionTransition RPC retry.
2386     if (current == null || !current.isSplittingOrSplitOnServer(serverName)) {
2387       return hri.getShortNameToLog() + " is not splitting on " + serverName;
2388     }
2389 
2390     // Just return in case of retrying
2391     if (current.isSplit()) {
2392       return null;
2393     }
2394 
2395     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2396     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2397     RegionState rs_a = regionStates.getRegionState(a);
2398     RegionState rs_b = regionStates.getRegionState(b);
2399     if (rs_a == null || !rs_a.isSplittingNewOnServer(serverName)
2400         || rs_b == null || !rs_b.isSplittingNewOnServer(serverName)) {
2401       return "Some daughter is not known to be splitting on " + serverName
2402         + ", a=" + rs_a + ", b=" + rs_b;
2403     }
2404 
2405     if (TEST_SKIP_SPLIT_HANDLING) {
2406       return "Skipping split message, TEST_SKIP_SPLIT_HANDLING is set";
2407     }
2408     regionOffline(hri, State.SPLIT);
2409     regionOnline(a, serverName, 1);
2410     regionOnline(b, serverName, 1);
2411 
2412     // User could disable the table before master knows the new region.
2413     if (getTableStateManager().isTableState(hri.getTable(),
2414         TableState.State.DISABLED, TableState.State.DISABLING)) {
2415       invokeUnAssign(a);
2416       invokeUnAssign(b);
2417     } else {
2418       Callable<Object> splitReplicasCallable = new Callable<Object>() {
2419         @Override
2420         public Object call() {
2421           doSplittingOfReplicas(hri, a, b);
2422           return null;
2423         }
2424       };
2425       threadPoolExecutorService.submit(splitReplicasCallable);
2426     }
2427     return null;
2428   }
2429 
2430   private String onRegionSplitReverted(final RegionState current, final HRegionInfo hri,
2431       final ServerName serverName, final RegionStateTransition transition) {
2432     // The region must be splitting on this server, and the daughters must be in
2433     // splitting_new state.
2434     // If the region is in open state, it could be an RPC retry.
2435     if (current == null || !current.isSplittingOrOpenedOnServer(serverName)) {
2436       return hri.getShortNameToLog() + " is not splitting on " + serverName;
2437     }
2438 
2439     // Just return in case of retrying
2440     if (current.isOpened()) {
2441       return null;
2442     }
2443 
2444     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2445     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2446     RegionState rs_a = regionStates.getRegionState(a);
2447     RegionState rs_b = regionStates.getRegionState(b);
2448     if (rs_a == null || !rs_a.isSplittingNewOnServer(serverName)
2449         || rs_b == null || !rs_b.isSplittingNewOnServer(serverName)) {
2450       return "Some daughter is not known to be splitting on " + serverName
2451         + ", a=" + rs_a + ", b=" + rs_b;
2452     }
2453 
2454     regionOnline(hri, serverName);
2455     regionOffline(a);
2456     regionOffline(b);
2457     if (getTableStateManager().isTableState(hri.getTable(),
2458         TableState.State.DISABLED, TableState.State.DISABLING)) {
2459       invokeUnAssign(hri);
2460     }
2461     return null;
2462   }
2463 
2464   private String onRegionReadyToMerge(final RegionState current, final HRegionInfo hri,
2465       final ServerName serverName, final RegionStateTransition transition) {
2466     // The region must be new, and the daughters must be open on this server.
2467     // If the region is in merge_new state, it could be an RPC retry.
2468     if (current != null && !current.isMergingNewOnServer(serverName)) {
2469       return "Merging daughter region already exists, p=" + current;
2470     }
2471 
2472     // Just return in case of retrying
2473     if (current != null) {
2474       return null;
2475     }
2476 
2477     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2478     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2479     Set<String> encodedNames = new HashSet<String>(2);
2480     encodedNames.add(a.getEncodedName());
2481     encodedNames.add(b.getEncodedName());
2482     Map<String, Lock> locks = locker.acquireLocks(encodedNames);
2483     try {
2484       RegionState rs_a = regionStates.getRegionState(a);
2485       RegionState rs_b = regionStates.getRegionState(b);
2486       if (rs_a == null || !rs_a.isOpenedOnServer(serverName)
2487           || rs_b == null || !rs_b.isOpenedOnServer(serverName)) {
2488         return "Some daughter is not in a state to merge on " + serverName
2489           + ", a=" + rs_a + ", b=" + rs_b;
2490       }
2491 
2492       regionStates.updateRegionState(a, State.MERGING);
2493       regionStates.updateRegionState(b, State.MERGING);
2494       regionStates.createRegionState(
2495         hri, State.MERGING_NEW, serverName, null);
2496       return null;
2497     } finally {
2498       for (Lock lock: locks.values()) {
2499         lock.unlock();
2500       }
2501     }
2502   }
2503 
2504   private String onRegionMergePONR(final RegionState current, final HRegionInfo hri,
2505       final ServerName serverName, final RegionStateTransition transition) {
2506     // The region must be in merging_new state, and the daughters must be
2507     // merging. To check RPC retry, we use server holding info.
2508     if (current != null && !current.isMergingNewOnServer(serverName)) {
2509       return hri.getShortNameToLog() + " is not merging on " + serverName;
2510     }
2511 
2512     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2513     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2514     RegionState rs_a = regionStates.getRegionState(a);
2515     RegionState rs_b = regionStates.getRegionState(b);
2516     if (rs_a == null || !rs_a.isMergingOnServer(serverName)
2517         || rs_b == null || !rs_b.isMergingOnServer(serverName)) {
2518       return "Some daughter is not known to be merging on " + serverName
2519         + ", a=" + rs_a + ", b=" + rs_b;
2520     }
2521 
2522     // Master could have restarted and lost the new region state
2523     if (current == null) {
2524       regionStates.createRegionState(
2525         hri, State.MERGING_NEW, serverName, null);
2526     }
2527 
2528     // Just return in case of retrying
2529     if (regionStates.isRegionOnServer(hri, serverName)) {
2530       return null;
2531     }
2532 
2533     try {
2534       regionStates.mergeRegions(hri, a, b, serverName);
2535     } catch (IOException ioe) {
2536       LOG.info("Failed to record merged region " + hri.getShortNameToLog());
2537       return "Failed to record the merging in meta";
2538     }
2539     return null;
2540   }
2541 
2542   private String onRegionMerged(final RegionState current, final HRegionInfo hri,
2543       final ServerName serverName, final RegionStateTransition transition) {
2544     // The region must be in merging_new state, and the daughters must be
2545     // merging on this server.
2546     // If current state is already opened on the same server,
2547     // it could be a reportRegionTransition RPC retry.
2548     if (current == null || !current.isMergingNewOrOpenedOnServer(serverName)) {
2549       return hri.getShortNameToLog() + " is not merging on " + serverName;
2550     }
2551 
2552     // Just return in case of retrying
2553     if (current.isOpened()) {
2554       return null;
2555     }
2556 
2557     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2558     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2559     RegionState rs_a = regionStates.getRegionState(a);
2560     RegionState rs_b = regionStates.getRegionState(b);
2561     if (rs_a == null || !rs_a.isMergingOnServer(serverName)
2562         || rs_b == null || !rs_b.isMergingOnServer(serverName)) {
2563       return "Some daughter is not known to be merging on " + serverName
2564         + ", a=" + rs_a + ", b=" + rs_b;
2565     }
2566 
2567     regionOffline(a, State.MERGED);
2568     regionOffline(b, State.MERGED);
2569     regionOnline(hri, serverName, 1);
2570 
2571     // User could disable the table before master knows the new region.
2572     if (getTableStateManager().isTableState(hri.getTable(),
2573         TableState.State.DISABLED, TableState.State.DISABLING)) {
2574       invokeUnAssign(hri);
2575     } else {
2576       Callable<Object> mergeReplicasCallable = new Callable<Object>() {
2577         @Override
2578         public Object call() {
2579           doMergingOfReplicas(hri, a, b);
2580           return null;
2581         }
2582       };
2583       threadPoolExecutorService.submit(mergeReplicasCallable);
2584     }
2585     return null;
2586   }
2587 
2588   private String onRegionMergeReverted(final RegionState current, final HRegionInfo hri,
2589       final ServerName serverName, final RegionStateTransition transition) {
2590     // The region must be in merging_new state, and the daughters must be
2591     // merging on this server.
2592     // If the region is in offline state, it could be an RPC retry.
2593     if (current == null || !current.isMergingNewOrOfflineOnServer(serverName)) {
2594       return hri.getShortNameToLog() + " is not merging on " + serverName;
2595     }
2596 
2597     // Just return in case of retrying
2598     if (current.isOffline()) {
2599       return null;
2600     }
2601 
2602     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2603     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2604     RegionState rs_a = regionStates.getRegionState(a);
2605     RegionState rs_b = regionStates.getRegionState(b);
2606     if (rs_a == null || !rs_a.isMergingOnServer(serverName)
2607         || rs_b == null || !rs_b.isMergingOnServer(serverName)) {
2608       return "Some daughter is not known to be merging on " + serverName
2609         + ", a=" + rs_a + ", b=" + rs_b;
2610     }
2611 
2612     regionOnline(a, serverName);
2613     regionOnline(b, serverName);
2614     regionOffline(hri);
2615 
2616     if (getTableStateManager().isTableState(hri.getTable(),
2617         TableState.State.DISABLED, TableState.State.DISABLING)) {
2618       invokeUnAssign(a);
2619       invokeUnAssign(b);
2620     }
2621     return null;
2622   }
2623 
2624   private void doMergingOfReplicas(HRegionInfo mergedHri, final HRegionInfo hri_a,
2625       final HRegionInfo hri_b) {
2626     // Close replicas for the original unmerged regions. create/assign new replicas
2627     // for the merged parent.
2628     List<HRegionInfo> unmergedRegions = new ArrayList<HRegionInfo>();
2629     unmergedRegions.add(hri_a);
2630     unmergedRegions.add(hri_b);
2631     Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(unmergedRegions);
2632     Collection<List<HRegionInfo>> c = map.values();
2633     for (List<HRegionInfo> l : c) {
2634       for (HRegionInfo h : l) {
2635         if (!RegionReplicaUtil.isDefaultReplica(h)) {
2636           LOG.debug("Unassigning un-merged replica " + h);
2637           unassign(h);
2638         }
2639       }
2640     }
2641     int numReplicas = 1;
2642     try {
2643       numReplicas = ((MasterServices)server).getTableDescriptors().get(mergedHri.getTable()).
2644           getRegionReplication();
2645     } catch (IOException e) {
2646       LOG.warn("Couldn't get the replication attribute of the table " + mergedHri.getTable() +
2647           " due to " + e.getMessage() + ". The assignment of replicas for the merged region " +
2648           "will not be done");
2649     }
2650     List<HRegionInfo> regions = new ArrayList<HRegionInfo>();
2651     for (int i = 1; i < numReplicas; i++) {
2652       regions.add(RegionReplicaUtil.getRegionInfoForReplica(mergedHri, i));
2653     }
2654     try {
2655       assign(regions);
2656     } catch (IOException ioe) {
2657       LOG.warn("Couldn't assign all replica(s) of region " + mergedHri + " because of " +
2658                 ioe.getMessage());
2659     } catch (InterruptedException ie) {
2660       LOG.warn("Couldn't assign all replica(s) of region " + mergedHri+ " because of " +
2661                 ie.getMessage());
2662     }
2663   }
2664 
2665   private void doSplittingOfReplicas(final HRegionInfo parentHri, final HRegionInfo hri_a,
2666       final HRegionInfo hri_b) {
2667     // create new regions for the replica, and assign them to match with the
2668     // current replica assignments. If replica1 of parent is assigned to RS1,
2669     // the replica1s of daughters will be on the same machine
2670     int numReplicas = 1;
2671     try {
2672       numReplicas = ((MasterServices)server).getTableDescriptors().get(parentHri.getTable()).
2673           getRegionReplication();
2674     } catch (IOException e) {
2675       LOG.warn("Couldn't get the replication attribute of the table " + parentHri.getTable() +
2676           " due to " + e.getMessage() + ". The assignment of daughter replicas " +
2677           "replicas will not be done");
2678     }
2679     // unassign the old replicas
2680     List<HRegionInfo> parentRegion = new ArrayList<HRegionInfo>();
2681     parentRegion.add(parentHri);
2682     Map<ServerName, List<HRegionInfo>> currentAssign =
2683         regionStates.getRegionAssignments(parentRegion);
2684     Collection<List<HRegionInfo>> c = currentAssign.values();
2685     for (List<HRegionInfo> l : c) {
2686       for (HRegionInfo h : l) {
2687         if (!RegionReplicaUtil.isDefaultReplica(h)) {
2688           LOG.debug("Unassigning parent's replica " + h);
2689           unassign(h);
2690         }
2691       }
2692     }
2693     // assign daughter replicas
2694     Map<HRegionInfo, ServerName> map = new HashMap<HRegionInfo, ServerName>();
2695     for (int i = 1; i < numReplicas; i++) {
2696       prepareDaughterReplicaForAssignment(hri_a, parentHri, i, map);
2697       prepareDaughterReplicaForAssignment(hri_b, parentHri, i, map);
2698     }
2699     try {
2700       assign(map);
2701     } catch (IOException e) {
2702       LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)");
2703     } catch (InterruptedException e) {
2704       LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)");
2705     }
2706   }
2707 
2708   private void prepareDaughterReplicaForAssignment(HRegionInfo daughterHri, HRegionInfo parentHri,
2709       int replicaId, Map<HRegionInfo, ServerName> map) {
2710     HRegionInfo parentReplica = RegionReplicaUtil.getRegionInfoForReplica(parentHri, replicaId);
2711     HRegionInfo daughterReplica = RegionReplicaUtil.getRegionInfoForReplica(daughterHri,
2712         replicaId);
2713     LOG.debug("Created replica region for daughter " + daughterReplica);
2714     ServerName sn;
2715     if ((sn = regionStates.getRegionServerOfRegion(parentReplica)) != null) {
2716       map.put(daughterReplica, sn);
2717     } else {
2718       List<ServerName> servers = serverManager.getOnlineServersList();
2719       sn = servers.get((new Random(System.currentTimeMillis())).nextInt(servers.size()));
2720       map.put(daughterReplica, sn);
2721     }
2722   }
2723 
2724   public Set<HRegionInfo> getReplicasToClose() {
2725     return replicasToClose;
2726   }
2727 
2728   /**
2729    * A region is offline.  The new state should be the specified one,
2730    * if not null.  If the specified state is null, the new state is Offline.
2731    * The specified state can be Split/Merged/Offline/null only.
2732    */
2733   private void regionOffline(final HRegionInfo regionInfo, final State state) {
2734     regionStates.regionOffline(regionInfo, state);
2735     removeClosedRegion(regionInfo);
2736     // remove the region plan as well just in case.
2737     clearRegionPlan(regionInfo);
2738     balancer.regionOffline(regionInfo);
2739 
2740     // Tell our listeners that a region was closed
2741     sendRegionClosedNotification(regionInfo);
2742     // also note that all the replicas of the primary should be closed
2743     if (state != null && state.equals(State.SPLIT)) {
2744       Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1);
2745       c.add(regionInfo);
2746       Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(c);
2747       Collection<List<HRegionInfo>> allReplicas = map.values();
2748       for (List<HRegionInfo> list : allReplicas) {
2749         replicasToClose.addAll(list);
2750       }
2751     }
2752     else if (state != null && state.equals(State.MERGED)) {
2753       Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1);
2754       c.add(regionInfo);
2755       Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(c);
2756       Collection<List<HRegionInfo>> allReplicas = map.values();
2757       for (List<HRegionInfo> list : allReplicas) {
2758         replicasToClose.addAll(list);
2759       }
2760     }
2761   }
2762 
2763   private void sendRegionOpenedNotification(final HRegionInfo regionInfo,
2764       final ServerName serverName) {
2765     if (!this.listeners.isEmpty()) {
2766       for (AssignmentListener listener : this.listeners) {
2767         listener.regionOpened(regionInfo, serverName);
2768       }
2769     }
2770   }
2771 
2772   private void sendRegionClosedNotification(final HRegionInfo regionInfo) {
2773     if (!this.listeners.isEmpty()) {
2774       for (AssignmentListener listener : this.listeners) {
2775         listener.regionClosed(regionInfo);
2776       }
2777     }
2778   }
2779 
2780   /**
2781    * Try to update some region states. If the state machine prevents
2782    * such update, an error message is returned to explain the reason.
2783    *
2784    * It's expected that in each transition there should have just one
2785    * region for opening/closing, 3 regions for splitting/merging.
2786    * These regions should be on the server that requested the change.
2787    *
2788    * Region state machine. Only these transitions
2789    * are expected to be triggered by a region server.
2790    *
2791    * On the state transition:
2792    *  (1) Open/Close should be initiated by master
2793    *      (a) Master sets the region to pending_open/pending_close
2794    *        in memory and hbase:meta after sending the request
2795    *        to the region server
2796    *      (b) Region server reports back to the master
2797    *        after open/close is done (either success/failure)
2798    *      (c) If region server has problem to report the status
2799    *        to master, it must be because the master is down or some
2800    *        temporary network issue. Otherwise, the region server should
2801    *        abort since it must be a bug. If the master is not accessible,
2802    *        the region server should keep trying until the server is
2803    *        stopped or till the status is reported to the (new) master
2804    *      (d) If region server dies in the middle of opening/closing
2805    *        a region, SSH picks it up and finishes it
2806    *      (e) If master dies in the middle, the new master recovers
2807    *        the state during initialization from hbase:meta. Region server
2808    *        can report any transition that has not been reported to
2809    *        the previous active master yet
2810    *  (2) Split/merge is initiated by region servers
2811    *      (a) To split a region, a region server sends a request
2812    *        to master to try to set a region to splitting, together with
2813    *        two daughters (to be created) to splitting new. If approved
2814    *        by the master, the splitting can then move ahead
2815    *      (b) To merge two regions, a region server sends a request to
2816    *        master to try to set the new merged region (to be created) to
2817    *        merging_new, together with two regions (to be merged) to merging.
2818    *        If it is ok with the master, the merge can then move ahead
2819    *      (c) Once the splitting/merging is done, the region server
2820    *        reports the status back to the master either success/failure.
2821    *      (d) Other scenarios should be handled similarly as for
2822    *        region open/close
2823    */
2824   protected String onRegionTransition(final ServerName serverName,
2825       final RegionStateTransition transition) {
2826     TransitionCode code = transition.getTransitionCode();
2827     HRegionInfo hri = HRegionInfo.convert(transition.getRegionInfo(0));
2828     Lock lock = locker.acquireLock(hri.getEncodedName());
2829     try {
2830       RegionState current = regionStates.getRegionState(hri);
2831       if (LOG.isDebugEnabled()) {
2832         LOG.debug("Got transition " + code + " for "
2833           + (current != null ? current.toString() : hri.getShortNameToLog())
2834           + " from " + serverName);
2835       }
2836       String errorMsg = null;
2837       switch (code) {
2838       case OPENED:
2839         errorMsg = onRegionOpen(current, hri, serverName, transition);
2840         break;
2841       case FAILED_OPEN:
2842         errorMsg = onRegionFailedOpen(current, hri, serverName);
2843         break;
2844       case CLOSED:
2845         errorMsg = onRegionClosed(current, hri, serverName);
2846         break;
2847       case READY_TO_SPLIT:
2848         try {
2849           regionStateListener.onRegionSplit(hri);
2850           errorMsg = onRegionReadyToSplit(current, hri, serverName, transition);
2851         } catch (IOException exp) {
2852             if (exp instanceof QuotaExceededException) {
2853               server.getRegionNormalizer().planSkipped(hri, PlanType.SPLIT);
2854             }
2855             errorMsg = StringUtils.stringifyException(exp);
2856         }
2857         break;
2858       case SPLIT_PONR:
2859         errorMsg = onRegionSplitPONR(current, hri, serverName, transition);
2860         break;
2861       case SPLIT:
2862         errorMsg = onRegionSplit(current, hri, serverName, transition);
2863         break;
2864       case SPLIT_REVERTED:
2865         errorMsg = onRegionSplitReverted(current, hri, serverName, transition);
2866         if (org.apache.commons.lang.StringUtils.isEmpty(errorMsg)) {
2867           try {
2868             regionStateListener.onRegionSplitReverted(hri);
2869           } catch (IOException exp) {
2870             LOG.warn(StringUtils.stringifyException(exp));
2871           }
2872         }
2873         break;
2874       case READY_TO_MERGE:
2875         errorMsg = onRegionReadyToMerge(current, hri, serverName, transition);
2876         break;
2877       case MERGE_PONR:
2878         errorMsg = onRegionMergePONR(current, hri, serverName, transition);
2879         break;
2880       case MERGED:
2881         try {
2882           errorMsg = onRegionMerged(current, hri, serverName, transition);
2883           regionStateListener.onRegionMerged(hri);
2884         } catch (IOException exp) {
2885           errorMsg = StringUtils.stringifyException(exp);
2886         }
2887         break;
2888       case MERGE_REVERTED:
2889         errorMsg = onRegionMergeReverted(current, hri, serverName, transition);
2890         break;
2891 
2892       default:
2893         errorMsg = "Unexpected transition code " + code;
2894       }
2895       if (errorMsg != null) {
2896         LOG.info("Could not transition region from " + current + " on "
2897           + code + " by " + serverName + ": " + errorMsg);
2898       }
2899       return errorMsg;
2900     } finally {
2901       lock.unlock();
2902     }
2903   }
2904 
2905   /**
2906    * @return Instance of load balancer
2907    */
2908   public LoadBalancer getBalancer() {
2909     return this.balancer;
2910   }
2911 
2912   public Map<ServerName, List<HRegionInfo>>
2913     getSnapShotOfAssignment(Collection<HRegionInfo> infos) {
2914     return getRegionStates().getRegionAssignments(infos);
2915   }
2916 
2917   void setRegionStateListener(RegionStateListener listener) {
2918     this.regionStateListener = listener;
2919   }
2920 }