View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Collection;
24  import java.util.Collections;
25  import java.util.HashMap;
26  import java.util.HashSet;
27  import java.util.Iterator;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.NavigableMap;
31  import java.util.Random;
32  import java.util.Set;
33  import java.util.TreeMap;
34  import java.util.concurrent.Callable;
35  import java.util.concurrent.ConcurrentHashMap;
36  import java.util.concurrent.CopyOnWriteArrayList;
37  import java.util.concurrent.TimeUnit;
38  import java.util.concurrent.atomic.AtomicBoolean;
39  import java.util.concurrent.atomic.AtomicInteger;
40  import java.util.concurrent.locks.Lock;
41  import java.util.concurrent.locks.ReentrantLock;
42  
43  import org.apache.commons.logging.Log;
44  import org.apache.commons.logging.LogFactory;
45  import org.apache.hadoop.hbase.classification.InterfaceAudience;
46  import org.apache.hadoop.conf.Configuration;
47  import org.apache.hadoop.fs.FileSystem;
48  import org.apache.hadoop.fs.Path;
49  import org.apache.hadoop.hbase.HBaseIOException;
50  import org.apache.hadoop.hbase.HConstants;
51  import org.apache.hadoop.hbase.HRegionInfo;
52  import org.apache.hadoop.hbase.HRegionLocation;
53  import org.apache.hadoop.hbase.HTableDescriptor;
54  import org.apache.hadoop.hbase.MetaTableAccessor;
55  import org.apache.hadoop.hbase.NotServingRegionException;
56  import org.apache.hadoop.hbase.RegionLocations;
57  import org.apache.hadoop.hbase.Server;
58  import org.apache.hadoop.hbase.ServerName;
59  import org.apache.hadoop.hbase.TableName;
60  import org.apache.hadoop.hbase.TableNotFoundException;
61  import org.apache.hadoop.hbase.client.RegionReplicaUtil;
62  import org.apache.hadoop.hbase.client.Result;
63  import org.apache.hadoop.hbase.client.TableState;
64  import org.apache.hadoop.hbase.executor.EventHandler;
65  import org.apache.hadoop.hbase.executor.EventType;
66  import org.apache.hadoop.hbase.executor.ExecutorService;
67  import org.apache.hadoop.hbase.ipc.FailedServerException;
68  import org.apache.hadoop.hbase.ipc.RpcClient;
69  import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
70  import org.apache.hadoop.hbase.master.RegionState.State;
71  import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper;
72  import org.apache.hadoop.hbase.master.balancer.FavoredNodeLoadBalancer;
73  import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
74  import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
75  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
76  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
77  import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
78  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
79  import org.apache.hadoop.hbase.wal.DefaultWALProvider;
80  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
81  import org.apache.hadoop.hbase.util.FSUtils;
82  import org.apache.hadoop.hbase.util.KeyLocker;
83  import org.apache.hadoop.hbase.util.Pair;
84  import org.apache.hadoop.hbase.util.PairOfSameType;
85  import org.apache.hadoop.hbase.util.Threads;
86  import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
87  import org.apache.hadoop.ipc.RemoteException;
88  import org.apache.zookeeper.KeeperException;
89  
90  import com.google.common.annotations.VisibleForTesting;
91  
92  /**
93   * Manages and performs region assignment.
94   * Related communications with regionserver are all done over RPC.
95   */
96  @InterfaceAudience.Private
97  public class AssignmentManager {
98    private static final Log LOG = LogFactory.getLog(AssignmentManager.class);
99  
100   protected final Server server;
101 
102   private ServerManager serverManager;
103 
104   private boolean shouldAssignRegionsWithFavoredNodes;
105 
106   private LoadBalancer balancer;
107 
108   private final MetricsAssignmentManager metricsAssignmentManager;
109 
110   private final TableLockManager tableLockManager;
111 
112   private AtomicInteger numRegionsOpened = new AtomicInteger(0);
113 
114   final private KeyLocker<String> locker = new KeyLocker<String>();
115 
116   Set<HRegionInfo> replicasToClose = Collections.synchronizedSet(new HashSet<HRegionInfo>());
117 
118   /**
119    * Map of regions to reopen after the schema of a table is changed. Key -
120    * encoded region name, value - HRegionInfo
121    */
122   private final Map <String, HRegionInfo> regionsToReopen;
123 
124   /*
125    * Maximum times we recurse an assignment/unassignment.
126    * See below in {@link #assign()} and {@link #unassign()}.
127    */
128   private final int maximumAttempts;
129 
130   /**
131    * The sleep time for which the assignment will wait before retrying in case of hbase:meta assignment
132    * failure due to lack of availability of region plan or bad region plan
133    */
134   private final long sleepTimeBeforeRetryingMetaAssignment;
135 
136   /** Plans for region movement. Key is the encoded version of a region name*/
137   // TODO: When do plans get cleaned out?  Ever? In server open and in server
138   // shutdown processing -- St.Ack
139   // All access to this Map must be synchronized.
140   final NavigableMap<String, RegionPlan> regionPlans =
141     new TreeMap<String, RegionPlan>();
142 
143   private final TableStateManager tableStateManager;
144 
145   private final ExecutorService executorService;
146 
147   // Thread pool executor service. TODO, consolidate with executorService?
148   private java.util.concurrent.ExecutorService threadPoolExecutorService;
149 
150   private final RegionStates regionStates;
151 
152   // The threshold to use bulk assigning. Using bulk assignment
153   // only if assigning at least this many regions to at least this
154   // many servers. If assigning fewer regions to fewer servers,
155   // bulk assigning may be not as efficient.
156   private final int bulkAssignThresholdRegions;
157   private final int bulkAssignThresholdServers;
158 
159   // Should bulk assignment wait till all regions are assigned,
160   // or it is timed out?  This is useful to measure bulk assignment
161   // performance, but not needed in most use cases.
162   private final boolean bulkAssignWaitTillAllAssigned;
163 
164   /**
165    * Indicator that AssignmentManager has recovered the region states so
166    * that ServerShutdownHandler can be fully enabled and re-assign regions
167    * of dead servers. So that when re-assignment happens, AssignmentManager
168    * has proper region states.
169    *
170    * Protected to ease testing.
171    */
172   protected final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
173 
174   /**
175    * A map to track the count a region fails to open in a row.
176    * So that we don't try to open a region forever if the failure is
177    * unrecoverable.  We don't put this information in region states
178    * because we don't expect this to happen frequently; we don't
179    * want to copy this information over during each state transition either.
180    */
181   private final ConcurrentHashMap<String, AtomicInteger>
182     failedOpenTracker = new ConcurrentHashMap<String, AtomicInteger>();
183 
184   // In case not using ZK for region assignment, region states
185   // are persisted in meta with a state store
186   private final RegionStateStore regionStateStore;
187 
188   /**
189    * For testing only!  Set to true to skip handling of split.
190    */
191   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
192   public static boolean TEST_SKIP_SPLIT_HANDLING = false;
193 
194   /** Listeners that are called on assignment events. */
195   private List<AssignmentListener> listeners = new CopyOnWriteArrayList<AssignmentListener>();
196 
197   /**
198    * Constructs a new assignment manager.
199    *
200    * @param server instance of HMaster this AM running inside
201    * @param serverManager serverManager for associated HMaster
202    * @param balancer implementation of {@link LoadBalancer}
203    * @param service Executor service
204    * @param metricsMaster metrics manager
205    * @param tableLockManager TableLock manager
206    * @throws IOException
207    */
208   public AssignmentManager(Server server, ServerManager serverManager,
209       final LoadBalancer balancer,
210       final ExecutorService service, MetricsMaster metricsMaster,
211       final TableLockManager tableLockManager,
212       final TableStateManager tableStateManager)
213           throws IOException {
214     this.server = server;
215     this.serverManager = serverManager;
216     this.executorService = service;
217     this.regionStateStore = new RegionStateStore(server);
218     this.regionsToReopen = Collections.synchronizedMap
219                            (new HashMap<String, HRegionInfo> ());
220     Configuration conf = server.getConfiguration();
221     // Only read favored nodes if using the favored nodes load balancer.
222     this.shouldAssignRegionsWithFavoredNodes = conf.getClass(
223            HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class).equals(
224            FavoredNodeLoadBalancer.class);
225 
226     this.tableStateManager = tableStateManager;
227 
228     // This is the max attempts, not retries, so it should be at least 1.
229     this.maximumAttempts = Math.max(1,
230       this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10));
231     this.sleepTimeBeforeRetryingMetaAssignment = this.server.getConfiguration().getLong(
232         "hbase.meta.assignment.retry.sleeptime", 1000l);
233     this.balancer = balancer;
234     int maxThreads = conf.getInt("hbase.assignment.threads.max", 30);
235     this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool(
236       maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("AM."));
237     this.regionStates = new RegionStates(
238       server, tableStateManager, serverManager, regionStateStore);
239 
240     this.bulkAssignWaitTillAllAssigned =
241       conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false);
242     this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7);
243     this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3);
244 
245     this.metricsAssignmentManager = new MetricsAssignmentManager();
246     this.tableLockManager = tableLockManager;
247   }
248 
249   /**
250    * Add the listener to the notification list.
251    * @param listener The AssignmentListener to register
252    */
253   public void registerListener(final AssignmentListener listener) {
254     this.listeners.add(listener);
255   }
256 
257   /**
258    * Remove the listener from the notification list.
259    * @param listener The AssignmentListener to unregister
260    */
261   public boolean unregisterListener(final AssignmentListener listener) {
262     return this.listeners.remove(listener);
263   }
264 
265   /**
266    * @return Instance of ZKTableStateManager.
267    */
268   public TableStateManager getTableStateManager() {
269     // These are 'expensive' to make involving trip to zk ensemble so allow
270     // sharing.
271     return this.tableStateManager;
272   }
273 
274   /**
275    * This SHOULD not be public. It is public now
276    * because of some unit tests.
277    *
278    * TODO: make it package private and keep RegionStates in the master package
279    */
280   public RegionStates getRegionStates() {
281     return regionStates;
282   }
283 
284   /**
285    * Used in some tests to mock up region state in meta
286    */
287   @VisibleForTesting
288   RegionStateStore getRegionStateStore() {
289     return regionStateStore;
290   }
291 
292   public RegionPlan getRegionReopenPlan(HRegionInfo hri) {
293     return new RegionPlan(hri, null, regionStates.getRegionServerOfRegion(hri));
294   }
295 
296   /**
297    * Add a regionPlan for the specified region.
298    * @param encodedName
299    * @param plan
300    */
301   public void addPlan(String encodedName, RegionPlan plan) {
302     synchronized (regionPlans) {
303       regionPlans.put(encodedName, plan);
304     }
305   }
306 
307   /**
308    * Add a map of region plans.
309    */
310   public void addPlans(Map<String, RegionPlan> plans) {
311     synchronized (regionPlans) {
312       regionPlans.putAll(plans);
313     }
314   }
315 
316   /**
317    * Set the list of regions that will be reopened
318    * because of an update in table schema
319    *
320    * @param regions
321    *          list of regions that should be tracked for reopen
322    */
323   public void setRegionsToReopen(List <HRegionInfo> regions) {
324     for(HRegionInfo hri : regions) {
325       regionsToReopen.put(hri.getEncodedName(), hri);
326     }
327   }
328 
329   /**
330    * Used by the client to identify if all regions have the schema updates
331    *
332    * @param tableName
333    * @return Pair indicating the status of the alter command
334    * @throws IOException
335    */
336   public Pair<Integer, Integer> getReopenStatus(TableName tableName)
337       throws IOException {
338     List<HRegionInfo> hris;
339     if (TableName.META_TABLE_NAME.equals(tableName)) {
340       hris = new MetaTableLocator().getMetaRegions(server.getZooKeeper());
341     } else {
342       hris = MetaTableAccessor.getTableRegions(server.getConnection(), tableName, true);
343     }
344 
345     Integer pending = 0;
346     for (HRegionInfo hri : hris) {
347       String name = hri.getEncodedName();
348       // no lock concurrent access ok: sequential consistency respected.
349       if (regionsToReopen.containsKey(name)
350           || regionStates.isRegionInTransition(name)) {
351         pending++;
352       }
353     }
354     return new Pair<Integer, Integer>(pending, hris.size());
355   }
356 
357   /**
358    * Used by ServerShutdownHandler to make sure AssignmentManager has completed
359    * the failover cleanup before re-assigning regions of dead servers. So that
360    * when re-assignment happens, AssignmentManager has proper region states.
361    */
362   public boolean isFailoverCleanupDone() {
363     return failoverCleanupDone.get();
364   }
365 
366   /**
367    * To avoid racing with AM, external entities may need to lock a region,
368    * for example, when SSH checks what regions to skip re-assigning.
369    */
370   public Lock acquireRegionLock(final String encodedName) {
371     return locker.acquireLock(encodedName);
372   }
373 
374   /**
375    * Now, failover cleanup is completed. Notify server manager to
376    * process queued up dead servers processing, if any.
377    */
378   void failoverCleanupDone() {
379     failoverCleanupDone.set(true);
380     serverManager.processQueuedDeadServers();
381   }
382 
383   /**
384    * Called on startup.
385    * Figures whether a fresh cluster start of we are joining extant running cluster.
386    * @throws IOException
387    * @throws KeeperException
388    * @throws InterruptedException
389    */
390   void joinCluster() throws IOException,
391           KeeperException, InterruptedException {
392     long startTime = System.currentTimeMillis();
393     // Concurrency note: In the below the accesses on regionsInTransition are
394     // outside of a synchronization block where usually all accesses to RIT are
395     // synchronized.  The presumption is that in this case it is safe since this
396     // method is being played by a single thread on startup.
397 
398     // TODO: Regions that have a null location and are not in regionsInTransitions
399     // need to be handled.
400 
401     // Scan hbase:meta to build list of existing regions, servers, and assignment
402     // Returns servers who have not checked in (assumed dead) that some regions
403     // were assigned to (according to the meta)
404     Set<ServerName> deadServers = rebuildUserRegions();
405 
406     // This method will assign all user regions if a clean server startup or
407     // it will reconstruct master state and cleanup any leftovers from
408     // previous master process.
409     boolean failover = processDeadServersAndRegionsInTransition(deadServers);
410 
411     recoverTableInDisablingState();
412     recoverTableInEnablingState();
413     LOG.info("Joined the cluster in " + (System.currentTimeMillis()
414       - startTime) + "ms, failover=" + failover);
415   }
416 
417   /**
418    * Process all regions that are in transition in zookeeper and also
419    * processes the list of dead servers by scanning the META.
420    * Used by master joining an cluster.  If we figure this is a clean cluster
421    * startup, will assign all user regions.
422    * @param deadServers
423    *          Map of dead servers and their regions. Can be null.
424    * @throws IOException
425    * @throws InterruptedException
426    */
427   boolean processDeadServersAndRegionsInTransition(final Set<ServerName> deadServers)
428           throws IOException, InterruptedException {
429     boolean failover = !serverManager.getDeadServers().isEmpty();
430     if (failover) {
431       // This may not be a failover actually, especially if meta is on this master.
432       if (LOG.isDebugEnabled()) {
433         LOG.debug("Found dead servers out on cluster " + serverManager.getDeadServers());
434       }
435     } else {
436       // If any one region except meta is assigned, it's a failover.
437       Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
438       for (Map.Entry<HRegionInfo, ServerName> en:
439           regionStates.getRegionAssignments().entrySet()) {
440         HRegionInfo hri = en.getKey();
441         if (!hri.isMetaTable()
442             && onlineServers.contains(en.getValue())) {
443           LOG.debug("Found " + hri + " out on cluster");
444           failover = true;
445           break;
446         }
447       }
448       if (!failover) {
449         // If any region except meta is in transition on a live server, it's a failover.
450         Map<String, RegionState> regionsInTransition = regionStates.getRegionsInTransition();
451         if (!regionsInTransition.isEmpty()) {
452           for (RegionState regionState: regionsInTransition.values()) {
453             if (!regionState.getRegion().isMetaRegion()
454                 && onlineServers.contains(regionState.getServerName())) {
455               LOG.debug("Found " + regionState + " in RITs");
456               failover = true;
457               break;
458             }
459           }
460         }
461       }
462     }
463     if (!failover) {
464       // If we get here, we have a full cluster restart. It is a failover only
465       // if there are some WALs are not split yet. For meta WALs, they should have
466       // been split already, if any. We can walk through those queued dead servers,
467       // if they don't have any WALs, this restart should be considered as a clean one
468       Set<ServerName> queuedDeadServers = serverManager.getRequeuedDeadServers().keySet();
469       if (!queuedDeadServers.isEmpty()) {
470         Configuration conf = server.getConfiguration();
471         Path rootdir = FSUtils.getRootDir(conf);
472         FileSystem fs = rootdir.getFileSystem(conf);
473         for (ServerName serverName: queuedDeadServers) {
474           // In the case of a clean exit, the shutdown handler would have presplit any WALs and
475           // removed empty directories.
476           Path logDir = new Path(rootdir,
477               DefaultWALProvider.getWALDirectoryName(serverName.toString()));
478           Path splitDir = logDir.suffix(DefaultWALProvider.SPLITTING_EXT);
479           if (fs.exists(logDir) || fs.exists(splitDir)) {
480             LOG.debug("Found queued dead server " + serverName);
481             failover = true;
482             break;
483           }
484         }
485         if (!failover) {
486           // We figured that it's not a failover, so no need to
487           // work on these re-queued dead servers any more.
488           LOG.info("AM figured that it's not a failover and cleaned up "
489             + queuedDeadServers.size() + " queued dead servers");
490           serverManager.removeRequeuedDeadServers();
491         }
492       }
493     }
494 
495     Set<TableName> disabledOrDisablingOrEnabling = null;
496     Map<HRegionInfo, ServerName> allRegions = null;
497 
498     if (!failover) {
499       disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
500         TableState.State.DISABLED, TableState.State.DISABLING,
501         TableState.State.ENABLING);
502 
503       // Clean re/start, mark all user regions closed before reassignment
504       allRegions = regionStates.closeAllUserRegions(
505         disabledOrDisablingOrEnabling);
506     }
507 
508     // Now region states are restored
509     regionStateStore.start();
510 
511     if (failover) {
512       if (deadServers != null && !deadServers.isEmpty()) {
513         for (ServerName serverName: deadServers) {
514           if (!serverManager.isServerDead(serverName)) {
515             serverManager.expireServer(serverName); // Let SSH do region re-assign
516           }
517         }
518       }
519       processRegionsInTransition(regionStates.getRegionsInTransition().values());
520     }
521 
522     // Now we can safely claim failover cleanup completed and enable
523     // ServerShutdownHandler for further processing. The nodes (below)
524     // in transition, if any, are for regions not related to those
525     // dead servers at all, and can be done in parallel to SSH.
526     failoverCleanupDone();
527     if (!failover) {
528       // Fresh cluster startup.
529       LOG.info("Clean cluster startup. Assigning user regions");
530       assignAllUserRegions(allRegions);
531     }
532     // unassign replicas of the split parents and the merged regions
533     // the daughter replicas are opened in assignAllUserRegions if it was
534     // not already opened.
535     for (HRegionInfo h : replicasToClose) {
536       unassign(h);
537     }
538     replicasToClose.clear();
539     return failover;
540   }
541 
542   /**
543    * When a region is closed, it should be removed from the regionsToReopen
544    * @param hri HRegionInfo of the region which was closed
545    */
546   public void removeClosedRegion(HRegionInfo hri) {
547     if (regionsToReopen.remove(hri.getEncodedName()) != null) {
548       LOG.debug("Removed region from reopening regions because it was closed");
549     }
550   }
551 
552   // TODO: processFavoredNodes might throw an exception, for e.g., if the
553   // meta could not be contacted/updated. We need to see how seriously to treat
554   // this problem as. Should we fail the current assignment. We should be able
555   // to recover from this problem eventually (if the meta couldn't be updated
556   // things should work normally and eventually get fixed up).
557   void processFavoredNodes(List<HRegionInfo> regions) throws IOException {
558     if (!shouldAssignRegionsWithFavoredNodes) return;
559     // The AM gets the favored nodes info for each region and updates the meta
560     // table with that info
561     Map<HRegionInfo, List<ServerName>> regionToFavoredNodes =
562         new HashMap<HRegionInfo, List<ServerName>>();
563     for (HRegionInfo region : regions) {
564       regionToFavoredNodes.put(region,
565           ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region));
566     }
567     FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(regionToFavoredNodes,
568       this.server.getConnection());
569   }
570 
571   /**
572    * Marks the region as online.  Removes it from regions in transition and
573    * updates the in-memory assignment information.
574    * <p>
575    * Used when a region has been successfully opened on a region server.
576    * @param regionInfo
577    * @param sn
578    */
579   void regionOnline(HRegionInfo regionInfo, ServerName sn) {
580     regionOnline(regionInfo, sn, HConstants.NO_SEQNUM);
581   }
582 
583   void regionOnline(HRegionInfo regionInfo, ServerName sn, long openSeqNum) {
584     numRegionsOpened.incrementAndGet();
585     regionStates.regionOnline(regionInfo, sn, openSeqNum);
586 
587     // Remove plan if one.
588     clearRegionPlan(regionInfo);
589     balancer.regionOnline(regionInfo, sn);
590 
591     // Tell our listeners that a region was opened
592     sendRegionOpenedNotification(regionInfo, sn);
593   }
594 
595   /**
596    * Marks the region as offline.  Removes it from regions in transition and
597    * removes in-memory assignment information.
598    * <p>
599    * Used when a region has been closed and should remain closed.
600    * @param regionInfo
601    */
602   public void regionOffline(final HRegionInfo regionInfo) {
603     regionOffline(regionInfo, null);
604   }
605 
606   public void offlineDisabledRegion(HRegionInfo regionInfo) {
607     replicasToClose.remove(regionInfo);
608     regionOffline(regionInfo);
609   }
610 
611   // Assignment methods
612 
613   /**
614    * Assigns the specified region.
615    * <p>
616    * If a RegionPlan is available with a valid destination then it will be used
617    * to determine what server region is assigned to.  If no RegionPlan is
618    * available, region will be assigned to a random available server.
619    * <p>
620    * Updates the RegionState and sends the OPEN RPC.
621    * <p>
622    * This will only succeed if the region is in transition and in a CLOSED or
623    * OFFLINE state or not in transition, and of course, the
624    * chosen server is up and running (It may have just crashed!).
625    *
626    * @param region server to be assigned
627    */
628   public void assign(HRegionInfo region) {
629     assign(region, false);
630   }
631 
632   /**
633    * Use care with forceNewPlan. It could cause double assignment.
634    */
635   public void assign(HRegionInfo region, boolean forceNewPlan) {
636     if (isDisabledorDisablingRegionInRIT(region)) {
637       return;
638     }
639     String encodedName = region.getEncodedName();
640     Lock lock = locker.acquireLock(encodedName);
641     try {
642       RegionState state = forceRegionStateToOffline(region, forceNewPlan);
643       if (state != null) {
644         if (regionStates.wasRegionOnDeadServer(encodedName)) {
645           LOG.info("Skip assigning " + region.getRegionNameAsString()
646             + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
647             + " is dead but not processed yet");
648           return;
649         }
650         assign(state, forceNewPlan);
651       }
652     } finally {
653       lock.unlock();
654     }
655   }
656 
657   /**
658    * Bulk assign regions to <code>destination</code>.
659    * @param destination
660    * @param regions Regions to assign.
661    * @return true if successful
662    */
663   boolean assign(final ServerName destination, final List<HRegionInfo> regions)
664     throws InterruptedException {
665     long startTime = EnvironmentEdgeManager.currentTime();
666     try {
667       int regionCount = regions.size();
668       if (regionCount == 0) {
669         return true;
670       }
671       LOG.info("Assigning " + regionCount + " region(s) to " + destination.toString());
672       Set<String> encodedNames = new HashSet<String>(regionCount);
673       for (HRegionInfo region : regions) {
674         encodedNames.add(region.getEncodedName());
675       }
676 
677       List<HRegionInfo> failedToOpenRegions = new ArrayList<HRegionInfo>();
678       Map<String, Lock> locks = locker.acquireLocks(encodedNames);
679       try {
680         Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>(regionCount);
681         List<RegionState> states = new ArrayList<RegionState>(regionCount);
682         for (HRegionInfo region : regions) {
683           String encodedName = region.getEncodedName();
684           if (!isDisabledorDisablingRegionInRIT(region)) {
685             RegionState state = forceRegionStateToOffline(region, false);
686             boolean onDeadServer = false;
687             if (state != null) {
688               if (regionStates.wasRegionOnDeadServer(encodedName)) {
689                 LOG.info("Skip assigning " + region.getRegionNameAsString()
690                   + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
691                   + " is dead but not processed yet");
692                 onDeadServer = true;
693               } else {
694                 RegionPlan plan = new RegionPlan(region, state.getServerName(), destination);
695                 plans.put(encodedName, plan);
696                 states.add(state);
697                 continue;
698               }
699             }
700             // Reassign if the region wasn't on a dead server
701             if (!onDeadServer) {
702               LOG.info("failed to force region state to offline, "
703                 + "will reassign later: " + region);
704               failedToOpenRegions.add(region); // assign individually later
705             }
706           }
707           // Release the lock, this region is excluded from bulk assign because
708           // we can't update its state, or set its znode to offline.
709           Lock lock = locks.remove(encodedName);
710           lock.unlock();
711         }
712 
713         if (server.isStopped()) {
714           return false;
715         }
716 
717         // Add region plans, so we can updateTimers when one region is opened so
718         // that unnecessary timeout on RIT is reduced.
719         this.addPlans(plans);
720 
721         List<Pair<HRegionInfo, List<ServerName>>> regionOpenInfos =
722           new ArrayList<Pair<HRegionInfo, List<ServerName>>>(states.size());
723         for (RegionState state: states) {
724           HRegionInfo region = state.getRegion();
725           regionStates.updateRegionState(
726             region, State.PENDING_OPEN, destination);
727           List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
728           if (this.shouldAssignRegionsWithFavoredNodes) {
729             favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
730           }
731           regionOpenInfos.add(new Pair<HRegionInfo, List<ServerName>>(
732             region, favoredNodes));
733         }
734 
735         // Move on to open regions.
736         try {
737           // Send OPEN RPC. If it fails on a IOE or RemoteException,
738           // regions will be assigned individually.
739           Configuration conf = server.getConfiguration();
740           long maxWaitTime = System.currentTimeMillis() +
741             conf.getLong("hbase.regionserver.rpc.startup.waittime", 60000);
742           for (int i = 1; i <= maximumAttempts && !server.isStopped(); i++) {
743             try {
744               List<RegionOpeningState> regionOpeningStateList = serverManager
745                 .sendRegionOpen(destination, regionOpenInfos);
746               for (int k = 0, n = regionOpeningStateList.size(); k < n; k++) {
747                 RegionOpeningState openingState = regionOpeningStateList.get(k);
748                 if (openingState != RegionOpeningState.OPENED) {
749                   HRegionInfo region = regionOpenInfos.get(k).getFirst();
750                   LOG.info("Got opening state " + openingState
751                     + ", will reassign later: " + region);
752                   // Failed opening this region, reassign it later
753                   forceRegionStateToOffline(region, true);
754                   failedToOpenRegions.add(region);
755                 }
756               }
757               break;
758             } catch (IOException e) {
759               if (e instanceof RemoteException) {
760                 e = ((RemoteException)e).unwrapRemoteException();
761               }
762               if (e instanceof RegionServerStoppedException) {
763                 LOG.warn("The region server was shut down, ", e);
764                 // No need to retry, the region server is a goner.
765                 return false;
766               } else if (e instanceof ServerNotRunningYetException) {
767                 long now = System.currentTimeMillis();
768                 if (now < maxWaitTime) {
769                   if (LOG.isDebugEnabled()) {
770                     LOG.debug("Server is not yet up; waiting up to " +
771                       (maxWaitTime - now) + "ms", e);
772                   }
773                   Thread.sleep(100);
774                   i--; // reset the try count
775                   continue;
776                 }
777               } else if (e instanceof java.net.SocketTimeoutException
778                   && this.serverManager.isServerOnline(destination)) {
779                 // In case socket is timed out and the region server is still online,
780                 // the openRegion RPC could have been accepted by the server and
781                 // just the response didn't go through.  So we will retry to
782                 // open the region on the same server.
783                 if (LOG.isDebugEnabled()) {
784                   LOG.debug("Bulk assigner openRegion() to " + destination
785                     + " has timed out, but the regions might"
786                     + " already be opened on it.", e);
787                 }
788                 // wait and reset the re-try count, server might be just busy.
789                 Thread.sleep(100);
790                 i--;
791                 continue;
792               } else if (e instanceof FailedServerException && i < maximumAttempts) {
793                 // In case the server is in the failed server list, no point to
794                 // retry too soon. Retry after the failed_server_expiry time
795                 long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
796                   RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
797                 if (LOG.isDebugEnabled()) {
798                   LOG.debug(destination + " is on failed server list; waiting "
799                     + sleepTime + "ms", e);
800                 }
801                 Thread.sleep(sleepTime);
802                 continue;
803               }
804               throw e;
805             }
806           }
807         } catch (IOException e) {
808           // Can be a socket timeout, EOF, NoRouteToHost, etc
809           LOG.info("Unable to communicate with " + destination
810             + " in order to assign regions, ", e);
811           for (RegionState state: states) {
812             HRegionInfo region = state.getRegion();
813             forceRegionStateToOffline(region, true);
814           }
815           return false;
816         }
817       } finally {
818         for (Lock lock : locks.values()) {
819           lock.unlock();
820         }
821       }
822 
823       if (!failedToOpenRegions.isEmpty()) {
824         for (HRegionInfo region : failedToOpenRegions) {
825           if (!regionStates.isRegionOnline(region)) {
826             invokeAssign(region);
827           }
828         }
829       }
830       LOG.debug("Bulk assigning done for " + destination);
831       return true;
832     } finally {
833       metricsAssignmentManager.updateBulkAssignTime(EnvironmentEdgeManager.currentTime() - startTime);
834     }
835   }
836 
837   /**
838    * Send CLOSE RPC if the server is online, otherwise, offline the region.
839    *
840    * The RPC will be sent only to the region sever found in the region state
841    * if it is passed in, otherwise, to the src server specified. If region
842    * state is not specified, we don't update region state at all, instead
843    * we just send the RPC call. This is useful for some cleanup without
844    * messing around the region states (see handleRegion, on region opened
845    * on an unexpected server scenario, for an example)
846    */
847   private void unassign(final HRegionInfo region,
848       final ServerName server, final ServerName dest) {
849     for (int i = 1; i <= this.maximumAttempts; i++) {
850       if (this.server.isStopped() || this.server.isAborted()) {
851         LOG.debug("Server stopped/aborted; skipping unassign of " + region);
852         return;
853       }
854       if (!serverManager.isServerOnline(server)) {
855         LOG.debug("Offline " + region.getRegionNameAsString()
856           + ", no need to unassign since it's on a dead server: " + server);
857         regionStates.updateRegionState(region, State.OFFLINE);
858         return;
859       }
860       try {
861         // Send CLOSE RPC
862         if (serverManager.sendRegionClose(server, region, dest)) {
863           LOG.debug("Sent CLOSE to " + server + " for region " +
864             region.getRegionNameAsString());
865           return;
866         }
867         // This never happens. Currently regionserver close always return true.
868         // Todo; this can now happen (0.96) if there is an exception in a coprocessor
869         LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
870           region.getRegionNameAsString());
871       } catch (Throwable t) {
872         if (t instanceof RemoteException) {
873           t = ((RemoteException)t).unwrapRemoteException();
874         }
875         if (t instanceof NotServingRegionException
876             || t instanceof RegionServerStoppedException
877             || t instanceof ServerNotRunningYetException) {
878           LOG.debug("Offline " + region.getRegionNameAsString()
879             + ", it's not any more on " + server, t);
880           regionStates.updateRegionState(region, State.OFFLINE);
881           return;
882         } else if (t instanceof FailedServerException && i < maximumAttempts) {
883           // In case the server is in the failed server list, no point to
884           // retry too soon. Retry after the failed_server_expiry time
885           try {
886             Configuration conf = this.server.getConfiguration();
887             long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
888               RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
889             if (LOG.isDebugEnabled()) {
890               LOG.debug(server + " is on failed server list; waiting "
891                 + sleepTime + "ms", t);
892             }
893             Thread.sleep(sleepTime);
894           } catch (InterruptedException ie) {
895             LOG.warn("Failed to unassign "
896               + region.getRegionNameAsString() + " since interrupted", ie);
897             regionStates.updateRegionState(region, State.FAILED_CLOSE);
898             Thread.currentThread().interrupt();
899             return;
900           }
901         }
902 
903         LOG.info("Server " + server + " returned " + t + " for "
904           + region.getRegionNameAsString() + ", try=" + i
905           + " of " + this.maximumAttempts, t);
906       }
907     }
908     // Run out of attempts
909     regionStates.updateRegionState(region, State.FAILED_CLOSE);
910   }
911 
912   /**
913    * Set region to OFFLINE unless it is opening and forceNewPlan is false.
914    */
915   private RegionState forceRegionStateToOffline(
916       final HRegionInfo region, final boolean forceNewPlan) {
917     RegionState state = regionStates.getRegionState(region);
918     if (state == null) {
919       LOG.warn("Assigning a region not in region states: " + region);
920       state = regionStates.createRegionState(region);
921     }
922 
923     if (forceNewPlan && LOG.isDebugEnabled()) {
924       LOG.debug("Force region state offline " + state);
925     }
926 
927     switch (state.getState()) {
928     case OPEN:
929     case OPENING:
930     case PENDING_OPEN:
931     case CLOSING:
932     case PENDING_CLOSE:
933       if (!forceNewPlan) {
934         LOG.debug("Skip assigning " +
935           region + ", it is already " + state);
936         return null;
937       }
938     case FAILED_CLOSE:
939     case FAILED_OPEN:
940       regionStates.updateRegionState(region, State.PENDING_CLOSE);
941       unassign(region, state.getServerName(), null);
942       state = regionStates.getRegionState(region);
943       if (!state.isOffline() && !state.isClosed()) {
944         // If the region isn't offline, we can't re-assign
945         // it now. It will be assigned automatically after
946         // the regionserver reports it's closed.
947         return null;
948       }
949     case OFFLINE:
950     case CLOSED:
951       break;
952     default:
953       LOG.error("Trying to assign region " + region
954         + ", which is " + state);
955       return null;
956     }
957     return state;
958   }
959 
960   /**
961    * Caller must hold lock on the passed <code>state</code> object.
962    * @param state
963    * @param forceNewPlan
964    */
965   private void assign(RegionState state, boolean forceNewPlan) {
966     long startTime = EnvironmentEdgeManager.currentTime();
967     try {
968       Configuration conf = server.getConfiguration();
969       RegionPlan plan = null;
970       long maxWaitTime = -1;
971       HRegionInfo region = state.getRegion();
972       Throwable previousException = null;
973       for (int i = 1; i <= maximumAttempts; i++) {
974         if (server.isStopped() || server.isAborted()) {
975           LOG.info("Skip assigning " + region.getRegionNameAsString()
976             + ", the server is stopped/aborted");
977           return;
978         }
979 
980         if (plan == null) { // Get a server for the region at first
981           try {
982             plan = getRegionPlan(region, forceNewPlan);
983           } catch (HBaseIOException e) {
984             LOG.warn("Failed to get region plan", e);
985           }
986         }
987 
988         if (plan == null) {
989           LOG.warn("Unable to determine a plan to assign " + region);
990 
991           // For meta region, we have to keep retrying until succeeding
992           if (region.isMetaRegion()) {
993             if (i == maximumAttempts) {
994               i = 0; // re-set attempt count to 0 for at least 1 retry
995 
996               LOG.warn("Unable to determine a plan to assign a hbase:meta region " + region +
997                 " after maximumAttempts (" + this.maximumAttempts +
998                 "). Reset attempts count and continue retrying.");
999             }
1000             waitForRetryingMetaAssignment();
1001             continue;
1002           }
1003 
1004           regionStates.updateRegionState(region, State.FAILED_OPEN);
1005           return;
1006         }
1007         // In case of assignment from EnableTableHandler table state is ENABLING. Any how
1008         // EnableTableHandler will set ENABLED after assigning all the table regions. If we
1009         // try to set to ENABLED directly then client API may think table is enabled.
1010         // When we have a case such as all the regions are added directly into hbase:meta and we call
1011         // assignRegion then we need to make the table ENABLED. Hence in such case the table
1012         // will not be in ENABLING or ENABLED state.
1013         TableName tableName = region.getTable();
1014         if (!tableStateManager.isTableState(tableName,
1015           TableState.State.ENABLED, TableState.State.ENABLING)) {
1016           LOG.debug("Setting table " + tableName + " to ENABLED state.");
1017           setEnabledTable(tableName);
1018         }
1019         LOG.info("Assigning " + region.getRegionNameAsString() +
1020             " to " + plan.getDestination().toString());
1021         // Transition RegionState to PENDING_OPEN
1022        regionStates.updateRegionState(region,
1023           State.PENDING_OPEN, plan.getDestination());
1024 
1025         boolean needNewPlan = false;
1026         final String assignMsg = "Failed assignment of " + region.getRegionNameAsString() +
1027             " to " + plan.getDestination();
1028         try {
1029           List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
1030           if (this.shouldAssignRegionsWithFavoredNodes) {
1031             favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
1032           }
1033           serverManager.sendRegionOpen(plan.getDestination(), region, favoredNodes);
1034           return; // we're done
1035         } catch (Throwable t) {
1036           if (t instanceof RemoteException) {
1037             t = ((RemoteException) t).unwrapRemoteException();
1038           }
1039           previousException = t;
1040 
1041           // Should we wait a little before retrying? If the server is starting it's yes.
1042           boolean hold = (t instanceof ServerNotRunningYetException);
1043 
1044           // In case socket is timed out and the region server is still online,
1045           // the openRegion RPC could have been accepted by the server and
1046           // just the response didn't go through.  So we will retry to
1047           // open the region on the same server.
1048           boolean retry = !hold && (t instanceof java.net.SocketTimeoutException
1049               && this.serverManager.isServerOnline(plan.getDestination()));
1050 
1051           if (hold) {
1052             LOG.warn(assignMsg + ", waiting a little before trying on the same region server " +
1053               "try=" + i + " of " + this.maximumAttempts, t);
1054 
1055             if (maxWaitTime < 0) {
1056               maxWaitTime = EnvironmentEdgeManager.currentTime()
1057                 + this.server.getConfiguration().getLong(
1058                   "hbase.regionserver.rpc.startup.waittime", 60000);
1059             }
1060             try {
1061               long now = EnvironmentEdgeManager.currentTime();
1062               if (now < maxWaitTime) {
1063                 if (LOG.isDebugEnabled()) {
1064                   LOG.debug("Server is not yet up; waiting up to "
1065                     + (maxWaitTime - now) + "ms", t);
1066                 }
1067                 Thread.sleep(100);
1068                 i--; // reset the try count
1069               } else {
1070                 LOG.debug("Server is not up for a while; try a new one", t);
1071                 needNewPlan = true;
1072               }
1073             } catch (InterruptedException ie) {
1074               LOG.warn("Failed to assign "
1075                   + region.getRegionNameAsString() + " since interrupted", ie);
1076               regionStates.updateRegionState(region, State.FAILED_OPEN);
1077               Thread.currentThread().interrupt();
1078               return;
1079             }
1080           } else if (retry) {
1081             i--; // we want to retry as many times as needed as long as the RS is not dead.
1082             if (LOG.isDebugEnabled()) {
1083               LOG.debug(assignMsg + ", trying to assign to the same region server due ", t);
1084             }
1085           } else {
1086             needNewPlan = true;
1087             LOG.warn(assignMsg + ", trying to assign elsewhere instead;" +
1088                 " try=" + i + " of " + this.maximumAttempts, t);
1089           }
1090         }
1091 
1092         if (i == this.maximumAttempts) {
1093           // For meta region, we have to keep retrying until succeeding
1094           if (region.isMetaRegion()) {
1095             i = 0; // re-set attempt count to 0 for at least 1 retry
1096             LOG.warn(assignMsg +
1097                 ", trying to assign a hbase:meta region reached to maximumAttempts (" +
1098                 this.maximumAttempts + ").  Reset attempt counts and continue retrying.");
1099             waitForRetryingMetaAssignment();
1100           }
1101           else {
1102             // Don't reset the region state or get a new plan any more.
1103             // This is the last try.
1104             continue;
1105           }
1106         }
1107 
1108         // If region opened on destination of present plan, reassigning to new
1109         // RS may cause double assignments. In case of RegionAlreadyInTransitionException
1110         // reassigning to same RS.
1111         if (needNewPlan) {
1112           // Force a new plan and reassign. Will return null if no servers.
1113           // The new plan could be the same as the existing plan since we don't
1114           // exclude the server of the original plan, which should not be
1115           // excluded since it could be the only server up now.
1116           RegionPlan newPlan = null;
1117           try {
1118             newPlan = getRegionPlan(region, true);
1119           } catch (HBaseIOException e) {
1120             LOG.warn("Failed to get region plan", e);
1121           }
1122           if (newPlan == null) {
1123             regionStates.updateRegionState(region, State.FAILED_OPEN);
1124             LOG.warn("Unable to find a viable location to assign region " +
1125                 region.getRegionNameAsString());
1126             return;
1127           }
1128 
1129           if (plan != newPlan && !plan.getDestination().equals(newPlan.getDestination())) {
1130             // Clean out plan we failed execute and one that doesn't look like it'll
1131             // succeed anyways; we need a new plan!
1132             // Transition back to OFFLINE
1133             regionStates.updateRegionState(region, State.OFFLINE);
1134             plan = newPlan;
1135           } else if(plan.getDestination().equals(newPlan.getDestination()) &&
1136               previousException instanceof FailedServerException) {
1137             try {
1138               LOG.info("Trying to re-assign " + region.getRegionNameAsString() +
1139                 " to the same failed server.");
1140               Thread.sleep(1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1141                 RpcClient.FAILED_SERVER_EXPIRY_DEFAULT));
1142             } catch (InterruptedException ie) {
1143               LOG.warn("Failed to assign "
1144                   + region.getRegionNameAsString() + " since interrupted", ie);
1145               regionStates.updateRegionState(region, State.FAILED_OPEN);
1146               Thread.currentThread().interrupt();
1147               return;
1148             }
1149           }
1150         }
1151       }
1152       // Run out of attempts
1153       regionStates.updateRegionState(region, State.FAILED_OPEN);
1154     } finally {
1155       metricsAssignmentManager.updateAssignmentTime(EnvironmentEdgeManager.currentTime() - startTime);
1156     }
1157   }
1158 
1159   private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) {
1160     if (this.tableStateManager.isTableState(region.getTable(),
1161             TableState.State.DISABLED,
1162             TableState.State.DISABLING) || replicasToClose.contains(region)) {
1163       LOG.info("Table " + region.getTable() + " is disabled or disabling;"
1164         + " skipping assign of " + region.getRegionNameAsString());
1165       offlineDisabledRegion(region);
1166       return true;
1167     }
1168     return false;
1169   }
1170 
1171   /**
1172    * @param region the region to assign
1173    * @param forceNewPlan If true, then if an existing plan exists, a new plan
1174    * will be generated.
1175    * @return Plan for passed <code>region</code> (If none currently, it creates one or
1176    * if no servers to assign, it returns null).
1177    */
1178   private RegionPlan getRegionPlan(final HRegionInfo region,
1179       final boolean forceNewPlan) throws HBaseIOException {
1180     // Pickup existing plan or make a new one
1181     final String encodedName = region.getEncodedName();
1182     final List<ServerName> destServers =
1183       serverManager.createDestinationServersList();
1184 
1185     if (destServers.isEmpty()){
1186       LOG.warn("Can't move " + encodedName +
1187         ", there is no destination server available.");
1188       return null;
1189     }
1190 
1191     RegionPlan randomPlan = null;
1192     boolean newPlan = false;
1193     RegionPlan existingPlan;
1194 
1195     synchronized (this.regionPlans) {
1196       existingPlan = this.regionPlans.get(encodedName);
1197 
1198       if (existingPlan != null && existingPlan.getDestination() != null) {
1199         LOG.debug("Found an existing plan for " + region.getRegionNameAsString()
1200           + " destination server is " + existingPlan.getDestination() +
1201             " accepted as a dest server = " + destServers.contains(existingPlan.getDestination()));
1202       }
1203 
1204       if (forceNewPlan
1205           || existingPlan == null
1206           || existingPlan.getDestination() == null
1207           || !destServers.contains(existingPlan.getDestination())) {
1208         newPlan = true;
1209         randomPlan = new RegionPlan(region, null,
1210             balancer.randomAssignment(region, destServers));
1211         if (!region.isMetaTable() && shouldAssignRegionsWithFavoredNodes) {
1212           List<HRegionInfo> regions = new ArrayList<HRegionInfo>(1);
1213           regions.add(region);
1214           try {
1215             processFavoredNodes(regions);
1216           } catch (IOException ie) {
1217             LOG.warn("Ignoring exception in processFavoredNodes " + ie);
1218           }
1219         }
1220         this.regionPlans.put(encodedName, randomPlan);
1221       }
1222     }
1223 
1224     if (newPlan) {
1225       if (randomPlan.getDestination() == null) {
1226         LOG.warn("Can't find a destination for " + encodedName);
1227         return null;
1228       }
1229       if (LOG.isDebugEnabled()) {
1230         LOG.debug("No previous transition plan found (or ignoring " +
1231           "an existing plan) for " + region.getRegionNameAsString() +
1232           "; generated random plan=" + randomPlan + "; " + destServers.size() +
1233           " (online=" + serverManager.getOnlineServers().size() +
1234           ") available servers, forceNewPlan=" + forceNewPlan);
1235       }
1236       return randomPlan;
1237     }
1238     if (LOG.isDebugEnabled()) {
1239       LOG.debug("Using pre-existing plan for " +
1240         region.getRegionNameAsString() + "; plan=" + existingPlan);
1241     }
1242     return existingPlan;
1243   }
1244 
1245   /**
1246    * Wait for some time before retrying meta table region assignment
1247    */
1248   private void waitForRetryingMetaAssignment() {
1249     try {
1250       Thread.sleep(this.sleepTimeBeforeRetryingMetaAssignment);
1251     } catch (InterruptedException e) {
1252       LOG.error("Got exception while waiting for hbase:meta assignment");
1253       Thread.currentThread().interrupt();
1254     }
1255   }
1256 
1257   /**
1258    * Unassigns the specified region.
1259    * <p>
1260    * Updates the RegionState and sends the CLOSE RPC unless region is being
1261    * split by regionserver; then the unassign fails (silently) because we
1262    * presume the region being unassigned no longer exists (its been split out
1263    * of existence). TODO: What to do if split fails and is rolled back and
1264    * parent is revivified?
1265    * <p>
1266    * If a RegionPlan is already set, it will remain.
1267    *
1268    * @param region server to be unassigned
1269    */
1270   public void unassign(HRegionInfo region) {
1271     unassign(region, null);
1272   }
1273 
1274 
1275   /**
1276    * Unassigns the specified region.
1277    * <p>
1278    * Updates the RegionState and sends the CLOSE RPC unless region is being
1279    * split by regionserver; then the unassign fails (silently) because we
1280    * presume the region being unassigned no longer exists (its been split out
1281    * of existence). TODO: What to do if split fails and is rolled back and
1282    * parent is revivified?
1283    * <p>
1284    * If a RegionPlan is already set, it will remain.
1285    *
1286    * @param region server to be unassigned
1287    * @param dest the destination server of the region
1288    */
1289   public void unassign(HRegionInfo region, ServerName dest) {
1290     // TODO: Method needs refactoring.  Ugly buried returns throughout.  Beware!
1291     LOG.debug("Starting unassign of " + region.getRegionNameAsString()
1292       + " (offlining), current state: " + regionStates.getRegionState(region));
1293 
1294     String encodedName = region.getEncodedName();
1295     // Grab the state of this region and synchronize on it
1296     // We need a lock here as we're going to do a put later and we don't want multiple states
1297     //  creation
1298     ReentrantLock lock = locker.acquireLock(encodedName);
1299     RegionState state = regionStates.getRegionTransitionState(encodedName);
1300     try {
1301       if (state == null || state.isFailedClose()) {
1302         if (state == null) {
1303           // Region is not in transition.
1304           // We can unassign it only if it's not SPLIT/MERGED.
1305           state = regionStates.getRegionState(encodedName);
1306           if (state != null && state.isUnassignable()) {
1307             LOG.info("Attempting to unassign " + state + ", ignored");
1308             // Offline region will be reassigned below
1309             return;
1310           }
1311           if (state == null || state.getServerName() == null) {
1312             // We don't know where the region is, offline it.
1313             // No need to send CLOSE RPC
1314             LOG.warn("Attempting to unassign a region not in RegionStates"
1315               + region.getRegionNameAsString() + ", offlined");
1316             regionOffline(region);
1317             return;
1318           }
1319         }
1320         state = regionStates.updateRegionState(
1321           region, State.PENDING_CLOSE);
1322       } else if (state.isFailedOpen()) {
1323         // The region is not open yet
1324         regionOffline(region);
1325         return;
1326       } else {
1327         LOG.debug("Attempting to unassign " +
1328           region.getRegionNameAsString() + " but it is " +
1329           "already in transition (" + state.getState());
1330         return;
1331       }
1332 
1333       unassign(region, state.getServerName(), dest);
1334     } finally {
1335       lock.unlock();
1336 
1337       // Region is expected to be reassigned afterwards
1338       if (!replicasToClose.contains(region)
1339           && regionStates.isRegionInState(region, State.OFFLINE)) {
1340         assign(region);
1341       }
1342     }
1343   }
1344 
1345   /**
1346    * Used by unit tests. Return the number of regions opened so far in the life
1347    * of the master. Increases by one every time the master opens a region
1348    * @return the counter value of the number of regions opened so far
1349    */
1350   public int getNumRegionsOpened() {
1351     return numRegionsOpened.get();
1352   }
1353 
1354   /**
1355    * Waits until the specified region has completed assignment.
1356    * <p>
1357    * If the region is already assigned, returns immediately.  Otherwise, method
1358    * blocks until the region is assigned.
1359    * @param regionInfo region to wait on assignment for
1360    * @throws InterruptedException
1361    */
1362   public boolean waitForAssignment(HRegionInfo regionInfo)
1363       throws InterruptedException {
1364     while (!regionStates.isRegionOnline(regionInfo)) {
1365       if (regionStates.isRegionInState(regionInfo, State.FAILED_OPEN)
1366           || this.server.isStopped()) {
1367         return false;
1368       }
1369 
1370       // We should receive a notification, but it's
1371       //  better to have a timeout to recheck the condition here:
1372       //  it lowers the impact of a race condition if any
1373       regionStates.waitForUpdate(100);
1374     }
1375     return true;
1376   }
1377 
1378   /**
1379    * Assigns the hbase:meta region.
1380    * <p>
1381    * Assumes that hbase:meta is currently closed and is not being actively served by
1382    * any RegionServer.
1383    */
1384   public void assignMeta() throws KeeperException {
1385     regionStates.updateRegionState(HRegionInfo.FIRST_META_REGIONINFO, State.OFFLINE);
1386     assign(HRegionInfo.FIRST_META_REGIONINFO);
1387   }
1388 
1389   /**
1390    * Assigns specified regions retaining assignments, if any.
1391    * <p>
1392    * This is a synchronous call and will return once every region has been
1393    * assigned.  If anything fails, an exception is thrown
1394    * @throws InterruptedException
1395    * @throws IOException
1396    */
1397   public void assign(Map<HRegionInfo, ServerName> regions)
1398         throws IOException, InterruptedException {
1399     if (regions == null || regions.isEmpty()) {
1400       return;
1401     }
1402     List<ServerName> servers = serverManager.createDestinationServersList();
1403     if (servers == null || servers.isEmpty()) {
1404       throw new IOException("Found no destination server to assign region(s)");
1405     }
1406 
1407     // Reuse existing assignment info
1408     Map<ServerName, List<HRegionInfo>> bulkPlan =
1409       balancer.retainAssignment(regions, servers);
1410     if (bulkPlan == null) {
1411       throw new IOException("Unable to determine a plan to assign region(s)");
1412     }
1413 
1414     assign(regions.size(), servers.size(),
1415       "retainAssignment=true", bulkPlan);
1416   }
1417 
1418   /**
1419    * Assigns specified regions round robin, if any.
1420    * <p>
1421    * This is a synchronous call and will return once every region has been
1422    * assigned.  If anything fails, an exception is thrown
1423    * @throws InterruptedException
1424    * @throws IOException
1425    */
1426   public void assign(List<HRegionInfo> regions)
1427         throws IOException, InterruptedException {
1428     if (regions == null || regions.isEmpty()) {
1429       return;
1430     }
1431 
1432     List<ServerName> servers = serverManager.createDestinationServersList();
1433     if (servers == null || servers.isEmpty()) {
1434       throw new IOException("Found no destination server to assign region(s)");
1435     }
1436 
1437     // Generate a round-robin bulk assignment plan
1438     Map<ServerName, List<HRegionInfo>> bulkPlan
1439       = balancer.roundRobinAssignment(regions, servers);
1440     if (bulkPlan == null) {
1441       throw new IOException("Unable to determine a plan to assign region(s)");
1442     }
1443 
1444     processFavoredNodes(regions);
1445     assign(regions.size(), servers.size(),
1446       "round-robin=true", bulkPlan);
1447   }
1448 
1449   private void assign(int regions, int totalServers,
1450       String message, Map<ServerName, List<HRegionInfo>> bulkPlan)
1451           throws InterruptedException, IOException {
1452 
1453     int servers = bulkPlan.size();
1454     if (servers == 1 || (regions < bulkAssignThresholdRegions
1455         && servers < bulkAssignThresholdServers)) {
1456 
1457       // Not use bulk assignment.  This could be more efficient in small
1458       // cluster, especially mini cluster for testing, so that tests won't time out
1459       if (LOG.isTraceEnabled()) {
1460         LOG.trace("Not using bulk assignment since we are assigning only " + regions +
1461           " region(s) to " + servers + " server(s)");
1462       }
1463       for (Map.Entry<ServerName, List<HRegionInfo>> plan: bulkPlan.entrySet()) {
1464         if (!assign(plan.getKey(), plan.getValue()) && !server.isStopped()) {
1465           for (HRegionInfo region: plan.getValue()) {
1466             if (!regionStates.isRegionOnline(region)) {
1467               invokeAssign(region);
1468             }
1469           }
1470         }
1471       }
1472     } else {
1473       LOG.info("Bulk assigning " + regions + " region(s) across "
1474         + totalServers + " server(s), " + message);
1475 
1476       // Use fixed count thread pool assigning.
1477       BulkAssigner ba = new GeneralBulkAssigner(
1478         this.server, bulkPlan, this, bulkAssignWaitTillAllAssigned);
1479       ba.bulkAssign();
1480       LOG.info("Bulk assigning done");
1481     }
1482   }
1483 
1484   /**
1485    * Assigns all user regions, if any exist.  Used during cluster startup.
1486    * <p>
1487    * This is a synchronous call and will return once every region has been
1488    * assigned.  If anything fails, an exception is thrown and the cluster
1489    * should be shutdown.
1490    * @throws InterruptedException
1491    * @throws IOException
1492    */
1493   private void assignAllUserRegions(Map<HRegionInfo, ServerName> allRegions)
1494       throws IOException, InterruptedException {
1495     if (allRegions == null || allRegions.isEmpty()) return;
1496 
1497     // Determine what type of assignment to do on startup
1498     boolean retainAssignment = server.getConfiguration().
1499       getBoolean("hbase.master.startup.retainassign", true);
1500 
1501     Set<HRegionInfo> regionsFromMetaScan = allRegions.keySet();
1502     if (retainAssignment) {
1503       assign(allRegions);
1504     } else {
1505       List<HRegionInfo> regions = new ArrayList<HRegionInfo>(regionsFromMetaScan);
1506       assign(regions);
1507     }
1508 
1509     for (HRegionInfo hri : regionsFromMetaScan) {
1510       TableName tableName = hri.getTable();
1511       if (!tableStateManager.isTableState(tableName,
1512               TableState.State.ENABLED)) {
1513         setEnabledTable(tableName);
1514       }
1515     }
1516     // assign all the replicas that were not recorded in the meta
1517     assign(replicaRegionsNotRecordedInMeta(regionsFromMetaScan, (MasterServices)server));
1518   }
1519 
1520   /**
1521    * Get a list of replica regions that are:
1522    * not recorded in meta yet. We might not have recorded the locations
1523    * for the replicas since the replicas may not have been online yet, master restarted
1524    * in the middle of assigning, ZK erased, etc.
1525    * @param regionsRecordedInMeta the list of regions we know are recorded in meta
1526    * either as a default, or, as the location of a replica
1527    * @param master
1528    * @return list of replica regions
1529    * @throws IOException
1530    */
1531   public static List<HRegionInfo> replicaRegionsNotRecordedInMeta(
1532       Set<HRegionInfo> regionsRecordedInMeta, MasterServices master)throws IOException {
1533     List<HRegionInfo> regionsNotRecordedInMeta = new ArrayList<HRegionInfo>();
1534     for (HRegionInfo hri : regionsRecordedInMeta) {
1535       TableName table = hri.getTable();
1536       HTableDescriptor htd = master.getTableDescriptors().get(table);
1537       // look at the HTD for the replica count. That's the source of truth
1538       int desiredRegionReplication = htd.getRegionReplication();
1539       for (int i = 0; i < desiredRegionReplication; i++) {
1540         HRegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hri, i);
1541         if (regionsRecordedInMeta.contains(replica)) continue;
1542         regionsNotRecordedInMeta.add(replica);
1543       }
1544     }
1545     return regionsNotRecordedInMeta;
1546   }
1547 
1548   /**
1549    * Rebuild the list of user regions and assignment information.
1550    * <p>
1551    * Returns a set of servers that are not found to be online that hosted
1552    * some regions.
1553    * @return set of servers not online that hosted some regions per meta
1554    * @throws IOException
1555    */
1556   Set<ServerName> rebuildUserRegions() throws
1557           IOException, KeeperException {
1558     Set<TableName> disabledOrEnablingTables = tableStateManager.getTablesInStates(
1559             TableState.State.DISABLED, TableState.State.ENABLING);
1560 
1561     Set<TableName> disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
1562             TableState.State.DISABLED,
1563             TableState.State.DISABLING,
1564             TableState.State.ENABLING);
1565 
1566     // Region assignment from META
1567     List<Result> results = MetaTableAccessor.fullScanOfMeta(server.getConnection());
1568     // Get any new but slow to checkin region server that joined the cluster
1569     Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
1570     // Set of offline servers to be returned
1571     Set<ServerName> offlineServers = new HashSet<ServerName>();
1572     // Iterate regions in META
1573     for (Result result : results) {
1574       if (result == null && LOG.isDebugEnabled()){
1575         LOG.debug("null result from meta - ignoring but this is strange.");
1576         continue;
1577       }
1578       // keep a track of replicas to close. These were the replicas of the originally
1579       // unmerged regions. The master might have closed them before but it mightn't
1580       // maybe because it crashed.
1581       PairOfSameType<HRegionInfo> p = MetaTableAccessor.getMergeRegions(result);
1582       if (p.getFirst() != null && p.getSecond() != null) {
1583         int numReplicas = ((MasterServices)server).getTableDescriptors().get(p.getFirst().
1584             getTable()).getRegionReplication();
1585         for (HRegionInfo merge : p) {
1586           for (int i = 1; i < numReplicas; i++) {
1587             replicasToClose.add(RegionReplicaUtil.getRegionInfoForReplica(merge, i));
1588           }
1589         }
1590       }
1591       RegionLocations rl =  MetaTableAccessor.getRegionLocations(result);
1592       if (rl == null) continue;
1593       HRegionLocation[] locations = rl.getRegionLocations();
1594       if (locations == null) continue;
1595       for (HRegionLocation hrl : locations) {
1596         HRegionInfo regionInfo = hrl.getRegionInfo();
1597         if (regionInfo == null) continue;
1598         int replicaId = regionInfo.getReplicaId();
1599         State state = RegionStateStore.getRegionState(result, replicaId);
1600         // keep a track of replicas to close. These were the replicas of the split parents
1601         // from the previous life of the master. The master should have closed them before
1602         // but it couldn't maybe because it crashed
1603         if (replicaId == 0 && state.equals(State.SPLIT)) {
1604           for (HRegionLocation h : locations) {
1605             replicasToClose.add(h.getRegionInfo());
1606           }
1607         }
1608         ServerName lastHost = hrl.getServerName();
1609         ServerName regionLocation = RegionStateStore.getRegionServer(result, replicaId);
1610         regionStates.createRegionState(regionInfo, state, regionLocation, lastHost);
1611         if (!regionStates.isRegionInState(regionInfo, State.OPEN)) {
1612           // Region is not open (either offline or in transition), skip
1613           continue;
1614         }
1615         TableName tableName = regionInfo.getTable();
1616         if (!onlineServers.contains(regionLocation)) {
1617           // Region is located on a server that isn't online
1618           offlineServers.add(regionLocation);
1619         } else if (!disabledOrEnablingTables.contains(tableName)) {
1620           // Region is being served and on an active server
1621           // add only if region not in disabled or enabling table
1622           regionStates.regionOnline(regionInfo, regionLocation);
1623           balancer.regionOnline(regionInfo, regionLocation);
1624         }
1625         // need to enable the table if not disabled or disabling or enabling
1626         // this will be used in rolling restarts
1627         if (!disabledOrDisablingOrEnabling.contains(tableName)
1628           && !getTableStateManager().isTableState(tableName,
1629                 TableState.State.ENABLED)) {
1630           setEnabledTable(tableName);
1631         }
1632       }
1633     }
1634     return offlineServers;
1635   }
1636 
1637   /**
1638    * Recover the tables that were not fully moved to DISABLED state. These
1639    * tables are in DISABLING state when the master restarted/switched.
1640    *
1641    * @throws KeeperException
1642    * @throws TableNotFoundException
1643    * @throws IOException
1644    */
1645   private void recoverTableInDisablingState()
1646           throws KeeperException, IOException {
1647     Set<TableName> disablingTables =
1648             tableStateManager.getTablesInStates(TableState.State.DISABLING);
1649     if (disablingTables.size() != 0) {
1650       for (TableName tableName : disablingTables) {
1651         // Recover by calling DisableTableHandler
1652         LOG.info("The table " + tableName
1653             + " is in DISABLING state.  Hence recovering by moving the table"
1654             + " to DISABLED state.");
1655         new DisableTableHandler(this.server, tableName,
1656             this, tableLockManager, true).prepare().process();
1657       }
1658     }
1659   }
1660 
1661   /**
1662    * Recover the tables that are not fully moved to ENABLED state. These tables
1663    * are in ENABLING state when the master restarted/switched
1664    *
1665    * @throws KeeperException
1666    * @throws org.apache.hadoop.hbase.TableNotFoundException
1667    * @throws IOException
1668    */
1669   private void recoverTableInEnablingState()
1670           throws KeeperException, IOException {
1671     Set<TableName> enablingTables = tableStateManager.
1672             getTablesInStates(TableState.State.ENABLING);
1673     if (enablingTables.size() != 0) {
1674       for (TableName tableName : enablingTables) {
1675         // Recover by calling EnableTableHandler
1676         LOG.info("The table " + tableName
1677             + " is in ENABLING state.  Hence recovering by moving the table"
1678             + " to ENABLED state.");
1679         // enableTable in sync way during master startup,
1680         // no need to invoke coprocessor
1681         EnableTableHandler eth = new EnableTableHandler(this.server, tableName,
1682           this, tableLockManager, true);
1683         try {
1684           eth.prepare();
1685         } catch (TableNotFoundException e) {
1686           LOG.warn("Table " + tableName + " not found in hbase:meta to recover.");
1687           continue;
1688         }
1689         eth.process();
1690       }
1691     }
1692   }
1693 
1694   /**
1695    * Processes list of regions in transition at startup
1696    */
1697   void processRegionsInTransition(Collection<RegionState> regionStates) {
1698     // We need to send RPC call again for PENDING_OPEN/PENDING_CLOSE regions
1699     // in case the RPC call is not sent out yet before the master was shut down
1700     // since we update the state before we send the RPC call. We can't update
1701     // the state after the RPC call. Otherwise, we don't know what's happened
1702     // to the region if the master dies right after the RPC call is out.
1703     for (RegionState regionState: regionStates) {
1704       if (!serverManager.isServerOnline(regionState.getServerName())) {
1705         continue; // SSH will handle it
1706       }
1707       RegionState.State state = regionState.getState();
1708       LOG.info("Processing " + regionState);
1709       switch (state) {
1710       case CLOSED:
1711         invokeAssign(regionState.getRegion());
1712         break;
1713       case PENDING_OPEN:
1714         retrySendRegionOpen(regionState);
1715         break;
1716       case PENDING_CLOSE:
1717         retrySendRegionClose(regionState);
1718         break;
1719       default:
1720         // No process for other states
1721       }
1722     }
1723   }
1724 
1725   /**
1726    * At master failover, for pending_open region, make sure
1727    * sendRegionOpen RPC call is sent to the target regionserver
1728    */
1729   private void retrySendRegionOpen(final RegionState regionState) {
1730     this.executorService.submit(
1731       new EventHandler(server, EventType.M_MASTER_RECOVERY) {
1732         @Override
1733         public void process() throws IOException {
1734           HRegionInfo hri = regionState.getRegion();
1735           ServerName serverName = regionState.getServerName();
1736           ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
1737           try {
1738             for (int i = 1; i <= maximumAttempts; i++) {
1739               if (!serverManager.isServerOnline(serverName)
1740                   || server.isStopped() || server.isAborted()) {
1741                 return; // No need any more
1742               }
1743               try {
1744                 if (!regionState.equals(regionStates.getRegionState(hri))) {
1745                   return; // Region is not in the expected state any more
1746                 }
1747                 List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
1748                 if (shouldAssignRegionsWithFavoredNodes) {
1749                   favoredNodes = ((FavoredNodeLoadBalancer)balancer).getFavoredNodes(hri);
1750                 }
1751                 serverManager.sendRegionOpen(serverName, hri, favoredNodes);
1752                 return; // we're done
1753               } catch (Throwable t) {
1754                 if (t instanceof RemoteException) {
1755                   t = ((RemoteException) t).unwrapRemoteException();
1756                 }
1757                 if (t instanceof FailedServerException && i < maximumAttempts) {
1758                   // In case the server is in the failed server list, no point to
1759                   // retry too soon. Retry after the failed_server_expiry time
1760                   try {
1761                     Configuration conf = this.server.getConfiguration();
1762                     long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1763                       RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
1764                     if (LOG.isDebugEnabled()) {
1765                       LOG.debug(serverName + " is on failed server list; waiting "
1766                         + sleepTime + "ms", t);
1767                     }
1768                     Thread.sleep(sleepTime);
1769                     continue;
1770                   } catch (InterruptedException ie) {
1771                     LOG.warn("Failed to assign "
1772                       + hri.getRegionNameAsString() + " since interrupted", ie);
1773                     regionStates.updateRegionState(hri, State.FAILED_OPEN);
1774                     Thread.currentThread().interrupt();
1775                     return;
1776                   }
1777                 }
1778                 if (serverManager.isServerOnline(serverName)
1779                     && t instanceof java.net.SocketTimeoutException) {
1780                   i--; // reset the try count
1781                 } else {
1782                   LOG.info("Got exception in retrying sendRegionOpen for "
1783                     + regionState + "; try=" + i + " of " + maximumAttempts, t);
1784                 }
1785                 Threads.sleep(100);
1786               }
1787             }
1788             // Run out of attempts
1789             regionStates.updateRegionState(hri, State.FAILED_OPEN);
1790           } finally {
1791             lock.unlock();
1792           }
1793         }
1794       });
1795   }
1796 
1797   /**
1798    * At master failover, for pending_close region, make sure
1799    * sendRegionClose RPC call is sent to the target regionserver
1800    */
1801   private void retrySendRegionClose(final RegionState regionState) {
1802     this.executorService.submit(
1803       new EventHandler(server, EventType.M_MASTER_RECOVERY) {
1804         @Override
1805         public void process() throws IOException {
1806           HRegionInfo hri = regionState.getRegion();
1807           ServerName serverName = regionState.getServerName();
1808           ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
1809           try {
1810             for (int i = 1; i <= maximumAttempts; i++) {
1811               if (!serverManager.isServerOnline(serverName)
1812                   || server.isStopped() || server.isAborted()) {
1813                 return; // No need any more
1814               }
1815               try {
1816                 if (!regionState.equals(regionStates.getRegionState(hri))) {
1817                   return; // Region is not in the expected state any more
1818                 }
1819                 serverManager.sendRegionClose(serverName, hri, null);
1820                 return; // Done.
1821               } catch (Throwable t) {
1822                 if (t instanceof RemoteException) {
1823                   t = ((RemoteException) t).unwrapRemoteException();
1824                 }
1825                 if (t instanceof FailedServerException && i < maximumAttempts) {
1826                   // In case the server is in the failed server list, no point to
1827                   // retry too soon. Retry after the failed_server_expiry time
1828                   try {
1829                     Configuration conf = this.server.getConfiguration();
1830                     long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1831                       RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
1832                     if (LOG.isDebugEnabled()) {
1833                       LOG.debug(serverName + " is on failed server list; waiting "
1834                         + sleepTime + "ms", t);
1835                     }
1836                     Thread.sleep(sleepTime);
1837                     continue;
1838                   } catch (InterruptedException ie) {
1839                     LOG.warn("Failed to unassign "
1840                       + hri.getRegionNameAsString() + " since interrupted", ie);
1841                     regionStates.updateRegionState(hri, RegionState.State.FAILED_CLOSE);
1842                     Thread.currentThread().interrupt();
1843                     return;
1844                   }
1845                 }
1846                 if (serverManager.isServerOnline(serverName)
1847                     && t instanceof java.net.SocketTimeoutException) {
1848                   i--; // reset the try count
1849                 } else {
1850                   LOG.info("Got exception in retrying sendRegionClose for "
1851                     + regionState + "; try=" + i + " of " + maximumAttempts, t);
1852                 }
1853                 Threads.sleep(100);
1854               }
1855             }
1856             // Run out of attempts
1857             regionStates.updateRegionState(hri, State.FAILED_CLOSE);
1858           } finally {
1859             lock.unlock();
1860           }
1861         }
1862       });
1863   }
1864 
1865   /**
1866    * Set Regions in transitions metrics.
1867    * This takes an iterator on the RegionInTransition map (CLSM), and is not synchronized.
1868    * This iterator is not fail fast, which may lead to stale read; but that's better than
1869    * creating a copy of the map for metrics computation, as this method will be invoked
1870    * on a frequent interval.
1871    */
1872   public void updateRegionsInTransitionMetrics() {
1873     long currentTime = System.currentTimeMillis();
1874     int totalRITs = 0;
1875     int totalRITsOverThreshold = 0;
1876     long oldestRITTime = 0;
1877     int ritThreshold = this.server.getConfiguration().
1878       getInt(HConstants.METRICS_RIT_STUCK_WARNING_THRESHOLD, 60000);
1879     for (RegionState state: regionStates.getRegionsInTransition().values()) {
1880       totalRITs++;
1881       long ritTime = currentTime - state.getStamp();
1882       if (ritTime > ritThreshold) { // more than the threshold
1883         totalRITsOverThreshold++;
1884       }
1885       if (oldestRITTime < ritTime) {
1886         oldestRITTime = ritTime;
1887       }
1888     }
1889     if (this.metricsAssignmentManager != null) {
1890       this.metricsAssignmentManager.updateRITOldestAge(oldestRITTime);
1891       this.metricsAssignmentManager.updateRITCount(totalRITs);
1892       this.metricsAssignmentManager.updateRITCountOverThreshold(totalRITsOverThreshold);
1893     }
1894   }
1895 
1896   /**
1897    * @param region Region whose plan we are to clear.
1898    */
1899   private void clearRegionPlan(final HRegionInfo region) {
1900     synchronized (this.regionPlans) {
1901       this.regionPlans.remove(region.getEncodedName());
1902     }
1903   }
1904 
1905   /**
1906    * Wait on region to clear regions-in-transition.
1907    * @param hri Region to wait on.
1908    * @throws IOException
1909    */
1910   public void waitOnRegionToClearRegionsInTransition(final HRegionInfo hri)
1911       throws IOException, InterruptedException {
1912     waitOnRegionToClearRegionsInTransition(hri, -1L);
1913   }
1914 
1915   /**
1916    * Wait on region to clear regions-in-transition or time out
1917    * @param hri
1918    * @param timeOut Milliseconds to wait for current region to be out of transition state.
1919    * @return True when a region clears regions-in-transition before timeout otherwise false
1920    * @throws InterruptedException
1921    */
1922   public boolean waitOnRegionToClearRegionsInTransition(final HRegionInfo hri, long timeOut)
1923       throws InterruptedException {
1924     if (!regionStates.isRegionInTransition(hri)) return true;
1925     long end = (timeOut <= 0) ? Long.MAX_VALUE : EnvironmentEdgeManager.currentTime()
1926         + timeOut;
1927     // There is already a timeout monitor on regions in transition so I
1928     // should not have to have one here too?
1929     LOG.info("Waiting for " + hri.getEncodedName() +
1930         " to leave regions-in-transition, timeOut=" + timeOut + " ms.");
1931     while (!this.server.isStopped() && regionStates.isRegionInTransition(hri)) {
1932       regionStates.waitForUpdate(100);
1933       if (EnvironmentEdgeManager.currentTime() > end) {
1934         LOG.info("Timed out on waiting for " + hri.getEncodedName() + " to be assigned.");
1935         return false;
1936       }
1937     }
1938     if (this.server.isStopped()) {
1939       LOG.info("Giving up wait on regions in transition because stoppable.isStopped is set");
1940       return false;
1941     }
1942     return true;
1943   }
1944 
1945   void invokeAssign(HRegionInfo regionInfo) {
1946     threadPoolExecutorService.submit(new AssignCallable(this, regionInfo));
1947   }
1948 
1949   void invokeUnAssign(HRegionInfo regionInfo) {
1950     threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
1951   }
1952 
1953   public boolean isCarryingMeta(ServerName serverName) {
1954     return isCarryingRegion(serverName, HRegionInfo.FIRST_META_REGIONINFO);
1955   }
1956 
1957   /**
1958    * Check if the shutdown server carries the specific region.
1959    * @return whether the serverName currently hosts the region
1960    */
1961   private boolean isCarryingRegion(ServerName serverName, HRegionInfo hri) {
1962     RegionState regionState = regionStates.getRegionTransitionState(hri);
1963     ServerName transitionAddr = regionState != null? regionState.getServerName(): null;
1964     if (transitionAddr != null) {
1965       boolean matchTransitionAddr = transitionAddr.equals(serverName);
1966       LOG.debug("Checking region=" + hri.getRegionNameAsString()
1967         + ", transitioning on server=" + matchTransitionAddr
1968         + " server being checked: " + serverName
1969         + ", matches=" + matchTransitionAddr);
1970       return matchTransitionAddr;
1971     }
1972 
1973     ServerName assignedAddr = regionStates.getRegionServerOfRegion(hri);
1974     boolean matchAssignedAddr = serverName.equals(assignedAddr);
1975     LOG.debug("based on AM, current region=" + hri.getRegionNameAsString()
1976       + " is on server=" + assignedAddr + ", server being checked: "
1977       + serverName);
1978     return matchAssignedAddr;
1979   }
1980 
1981   /**
1982    * Process shutdown server removing any assignments.
1983    * @param sn Server that went down.
1984    * @return list of regions in transition on this server
1985    */
1986   public List<HRegionInfo> processServerShutdown(final ServerName sn) {
1987     // Clean out any existing assignment plans for this server
1988     synchronized (this.regionPlans) {
1989       for (Iterator <Map.Entry<String, RegionPlan>> i =
1990           this.regionPlans.entrySet().iterator(); i.hasNext();) {
1991         Map.Entry<String, RegionPlan> e = i.next();
1992         ServerName otherSn = e.getValue().getDestination();
1993         // The name will be null if the region is planned for a random assign.
1994         if (otherSn != null && otherSn.equals(sn)) {
1995           // Use iterator's remove else we'll get CME
1996           i.remove();
1997         }
1998       }
1999     }
2000     List<HRegionInfo> rits = regionStates.serverOffline(sn);
2001     for (Iterator<HRegionInfo> it = rits.iterator(); it.hasNext(); ) {
2002       HRegionInfo hri = it.next();
2003       String encodedName = hri.getEncodedName();
2004 
2005       // We need a lock on the region as we could update it
2006       Lock lock = locker.acquireLock(encodedName);
2007       try {
2008         RegionState regionState =
2009           regionStates.getRegionTransitionState(encodedName);
2010         if (regionState == null
2011             || (regionState.getServerName() != null && !regionState.isOnServer(sn))
2012             || !RegionStates.isOneOfStates(regionState, State.PENDING_OPEN,
2013                 State.OPENING, State.FAILED_OPEN, State.FAILED_CLOSE, State.OFFLINE)) {
2014           LOG.info("Skip " + regionState + " since it is not opening/failed_close"
2015             + " on the dead server any more: " + sn);
2016           it.remove();
2017         } else {
2018           if (tableStateManager.isTableState(hri.getTable(),
2019                   TableState.State.DISABLED, TableState.State.DISABLING)) {
2020             regionStates.regionOffline(hri);
2021             it.remove();
2022             continue;
2023           }
2024           // Mark the region offline and assign it again by SSH
2025           regionStates.updateRegionState(hri, State.OFFLINE);
2026         }
2027       } finally {
2028         lock.unlock();
2029       }
2030     }
2031     return rits;
2032   }
2033 
2034   /**
2035    * @param plan Plan to execute.
2036    */
2037   public void balance(final RegionPlan plan) {
2038     HRegionInfo hri = plan.getRegionInfo();
2039     TableName tableName = hri.getTable();
2040     if (tableStateManager.isTableState(tableName,
2041             TableState.State.DISABLED, TableState.State.DISABLING)) {
2042       LOG.info("Ignored moving region of disabling/disabled table "
2043         + tableName);
2044       return;
2045     }
2046 
2047     // Move the region only if it's assigned
2048     String encodedName = hri.getEncodedName();
2049     ReentrantLock lock = locker.acquireLock(encodedName);
2050     try {
2051       if (!regionStates.isRegionOnline(hri)) {
2052         RegionState state = regionStates.getRegionState(encodedName);
2053         LOG.info("Ignored moving region not assigned: " + hri + ", "
2054           + (state == null ? "not in region states" : state));
2055         return;
2056       }
2057       synchronized (this.regionPlans) {
2058         this.regionPlans.put(plan.getRegionName(), plan);
2059       }
2060       unassign(hri, plan.getDestination());
2061     } finally {
2062       lock.unlock();
2063     }
2064   }
2065 
2066   public void stop() {
2067     // Shutdown the threadpool executor service
2068     threadPoolExecutorService.shutdownNow();
2069     regionStateStore.stop();
2070   }
2071 
2072   protected void setEnabledTable(TableName tableName) {
2073     try {
2074       this.tableStateManager.setTableState(tableName,
2075               TableState.State.ENABLED);
2076     } catch (IOException e) {
2077       // here we can abort as it is the start up flow
2078       String errorMsg = "Unable to ensure that the table " + tableName
2079           + " will be" + " enabled because of a ZooKeeper issue";
2080       LOG.error(errorMsg);
2081       this.server.abort(errorMsg, e);
2082     }
2083   }
2084 
2085   private String onRegionFailedOpen(final RegionState current,
2086       final HRegionInfo hri, final ServerName serverName) {
2087     // The region must be opening on this server.
2088     // If current state is failed_open on the same server,
2089     // it could be a reportRegionTransition RPC retry.
2090     if (current == null || !current.isOpeningOrFailedOpenOnServer(serverName)) {
2091       return hri.getShortNameToLog() + " is not opening on " + serverName;
2092     }
2093 
2094     // Just return in case of retrying
2095     if (current.isFailedOpen()) {
2096       return null;
2097     }
2098 
2099     String encodedName = hri.getEncodedName();
2100     AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
2101     if (failedOpenCount == null) {
2102       failedOpenCount = new AtomicInteger();
2103       // No need to use putIfAbsent, or extra synchronization since
2104       // this whole handleRegion block is locked on the encoded region
2105       // name, and failedOpenTracker is updated only in this block
2106       failedOpenTracker.put(encodedName, failedOpenCount);
2107     }
2108     if (failedOpenCount.incrementAndGet() >= maximumAttempts && !hri.isMetaRegion()) {
2109       regionStates.updateRegionState(hri, State.FAILED_OPEN);
2110       // remove the tracking info to save memory, also reset
2111       // the count for next open initiative
2112       failedOpenTracker.remove(encodedName);
2113     } else {
2114       if (hri.isMetaRegion() && failedOpenCount.get() >= maximumAttempts) {
2115         // Log a warning message if a meta region failedOpenCount exceeds maximumAttempts
2116         // so that we are aware of potential problem if it persists for a long time.
2117         LOG.warn("Failed to open the hbase:meta region " +
2118             hri.getRegionNameAsString() + " after" +
2119             failedOpenCount.get() + " retries. Continue retrying.");
2120       }
2121 
2122       // Handle this the same as if it were opened and then closed.
2123       RegionState regionState = regionStates.updateRegionState(hri, State.CLOSED);
2124       if (regionState != null) {
2125         // When there are more than one region server a new RS is selected as the
2126         // destination and the same is updated in the region plan. (HBASE-5546)
2127         if (getTableStateManager().isTableState(hri.getTable(),
2128                 TableState.State.DISABLED, TableState.State.DISABLING) ||
2129                 replicasToClose.contains(hri)) {
2130           offlineDisabledRegion(hri);
2131           return null;
2132         }
2133         regionStates.updateRegionState(hri, RegionState.State.CLOSED);
2134         // This below has to do w/ online enable/disable of a table
2135         removeClosedRegion(hri);
2136         try {
2137           getRegionPlan(hri, true);
2138         } catch (HBaseIOException e) {
2139           LOG.warn("Failed to get region plan", e);
2140         }
2141         invokeAssign(hri);
2142       }
2143     }
2144     // Null means no error
2145     return null;
2146   }
2147 
2148   private String onRegionOpen(final RegionState current, final HRegionInfo hri,
2149       final ServerName serverName, final RegionStateTransition transition) {
2150     // The region must be opening on this server.
2151     // If current state is already opened on the same server,
2152     // it could be a reportRegionTransition RPC retry.
2153     if (current == null || !current.isOpeningOrOpenedOnServer(serverName)) {
2154       return hri.getShortNameToLog() + " is not opening on " + serverName;
2155     }
2156 
2157     // Just return in case of retrying
2158     if (current.isOpened()) {
2159       return null;
2160     }
2161 
2162     long openSeqNum = transition.hasOpenSeqNum()
2163       ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM;
2164     if (openSeqNum < 0) {
2165       return "Newly opened region has invalid open seq num " + openSeqNum;
2166     }
2167     regionOnline(hri, serverName, openSeqNum);
2168 
2169     // reset the count, if any
2170     failedOpenTracker.remove(hri.getEncodedName());
2171     if (getTableStateManager().isTableState(hri.getTable(),
2172             TableState.State.DISABLED, TableState.State.DISABLING)) {
2173       invokeUnAssign(hri);
2174     }
2175     return null;
2176   }
2177 
2178   private String onRegionClosed(final RegionState current,
2179       final HRegionInfo hri, final ServerName serverName) {
2180     // Region will be usually assigned right after closed. When a RPC retry comes
2181     // in, the region may already have moved away from closed state. However, on the
2182     // region server side, we don't care much about the response for this transition.
2183     // We only make sure master has got and processed this report, either
2184     // successfully or not. So this is fine, not a problem at all.
2185     if (current == null || !current.isClosingOrClosedOnServer(serverName)) {
2186       return hri.getShortNameToLog() + " is not closing on " + serverName;
2187     }
2188 
2189     // Just return in case of retrying
2190     if (current.isClosed()) {
2191       return null;
2192     }
2193 
2194     if (getTableStateManager().isTableState(hri.getTable(), TableState.State.DISABLED,
2195         TableState.State.DISABLING) || replicasToClose.contains(hri)) {
2196       offlineDisabledRegion(hri);
2197       return null;
2198     }
2199 
2200     regionStates.updateRegionState(hri, RegionState.State.CLOSED);
2201     sendRegionClosedNotification(hri);
2202     // This below has to do w/ online enable/disable of a table
2203     removeClosedRegion(hri);
2204     invokeAssign(hri);
2205     return null;
2206   }
2207 
2208   private String onRegionReadyToSplit(final RegionState current, final HRegionInfo hri,
2209       final ServerName serverName, final RegionStateTransition transition) {
2210     // The region must be opened on this server.
2211     // If current state is already splitting on the same server,
2212     // it could be a reportRegionTransition RPC retry.
2213     if (current == null || !current.isSplittingOrOpenedOnServer(serverName)) {
2214       return hri.getShortNameToLog() + " is not opening on " + serverName;
2215     }
2216 
2217     // Just return in case of retrying
2218     if (current.isSplitting()) {
2219       return null;
2220     }
2221 
2222     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2223     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2224     RegionState rs_a = regionStates.getRegionState(a);
2225     RegionState rs_b = regionStates.getRegionState(b);
2226     if (rs_a != null || rs_b != null) {
2227       return "Some daughter is already existing. "
2228         + "a=" + rs_a + ", b=" + rs_b;
2229     }
2230 
2231     // Server holding is not updated at this stage.
2232     // It is done after PONR.
2233     regionStates.updateRegionState(hri, State.SPLITTING);
2234     regionStates.createRegionState(
2235       a, State.SPLITTING_NEW, serverName, null);
2236     regionStates.createRegionState(
2237       b, State.SPLITTING_NEW, serverName, null);
2238     return null;
2239   }
2240 
2241   private String onRegionSplitPONR(final RegionState current, final HRegionInfo hri,
2242       final ServerName serverName, final RegionStateTransition transition) {
2243     // The region must be splitting on this server, and the daughters must be in
2244     // splitting_new state. To check RPC retry, we use server holding info.
2245     if (current == null || !current.isSplittingOnServer(serverName)) {
2246       return hri.getShortNameToLog() + " is not splitting on " + serverName;
2247     }
2248 
2249     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2250     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2251     RegionState rs_a = regionStates.getRegionState(a);
2252     RegionState rs_b = regionStates.getRegionState(b);
2253 
2254     // Master could have restarted and lost the new region
2255     // states, if so, they must be lost together
2256     if (rs_a == null && rs_b == null) {
2257       rs_a = regionStates.createRegionState(
2258         a, State.SPLITTING_NEW, serverName, null);
2259       rs_b = regionStates.createRegionState(
2260         b, State.SPLITTING_NEW, serverName, null);
2261     }
2262 
2263     if (rs_a == null || !rs_a.isSplittingNewOnServer(serverName)
2264         || rs_b == null || !rs_b.isSplittingNewOnServer(serverName)) {
2265       return "Some daughter is not known to be splitting on " + serverName
2266         + ", a=" + rs_a + ", b=" + rs_b;
2267     }
2268 
2269     // Just return in case of retrying
2270     if (!regionStates.isRegionOnServer(hri, serverName)) {
2271       return null;
2272     }
2273 
2274     try {
2275       regionStates.splitRegion(hri, a, b, serverName);
2276     } catch (IOException ioe) {
2277       LOG.info("Failed to record split region " + hri.getShortNameToLog());
2278       return "Failed to record the splitting in meta";
2279     }
2280     return null;
2281   }
2282 
2283   private String onRegionSplit(final RegionState current, final HRegionInfo hri,
2284       final ServerName serverName, final RegionStateTransition transition) {
2285     // The region must be splitting on this server, and the daughters must be in
2286     // splitting_new state.
2287     // If current state is already split on the same server,
2288     // it could be a reportRegionTransition RPC retry.
2289     if (current == null || !current.isSplittingOrSplitOnServer(serverName)) {
2290       return hri.getShortNameToLog() + " is not splitting on " + serverName;
2291     }
2292 
2293     // Just return in case of retrying
2294     if (current.isSplit()) {
2295       return null;
2296     }
2297 
2298     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2299     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2300     RegionState rs_a = regionStates.getRegionState(a);
2301     RegionState rs_b = regionStates.getRegionState(b);
2302     if (rs_a == null || !rs_a.isSplittingNewOnServer(serverName)
2303         || rs_b == null || !rs_b.isSplittingNewOnServer(serverName)) {
2304       return "Some daughter is not known to be splitting on " + serverName
2305         + ", a=" + rs_a + ", b=" + rs_b;
2306     }
2307 
2308     if (TEST_SKIP_SPLIT_HANDLING) {
2309       return "Skipping split message, TEST_SKIP_SPLIT_HANDLING is set";
2310     }
2311     regionOffline(hri, State.SPLIT);
2312     regionOnline(a, serverName, 1);
2313     regionOnline(b, serverName, 1);
2314 
2315     // User could disable the table before master knows the new region.
2316     if (getTableStateManager().isTableState(hri.getTable(),
2317         TableState.State.DISABLED, TableState.State.DISABLING)) {
2318       invokeUnAssign(a);
2319       invokeUnAssign(b);
2320     } else {
2321       Callable<Object> splitReplicasCallable = new Callable<Object>() {
2322         @Override
2323         public Object call() {
2324           doSplittingOfReplicas(hri, a, b);
2325           return null;
2326         }
2327       };
2328       threadPoolExecutorService.submit(splitReplicasCallable);
2329     }
2330     return null;
2331   }
2332 
2333   private String onRegionSplitReverted(final RegionState current, final HRegionInfo hri,
2334       final ServerName serverName, final RegionStateTransition transition) {
2335     // The region must be splitting on this server, and the daughters must be in
2336     // splitting_new state.
2337     // If the region is in open state, it could be an RPC retry.
2338     if (current == null || !current.isSplittingOrOpenedOnServer(serverName)) {
2339       return hri.getShortNameToLog() + " is not splitting on " + serverName;
2340     }
2341 
2342     // Just return in case of retrying
2343     if (current.isOpened()) {
2344       return null;
2345     }
2346 
2347     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2348     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2349     RegionState rs_a = regionStates.getRegionState(a);
2350     RegionState rs_b = regionStates.getRegionState(b);
2351     if (rs_a == null || !rs_a.isSplittingNewOnServer(serverName)
2352         || rs_b == null || !rs_b.isSplittingNewOnServer(serverName)) {
2353       return "Some daughter is not known to be splitting on " + serverName
2354         + ", a=" + rs_a + ", b=" + rs_b;
2355     }
2356 
2357     regionOnline(hri, serverName);
2358     regionOffline(a);
2359     regionOffline(b);
2360     if (getTableStateManager().isTableState(hri.getTable(),
2361         TableState.State.DISABLED, TableState.State.DISABLING)) {
2362       invokeUnAssign(hri);
2363     }
2364     return null;
2365   }
2366 
2367   private String onRegionReadyToMerge(final RegionState current, final HRegionInfo hri,
2368       final ServerName serverName, final RegionStateTransition transition) {
2369     // The region must be new, and the daughters must be open on this server.
2370     // If the region is in merge_new state, it could be an RPC retry.
2371     if (current != null && !current.isMergingNewOnServer(serverName)) {
2372       return "Merging daughter region already exists, p=" + current;
2373     }
2374 
2375     // Just return in case of retrying
2376     if (current != null) {
2377       return null;
2378     }
2379 
2380     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2381     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2382     Set<String> encodedNames = new HashSet<String>(2);
2383     encodedNames.add(a.getEncodedName());
2384     encodedNames.add(b.getEncodedName());
2385     Map<String, Lock> locks = locker.acquireLocks(encodedNames);
2386     try {
2387       RegionState rs_a = regionStates.getRegionState(a);
2388       RegionState rs_b = regionStates.getRegionState(b);
2389       if (rs_a == null || !rs_a.isOpenedOnServer(serverName)
2390           || rs_b == null || !rs_b.isOpenedOnServer(serverName)) {
2391         return "Some daughter is not in a state to merge on " + serverName
2392           + ", a=" + rs_a + ", b=" + rs_b;
2393       }
2394 
2395       regionStates.updateRegionState(a, State.MERGING);
2396       regionStates.updateRegionState(b, State.MERGING);
2397       regionStates.createRegionState(
2398         hri, State.MERGING_NEW, serverName, null);
2399       return null;
2400     } finally {
2401       for (Lock lock: locks.values()) {
2402         lock.unlock();
2403       }
2404     }
2405   }
2406 
2407   private String onRegionMergePONR(final RegionState current, final HRegionInfo hri,
2408       final ServerName serverName, final RegionStateTransition transition) {
2409     // The region must be in merging_new state, and the daughters must be
2410     // merging. To check RPC retry, we use server holding info.
2411     if (current != null && !current.isMergingNewOnServer(serverName)) {
2412       return hri.getShortNameToLog() + " is not merging on " + serverName;
2413     }
2414 
2415     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2416     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2417     RegionState rs_a = regionStates.getRegionState(a);
2418     RegionState rs_b = regionStates.getRegionState(b);
2419     if (rs_a == null || !rs_a.isMergingOnServer(serverName)
2420         || rs_b == null || !rs_b.isMergingOnServer(serverName)) {
2421       return "Some daughter is not known to be merging on " + serverName
2422         + ", a=" + rs_a + ", b=" + rs_b;
2423     }
2424 
2425     // Master could have restarted and lost the new region state
2426     if (current == null) {
2427       regionStates.createRegionState(
2428         hri, State.MERGING_NEW, serverName, null);
2429     }
2430 
2431     // Just return in case of retrying
2432     if (regionStates.isRegionOnServer(hri, serverName)) {
2433       return null;
2434     }
2435 
2436     try {
2437       regionStates.mergeRegions(hri, a, b, serverName);
2438     } catch (IOException ioe) {
2439       LOG.info("Failed to record merged region " + hri.getShortNameToLog());
2440       return "Failed to record the merging in meta";
2441     }
2442     return null;
2443   }
2444 
2445   private String onRegionMerged(final RegionState current, final HRegionInfo hri,
2446       final ServerName serverName, final RegionStateTransition transition) {
2447     // The region must be in merging_new state, and the daughters must be
2448     // merging on this server.
2449     // If current state is already opened on the same server,
2450     // it could be a reportRegionTransition RPC retry.
2451     if (current == null || !current.isMergingNewOrOpenedOnServer(serverName)) {
2452       return hri.getShortNameToLog() + " is not merging on " + serverName;
2453     }
2454 
2455     // Just return in case of retrying
2456     if (current.isOpened()) {
2457       return null;
2458     }
2459 
2460     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2461     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2462     RegionState rs_a = regionStates.getRegionState(a);
2463     RegionState rs_b = regionStates.getRegionState(b);
2464     if (rs_a == null || !rs_a.isMergingOnServer(serverName)
2465         || rs_b == null || !rs_b.isMergingOnServer(serverName)) {
2466       return "Some daughter is not known to be merging on " + serverName
2467         + ", a=" + rs_a + ", b=" + rs_b;
2468     }
2469 
2470     regionOffline(a, State.MERGED);
2471     regionOffline(b, State.MERGED);
2472     regionOnline(hri, serverName, 1);
2473 
2474     // User could disable the table before master knows the new region.
2475     if (getTableStateManager().isTableState(hri.getTable(),
2476         TableState.State.DISABLED, TableState.State.DISABLING)) {
2477       invokeUnAssign(hri);
2478     } else {
2479       Callable<Object> mergeReplicasCallable = new Callable<Object>() {
2480         @Override
2481         public Object call() {
2482           doMergingOfReplicas(hri, a, b);
2483           return null;
2484         }
2485       };
2486       threadPoolExecutorService.submit(mergeReplicasCallable);
2487     }
2488     return null;
2489   }
2490 
2491   private String onRegionMergeReverted(final RegionState current, final HRegionInfo hri,
2492       final ServerName serverName, final RegionStateTransition transition) {
2493     // The region must be in merging_new state, and the daughters must be
2494     // merging on this server.
2495     // If the region is in offline state, it could be an RPC retry.
2496     if (current == null || !current.isMergingNewOrOfflineOnServer(serverName)) {
2497       return hri.getShortNameToLog() + " is not merging on " + serverName;
2498     }
2499 
2500     // Just return in case of retrying
2501     if (current.isOffline()) {
2502       return null;
2503     }
2504 
2505     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2506     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2507     RegionState rs_a = regionStates.getRegionState(a);
2508     RegionState rs_b = regionStates.getRegionState(b);
2509     if (rs_a == null || !rs_a.isMergingOnServer(serverName)
2510         || rs_b == null || !rs_b.isMergingOnServer(serverName)) {
2511       return "Some daughter is not known to be merging on " + serverName
2512         + ", a=" + rs_a + ", b=" + rs_b;
2513     }
2514 
2515     regionOnline(a, serverName);
2516     regionOnline(b, serverName);
2517     regionOffline(hri);
2518 
2519     if (getTableStateManager().isTableState(hri.getTable(),
2520         TableState.State.DISABLED, TableState.State.DISABLING)) {
2521       invokeUnAssign(a);
2522       invokeUnAssign(b);
2523     }
2524     return null;
2525   }
2526 
2527   private void doMergingOfReplicas(HRegionInfo mergedHri, final HRegionInfo hri_a,
2528       final HRegionInfo hri_b) {
2529     // Close replicas for the original unmerged regions. create/assign new replicas
2530     // for the merged parent.
2531     List<HRegionInfo> unmergedRegions = new ArrayList<HRegionInfo>();
2532     unmergedRegions.add(hri_a);
2533     unmergedRegions.add(hri_b);
2534     Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(unmergedRegions);
2535     Collection<List<HRegionInfo>> c = map.values();
2536     for (List<HRegionInfo> l : c) {
2537       for (HRegionInfo h : l) {
2538         if (!RegionReplicaUtil.isDefaultReplica(h)) {
2539           LOG.debug("Unassigning un-merged replica " + h);
2540           unassign(h);
2541         }
2542       }
2543     }
2544     int numReplicas = 1;
2545     try {
2546       numReplicas = ((MasterServices)server).getTableDescriptors().get(mergedHri.getTable()).
2547           getRegionReplication();
2548     } catch (IOException e) {
2549       LOG.warn("Couldn't get the replication attribute of the table " + mergedHri.getTable() +
2550           " due to " + e.getMessage() + ". The assignment of replicas for the merged region " +
2551           "will not be done");
2552     }
2553     List<HRegionInfo> regions = new ArrayList<HRegionInfo>();
2554     for (int i = 1; i < numReplicas; i++) {
2555       regions.add(RegionReplicaUtil.getRegionInfoForReplica(mergedHri, i));
2556     }
2557     try {
2558       assign(regions);
2559     } catch (IOException ioe) {
2560       LOG.warn("Couldn't assign all replica(s) of region " + mergedHri + " because of " +
2561                 ioe.getMessage());
2562     } catch (InterruptedException ie) {
2563       LOG.warn("Couldn't assign all replica(s) of region " + mergedHri+ " because of " +
2564                 ie.getMessage());
2565     }
2566   }
2567 
2568   private void doSplittingOfReplicas(final HRegionInfo parentHri, final HRegionInfo hri_a,
2569       final HRegionInfo hri_b) {
2570     // create new regions for the replica, and assign them to match with the
2571     // current replica assignments. If replica1 of parent is assigned to RS1,
2572     // the replica1s of daughters will be on the same machine
2573     int numReplicas = 1;
2574     try {
2575       numReplicas = ((MasterServices)server).getTableDescriptors().get(parentHri.getTable()).
2576           getRegionReplication();
2577     } catch (IOException e) {
2578       LOG.warn("Couldn't get the replication attribute of the table " + parentHri.getTable() +
2579           " due to " + e.getMessage() + ". The assignment of daughter replicas " +
2580           "replicas will not be done");
2581     }
2582     // unassign the old replicas
2583     List<HRegionInfo> parentRegion = new ArrayList<HRegionInfo>();
2584     parentRegion.add(parentHri);
2585     Map<ServerName, List<HRegionInfo>> currentAssign =
2586         regionStates.getRegionAssignments(parentRegion);
2587     Collection<List<HRegionInfo>> c = currentAssign.values();
2588     for (List<HRegionInfo> l : c) {
2589       for (HRegionInfo h : l) {
2590         if (!RegionReplicaUtil.isDefaultReplica(h)) {
2591           LOG.debug("Unassigning parent's replica " + h);
2592           unassign(h);
2593         }
2594       }
2595     }
2596     // assign daughter replicas
2597     Map<HRegionInfo, ServerName> map = new HashMap<HRegionInfo, ServerName>();
2598     for (int i = 1; i < numReplicas; i++) {
2599       prepareDaughterReplicaForAssignment(hri_a, parentHri, i, map);
2600       prepareDaughterReplicaForAssignment(hri_b, parentHri, i, map);
2601     }
2602     try {
2603       assign(map);
2604     } catch (IOException e) {
2605       LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)");
2606     } catch (InterruptedException e) {
2607       LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)");
2608     }
2609   }
2610 
2611   private void prepareDaughterReplicaForAssignment(HRegionInfo daughterHri, HRegionInfo parentHri,
2612       int replicaId, Map<HRegionInfo, ServerName> map) {
2613     HRegionInfo parentReplica = RegionReplicaUtil.getRegionInfoForReplica(parentHri, replicaId);
2614     HRegionInfo daughterReplica = RegionReplicaUtil.getRegionInfoForReplica(daughterHri,
2615         replicaId);
2616     LOG.debug("Created replica region for daughter " + daughterReplica);
2617     ServerName sn;
2618     if ((sn = regionStates.getRegionServerOfRegion(parentReplica)) != null) {
2619       map.put(daughterReplica, sn);
2620     } else {
2621       List<ServerName> servers = serverManager.getOnlineServersList();
2622       sn = servers.get((new Random(System.currentTimeMillis())).nextInt(servers.size()));
2623       map.put(daughterReplica, sn);
2624     }
2625   }
2626 
2627   public Set<HRegionInfo> getReplicasToClose() {
2628     return replicasToClose;
2629   }
2630 
2631   /**
2632    * A region is offline.  The new state should be the specified one,
2633    * if not null.  If the specified state is null, the new state is Offline.
2634    * The specified state can be Split/Merged/Offline/null only.
2635    */
2636   private void regionOffline(final HRegionInfo regionInfo, final State state) {
2637     regionStates.regionOffline(regionInfo, state);
2638     removeClosedRegion(regionInfo);
2639     // remove the region plan as well just in case.
2640     clearRegionPlan(regionInfo);
2641     balancer.regionOffline(regionInfo);
2642 
2643     // Tell our listeners that a region was closed
2644     sendRegionClosedNotification(regionInfo);
2645     // also note that all the replicas of the primary should be closed
2646     if (state != null && state.equals(State.SPLIT)) {
2647       Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1);
2648       c.add(regionInfo);
2649       Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(c);
2650       Collection<List<HRegionInfo>> allReplicas = map.values();
2651       for (List<HRegionInfo> list : allReplicas) {
2652         replicasToClose.addAll(list);
2653       }
2654     }
2655     else if (state != null && state.equals(State.MERGED)) {
2656       Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1);
2657       c.add(regionInfo);
2658       Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(c);
2659       Collection<List<HRegionInfo>> allReplicas = map.values();
2660       for (List<HRegionInfo> list : allReplicas) {
2661         replicasToClose.addAll(list);
2662       }
2663     }
2664   }
2665 
2666   private void sendRegionOpenedNotification(final HRegionInfo regionInfo,
2667       final ServerName serverName) {
2668     if (!this.listeners.isEmpty()) {
2669       for (AssignmentListener listener : this.listeners) {
2670         listener.regionOpened(regionInfo, serverName);
2671       }
2672     }
2673   }
2674 
2675   private void sendRegionClosedNotification(final HRegionInfo regionInfo) {
2676     if (!this.listeners.isEmpty()) {
2677       for (AssignmentListener listener : this.listeners) {
2678         listener.regionClosed(regionInfo);
2679       }
2680     }
2681   }
2682 
2683   /**
2684    * Try to update some region states. If the state machine prevents
2685    * such update, an error message is returned to explain the reason.
2686    *
2687    * It's expected that in each transition there should have just one
2688    * region for opening/closing, 3 regions for splitting/merging.
2689    * These regions should be on the server that requested the change.
2690    *
2691    * Region state machine. Only these transitions
2692    * are expected to be triggered by a region server.
2693    *
2694    * On the state transition:
2695    *  (1) Open/Close should be initiated by master
2696    *      (a) Master sets the region to pending_open/pending_close
2697    *        in memory and hbase:meta after sending the request
2698    *        to the region server
2699    *      (b) Region server reports back to the master
2700    *        after open/close is done (either success/failure)
2701    *      (c) If region server has problem to report the status
2702    *        to master, it must be because the master is down or some
2703    *        temporary network issue. Otherwise, the region server should
2704    *        abort since it must be a bug. If the master is not accessible,
2705    *        the region server should keep trying until the server is
2706    *        stopped or till the status is reported to the (new) master
2707    *      (d) If region server dies in the middle of opening/closing
2708    *        a region, SSH picks it up and finishes it
2709    *      (e) If master dies in the middle, the new master recovers
2710    *        the state during initialization from hbase:meta. Region server
2711    *        can report any transition that has not been reported to
2712    *        the previous active master yet
2713    *  (2) Split/merge is initiated by region servers
2714    *      (a) To split a region, a region server sends a request
2715    *        to master to try to set a region to splitting, together with
2716    *        two daughters (to be created) to splitting new. If approved
2717    *        by the master, the splitting can then move ahead
2718    *      (b) To merge two regions, a region server sends a request to
2719    *        master to try to set the new merged region (to be created) to
2720    *        merging_new, together with two regions (to be merged) to merging.
2721    *        If it is ok with the master, the merge can then move ahead
2722    *      (c) Once the splitting/merging is done, the region server
2723    *        reports the status back to the master either success/failure.
2724    *      (d) Other scenarios should be handled similarly as for
2725    *        region open/close
2726    */
2727   protected String onRegionTransition(final ServerName serverName,
2728       final RegionStateTransition transition) {
2729     TransitionCode code = transition.getTransitionCode();
2730     HRegionInfo hri = HRegionInfo.convert(transition.getRegionInfo(0));
2731     Lock lock = locker.acquireLock(hri.getEncodedName());
2732     try {
2733       RegionState current = regionStates.getRegionState(hri);
2734       if (LOG.isDebugEnabled()) {
2735         LOG.debug("Got transition " + code + " for "
2736           + (current != null ? current.toString() : hri.getShortNameToLog())
2737           + " from " + serverName);
2738       }
2739       String errorMsg = null;
2740       switch (code) {
2741       case OPENED:
2742         errorMsg = onRegionOpen(current, hri, serverName, transition);
2743         break;
2744       case FAILED_OPEN:
2745         errorMsg = onRegionFailedOpen(current, hri, serverName);
2746         break;
2747       case CLOSED:
2748         errorMsg = onRegionClosed(current, hri, serverName);
2749         break;
2750       case READY_TO_SPLIT:
2751         errorMsg = onRegionReadyToSplit(current, hri, serverName, transition);
2752         break;
2753       case SPLIT_PONR:
2754         errorMsg = onRegionSplitPONR(current, hri, serverName, transition);
2755         break;
2756       case SPLIT:
2757         errorMsg = onRegionSplit(current, hri, serverName, transition);
2758         break;
2759       case SPLIT_REVERTED:
2760         errorMsg = onRegionSplitReverted(current, hri, serverName, transition);
2761         break;
2762       case READY_TO_MERGE:
2763         errorMsg = onRegionReadyToMerge(current, hri, serverName, transition);
2764         break;
2765       case MERGE_PONR:
2766         errorMsg = onRegionMergePONR(current, hri, serverName, transition);
2767         break;
2768       case MERGED:
2769         errorMsg = onRegionMerged(current, hri, serverName, transition);
2770         break;
2771       case MERGE_REVERTED:
2772         errorMsg = onRegionMergeReverted(current, hri, serverName, transition);
2773         break;
2774 
2775       default:
2776         errorMsg = "Unexpected transition code " + code;
2777       }
2778       if (errorMsg != null) {
2779         LOG.info("Could not transition region from " + current + " on "
2780           + code + " by " + serverName + ": " + errorMsg);
2781       }
2782       return errorMsg;
2783     } finally {
2784       lock.unlock();
2785     }
2786   }
2787 
2788   /**
2789    * @return Instance of load balancer
2790    */
2791   public LoadBalancer getBalancer() {
2792     return this.balancer;
2793   }
2794 
2795   public Map<ServerName, List<HRegionInfo>>
2796     getSnapShotOfAssignment(Collection<HRegionInfo> infos) {
2797     return getRegionStates().getRegionAssignments(infos);
2798   }
2799 }