View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Collection;
24  import java.util.Collections;
25  import java.util.HashMap;
26  import java.util.HashSet;
27  import java.util.Iterator;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.NavigableMap;
31  import java.util.Random;
32  import java.util.Set;
33  import java.util.TreeMap;
34  import java.util.concurrent.Callable;
35  import java.util.concurrent.ConcurrentHashMap;
36  import java.util.concurrent.CopyOnWriteArrayList;
37  import java.util.concurrent.TimeUnit;
38  import java.util.concurrent.atomic.AtomicBoolean;
39  import java.util.concurrent.atomic.AtomicInteger;
40  import java.util.concurrent.locks.Lock;
41  import java.util.concurrent.locks.ReentrantLock;
42  
43  import org.apache.commons.logging.Log;
44  import org.apache.commons.logging.LogFactory;
45  import org.apache.hadoop.hbase.classification.InterfaceAudience;
46  import org.apache.hadoop.conf.Configuration;
47  import org.apache.hadoop.fs.FileSystem;
48  import org.apache.hadoop.fs.Path;
49  import org.apache.hadoop.hbase.HBaseIOException;
50  import org.apache.hadoop.hbase.HConstants;
51  import org.apache.hadoop.hbase.HRegionInfo;
52  import org.apache.hadoop.hbase.HRegionLocation;
53  import org.apache.hadoop.hbase.HTableDescriptor;
54  import org.apache.hadoop.hbase.MetaTableAccessor;
55  import org.apache.hadoop.hbase.NotServingRegionException;
56  import org.apache.hadoop.hbase.RegionLocations;
57  import org.apache.hadoop.hbase.Server;
58  import org.apache.hadoop.hbase.ServerName;
59  import org.apache.hadoop.hbase.TableName;
60  import org.apache.hadoop.hbase.TableNotFoundException;
61  import org.apache.hadoop.hbase.client.RegionReplicaUtil;
62  import org.apache.hadoop.hbase.client.Result;
63  import org.apache.hadoop.hbase.client.TableState;
64  import org.apache.hadoop.hbase.executor.EventHandler;
65  import org.apache.hadoop.hbase.executor.EventType;
66  import org.apache.hadoop.hbase.executor.ExecutorService;
67  import org.apache.hadoop.hbase.ipc.FailedServerException;
68  import org.apache.hadoop.hbase.ipc.RpcClient;
69  import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
70  import org.apache.hadoop.hbase.master.RegionState.State;
71  import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper;
72  import org.apache.hadoop.hbase.master.balancer.FavoredNodeLoadBalancer;
73  import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
74  import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
75  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
76  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
77  import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
78  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
79  import org.apache.hadoop.hbase.wal.DefaultWALProvider;
80  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
81  import org.apache.hadoop.hbase.util.FSUtils;
82  import org.apache.hadoop.hbase.util.KeyLocker;
83  import org.apache.hadoop.hbase.util.Pair;
84  import org.apache.hadoop.hbase.util.PairOfSameType;
85  import org.apache.hadoop.hbase.util.Threads;
86  import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
87  import org.apache.hadoop.ipc.RemoteException;
88  import org.apache.zookeeper.KeeperException;
89  
90  import com.google.common.annotations.VisibleForTesting;
91  
92  /**
93   * Manages and performs region assignment.
94   * Related communications with regionserver are all done over RPC.
95   */
96  @InterfaceAudience.Private
97  public class AssignmentManager {
98    private static final Log LOG = LogFactory.getLog(AssignmentManager.class);
99  
100   protected final Server server;
101 
102   private ServerManager serverManager;
103 
104   private boolean shouldAssignRegionsWithFavoredNodes;
105 
106   private LoadBalancer balancer;
107 
108   private final MetricsAssignmentManager metricsAssignmentManager;
109 
110   private final TableLockManager tableLockManager;
111 
112   private AtomicInteger numRegionsOpened = new AtomicInteger(0);
113 
114   final private KeyLocker<String> locker = new KeyLocker<String>();
115 
116   Set<HRegionInfo> replicasToClose = Collections.synchronizedSet(new HashSet<HRegionInfo>());
117 
118   /**
119    * Map of regions to reopen after the schema of a table is changed. Key -
120    * encoded region name, value - HRegionInfo
121    */
122   private final Map <String, HRegionInfo> regionsToReopen;
123 
124   /*
125    * Maximum times we recurse an assignment/unassignment.
126    * See below in {@link #assign()} and {@link #unassign()}.
127    */
128   private final int maximumAttempts;
129 
130   /**
131    * The sleep time for which the assignment will wait before retrying in case of hbase:meta assignment
132    * failure due to lack of availability of region plan or bad region plan
133    */
134   private final long sleepTimeBeforeRetryingMetaAssignment;
135 
136   /** Plans for region movement. Key is the encoded version of a region name*/
137   // TODO: When do plans get cleaned out?  Ever? In server open and in server
138   // shutdown processing -- St.Ack
139   // All access to this Map must be synchronized.
140   final NavigableMap<String, RegionPlan> regionPlans =
141     new TreeMap<String, RegionPlan>();
142 
143   private final TableStateManager tableStateManager;
144 
145   private final ExecutorService executorService;
146 
147   // Thread pool executor service. TODO, consolidate with executorService?
148   private java.util.concurrent.ExecutorService threadPoolExecutorService;
149 
150   private final RegionStates regionStates;
151 
152   // The threshold to use bulk assigning. Using bulk assignment
153   // only if assigning at least this many regions to at least this
154   // many servers. If assigning fewer regions to fewer servers,
155   // bulk assigning may be not as efficient.
156   private final int bulkAssignThresholdRegions;
157   private final int bulkAssignThresholdServers;
158 
159   // Should bulk assignment wait till all regions are assigned,
160   // or it is timed out?  This is useful to measure bulk assignment
161   // performance, but not needed in most use cases.
162   private final boolean bulkAssignWaitTillAllAssigned;
163 
164   /**
165    * Indicator that AssignmentManager has recovered the region states so
166    * that ServerShutdownHandler can be fully enabled and re-assign regions
167    * of dead servers. So that when re-assignment happens, AssignmentManager
168    * has proper region states.
169    *
170    * Protected to ease testing.
171    */
172   protected final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
173 
174   /**
175    * A map to track the count a region fails to open in a row.
176    * So that we don't try to open a region forever if the failure is
177    * unrecoverable.  We don't put this information in region states
178    * because we don't expect this to happen frequently; we don't
179    * want to copy this information over during each state transition either.
180    */
181   private final ConcurrentHashMap<String, AtomicInteger>
182     failedOpenTracker = new ConcurrentHashMap<String, AtomicInteger>();
183 
184   // In case not using ZK for region assignment, region states
185   // are persisted in meta with a state store
186   private final RegionStateStore regionStateStore;
187 
188   /**
189    * For testing only!  Set to true to skip handling of split.
190    */
191   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
192   public static boolean TEST_SKIP_SPLIT_HANDLING = false;
193 
194   /** Listeners that are called on assignment events. */
195   private List<AssignmentListener> listeners = new CopyOnWriteArrayList<AssignmentListener>();
196 
197   /**
198    * Constructs a new assignment manager.
199    *
200    * @param server instance of HMaster this AM running inside
201    * @param serverManager serverManager for associated HMaster
202    * @param balancer implementation of {@link LoadBalancer}
203    * @param service Executor service
204    * @param metricsMaster metrics manager
205    * @param tableLockManager TableLock manager
206    * @throws IOException
207    */
208   public AssignmentManager(Server server, ServerManager serverManager,
209       final LoadBalancer balancer,
210       final ExecutorService service, MetricsMaster metricsMaster,
211       final TableLockManager tableLockManager,
212       final TableStateManager tableStateManager)
213           throws IOException {
214     this.server = server;
215     this.serverManager = serverManager;
216     this.executorService = service;
217     this.regionStateStore = new RegionStateStore(server);
218     this.regionsToReopen = Collections.synchronizedMap
219                            (new HashMap<String, HRegionInfo> ());
220     Configuration conf = server.getConfiguration();
221     // Only read favored nodes if using the favored nodes load balancer.
222     this.shouldAssignRegionsWithFavoredNodes = conf.getClass(
223            HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class).equals(
224            FavoredNodeLoadBalancer.class);
225 
226     this.tableStateManager = tableStateManager;
227 
228     // This is the max attempts, not retries, so it should be at least 1.
229     this.maximumAttempts = Math.max(1,
230       this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10));
231     this.sleepTimeBeforeRetryingMetaAssignment = this.server.getConfiguration().getLong(
232         "hbase.meta.assignment.retry.sleeptime", 1000l);
233     this.balancer = balancer;
234     int maxThreads = conf.getInt("hbase.assignment.threads.max", 30);
235     this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool(
236       maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("AM."));
237     this.regionStates = new RegionStates(
238       server, tableStateManager, serverManager, regionStateStore);
239 
240     this.bulkAssignWaitTillAllAssigned =
241       conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false);
242     this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7);
243     this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3);
244 
245     this.metricsAssignmentManager = new MetricsAssignmentManager();
246     this.tableLockManager = tableLockManager;
247   }
248 
249   /**
250    * Add the listener to the notification list.
251    * @param listener The AssignmentListener to register
252    */
253   public void registerListener(final AssignmentListener listener) {
254     this.listeners.add(listener);
255   }
256 
257   /**
258    * Remove the listener from the notification list.
259    * @param listener The AssignmentListener to unregister
260    */
261   public boolean unregisterListener(final AssignmentListener listener) {
262     return this.listeners.remove(listener);
263   }
264 
265   /**
266    * @return Instance of ZKTableStateManager.
267    */
268   public TableStateManager getTableStateManager() {
269     // These are 'expensive' to make involving trip to zk ensemble so allow
270     // sharing.
271     return this.tableStateManager;
272   }
273 
274   /**
275    * This SHOULD not be public. It is public now
276    * because of some unit tests.
277    *
278    * TODO: make it package private and keep RegionStates in the master package
279    */
280   public RegionStates getRegionStates() {
281     return regionStates;
282   }
283 
284   /**
285    * Used in some tests to mock up region state in meta
286    */
287   @VisibleForTesting
288   RegionStateStore getRegionStateStore() {
289     return regionStateStore;
290   }
291 
292   public RegionPlan getRegionReopenPlan(HRegionInfo hri) {
293     return new RegionPlan(hri, null, regionStates.getRegionServerOfRegion(hri));
294   }
295 
296   /**
297    * Add a regionPlan for the specified region.
298    * @param encodedName
299    * @param plan
300    */
301   public void addPlan(String encodedName, RegionPlan plan) {
302     synchronized (regionPlans) {
303       regionPlans.put(encodedName, plan);
304     }
305   }
306 
307   /**
308    * Add a map of region plans.
309    */
310   public void addPlans(Map<String, RegionPlan> plans) {
311     synchronized (regionPlans) {
312       regionPlans.putAll(plans);
313     }
314   }
315 
316   /**
317    * Set the list of regions that will be reopened
318    * because of an update in table schema
319    *
320    * @param regions
321    *          list of regions that should be tracked for reopen
322    */
323   public void setRegionsToReopen(List <HRegionInfo> regions) {
324     for(HRegionInfo hri : regions) {
325       regionsToReopen.put(hri.getEncodedName(), hri);
326     }
327   }
328 
329   /**
330    * Used by the client to identify if all regions have the schema updates
331    *
332    * @param tableName
333    * @return Pair indicating the status of the alter command
334    * @throws IOException
335    */
336   public Pair<Integer, Integer> getReopenStatus(TableName tableName)
337       throws IOException {
338     List<HRegionInfo> hris;
339     if (TableName.META_TABLE_NAME.equals(tableName)) {
340       hris = new MetaTableLocator().getMetaRegions(server.getZooKeeper());
341     } else {
342       hris = MetaTableAccessor.getTableRegions(server.getConnection(), tableName, true);
343     }
344 
345     Integer pending = 0;
346     for (HRegionInfo hri : hris) {
347       String name = hri.getEncodedName();
348       // no lock concurrent access ok: sequential consistency respected.
349       if (regionsToReopen.containsKey(name)
350           || regionStates.isRegionInTransition(name)) {
351         pending++;
352       }
353     }
354     return new Pair<Integer, Integer>(pending, hris.size());
355   }
356 
357   /**
358    * Used by ServerShutdownHandler to make sure AssignmentManager has completed
359    * the failover cleanup before re-assigning regions of dead servers. So that
360    * when re-assignment happens, AssignmentManager has proper region states.
361    */
362   public boolean isFailoverCleanupDone() {
363     return failoverCleanupDone.get();
364   }
365 
366   /**
367    * To avoid racing with AM, external entities may need to lock a region,
368    * for example, when SSH checks what regions to skip re-assigning.
369    */
370   public Lock acquireRegionLock(final String encodedName) {
371     return locker.acquireLock(encodedName);
372   }
373 
374   /**
375    * Now, failover cleanup is completed. Notify server manager to
376    * process queued up dead servers processing, if any.
377    */
378   void failoverCleanupDone() {
379     failoverCleanupDone.set(true);
380     serverManager.processQueuedDeadServers();
381   }
382 
383   /**
384    * Called on startup.
385    * Figures whether a fresh cluster start of we are joining extant running cluster.
386    * @throws IOException
387    * @throws KeeperException
388    * @throws InterruptedException
389    */
390   void joinCluster() throws IOException,
391           KeeperException, InterruptedException {
392     long startTime = System.currentTimeMillis();
393     // Concurrency note: In the below the accesses on regionsInTransition are
394     // outside of a synchronization block where usually all accesses to RIT are
395     // synchronized.  The presumption is that in this case it is safe since this
396     // method is being played by a single thread on startup.
397 
398     // TODO: Regions that have a null location and are not in regionsInTransitions
399     // need to be handled.
400 
401     // Scan hbase:meta to build list of existing regions, servers, and assignment
402     // Returns servers who have not checked in (assumed dead) that some regions
403     // were assigned to (according to the meta)
404     Set<ServerName> deadServers = rebuildUserRegions();
405 
406     // This method will assign all user regions if a clean server startup or
407     // it will reconstruct master state and cleanup any leftovers from
408     // previous master process.
409     boolean failover = processDeadServersAndRegionsInTransition(deadServers);
410 
411     recoverTableInDisablingState();
412     recoverTableInEnablingState();
413     LOG.info("Joined the cluster in " + (System.currentTimeMillis()
414       - startTime) + "ms, failover=" + failover);
415   }
416 
417   /**
418    * Process all regions that are in transition in zookeeper and also
419    * processes the list of dead servers by scanning the META.
420    * Used by master joining an cluster.  If we figure this is a clean cluster
421    * startup, will assign all user regions.
422    * @param deadServers
423    *          Map of dead servers and their regions. Can be null.
424    * @throws IOException
425    * @throws InterruptedException
426    */
427   boolean processDeadServersAndRegionsInTransition(final Set<ServerName> deadServers)
428           throws IOException, InterruptedException {
429     boolean failover = !serverManager.getDeadServers().isEmpty();
430     if (failover) {
431       // This may not be a failover actually, especially if meta is on this master.
432       if (LOG.isDebugEnabled()) {
433         LOG.debug("Found dead servers out on cluster " + serverManager.getDeadServers());
434       }
435     } else {
436       // If any one region except meta is assigned, it's a failover.
437       Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
438       for (Map.Entry<HRegionInfo, ServerName> en:
439           regionStates.getRegionAssignments().entrySet()) {
440         HRegionInfo hri = en.getKey();
441         if (!hri.isMetaTable()
442             && onlineServers.contains(en.getValue())) {
443           LOG.debug("Found " + hri + " out on cluster");
444           failover = true;
445           break;
446         }
447       }
448       if (!failover) {
449         // If any region except meta is in transition on a live server, it's a failover.
450         Map<String, RegionState> regionsInTransition = regionStates.getRegionsInTransition();
451         if (!regionsInTransition.isEmpty()) {
452           for (RegionState regionState: regionsInTransition.values()) {
453             ServerName serverName = regionState.getServerName();
454             if (!regionState.getRegion().isMetaRegion()
455                 && serverName != null && onlineServers.contains(serverName)) {
456               LOG.debug("Found " + regionState + " in RITs");
457               failover = true;
458               break;
459             }
460           }
461         }
462       }
463     }
464     if (!failover) {
465       // If we get here, we have a full cluster restart. It is a failover only
466       // if there are some WALs are not split yet. For meta WALs, they should have
467       // been split already, if any. We can walk through those queued dead servers,
468       // if they don't have any WALs, this restart should be considered as a clean one
469       Set<ServerName> queuedDeadServers = serverManager.getRequeuedDeadServers().keySet();
470       if (!queuedDeadServers.isEmpty()) {
471         Configuration conf = server.getConfiguration();
472         Path rootdir = FSUtils.getRootDir(conf);
473         FileSystem fs = rootdir.getFileSystem(conf);
474         for (ServerName serverName: queuedDeadServers) {
475           // In the case of a clean exit, the shutdown handler would have presplit any WALs and
476           // removed empty directories.
477           Path logDir = new Path(rootdir,
478               DefaultWALProvider.getWALDirectoryName(serverName.toString()));
479           Path splitDir = logDir.suffix(DefaultWALProvider.SPLITTING_EXT);
480           if (fs.exists(logDir) || fs.exists(splitDir)) {
481             LOG.debug("Found queued dead server " + serverName);
482             failover = true;
483             break;
484           }
485         }
486         if (!failover) {
487           // We figured that it's not a failover, so no need to
488           // work on these re-queued dead servers any more.
489           LOG.info("AM figured that it's not a failover and cleaned up "
490             + queuedDeadServers.size() + " queued dead servers");
491           serverManager.removeRequeuedDeadServers();
492         }
493       }
494     }
495 
496     Set<TableName> disabledOrDisablingOrEnabling = null;
497     Map<HRegionInfo, ServerName> allRegions = null;
498 
499     if (!failover) {
500       disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
501         TableState.State.DISABLED, TableState.State.DISABLING,
502         TableState.State.ENABLING);
503 
504       // Clean re/start, mark all user regions closed before reassignment
505       allRegions = regionStates.closeAllUserRegions(
506         disabledOrDisablingOrEnabling);
507     }
508 
509     // Now region states are restored
510     regionStateStore.start();
511 
512     if (failover) {
513       if (deadServers != null && !deadServers.isEmpty()) {
514         for (ServerName serverName: deadServers) {
515           if (!serverManager.isServerDead(serverName)) {
516             serverManager.expireServer(serverName); // Let SSH do region re-assign
517           }
518         }
519       }
520       processRegionsInTransition(regionStates.getRegionsInTransition().values());
521     }
522 
523     // Now we can safely claim failover cleanup completed and enable
524     // ServerShutdownHandler for further processing. The nodes (below)
525     // in transition, if any, are for regions not related to those
526     // dead servers at all, and can be done in parallel to SSH.
527     failoverCleanupDone();
528     if (!failover) {
529       // Fresh cluster startup.
530       LOG.info("Clean cluster startup. Assigning user regions");
531       assignAllUserRegions(allRegions);
532     }
533     // unassign replicas of the split parents and the merged regions
534     // the daughter replicas are opened in assignAllUserRegions if it was
535     // not already opened.
536     for (HRegionInfo h : replicasToClose) {
537       unassign(h);
538     }
539     replicasToClose.clear();
540     return failover;
541   }
542 
543   /**
544    * When a region is closed, it should be removed from the regionsToReopen
545    * @param hri HRegionInfo of the region which was closed
546    */
547   public void removeClosedRegion(HRegionInfo hri) {
548     if (regionsToReopen.remove(hri.getEncodedName()) != null) {
549       LOG.debug("Removed region from reopening regions because it was closed");
550     }
551   }
552 
553   // TODO: processFavoredNodes might throw an exception, for e.g., if the
554   // meta could not be contacted/updated. We need to see how seriously to treat
555   // this problem as. Should we fail the current assignment. We should be able
556   // to recover from this problem eventually (if the meta couldn't be updated
557   // things should work normally and eventually get fixed up).
558   void processFavoredNodes(List<HRegionInfo> regions) throws IOException {
559     if (!shouldAssignRegionsWithFavoredNodes) return;
560     // The AM gets the favored nodes info for each region and updates the meta
561     // table with that info
562     Map<HRegionInfo, List<ServerName>> regionToFavoredNodes =
563         new HashMap<HRegionInfo, List<ServerName>>();
564     for (HRegionInfo region : regions) {
565       regionToFavoredNodes.put(region,
566           ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region));
567     }
568     FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(regionToFavoredNodes,
569       this.server.getConnection());
570   }
571 
572   /**
573    * Marks the region as online.  Removes it from regions in transition and
574    * updates the in-memory assignment information.
575    * <p>
576    * Used when a region has been successfully opened on a region server.
577    * @param regionInfo
578    * @param sn
579    */
580   void regionOnline(HRegionInfo regionInfo, ServerName sn) {
581     regionOnline(regionInfo, sn, HConstants.NO_SEQNUM);
582   }
583 
584   void regionOnline(HRegionInfo regionInfo, ServerName sn, long openSeqNum) {
585     numRegionsOpened.incrementAndGet();
586     regionStates.regionOnline(regionInfo, sn, openSeqNum);
587 
588     // Remove plan if one.
589     clearRegionPlan(regionInfo);
590     balancer.regionOnline(regionInfo, sn);
591 
592     // Tell our listeners that a region was opened
593     sendRegionOpenedNotification(regionInfo, sn);
594   }
595 
596   /**
597    * Marks the region as offline.  Removes it from regions in transition and
598    * removes in-memory assignment information.
599    * <p>
600    * Used when a region has been closed and should remain closed.
601    * @param regionInfo
602    */
603   public void regionOffline(final HRegionInfo regionInfo) {
604     regionOffline(regionInfo, null);
605   }
606 
607   public void offlineDisabledRegion(HRegionInfo regionInfo) {
608     replicasToClose.remove(regionInfo);
609     regionOffline(regionInfo);
610   }
611 
612   // Assignment methods
613 
614   /**
615    * Assigns the specified region.
616    * <p>
617    * If a RegionPlan is available with a valid destination then it will be used
618    * to determine what server region is assigned to.  If no RegionPlan is
619    * available, region will be assigned to a random available server.
620    * <p>
621    * Updates the RegionState and sends the OPEN RPC.
622    * <p>
623    * This will only succeed if the region is in transition and in a CLOSED or
624    * OFFLINE state or not in transition, and of course, the
625    * chosen server is up and running (It may have just crashed!).
626    *
627    * @param region server to be assigned
628    */
629   public void assign(HRegionInfo region) {
630     assign(region, false);
631   }
632 
633   /**
634    * Use care with forceNewPlan. It could cause double assignment.
635    */
636   public void assign(HRegionInfo region, boolean forceNewPlan) {
637     if (isDisabledorDisablingRegionInRIT(region)) {
638       return;
639     }
640     String encodedName = region.getEncodedName();
641     Lock lock = locker.acquireLock(encodedName);
642     try {
643       RegionState state = forceRegionStateToOffline(region, forceNewPlan);
644       if (state != null) {
645         if (regionStates.wasRegionOnDeadServer(encodedName)) {
646           LOG.info("Skip assigning " + region.getRegionNameAsString()
647             + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
648             + " is dead but not processed yet");
649           return;
650         }
651         assign(state, forceNewPlan);
652       }
653     } finally {
654       lock.unlock();
655     }
656   }
657 
658   /**
659    * Bulk assign regions to <code>destination</code>.
660    * @param destination
661    * @param regions Regions to assign.
662    * @return true if successful
663    */
664   boolean assign(final ServerName destination, final List<HRegionInfo> regions)
665     throws InterruptedException {
666     long startTime = EnvironmentEdgeManager.currentTime();
667     try {
668       int regionCount = regions.size();
669       if (regionCount == 0) {
670         return true;
671       }
672       LOG.info("Assigning " + regionCount + " region(s) to " + destination.toString());
673       Set<String> encodedNames = new HashSet<String>(regionCount);
674       for (HRegionInfo region : regions) {
675         encodedNames.add(region.getEncodedName());
676       }
677 
678       List<HRegionInfo> failedToOpenRegions = new ArrayList<HRegionInfo>();
679       Map<String, Lock> locks = locker.acquireLocks(encodedNames);
680       try {
681         Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>(regionCount);
682         List<RegionState> states = new ArrayList<RegionState>(regionCount);
683         for (HRegionInfo region : regions) {
684           String encodedName = region.getEncodedName();
685           if (!isDisabledorDisablingRegionInRIT(region)) {
686             RegionState state = forceRegionStateToOffline(region, false);
687             boolean onDeadServer = false;
688             if (state != null) {
689               if (regionStates.wasRegionOnDeadServer(encodedName)) {
690                 LOG.info("Skip assigning " + region.getRegionNameAsString()
691                   + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
692                   + " is dead but not processed yet");
693                 onDeadServer = true;
694               } else {
695                 RegionPlan plan = new RegionPlan(region, state.getServerName(), destination);
696                 plans.put(encodedName, plan);
697                 states.add(state);
698                 continue;
699               }
700             }
701             // Reassign if the region wasn't on a dead server
702             if (!onDeadServer) {
703               LOG.info("failed to force region state to offline, "
704                 + "will reassign later: " + region);
705               failedToOpenRegions.add(region); // assign individually later
706             }
707           }
708           // Release the lock, this region is excluded from bulk assign because
709           // we can't update its state, or set its znode to offline.
710           Lock lock = locks.remove(encodedName);
711           lock.unlock();
712         }
713 
714         if (server.isStopped()) {
715           return false;
716         }
717 
718         // Add region plans, so we can updateTimers when one region is opened so
719         // that unnecessary timeout on RIT is reduced.
720         this.addPlans(plans);
721 
722         List<Pair<HRegionInfo, List<ServerName>>> regionOpenInfos =
723           new ArrayList<Pair<HRegionInfo, List<ServerName>>>(states.size());
724         for (RegionState state: states) {
725           HRegionInfo region = state.getRegion();
726           regionStates.updateRegionState(
727             region, State.PENDING_OPEN, destination);
728           List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
729           if (this.shouldAssignRegionsWithFavoredNodes) {
730             favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
731           }
732           regionOpenInfos.add(new Pair<HRegionInfo, List<ServerName>>(
733             region, favoredNodes));
734         }
735 
736         // Move on to open regions.
737         try {
738           // Send OPEN RPC. If it fails on a IOE or RemoteException,
739           // regions will be assigned individually.
740           Configuration conf = server.getConfiguration();
741           long maxWaitTime = System.currentTimeMillis() +
742             conf.getLong("hbase.regionserver.rpc.startup.waittime", 60000);
743           for (int i = 1; i <= maximumAttempts && !server.isStopped(); i++) {
744             try {
745               List<RegionOpeningState> regionOpeningStateList = serverManager
746                 .sendRegionOpen(destination, regionOpenInfos);
747               for (int k = 0, n = regionOpeningStateList.size(); k < n; k++) {
748                 RegionOpeningState openingState = regionOpeningStateList.get(k);
749                 if (openingState != RegionOpeningState.OPENED) {
750                   HRegionInfo region = regionOpenInfos.get(k).getFirst();
751                   LOG.info("Got opening state " + openingState
752                     + ", will reassign later: " + region);
753                   // Failed opening this region, reassign it later
754                   forceRegionStateToOffline(region, true);
755                   failedToOpenRegions.add(region);
756                 }
757               }
758               break;
759             } catch (IOException e) {
760               if (e instanceof RemoteException) {
761                 e = ((RemoteException)e).unwrapRemoteException();
762               }
763               if (e instanceof RegionServerStoppedException) {
764                 LOG.warn("The region server was shut down, ", e);
765                 // No need to retry, the region server is a goner.
766                 return false;
767               } else if (e instanceof ServerNotRunningYetException) {
768                 long now = System.currentTimeMillis();
769                 if (now < maxWaitTime) {
770                   if (LOG.isDebugEnabled()) {
771                     LOG.debug("Server is not yet up; waiting up to " +
772                       (maxWaitTime - now) + "ms", e);
773                   }
774                   Thread.sleep(100);
775                   i--; // reset the try count
776                   continue;
777                 }
778               } else if (e instanceof java.net.SocketTimeoutException
779                   && this.serverManager.isServerOnline(destination)) {
780                 // In case socket is timed out and the region server is still online,
781                 // the openRegion RPC could have been accepted by the server and
782                 // just the response didn't go through.  So we will retry to
783                 // open the region on the same server.
784                 if (LOG.isDebugEnabled()) {
785                   LOG.debug("Bulk assigner openRegion() to " + destination
786                     + " has timed out, but the regions might"
787                     + " already be opened on it.", e);
788                 }
789                 // wait and reset the re-try count, server might be just busy.
790                 Thread.sleep(100);
791                 i--;
792                 continue;
793               } else if (e instanceof FailedServerException && i < maximumAttempts) {
794                 // In case the server is in the failed server list, no point to
795                 // retry too soon. Retry after the failed_server_expiry time
796                 long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
797                   RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
798                 if (LOG.isDebugEnabled()) {
799                   LOG.debug(destination + " is on failed server list; waiting "
800                     + sleepTime + "ms", e);
801                 }
802                 Thread.sleep(sleepTime);
803                 continue;
804               }
805               throw e;
806             }
807           }
808         } catch (IOException e) {
809           // Can be a socket timeout, EOF, NoRouteToHost, etc
810           LOG.info("Unable to communicate with " + destination
811             + " in order to assign regions, ", e);
812           for (RegionState state: states) {
813             HRegionInfo region = state.getRegion();
814             forceRegionStateToOffline(region, true);
815           }
816           return false;
817         }
818       } finally {
819         for (Lock lock : locks.values()) {
820           lock.unlock();
821         }
822       }
823 
824       if (!failedToOpenRegions.isEmpty()) {
825         for (HRegionInfo region : failedToOpenRegions) {
826           if (!regionStates.isRegionOnline(region)) {
827             invokeAssign(region);
828           }
829         }
830       }
831       LOG.debug("Bulk assigning done for " + destination);
832       return true;
833     } finally {
834       metricsAssignmentManager.updateBulkAssignTime(EnvironmentEdgeManager.currentTime() - startTime);
835     }
836   }
837 
838   /**
839    * Send CLOSE RPC if the server is online, otherwise, offline the region.
840    *
841    * The RPC will be sent only to the region sever found in the region state
842    * if it is passed in, otherwise, to the src server specified. If region
843    * state is not specified, we don't update region state at all, instead
844    * we just send the RPC call. This is useful for some cleanup without
845    * messing around the region states (see handleRegion, on region opened
846    * on an unexpected server scenario, for an example)
847    */
848   private void unassign(final HRegionInfo region,
849       final ServerName server, final ServerName dest) {
850     for (int i = 1; i <= this.maximumAttempts; i++) {
851       if (this.server.isStopped() || this.server.isAborted()) {
852         LOG.debug("Server stopped/aborted; skipping unassign of " + region);
853         return;
854       }
855       if (!serverManager.isServerOnline(server)) {
856         LOG.debug("Offline " + region.getRegionNameAsString()
857           + ", no need to unassign since it's on a dead server: " + server);
858         regionStates.updateRegionState(region, State.OFFLINE);
859         return;
860       }
861       try {
862         // Send CLOSE RPC
863         if (serverManager.sendRegionClose(server, region, dest)) {
864           LOG.debug("Sent CLOSE to " + server + " for region " +
865             region.getRegionNameAsString());
866           return;
867         }
868         // This never happens. Currently regionserver close always return true.
869         // Todo; this can now happen (0.96) if there is an exception in a coprocessor
870         LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
871           region.getRegionNameAsString());
872       } catch (Throwable t) {
873         if (t instanceof RemoteException) {
874           t = ((RemoteException)t).unwrapRemoteException();
875         }
876         if (t instanceof NotServingRegionException
877             || t instanceof RegionServerStoppedException
878             || t instanceof ServerNotRunningYetException) {
879           LOG.debug("Offline " + region.getRegionNameAsString()
880             + ", it's not any more on " + server, t);
881           regionStates.updateRegionState(region, State.OFFLINE);
882           return;
883         } else if (t instanceof FailedServerException && i < maximumAttempts) {
884           // In case the server is in the failed server list, no point to
885           // retry too soon. Retry after the failed_server_expiry time
886           try {
887             Configuration conf = this.server.getConfiguration();
888             long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
889               RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
890             if (LOG.isDebugEnabled()) {
891               LOG.debug(server + " is on failed server list; waiting "
892                 + sleepTime + "ms", t);
893             }
894             Thread.sleep(sleepTime);
895           } catch (InterruptedException ie) {
896             LOG.warn("Failed to unassign "
897               + region.getRegionNameAsString() + " since interrupted", ie);
898             regionStates.updateRegionState(region, State.FAILED_CLOSE);
899             Thread.currentThread().interrupt();
900             return;
901           }
902         }
903 
904         LOG.info("Server " + server + " returned " + t + " for "
905           + region.getRegionNameAsString() + ", try=" + i
906           + " of " + this.maximumAttempts, t);
907       }
908     }
909     // Run out of attempts
910     regionStates.updateRegionState(region, State.FAILED_CLOSE);
911   }
912 
913   /**
914    * Set region to OFFLINE unless it is opening and forceNewPlan is false.
915    */
916   private RegionState forceRegionStateToOffline(
917       final HRegionInfo region, final boolean forceNewPlan) {
918     RegionState state = regionStates.getRegionState(region);
919     if (state == null) {
920       LOG.warn("Assigning a region not in region states: " + region);
921       state = regionStates.createRegionState(region);
922     }
923 
924     if (forceNewPlan && LOG.isDebugEnabled()) {
925       LOG.debug("Force region state offline " + state);
926     }
927 
928     switch (state.getState()) {
929     case OPEN:
930     case OPENING:
931     case PENDING_OPEN:
932     case CLOSING:
933     case PENDING_CLOSE:
934       if (!forceNewPlan) {
935         LOG.debug("Skip assigning " +
936           region + ", it is already " + state);
937         return null;
938       }
939     case FAILED_CLOSE:
940     case FAILED_OPEN:
941       regionStates.updateRegionState(region, State.PENDING_CLOSE);
942       unassign(region, state.getServerName(), null);
943       state = regionStates.getRegionState(region);
944       if (!state.isOffline() && !state.isClosed()) {
945         // If the region isn't offline, we can't re-assign
946         // it now. It will be assigned automatically after
947         // the regionserver reports it's closed.
948         return null;
949       }
950     case OFFLINE:
951     case CLOSED:
952       break;
953     default:
954       LOG.error("Trying to assign region " + region
955         + ", which is " + state);
956       return null;
957     }
958     return state;
959   }
960 
961   /**
962    * Caller must hold lock on the passed <code>state</code> object.
963    * @param state
964    * @param forceNewPlan
965    */
966   private void assign(RegionState state, boolean forceNewPlan) {
967     long startTime = EnvironmentEdgeManager.currentTime();
968     try {
969       Configuration conf = server.getConfiguration();
970       RegionPlan plan = null;
971       long maxWaitTime = -1;
972       HRegionInfo region = state.getRegion();
973       Throwable previousException = null;
974       for (int i = 1; i <= maximumAttempts; i++) {
975         if (server.isStopped() || server.isAborted()) {
976           LOG.info("Skip assigning " + region.getRegionNameAsString()
977             + ", the server is stopped/aborted");
978           return;
979         }
980 
981         if (plan == null) { // Get a server for the region at first
982           try {
983             plan = getRegionPlan(region, forceNewPlan);
984           } catch (HBaseIOException e) {
985             LOG.warn("Failed to get region plan", e);
986           }
987         }
988 
989         if (plan == null) {
990           LOG.warn("Unable to determine a plan to assign " + region);
991 
992           // For meta region, we have to keep retrying until succeeding
993           if (region.isMetaRegion()) {
994             if (i == maximumAttempts) {
995               i = 0; // re-set attempt count to 0 for at least 1 retry
996 
997               LOG.warn("Unable to determine a plan to assign a hbase:meta region " + region +
998                 " after maximumAttempts (" + this.maximumAttempts +
999                 "). Reset attempts count and continue retrying.");
1000             }
1001             waitForRetryingMetaAssignment();
1002             continue;
1003           }
1004 
1005           regionStates.updateRegionState(region, State.FAILED_OPEN);
1006           return;
1007         }
1008         // In case of assignment from EnableTableHandler table state is ENABLING. Any how
1009         // EnableTableHandler will set ENABLED after assigning all the table regions. If we
1010         // try to set to ENABLED directly then client API may think table is enabled.
1011         // When we have a case such as all the regions are added directly into hbase:meta and we call
1012         // assignRegion then we need to make the table ENABLED. Hence in such case the table
1013         // will not be in ENABLING or ENABLED state.
1014         TableName tableName = region.getTable();
1015         if (!tableStateManager.isTableState(tableName,
1016           TableState.State.ENABLED, TableState.State.ENABLING)) {
1017           LOG.debug("Setting table " + tableName + " to ENABLED state.");
1018           setEnabledTable(tableName);
1019         }
1020         LOG.info("Assigning " + region.getRegionNameAsString() +
1021             " to " + plan.getDestination().toString());
1022         // Transition RegionState to PENDING_OPEN
1023        regionStates.updateRegionState(region,
1024           State.PENDING_OPEN, plan.getDestination());
1025 
1026         boolean needNewPlan = false;
1027         final String assignMsg = "Failed assignment of " + region.getRegionNameAsString() +
1028             " to " + plan.getDestination();
1029         try {
1030           List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
1031           if (this.shouldAssignRegionsWithFavoredNodes) {
1032             favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
1033           }
1034           serverManager.sendRegionOpen(plan.getDestination(), region, favoredNodes);
1035           return; // we're done
1036         } catch (Throwable t) {
1037           if (t instanceof RemoteException) {
1038             t = ((RemoteException) t).unwrapRemoteException();
1039           }
1040           previousException = t;
1041 
1042           // Should we wait a little before retrying? If the server is starting it's yes.
1043           boolean hold = (t instanceof ServerNotRunningYetException);
1044 
1045           // In case socket is timed out and the region server is still online,
1046           // the openRegion RPC could have been accepted by the server and
1047           // just the response didn't go through.  So we will retry to
1048           // open the region on the same server.
1049           boolean retry = !hold && (t instanceof java.net.SocketTimeoutException
1050               && this.serverManager.isServerOnline(plan.getDestination()));
1051 
1052           if (hold) {
1053             LOG.warn(assignMsg + ", waiting a little before trying on the same region server " +
1054               "try=" + i + " of " + this.maximumAttempts, t);
1055 
1056             if (maxWaitTime < 0) {
1057               maxWaitTime = EnvironmentEdgeManager.currentTime()
1058                 + this.server.getConfiguration().getLong(
1059                   "hbase.regionserver.rpc.startup.waittime", 60000);
1060             }
1061             try {
1062               long now = EnvironmentEdgeManager.currentTime();
1063               if (now < maxWaitTime) {
1064                 if (LOG.isDebugEnabled()) {
1065                   LOG.debug("Server is not yet up; waiting up to "
1066                     + (maxWaitTime - now) + "ms", t);
1067                 }
1068                 Thread.sleep(100);
1069                 i--; // reset the try count
1070               } else {
1071                 LOG.debug("Server is not up for a while; try a new one", t);
1072                 needNewPlan = true;
1073               }
1074             } catch (InterruptedException ie) {
1075               LOG.warn("Failed to assign "
1076                   + region.getRegionNameAsString() + " since interrupted", ie);
1077               regionStates.updateRegionState(region, State.FAILED_OPEN);
1078               Thread.currentThread().interrupt();
1079               return;
1080             }
1081           } else if (retry) {
1082             i--; // we want to retry as many times as needed as long as the RS is not dead.
1083             if (LOG.isDebugEnabled()) {
1084               LOG.debug(assignMsg + ", trying to assign to the same region server due ", t);
1085             }
1086           } else {
1087             needNewPlan = true;
1088             LOG.warn(assignMsg + ", trying to assign elsewhere instead;" +
1089                 " try=" + i + " of " + this.maximumAttempts, t);
1090           }
1091         }
1092 
1093         if (i == this.maximumAttempts) {
1094           // For meta region, we have to keep retrying until succeeding
1095           if (region.isMetaRegion()) {
1096             i = 0; // re-set attempt count to 0 for at least 1 retry
1097             LOG.warn(assignMsg +
1098                 ", trying to assign a hbase:meta region reached to maximumAttempts (" +
1099                 this.maximumAttempts + ").  Reset attempt counts and continue retrying.");
1100             waitForRetryingMetaAssignment();
1101           }
1102           else {
1103             // Don't reset the region state or get a new plan any more.
1104             // This is the last try.
1105             continue;
1106           }
1107         }
1108 
1109         // If region opened on destination of present plan, reassigning to new
1110         // RS may cause double assignments. In case of RegionAlreadyInTransitionException
1111         // reassigning to same RS.
1112         if (needNewPlan) {
1113           // Force a new plan and reassign. Will return null if no servers.
1114           // The new plan could be the same as the existing plan since we don't
1115           // exclude the server of the original plan, which should not be
1116           // excluded since it could be the only server up now.
1117           RegionPlan newPlan = null;
1118           try {
1119             newPlan = getRegionPlan(region, true);
1120           } catch (HBaseIOException e) {
1121             LOG.warn("Failed to get region plan", e);
1122           }
1123           if (newPlan == null) {
1124             regionStates.updateRegionState(region, State.FAILED_OPEN);
1125             LOG.warn("Unable to find a viable location to assign region " +
1126                 region.getRegionNameAsString());
1127             return;
1128           }
1129 
1130           if (plan != newPlan && !plan.getDestination().equals(newPlan.getDestination())) {
1131             // Clean out plan we failed execute and one that doesn't look like it'll
1132             // succeed anyways; we need a new plan!
1133             // Transition back to OFFLINE
1134             regionStates.updateRegionState(region, State.OFFLINE);
1135             plan = newPlan;
1136           } else if(plan.getDestination().equals(newPlan.getDestination()) &&
1137               previousException instanceof FailedServerException) {
1138             try {
1139               LOG.info("Trying to re-assign " + region.getRegionNameAsString() +
1140                 " to the same failed server.");
1141               Thread.sleep(1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1142                 RpcClient.FAILED_SERVER_EXPIRY_DEFAULT));
1143             } catch (InterruptedException ie) {
1144               LOG.warn("Failed to assign "
1145                   + region.getRegionNameAsString() + " since interrupted", ie);
1146               regionStates.updateRegionState(region, State.FAILED_OPEN);
1147               Thread.currentThread().interrupt();
1148               return;
1149             }
1150           }
1151         }
1152       }
1153       // Run out of attempts
1154       regionStates.updateRegionState(region, State.FAILED_OPEN);
1155     } finally {
1156       metricsAssignmentManager.updateAssignmentTime(EnvironmentEdgeManager.currentTime() - startTime);
1157     }
1158   }
1159 
1160   private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) {
1161     if (this.tableStateManager.isTableState(region.getTable(),
1162             TableState.State.DISABLED,
1163             TableState.State.DISABLING) || replicasToClose.contains(region)) {
1164       LOG.info("Table " + region.getTable() + " is disabled or disabling;"
1165         + " skipping assign of " + region.getRegionNameAsString());
1166       offlineDisabledRegion(region);
1167       return true;
1168     }
1169     return false;
1170   }
1171 
1172   /**
1173    * @param region the region to assign
1174    * @param forceNewPlan If true, then if an existing plan exists, a new plan
1175    * will be generated.
1176    * @return Plan for passed <code>region</code> (If none currently, it creates one or
1177    * if no servers to assign, it returns null).
1178    */
1179   private RegionPlan getRegionPlan(final HRegionInfo region,
1180       final boolean forceNewPlan) throws HBaseIOException {
1181     // Pickup existing plan or make a new one
1182     final String encodedName = region.getEncodedName();
1183     final List<ServerName> destServers =
1184       serverManager.createDestinationServersList();
1185 
1186     if (destServers.isEmpty()){
1187       LOG.warn("Can't move " + encodedName +
1188         ", there is no destination server available.");
1189       return null;
1190     }
1191 
1192     RegionPlan randomPlan = null;
1193     boolean newPlan = false;
1194     RegionPlan existingPlan;
1195 
1196     synchronized (this.regionPlans) {
1197       existingPlan = this.regionPlans.get(encodedName);
1198 
1199       if (existingPlan != null && existingPlan.getDestination() != null) {
1200         LOG.debug("Found an existing plan for " + region.getRegionNameAsString()
1201           + " destination server is " + existingPlan.getDestination() +
1202             " accepted as a dest server = " + destServers.contains(existingPlan.getDestination()));
1203       }
1204 
1205       if (forceNewPlan
1206           || existingPlan == null
1207           || existingPlan.getDestination() == null
1208           || !destServers.contains(existingPlan.getDestination())) {
1209         newPlan = true;
1210         randomPlan = new RegionPlan(region, null,
1211             balancer.randomAssignment(region, destServers));
1212         if (!region.isMetaTable() && shouldAssignRegionsWithFavoredNodes) {
1213           List<HRegionInfo> regions = new ArrayList<HRegionInfo>(1);
1214           regions.add(region);
1215           try {
1216             processFavoredNodes(regions);
1217           } catch (IOException ie) {
1218             LOG.warn("Ignoring exception in processFavoredNodes " + ie);
1219           }
1220         }
1221         this.regionPlans.put(encodedName, randomPlan);
1222       }
1223     }
1224 
1225     if (newPlan) {
1226       if (randomPlan.getDestination() == null) {
1227         LOG.warn("Can't find a destination for " + encodedName);
1228         return null;
1229       }
1230       if (LOG.isDebugEnabled()) {
1231         LOG.debug("No previous transition plan found (or ignoring " +
1232           "an existing plan) for " + region.getRegionNameAsString() +
1233           "; generated random plan=" + randomPlan + "; " + destServers.size() +
1234           " (online=" + serverManager.getOnlineServers().size() +
1235           ") available servers, forceNewPlan=" + forceNewPlan);
1236       }
1237       return randomPlan;
1238     }
1239     if (LOG.isDebugEnabled()) {
1240       LOG.debug("Using pre-existing plan for " +
1241         region.getRegionNameAsString() + "; plan=" + existingPlan);
1242     }
1243     return existingPlan;
1244   }
1245 
1246   /**
1247    * Wait for some time before retrying meta table region assignment
1248    */
1249   private void waitForRetryingMetaAssignment() {
1250     try {
1251       Thread.sleep(this.sleepTimeBeforeRetryingMetaAssignment);
1252     } catch (InterruptedException e) {
1253       LOG.error("Got exception while waiting for hbase:meta assignment");
1254       Thread.currentThread().interrupt();
1255     }
1256   }
1257 
1258   /**
1259    * Unassigns the specified region.
1260    * <p>
1261    * Updates the RegionState and sends the CLOSE RPC unless region is being
1262    * split by regionserver; then the unassign fails (silently) because we
1263    * presume the region being unassigned no longer exists (its been split out
1264    * of existence). TODO: What to do if split fails and is rolled back and
1265    * parent is revivified?
1266    * <p>
1267    * If a RegionPlan is already set, it will remain.
1268    *
1269    * @param region server to be unassigned
1270    */
1271   public void unassign(HRegionInfo region) {
1272     unassign(region, null);
1273   }
1274 
1275 
1276   /**
1277    * Unassigns the specified region.
1278    * <p>
1279    * Updates the RegionState and sends the CLOSE RPC unless region is being
1280    * split by regionserver; then the unassign fails (silently) because we
1281    * presume the region being unassigned no longer exists (its been split out
1282    * of existence). TODO: What to do if split fails and is rolled back and
1283    * parent is revivified?
1284    * <p>
1285    * If a RegionPlan is already set, it will remain.
1286    *
1287    * @param region server to be unassigned
1288    * @param dest the destination server of the region
1289    */
1290   public void unassign(HRegionInfo region, ServerName dest) {
1291     // TODO: Method needs refactoring.  Ugly buried returns throughout.  Beware!
1292     LOG.debug("Starting unassign of " + region.getRegionNameAsString()
1293       + " (offlining), current state: " + regionStates.getRegionState(region));
1294 
1295     String encodedName = region.getEncodedName();
1296     // Grab the state of this region and synchronize on it
1297     // We need a lock here as we're going to do a put later and we don't want multiple states
1298     //  creation
1299     ReentrantLock lock = locker.acquireLock(encodedName);
1300     RegionState state = regionStates.getRegionTransitionState(encodedName);
1301     try {
1302       if (state == null || state.isFailedClose()) {
1303         if (state == null) {
1304           // Region is not in transition.
1305           // We can unassign it only if it's not SPLIT/MERGED.
1306           state = regionStates.getRegionState(encodedName);
1307           if (state != null && state.isUnassignable()) {
1308             LOG.info("Attempting to unassign " + state + ", ignored");
1309             // Offline region will be reassigned below
1310             return;
1311           }
1312           if (state == null || state.getServerName() == null) {
1313             // We don't know where the region is, offline it.
1314             // No need to send CLOSE RPC
1315             LOG.warn("Attempting to unassign a region not in RegionStates"
1316               + region.getRegionNameAsString() + ", offlined");
1317             regionOffline(region);
1318             return;
1319           }
1320         }
1321         state = regionStates.updateRegionState(
1322           region, State.PENDING_CLOSE);
1323       } else if (state.isFailedOpen()) {
1324         // The region is not open yet
1325         regionOffline(region);
1326         return;
1327       } else {
1328         LOG.debug("Attempting to unassign " +
1329           region.getRegionNameAsString() + " but it is " +
1330           "already in transition (" + state.getState());
1331         return;
1332       }
1333 
1334       unassign(region, state.getServerName(), dest);
1335     } finally {
1336       lock.unlock();
1337 
1338       // Region is expected to be reassigned afterwards
1339       if (!replicasToClose.contains(region)
1340           && regionStates.isRegionInState(region, State.OFFLINE)) {
1341         assign(region);
1342       }
1343     }
1344   }
1345 
1346   /**
1347    * Used by unit tests. Return the number of regions opened so far in the life
1348    * of the master. Increases by one every time the master opens a region
1349    * @return the counter value of the number of regions opened so far
1350    */
1351   public int getNumRegionsOpened() {
1352     return numRegionsOpened.get();
1353   }
1354 
1355   /**
1356    * Waits until the specified region has completed assignment.
1357    * <p>
1358    * If the region is already assigned, returns immediately.  Otherwise, method
1359    * blocks until the region is assigned.
1360    * @param regionInfo region to wait on assignment for
1361    * @throws InterruptedException
1362    */
1363   public boolean waitForAssignment(HRegionInfo regionInfo)
1364       throws InterruptedException {
1365     while (!regionStates.isRegionOnline(regionInfo)) {
1366       if (regionStates.isRegionInState(regionInfo, State.FAILED_OPEN)
1367           || this.server.isStopped()) {
1368         return false;
1369       }
1370 
1371       // We should receive a notification, but it's
1372       //  better to have a timeout to recheck the condition here:
1373       //  it lowers the impact of a race condition if any
1374       regionStates.waitForUpdate(100);
1375     }
1376     return true;
1377   }
1378 
1379   /**
1380    * Assigns the hbase:meta region.
1381    * <p>
1382    * Assumes that hbase:meta is currently closed and is not being actively served by
1383    * any RegionServer.
1384    */
1385   public void assignMeta() throws KeeperException {
1386     regionStates.updateRegionState(HRegionInfo.FIRST_META_REGIONINFO, State.OFFLINE);
1387     assign(HRegionInfo.FIRST_META_REGIONINFO);
1388   }
1389 
1390   /**
1391    * Assigns specified regions retaining assignments, if any.
1392    * <p>
1393    * This is a synchronous call and will return once every region has been
1394    * assigned.  If anything fails, an exception is thrown
1395    * @throws InterruptedException
1396    * @throws IOException
1397    */
1398   public void assign(Map<HRegionInfo, ServerName> regions)
1399         throws IOException, InterruptedException {
1400     if (regions == null || regions.isEmpty()) {
1401       return;
1402     }
1403     List<ServerName> servers = serverManager.createDestinationServersList();
1404     if (servers == null || servers.isEmpty()) {
1405       throw new IOException("Found no destination server to assign region(s)");
1406     }
1407 
1408     // Reuse existing assignment info
1409     Map<ServerName, List<HRegionInfo>> bulkPlan =
1410       balancer.retainAssignment(regions, servers);
1411     if (bulkPlan == null) {
1412       throw new IOException("Unable to determine a plan to assign region(s)");
1413     }
1414 
1415     assign(regions.size(), servers.size(),
1416       "retainAssignment=true", bulkPlan);
1417   }
1418 
1419   /**
1420    * Assigns specified regions round robin, if any.
1421    * <p>
1422    * This is a synchronous call and will return once every region has been
1423    * assigned.  If anything fails, an exception is thrown
1424    * @throws InterruptedException
1425    * @throws IOException
1426    */
1427   public void assign(List<HRegionInfo> regions)
1428         throws IOException, InterruptedException {
1429     if (regions == null || regions.isEmpty()) {
1430       return;
1431     }
1432 
1433     List<ServerName> servers = serverManager.createDestinationServersList();
1434     if (servers == null || servers.isEmpty()) {
1435       throw new IOException("Found no destination server to assign region(s)");
1436     }
1437 
1438     // Generate a round-robin bulk assignment plan
1439     Map<ServerName, List<HRegionInfo>> bulkPlan
1440       = balancer.roundRobinAssignment(regions, servers);
1441     if (bulkPlan == null) {
1442       throw new IOException("Unable to determine a plan to assign region(s)");
1443     }
1444 
1445     processFavoredNodes(regions);
1446     assign(regions.size(), servers.size(),
1447       "round-robin=true", bulkPlan);
1448   }
1449 
1450   private void assign(int regions, int totalServers,
1451       String message, Map<ServerName, List<HRegionInfo>> bulkPlan)
1452           throws InterruptedException, IOException {
1453 
1454     int servers = bulkPlan.size();
1455     if (servers == 1 || (regions < bulkAssignThresholdRegions
1456         && servers < bulkAssignThresholdServers)) {
1457 
1458       // Not use bulk assignment.  This could be more efficient in small
1459       // cluster, especially mini cluster for testing, so that tests won't time out
1460       if (LOG.isTraceEnabled()) {
1461         LOG.trace("Not using bulk assignment since we are assigning only " + regions +
1462           " region(s) to " + servers + " server(s)");
1463       }
1464       for (Map.Entry<ServerName, List<HRegionInfo>> plan: bulkPlan.entrySet()) {
1465         if (!assign(plan.getKey(), plan.getValue()) && !server.isStopped()) {
1466           for (HRegionInfo region: plan.getValue()) {
1467             if (!regionStates.isRegionOnline(region)) {
1468               invokeAssign(region);
1469             }
1470           }
1471         }
1472       }
1473     } else {
1474       LOG.info("Bulk assigning " + regions + " region(s) across "
1475         + totalServers + " server(s), " + message);
1476 
1477       // Use fixed count thread pool assigning.
1478       BulkAssigner ba = new GeneralBulkAssigner(
1479         this.server, bulkPlan, this, bulkAssignWaitTillAllAssigned);
1480       ba.bulkAssign();
1481       LOG.info("Bulk assigning done");
1482     }
1483   }
1484 
1485   /**
1486    * Assigns all user regions, if any exist.  Used during cluster startup.
1487    * <p>
1488    * This is a synchronous call and will return once every region has been
1489    * assigned.  If anything fails, an exception is thrown and the cluster
1490    * should be shutdown.
1491    * @throws InterruptedException
1492    * @throws IOException
1493    */
1494   private void assignAllUserRegions(Map<HRegionInfo, ServerName> allRegions)
1495       throws IOException, InterruptedException {
1496     if (allRegions == null || allRegions.isEmpty()) return;
1497 
1498     // Determine what type of assignment to do on startup
1499     boolean retainAssignment = server.getConfiguration().
1500       getBoolean("hbase.master.startup.retainassign", true);
1501 
1502     Set<HRegionInfo> regionsFromMetaScan = allRegions.keySet();
1503     if (retainAssignment) {
1504       assign(allRegions);
1505     } else {
1506       List<HRegionInfo> regions = new ArrayList<HRegionInfo>(regionsFromMetaScan);
1507       assign(regions);
1508     }
1509 
1510     for (HRegionInfo hri : regionsFromMetaScan) {
1511       TableName tableName = hri.getTable();
1512       if (!tableStateManager.isTableState(tableName,
1513               TableState.State.ENABLED)) {
1514         setEnabledTable(tableName);
1515       }
1516     }
1517     // assign all the replicas that were not recorded in the meta
1518     assign(replicaRegionsNotRecordedInMeta(regionsFromMetaScan, (MasterServices)server));
1519   }
1520 
1521   /**
1522    * Get a list of replica regions that are:
1523    * not recorded in meta yet. We might not have recorded the locations
1524    * for the replicas since the replicas may not have been online yet, master restarted
1525    * in the middle of assigning, ZK erased, etc.
1526    * @param regionsRecordedInMeta the list of regions we know are recorded in meta
1527    * either as a default, or, as the location of a replica
1528    * @param master
1529    * @return list of replica regions
1530    * @throws IOException
1531    */
1532   public static List<HRegionInfo> replicaRegionsNotRecordedInMeta(
1533       Set<HRegionInfo> regionsRecordedInMeta, MasterServices master)throws IOException {
1534     List<HRegionInfo> regionsNotRecordedInMeta = new ArrayList<HRegionInfo>();
1535     for (HRegionInfo hri : regionsRecordedInMeta) {
1536       TableName table = hri.getTable();
1537       HTableDescriptor htd = master.getTableDescriptors().get(table);
1538       // look at the HTD for the replica count. That's the source of truth
1539       int desiredRegionReplication = htd.getRegionReplication();
1540       for (int i = 0; i < desiredRegionReplication; i++) {
1541         HRegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hri, i);
1542         if (regionsRecordedInMeta.contains(replica)) continue;
1543         regionsNotRecordedInMeta.add(replica);
1544       }
1545     }
1546     return regionsNotRecordedInMeta;
1547   }
1548 
1549   /**
1550    * Rebuild the list of user regions and assignment information.
1551    * <p>
1552    * Returns a set of servers that are not found to be online that hosted
1553    * some regions.
1554    * @return set of servers not online that hosted some regions per meta
1555    * @throws IOException
1556    */
1557   Set<ServerName> rebuildUserRegions() throws
1558           IOException, KeeperException {
1559     Set<TableName> disabledOrEnablingTables = tableStateManager.getTablesInStates(
1560             TableState.State.DISABLED, TableState.State.ENABLING);
1561 
1562     Set<TableName> disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
1563             TableState.State.DISABLED,
1564             TableState.State.DISABLING,
1565             TableState.State.ENABLING);
1566 
1567     // Region assignment from META
1568     List<Result> results = MetaTableAccessor.fullScanOfMeta(server.getConnection());
1569     // Get any new but slow to checkin region server that joined the cluster
1570     Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
1571     // Set of offline servers to be returned
1572     Set<ServerName> offlineServers = new HashSet<ServerName>();
1573     // Iterate regions in META
1574     for (Result result : results) {
1575       if (result == null && LOG.isDebugEnabled()){
1576         LOG.debug("null result from meta - ignoring but this is strange.");
1577         continue;
1578       }
1579       // keep a track of replicas to close. These were the replicas of the originally
1580       // unmerged regions. The master might have closed them before but it mightn't
1581       // maybe because it crashed.
1582       PairOfSameType<HRegionInfo> p = MetaTableAccessor.getMergeRegions(result);
1583       if (p.getFirst() != null && p.getSecond() != null) {
1584         int numReplicas = ((MasterServices)server).getTableDescriptors().get(p.getFirst().
1585             getTable()).getRegionReplication();
1586         for (HRegionInfo merge : p) {
1587           for (int i = 1; i < numReplicas; i++) {
1588             replicasToClose.add(RegionReplicaUtil.getRegionInfoForReplica(merge, i));
1589           }
1590         }
1591       }
1592       RegionLocations rl =  MetaTableAccessor.getRegionLocations(result);
1593       if (rl == null) continue;
1594       HRegionLocation[] locations = rl.getRegionLocations();
1595       if (locations == null) continue;
1596       for (HRegionLocation hrl : locations) {
1597         HRegionInfo regionInfo = hrl.getRegionInfo();
1598         if (regionInfo == null) continue;
1599         int replicaId = regionInfo.getReplicaId();
1600         State state = RegionStateStore.getRegionState(result, replicaId);
1601         // keep a track of replicas to close. These were the replicas of the split parents
1602         // from the previous life of the master. The master should have closed them before
1603         // but it couldn't maybe because it crashed
1604         if (replicaId == 0 && state.equals(State.SPLIT)) {
1605           for (HRegionLocation h : locations) {
1606             replicasToClose.add(h.getRegionInfo());
1607           }
1608         }
1609         ServerName lastHost = hrl.getServerName();
1610         ServerName regionLocation = RegionStateStore.getRegionServer(result, replicaId);
1611         regionStates.createRegionState(regionInfo, state, regionLocation, lastHost);
1612         if (!regionStates.isRegionInState(regionInfo, State.OPEN)) {
1613           // Region is not open (either offline or in transition), skip
1614           continue;
1615         }
1616         TableName tableName = regionInfo.getTable();
1617         if (!onlineServers.contains(regionLocation)) {
1618           // Region is located on a server that isn't online
1619           offlineServers.add(regionLocation);
1620         } else if (!disabledOrEnablingTables.contains(tableName)) {
1621           // Region is being served and on an active server
1622           // add only if region not in disabled or enabling table
1623           regionStates.regionOnline(regionInfo, regionLocation);
1624           balancer.regionOnline(regionInfo, regionLocation);
1625         }
1626         // need to enable the table if not disabled or disabling or enabling
1627         // this will be used in rolling restarts
1628         if (!disabledOrDisablingOrEnabling.contains(tableName)
1629           && !getTableStateManager().isTableState(tableName,
1630                 TableState.State.ENABLED)) {
1631           setEnabledTable(tableName);
1632         }
1633       }
1634     }
1635     return offlineServers;
1636   }
1637 
1638   /**
1639    * Recover the tables that were not fully moved to DISABLED state. These
1640    * tables are in DISABLING state when the master restarted/switched.
1641    *
1642    * @throws KeeperException
1643    * @throws TableNotFoundException
1644    * @throws IOException
1645    */
1646   private void recoverTableInDisablingState()
1647           throws KeeperException, IOException {
1648     Set<TableName> disablingTables =
1649             tableStateManager.getTablesInStates(TableState.State.DISABLING);
1650     if (disablingTables.size() != 0) {
1651       for (TableName tableName : disablingTables) {
1652         // Recover by calling DisableTableHandler
1653         LOG.info("The table " + tableName
1654             + " is in DISABLING state.  Hence recovering by moving the table"
1655             + " to DISABLED state.");
1656         new DisableTableHandler(this.server, tableName,
1657             this, tableLockManager, true).prepare().process();
1658       }
1659     }
1660   }
1661 
1662   /**
1663    * Recover the tables that are not fully moved to ENABLED state. These tables
1664    * are in ENABLING state when the master restarted/switched
1665    *
1666    * @throws KeeperException
1667    * @throws org.apache.hadoop.hbase.TableNotFoundException
1668    * @throws IOException
1669    */
1670   private void recoverTableInEnablingState()
1671           throws KeeperException, IOException {
1672     Set<TableName> enablingTables = tableStateManager.
1673             getTablesInStates(TableState.State.ENABLING);
1674     if (enablingTables.size() != 0) {
1675       for (TableName tableName : enablingTables) {
1676         // Recover by calling EnableTableHandler
1677         LOG.info("The table " + tableName
1678             + " is in ENABLING state.  Hence recovering by moving the table"
1679             + " to ENABLED state.");
1680         // enableTable in sync way during master startup,
1681         // no need to invoke coprocessor
1682         EnableTableHandler eth = new EnableTableHandler(this.server, tableName,
1683           this, tableLockManager, true);
1684         try {
1685           eth.prepare();
1686         } catch (TableNotFoundException e) {
1687           LOG.warn("Table " + tableName + " not found in hbase:meta to recover.");
1688           continue;
1689         }
1690         eth.process();
1691       }
1692     }
1693   }
1694 
1695   /**
1696    * Processes list of regions in transition at startup
1697    */
1698   void processRegionsInTransition(Collection<RegionState> regionsInTransition) {
1699     // We need to send RPC call again for PENDING_OPEN/PENDING_CLOSE regions
1700     // in case the RPC call is not sent out yet before the master was shut down
1701     // since we update the state before we send the RPC call. We can't update
1702     // the state after the RPC call. Otherwise, we don't know what's happened
1703     // to the region if the master dies right after the RPC call is out.
1704     for (RegionState regionState: regionsInTransition) {
1705       LOG.info("Processing " + regionState);
1706       ServerName serverName = regionState.getServerName();
1707       // Server could be null in case of FAILED_OPEN when master cannot find a region plan. In that
1708       // case, try assigning it here.
1709       if (serverName != null && !serverManager.getOnlineServers().containsKey(serverName)) {
1710         LOG.info("Server " + serverName + " isn't online. SSH will handle this");
1711         continue; // SSH will handle it
1712       }
1713       HRegionInfo regionInfo = regionState.getRegion();
1714       RegionState.State state = regionState.getState();
1715       switch (state) {
1716       case CLOSED:
1717         invokeAssign(regionState.getRegion());
1718         break;
1719       case PENDING_OPEN:
1720         retrySendRegionOpen(regionState);
1721         break;
1722       case PENDING_CLOSE:
1723         retrySendRegionClose(regionState);
1724         break;
1725       case FAILED_CLOSE:
1726       case FAILED_OPEN:
1727         invokeUnAssign(regionInfo);
1728         break;
1729       default:
1730         // No process for other states
1731       }
1732     }
1733   }
1734 
1735   /**
1736    * At master failover, for pending_open region, make sure
1737    * sendRegionOpen RPC call is sent to the target regionserver
1738    */
1739   private void retrySendRegionOpen(final RegionState regionState) {
1740     this.executorService.submit(
1741       new EventHandler(server, EventType.M_MASTER_RECOVERY) {
1742         @Override
1743         public void process() throws IOException {
1744           HRegionInfo hri = regionState.getRegion();
1745           ServerName serverName = regionState.getServerName();
1746           ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
1747           try {
1748             for (int i = 1; i <= maximumAttempts; i++) {
1749               if (!serverManager.isServerOnline(serverName)
1750                   || server.isStopped() || server.isAborted()) {
1751                 return; // No need any more
1752               }
1753               try {
1754                 if (!regionState.equals(regionStates.getRegionState(hri))) {
1755                   return; // Region is not in the expected state any more
1756                 }
1757                 List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
1758                 if (shouldAssignRegionsWithFavoredNodes) {
1759                   favoredNodes = ((FavoredNodeLoadBalancer)balancer).getFavoredNodes(hri);
1760                 }
1761                 serverManager.sendRegionOpen(serverName, hri, favoredNodes);
1762                 return; // we're done
1763               } catch (Throwable t) {
1764                 if (t instanceof RemoteException) {
1765                   t = ((RemoteException) t).unwrapRemoteException();
1766                 }
1767                 if (t instanceof FailedServerException && i < maximumAttempts) {
1768                   // In case the server is in the failed server list, no point to
1769                   // retry too soon. Retry after the failed_server_expiry time
1770                   try {
1771                     Configuration conf = this.server.getConfiguration();
1772                     long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1773                       RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
1774                     if (LOG.isDebugEnabled()) {
1775                       LOG.debug(serverName + " is on failed server list; waiting "
1776                         + sleepTime + "ms", t);
1777                     }
1778                     Thread.sleep(sleepTime);
1779                     continue;
1780                   } catch (InterruptedException ie) {
1781                     LOG.warn("Failed to assign "
1782                       + hri.getRegionNameAsString() + " since interrupted", ie);
1783                     regionStates.updateRegionState(hri, State.FAILED_OPEN);
1784                     Thread.currentThread().interrupt();
1785                     return;
1786                   }
1787                 }
1788                 if (serverManager.isServerOnline(serverName)
1789                     && t instanceof java.net.SocketTimeoutException) {
1790                   i--; // reset the try count
1791                 } else {
1792                   LOG.info("Got exception in retrying sendRegionOpen for "
1793                     + regionState + "; try=" + i + " of " + maximumAttempts, t);
1794                 }
1795                 Threads.sleep(100);
1796               }
1797             }
1798             // Run out of attempts
1799             regionStates.updateRegionState(hri, State.FAILED_OPEN);
1800           } finally {
1801             lock.unlock();
1802           }
1803         }
1804       });
1805   }
1806 
1807   /**
1808    * At master failover, for pending_close region, make sure
1809    * sendRegionClose RPC call is sent to the target regionserver
1810    */
1811   private void retrySendRegionClose(final RegionState regionState) {
1812     this.executorService.submit(
1813       new EventHandler(server, EventType.M_MASTER_RECOVERY) {
1814         @Override
1815         public void process() throws IOException {
1816           HRegionInfo hri = regionState.getRegion();
1817           ServerName serverName = regionState.getServerName();
1818           ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
1819           try {
1820             for (int i = 1; i <= maximumAttempts; i++) {
1821               if (!serverManager.isServerOnline(serverName)
1822                   || server.isStopped() || server.isAborted()) {
1823                 return; // No need any more
1824               }
1825               try {
1826                 if (!regionState.equals(regionStates.getRegionState(hri))) {
1827                   return; // Region is not in the expected state any more
1828                 }
1829                 serverManager.sendRegionClose(serverName, hri, null);
1830                 return; // Done.
1831               } catch (Throwable t) {
1832                 if (t instanceof RemoteException) {
1833                   t = ((RemoteException) t).unwrapRemoteException();
1834                 }
1835                 if (t instanceof FailedServerException && i < maximumAttempts) {
1836                   // In case the server is in the failed server list, no point to
1837                   // retry too soon. Retry after the failed_server_expiry time
1838                   try {
1839                     Configuration conf = this.server.getConfiguration();
1840                     long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1841                       RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
1842                     if (LOG.isDebugEnabled()) {
1843                       LOG.debug(serverName + " is on failed server list; waiting "
1844                         + sleepTime + "ms", t);
1845                     }
1846                     Thread.sleep(sleepTime);
1847                     continue;
1848                   } catch (InterruptedException ie) {
1849                     LOG.warn("Failed to unassign "
1850                       + hri.getRegionNameAsString() + " since interrupted", ie);
1851                     regionStates.updateRegionState(hri, RegionState.State.FAILED_CLOSE);
1852                     Thread.currentThread().interrupt();
1853                     return;
1854                   }
1855                 }
1856                 if (serverManager.isServerOnline(serverName)
1857                     && t instanceof java.net.SocketTimeoutException) {
1858                   i--; // reset the try count
1859                 } else {
1860                   LOG.info("Got exception in retrying sendRegionClose for "
1861                     + regionState + "; try=" + i + " of " + maximumAttempts, t);
1862                 }
1863                 Threads.sleep(100);
1864               }
1865             }
1866             // Run out of attempts
1867             regionStates.updateRegionState(hri, State.FAILED_CLOSE);
1868           } finally {
1869             lock.unlock();
1870           }
1871         }
1872       });
1873   }
1874 
1875   /**
1876    * Set Regions in transitions metrics.
1877    * This takes an iterator on the RegionInTransition map (CLSM), and is not synchronized.
1878    * This iterator is not fail fast, which may lead to stale read; but that's better than
1879    * creating a copy of the map for metrics computation, as this method will be invoked
1880    * on a frequent interval.
1881    */
1882   public void updateRegionsInTransitionMetrics() {
1883     long currentTime = System.currentTimeMillis();
1884     int totalRITs = 0;
1885     int totalRITsOverThreshold = 0;
1886     long oldestRITTime = 0;
1887     int ritThreshold = this.server.getConfiguration().
1888       getInt(HConstants.METRICS_RIT_STUCK_WARNING_THRESHOLD, 60000);
1889     for (RegionState state: regionStates.getRegionsInTransition().values()) {
1890       totalRITs++;
1891       long ritTime = currentTime - state.getStamp();
1892       if (ritTime > ritThreshold) { // more than the threshold
1893         totalRITsOverThreshold++;
1894       }
1895       if (oldestRITTime < ritTime) {
1896         oldestRITTime = ritTime;
1897       }
1898     }
1899     if (this.metricsAssignmentManager != null) {
1900       this.metricsAssignmentManager.updateRITOldestAge(oldestRITTime);
1901       this.metricsAssignmentManager.updateRITCount(totalRITs);
1902       this.metricsAssignmentManager.updateRITCountOverThreshold(totalRITsOverThreshold);
1903     }
1904   }
1905 
1906   /**
1907    * @param region Region whose plan we are to clear.
1908    */
1909   private void clearRegionPlan(final HRegionInfo region) {
1910     synchronized (this.regionPlans) {
1911       this.regionPlans.remove(region.getEncodedName());
1912     }
1913   }
1914 
1915   /**
1916    * Wait on region to clear regions-in-transition.
1917    * @param hri Region to wait on.
1918    * @throws IOException
1919    */
1920   public void waitOnRegionToClearRegionsInTransition(final HRegionInfo hri)
1921       throws IOException, InterruptedException {
1922     waitOnRegionToClearRegionsInTransition(hri, -1L);
1923   }
1924 
1925   /**
1926    * Wait on region to clear regions-in-transition or time out
1927    * @param hri
1928    * @param timeOut Milliseconds to wait for current region to be out of transition state.
1929    * @return True when a region clears regions-in-transition before timeout otherwise false
1930    * @throws InterruptedException
1931    */
1932   public boolean waitOnRegionToClearRegionsInTransition(final HRegionInfo hri, long timeOut)
1933       throws InterruptedException {
1934     if (!regionStates.isRegionInTransition(hri)) return true;
1935     long end = (timeOut <= 0) ? Long.MAX_VALUE : EnvironmentEdgeManager.currentTime()
1936         + timeOut;
1937     // There is already a timeout monitor on regions in transition so I
1938     // should not have to have one here too?
1939     LOG.info("Waiting for " + hri.getEncodedName() +
1940         " to leave regions-in-transition, timeOut=" + timeOut + " ms.");
1941     while (!this.server.isStopped() && regionStates.isRegionInTransition(hri)) {
1942       regionStates.waitForUpdate(100);
1943       if (EnvironmentEdgeManager.currentTime() > end) {
1944         LOG.info("Timed out on waiting for " + hri.getEncodedName() + " to be assigned.");
1945         return false;
1946       }
1947     }
1948     if (this.server.isStopped()) {
1949       LOG.info("Giving up wait on regions in transition because stoppable.isStopped is set");
1950       return false;
1951     }
1952     return true;
1953   }
1954 
1955   void invokeAssign(HRegionInfo regionInfo) {
1956     threadPoolExecutorService.submit(new AssignCallable(this, regionInfo));
1957   }
1958 
1959   void invokeUnAssign(HRegionInfo regionInfo) {
1960     threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
1961   }
1962 
1963   public boolean isCarryingMeta(ServerName serverName) {
1964     return isCarryingRegion(serverName, HRegionInfo.FIRST_META_REGIONINFO);
1965   }
1966 
1967   /**
1968    * Check if the shutdown server carries the specific region.
1969    * @return whether the serverName currently hosts the region
1970    */
1971   private boolean isCarryingRegion(ServerName serverName, HRegionInfo hri) {
1972     RegionState regionState = regionStates.getRegionTransitionState(hri);
1973     ServerName transitionAddr = regionState != null? regionState.getServerName(): null;
1974     if (transitionAddr != null) {
1975       boolean matchTransitionAddr = transitionAddr.equals(serverName);
1976       LOG.debug("Checking region=" + hri.getRegionNameAsString()
1977         + ", transitioning on server=" + matchTransitionAddr
1978         + " server being checked: " + serverName
1979         + ", matches=" + matchTransitionAddr);
1980       return matchTransitionAddr;
1981     }
1982 
1983     ServerName assignedAddr = regionStates.getRegionServerOfRegion(hri);
1984     boolean matchAssignedAddr = serverName.equals(assignedAddr);
1985     LOG.debug("based on AM, current region=" + hri.getRegionNameAsString()
1986       + " is on server=" + assignedAddr + ", server being checked: "
1987       + serverName);
1988     return matchAssignedAddr;
1989   }
1990 
1991   /**
1992    * Process shutdown server removing any assignments.
1993    * @param sn Server that went down.
1994    * @return list of regions in transition on this server
1995    */
1996   public List<HRegionInfo> processServerShutdown(final ServerName sn) {
1997     // Clean out any existing assignment plans for this server
1998     synchronized (this.regionPlans) {
1999       for (Iterator <Map.Entry<String, RegionPlan>> i =
2000           this.regionPlans.entrySet().iterator(); i.hasNext();) {
2001         Map.Entry<String, RegionPlan> e = i.next();
2002         ServerName otherSn = e.getValue().getDestination();
2003         // The name will be null if the region is planned for a random assign.
2004         if (otherSn != null && otherSn.equals(sn)) {
2005           // Use iterator's remove else we'll get CME
2006           i.remove();
2007         }
2008       }
2009     }
2010     List<HRegionInfo> rits = regionStates.serverOffline(sn);
2011     for (Iterator<HRegionInfo> it = rits.iterator(); it.hasNext(); ) {
2012       HRegionInfo hri = it.next();
2013       String encodedName = hri.getEncodedName();
2014 
2015       // We need a lock on the region as we could update it
2016       Lock lock = locker.acquireLock(encodedName);
2017       try {
2018         RegionState regionState =
2019           regionStates.getRegionTransitionState(encodedName);
2020         if (regionState == null
2021             || (regionState.getServerName() != null && !regionState.isOnServer(sn))
2022             || !RegionStates.isOneOfStates(regionState, State.PENDING_OPEN,
2023                 State.OPENING, State.FAILED_OPEN, State.FAILED_CLOSE, State.OFFLINE)) {
2024           LOG.info("Skip " + regionState + " since it is not opening/failed_close"
2025             + " on the dead server any more: " + sn);
2026           it.remove();
2027         } else {
2028           if (tableStateManager.isTableState(hri.getTable(),
2029                   TableState.State.DISABLED, TableState.State.DISABLING)) {
2030             regionStates.regionOffline(hri);
2031             it.remove();
2032             continue;
2033           }
2034           // Mark the region offline and assign it again by SSH
2035           regionStates.updateRegionState(hri, State.OFFLINE);
2036         }
2037       } finally {
2038         lock.unlock();
2039       }
2040     }
2041     return rits;
2042   }
2043 
2044   /**
2045    * @param plan Plan to execute.
2046    */
2047   public void balance(final RegionPlan plan) {
2048     HRegionInfo hri = plan.getRegionInfo();
2049     TableName tableName = hri.getTable();
2050     if (tableStateManager.isTableState(tableName,
2051             TableState.State.DISABLED, TableState.State.DISABLING)) {
2052       LOG.info("Ignored moving region of disabling/disabled table "
2053         + tableName);
2054       return;
2055     }
2056 
2057     // Move the region only if it's assigned
2058     String encodedName = hri.getEncodedName();
2059     ReentrantLock lock = locker.acquireLock(encodedName);
2060     try {
2061       if (!regionStates.isRegionOnline(hri)) {
2062         RegionState state = regionStates.getRegionState(encodedName);
2063         LOG.info("Ignored moving region not assigned: " + hri + ", "
2064           + (state == null ? "not in region states" : state));
2065         return;
2066       }
2067       synchronized (this.regionPlans) {
2068         this.regionPlans.put(plan.getRegionName(), plan);
2069       }
2070       unassign(hri, plan.getDestination());
2071     } finally {
2072       lock.unlock();
2073     }
2074   }
2075 
2076   public void stop() {
2077     // Shutdown the threadpool executor service
2078     threadPoolExecutorService.shutdownNow();
2079     regionStateStore.stop();
2080   }
2081 
2082   protected void setEnabledTable(TableName tableName) {
2083     try {
2084       this.tableStateManager.setTableState(tableName,
2085               TableState.State.ENABLED);
2086     } catch (IOException e) {
2087       // here we can abort as it is the start up flow
2088       String errorMsg = "Unable to ensure that the table " + tableName
2089           + " will be" + " enabled because of a ZooKeeper issue";
2090       LOG.error(errorMsg);
2091       this.server.abort(errorMsg, e);
2092     }
2093   }
2094 
2095   private String onRegionFailedOpen(final RegionState current,
2096       final HRegionInfo hri, final ServerName serverName) {
2097     // The region must be opening on this server.
2098     // If current state is failed_open on the same server,
2099     // it could be a reportRegionTransition RPC retry.
2100     if (current == null || !current.isOpeningOrFailedOpenOnServer(serverName)) {
2101       return hri.getShortNameToLog() + " is not opening on " + serverName;
2102     }
2103 
2104     // Just return in case of retrying
2105     if (current.isFailedOpen()) {
2106       return null;
2107     }
2108 
2109     String encodedName = hri.getEncodedName();
2110     AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
2111     if (failedOpenCount == null) {
2112       failedOpenCount = new AtomicInteger();
2113       // No need to use putIfAbsent, or extra synchronization since
2114       // this whole handleRegion block is locked on the encoded region
2115       // name, and failedOpenTracker is updated only in this block
2116       failedOpenTracker.put(encodedName, failedOpenCount);
2117     }
2118     if (failedOpenCount.incrementAndGet() >= maximumAttempts && !hri.isMetaRegion()) {
2119       regionStates.updateRegionState(hri, State.FAILED_OPEN);
2120       // remove the tracking info to save memory, also reset
2121       // the count for next open initiative
2122       failedOpenTracker.remove(encodedName);
2123     } else {
2124       if (hri.isMetaRegion() && failedOpenCount.get() >= maximumAttempts) {
2125         // Log a warning message if a meta region failedOpenCount exceeds maximumAttempts
2126         // so that we are aware of potential problem if it persists for a long time.
2127         LOG.warn("Failed to open the hbase:meta region " +
2128             hri.getRegionNameAsString() + " after" +
2129             failedOpenCount.get() + " retries. Continue retrying.");
2130       }
2131 
2132       // Handle this the same as if it were opened and then closed.
2133       RegionState regionState = regionStates.updateRegionState(hri, State.CLOSED);
2134       if (regionState != null) {
2135         // When there are more than one region server a new RS is selected as the
2136         // destination and the same is updated in the region plan. (HBASE-5546)
2137         if (getTableStateManager().isTableState(hri.getTable(),
2138                 TableState.State.DISABLED, TableState.State.DISABLING) ||
2139                 replicasToClose.contains(hri)) {
2140           offlineDisabledRegion(hri);
2141           return null;
2142         }
2143         regionStates.updateRegionState(hri, RegionState.State.CLOSED);
2144         // This below has to do w/ online enable/disable of a table
2145         removeClosedRegion(hri);
2146         try {
2147           getRegionPlan(hri, true);
2148         } catch (HBaseIOException e) {
2149           LOG.warn("Failed to get region plan", e);
2150         }
2151         invokeAssign(hri);
2152       }
2153     }
2154     // Null means no error
2155     return null;
2156   }
2157 
2158   private String onRegionOpen(final RegionState current, final HRegionInfo hri,
2159       final ServerName serverName, final RegionStateTransition transition) {
2160     // The region must be opening on this server.
2161     // If current state is already opened on the same server,
2162     // it could be a reportRegionTransition RPC retry.
2163     if (current == null || !current.isOpeningOrOpenedOnServer(serverName)) {
2164       return hri.getShortNameToLog() + " is not opening on " + serverName;
2165     }
2166 
2167     // Just return in case of retrying
2168     if (current.isOpened()) {
2169       return null;
2170     }
2171 
2172     long openSeqNum = transition.hasOpenSeqNum()
2173       ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM;
2174     if (openSeqNum < 0) {
2175       return "Newly opened region has invalid open seq num " + openSeqNum;
2176     }
2177     regionOnline(hri, serverName, openSeqNum);
2178 
2179     // reset the count, if any
2180     failedOpenTracker.remove(hri.getEncodedName());
2181     if (getTableStateManager().isTableState(hri.getTable(),
2182             TableState.State.DISABLED, TableState.State.DISABLING)) {
2183       invokeUnAssign(hri);
2184     }
2185     return null;
2186   }
2187 
2188   private String onRegionClosed(final RegionState current,
2189       final HRegionInfo hri, final ServerName serverName) {
2190     // Region will be usually assigned right after closed. When a RPC retry comes
2191     // in, the region may already have moved away from closed state. However, on the
2192     // region server side, we don't care much about the response for this transition.
2193     // We only make sure master has got and processed this report, either
2194     // successfully or not. So this is fine, not a problem at all.
2195     if (current == null || !current.isClosingOrClosedOnServer(serverName)) {
2196       return hri.getShortNameToLog() + " is not closing on " + serverName;
2197     }
2198 
2199     // Just return in case of retrying
2200     if (current.isClosed()) {
2201       return null;
2202     }
2203 
2204     if (getTableStateManager().isTableState(hri.getTable(), TableState.State.DISABLED,
2205         TableState.State.DISABLING) || replicasToClose.contains(hri)) {
2206       offlineDisabledRegion(hri);
2207       return null;
2208     }
2209 
2210     regionStates.updateRegionState(hri, RegionState.State.CLOSED);
2211     sendRegionClosedNotification(hri);
2212     // This below has to do w/ online enable/disable of a table
2213     removeClosedRegion(hri);
2214     invokeAssign(hri);
2215     return null;
2216   }
2217 
2218   private String onRegionReadyToSplit(final RegionState current, final HRegionInfo hri,
2219       final ServerName serverName, final RegionStateTransition transition) {
2220     // The region must be opened on this server.
2221     // If current state is already splitting on the same server,
2222     // it could be a reportRegionTransition RPC retry.
2223     if (current == null || !current.isSplittingOrOpenedOnServer(serverName)) {
2224       return hri.getShortNameToLog() + " is not opening on " + serverName;
2225     }
2226 
2227     // Just return in case of retrying
2228     if (current.isSplitting()) {
2229       return null;
2230     }
2231 
2232     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2233     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2234     RegionState rs_a = regionStates.getRegionState(a);
2235     RegionState rs_b = regionStates.getRegionState(b);
2236     if (rs_a != null || rs_b != null) {
2237       return "Some daughter is already existing. "
2238         + "a=" + rs_a + ", b=" + rs_b;
2239     }
2240 
2241     // Server holding is not updated at this stage.
2242     // It is done after PONR.
2243     regionStates.updateRegionState(hri, State.SPLITTING);
2244     regionStates.createRegionState(
2245       a, State.SPLITTING_NEW, serverName, null);
2246     regionStates.createRegionState(
2247       b, State.SPLITTING_NEW, serverName, null);
2248     return null;
2249   }
2250 
2251   private String onRegionSplitPONR(final RegionState current, final HRegionInfo hri,
2252       final ServerName serverName, final RegionStateTransition transition) {
2253     // The region must be splitting on this server, and the daughters must be in
2254     // splitting_new state. To check RPC retry, we use server holding info.
2255     if (current == null || !current.isSplittingOnServer(serverName)) {
2256       return hri.getShortNameToLog() + " is not splitting on " + serverName;
2257     }
2258 
2259     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2260     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2261     RegionState rs_a = regionStates.getRegionState(a);
2262     RegionState rs_b = regionStates.getRegionState(b);
2263 
2264     // Master could have restarted and lost the new region
2265     // states, if so, they must be lost together
2266     if (rs_a == null && rs_b == null) {
2267       rs_a = regionStates.createRegionState(
2268         a, State.SPLITTING_NEW, serverName, null);
2269       rs_b = regionStates.createRegionState(
2270         b, State.SPLITTING_NEW, serverName, null);
2271     }
2272 
2273     if (rs_a == null || !rs_a.isSplittingNewOnServer(serverName)
2274         || rs_b == null || !rs_b.isSplittingNewOnServer(serverName)) {
2275       return "Some daughter is not known to be splitting on " + serverName
2276         + ", a=" + rs_a + ", b=" + rs_b;
2277     }
2278 
2279     // Just return in case of retrying
2280     if (!regionStates.isRegionOnServer(hri, serverName)) {
2281       return null;
2282     }
2283 
2284     try {
2285       regionStates.splitRegion(hri, a, b, serverName);
2286     } catch (IOException ioe) {
2287       LOG.info("Failed to record split region " + hri.getShortNameToLog());
2288       return "Failed to record the splitting in meta";
2289     }
2290     return null;
2291   }
2292 
2293   private String onRegionSplit(final RegionState current, final HRegionInfo hri,
2294       final ServerName serverName, final RegionStateTransition transition) {
2295     // The region must be splitting on this server, and the daughters must be in
2296     // splitting_new state.
2297     // If current state is already split on the same server,
2298     // it could be a reportRegionTransition RPC retry.
2299     if (current == null || !current.isSplittingOrSplitOnServer(serverName)) {
2300       return hri.getShortNameToLog() + " is not splitting on " + serverName;
2301     }
2302 
2303     // Just return in case of retrying
2304     if (current.isSplit()) {
2305       return null;
2306     }
2307 
2308     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2309     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2310     RegionState rs_a = regionStates.getRegionState(a);
2311     RegionState rs_b = regionStates.getRegionState(b);
2312     if (rs_a == null || !rs_a.isSplittingNewOnServer(serverName)
2313         || rs_b == null || !rs_b.isSplittingNewOnServer(serverName)) {
2314       return "Some daughter is not known to be splitting on " + serverName
2315         + ", a=" + rs_a + ", b=" + rs_b;
2316     }
2317 
2318     if (TEST_SKIP_SPLIT_HANDLING) {
2319       return "Skipping split message, TEST_SKIP_SPLIT_HANDLING is set";
2320     }
2321     regionOffline(hri, State.SPLIT);
2322     regionOnline(a, serverName, 1);
2323     regionOnline(b, serverName, 1);
2324 
2325     // User could disable the table before master knows the new region.
2326     if (getTableStateManager().isTableState(hri.getTable(),
2327         TableState.State.DISABLED, TableState.State.DISABLING)) {
2328       invokeUnAssign(a);
2329       invokeUnAssign(b);
2330     } else {
2331       Callable<Object> splitReplicasCallable = new Callable<Object>() {
2332         @Override
2333         public Object call() {
2334           doSplittingOfReplicas(hri, a, b);
2335           return null;
2336         }
2337       };
2338       threadPoolExecutorService.submit(splitReplicasCallable);
2339     }
2340     return null;
2341   }
2342 
2343   private String onRegionSplitReverted(final RegionState current, final HRegionInfo hri,
2344       final ServerName serverName, final RegionStateTransition transition) {
2345     // The region must be splitting on this server, and the daughters must be in
2346     // splitting_new state.
2347     // If the region is in open state, it could be an RPC retry.
2348     if (current == null || !current.isSplittingOrOpenedOnServer(serverName)) {
2349       return hri.getShortNameToLog() + " is not splitting on " + serverName;
2350     }
2351 
2352     // Just return in case of retrying
2353     if (current.isOpened()) {
2354       return null;
2355     }
2356 
2357     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2358     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2359     RegionState rs_a = regionStates.getRegionState(a);
2360     RegionState rs_b = regionStates.getRegionState(b);
2361     if (rs_a == null || !rs_a.isSplittingNewOnServer(serverName)
2362         || rs_b == null || !rs_b.isSplittingNewOnServer(serverName)) {
2363       return "Some daughter is not known to be splitting on " + serverName
2364         + ", a=" + rs_a + ", b=" + rs_b;
2365     }
2366 
2367     regionOnline(hri, serverName);
2368     regionOffline(a);
2369     regionOffline(b);
2370     if (getTableStateManager().isTableState(hri.getTable(),
2371         TableState.State.DISABLED, TableState.State.DISABLING)) {
2372       invokeUnAssign(hri);
2373     }
2374     return null;
2375   }
2376 
2377   private String onRegionReadyToMerge(final RegionState current, final HRegionInfo hri,
2378       final ServerName serverName, final RegionStateTransition transition) {
2379     // The region must be new, and the daughters must be open on this server.
2380     // If the region is in merge_new state, it could be an RPC retry.
2381     if (current != null && !current.isMergingNewOnServer(serverName)) {
2382       return "Merging daughter region already exists, p=" + current;
2383     }
2384 
2385     // Just return in case of retrying
2386     if (current != null) {
2387       return null;
2388     }
2389 
2390     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2391     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2392     Set<String> encodedNames = new HashSet<String>(2);
2393     encodedNames.add(a.getEncodedName());
2394     encodedNames.add(b.getEncodedName());
2395     Map<String, Lock> locks = locker.acquireLocks(encodedNames);
2396     try {
2397       RegionState rs_a = regionStates.getRegionState(a);
2398       RegionState rs_b = regionStates.getRegionState(b);
2399       if (rs_a == null || !rs_a.isOpenedOnServer(serverName)
2400           || rs_b == null || !rs_b.isOpenedOnServer(serverName)) {
2401         return "Some daughter is not in a state to merge on " + serverName
2402           + ", a=" + rs_a + ", b=" + rs_b;
2403       }
2404 
2405       regionStates.updateRegionState(a, State.MERGING);
2406       regionStates.updateRegionState(b, State.MERGING);
2407       regionStates.createRegionState(
2408         hri, State.MERGING_NEW, serverName, null);
2409       return null;
2410     } finally {
2411       for (Lock lock: locks.values()) {
2412         lock.unlock();
2413       }
2414     }
2415   }
2416 
2417   private String onRegionMergePONR(final RegionState current, final HRegionInfo hri,
2418       final ServerName serverName, final RegionStateTransition transition) {
2419     // The region must be in merging_new state, and the daughters must be
2420     // merging. To check RPC retry, we use server holding info.
2421     if (current != null && !current.isMergingNewOnServer(serverName)) {
2422       return hri.getShortNameToLog() + " is not merging on " + serverName;
2423     }
2424 
2425     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2426     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2427     RegionState rs_a = regionStates.getRegionState(a);
2428     RegionState rs_b = regionStates.getRegionState(b);
2429     if (rs_a == null || !rs_a.isMergingOnServer(serverName)
2430         || rs_b == null || !rs_b.isMergingOnServer(serverName)) {
2431       return "Some daughter is not known to be merging on " + serverName
2432         + ", a=" + rs_a + ", b=" + rs_b;
2433     }
2434 
2435     // Master could have restarted and lost the new region state
2436     if (current == null) {
2437       regionStates.createRegionState(
2438         hri, State.MERGING_NEW, serverName, null);
2439     }
2440 
2441     // Just return in case of retrying
2442     if (regionStates.isRegionOnServer(hri, serverName)) {
2443       return null;
2444     }
2445 
2446     try {
2447       regionStates.mergeRegions(hri, a, b, serverName);
2448     } catch (IOException ioe) {
2449       LOG.info("Failed to record merged region " + hri.getShortNameToLog());
2450       return "Failed to record the merging in meta";
2451     }
2452     return null;
2453   }
2454 
2455   private String onRegionMerged(final RegionState current, final HRegionInfo hri,
2456       final ServerName serverName, final RegionStateTransition transition) {
2457     // The region must be in merging_new state, and the daughters must be
2458     // merging on this server.
2459     // If current state is already opened on the same server,
2460     // it could be a reportRegionTransition RPC retry.
2461     if (current == null || !current.isMergingNewOrOpenedOnServer(serverName)) {
2462       return hri.getShortNameToLog() + " is not merging on " + serverName;
2463     }
2464 
2465     // Just return in case of retrying
2466     if (current.isOpened()) {
2467       return null;
2468     }
2469 
2470     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2471     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2472     RegionState rs_a = regionStates.getRegionState(a);
2473     RegionState rs_b = regionStates.getRegionState(b);
2474     if (rs_a == null || !rs_a.isMergingOnServer(serverName)
2475         || rs_b == null || !rs_b.isMergingOnServer(serverName)) {
2476       return "Some daughter is not known to be merging on " + serverName
2477         + ", a=" + rs_a + ", b=" + rs_b;
2478     }
2479 
2480     regionOffline(a, State.MERGED);
2481     regionOffline(b, State.MERGED);
2482     regionOnline(hri, serverName, 1);
2483 
2484     // User could disable the table before master knows the new region.
2485     if (getTableStateManager().isTableState(hri.getTable(),
2486         TableState.State.DISABLED, TableState.State.DISABLING)) {
2487       invokeUnAssign(hri);
2488     } else {
2489       Callable<Object> mergeReplicasCallable = new Callable<Object>() {
2490         @Override
2491         public Object call() {
2492           doMergingOfReplicas(hri, a, b);
2493           return null;
2494         }
2495       };
2496       threadPoolExecutorService.submit(mergeReplicasCallable);
2497     }
2498     return null;
2499   }
2500 
2501   private String onRegionMergeReverted(final RegionState current, final HRegionInfo hri,
2502       final ServerName serverName, final RegionStateTransition transition) {
2503     // The region must be in merging_new state, and the daughters must be
2504     // merging on this server.
2505     // If the region is in offline state, it could be an RPC retry.
2506     if (current == null || !current.isMergingNewOrOfflineOnServer(serverName)) {
2507       return hri.getShortNameToLog() + " is not merging on " + serverName;
2508     }
2509 
2510     // Just return in case of retrying
2511     if (current.isOffline()) {
2512       return null;
2513     }
2514 
2515     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2516     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2517     RegionState rs_a = regionStates.getRegionState(a);
2518     RegionState rs_b = regionStates.getRegionState(b);
2519     if (rs_a == null || !rs_a.isMergingOnServer(serverName)
2520         || rs_b == null || !rs_b.isMergingOnServer(serverName)) {
2521       return "Some daughter is not known to be merging on " + serverName
2522         + ", a=" + rs_a + ", b=" + rs_b;
2523     }
2524 
2525     regionOnline(a, serverName);
2526     regionOnline(b, serverName);
2527     regionOffline(hri);
2528 
2529     if (getTableStateManager().isTableState(hri.getTable(),
2530         TableState.State.DISABLED, TableState.State.DISABLING)) {
2531       invokeUnAssign(a);
2532       invokeUnAssign(b);
2533     }
2534     return null;
2535   }
2536 
2537   private void doMergingOfReplicas(HRegionInfo mergedHri, final HRegionInfo hri_a,
2538       final HRegionInfo hri_b) {
2539     // Close replicas for the original unmerged regions. create/assign new replicas
2540     // for the merged parent.
2541     List<HRegionInfo> unmergedRegions = new ArrayList<HRegionInfo>();
2542     unmergedRegions.add(hri_a);
2543     unmergedRegions.add(hri_b);
2544     Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(unmergedRegions);
2545     Collection<List<HRegionInfo>> c = map.values();
2546     for (List<HRegionInfo> l : c) {
2547       for (HRegionInfo h : l) {
2548         if (!RegionReplicaUtil.isDefaultReplica(h)) {
2549           LOG.debug("Unassigning un-merged replica " + h);
2550           unassign(h);
2551         }
2552       }
2553     }
2554     int numReplicas = 1;
2555     try {
2556       numReplicas = ((MasterServices)server).getTableDescriptors().get(mergedHri.getTable()).
2557           getRegionReplication();
2558     } catch (IOException e) {
2559       LOG.warn("Couldn't get the replication attribute of the table " + mergedHri.getTable() +
2560           " due to " + e.getMessage() + ". The assignment of replicas for the merged region " +
2561           "will not be done");
2562     }
2563     List<HRegionInfo> regions = new ArrayList<HRegionInfo>();
2564     for (int i = 1; i < numReplicas; i++) {
2565       regions.add(RegionReplicaUtil.getRegionInfoForReplica(mergedHri, i));
2566     }
2567     try {
2568       assign(regions);
2569     } catch (IOException ioe) {
2570       LOG.warn("Couldn't assign all replica(s) of region " + mergedHri + " because of " +
2571                 ioe.getMessage());
2572     } catch (InterruptedException ie) {
2573       LOG.warn("Couldn't assign all replica(s) of region " + mergedHri+ " because of " +
2574                 ie.getMessage());
2575     }
2576   }
2577 
2578   private void doSplittingOfReplicas(final HRegionInfo parentHri, final HRegionInfo hri_a,
2579       final HRegionInfo hri_b) {
2580     // create new regions for the replica, and assign them to match with the
2581     // current replica assignments. If replica1 of parent is assigned to RS1,
2582     // the replica1s of daughters will be on the same machine
2583     int numReplicas = 1;
2584     try {
2585       numReplicas = ((MasterServices)server).getTableDescriptors().get(parentHri.getTable()).
2586           getRegionReplication();
2587     } catch (IOException e) {
2588       LOG.warn("Couldn't get the replication attribute of the table " + parentHri.getTable() +
2589           " due to " + e.getMessage() + ". The assignment of daughter replicas " +
2590           "replicas will not be done");
2591     }
2592     // unassign the old replicas
2593     List<HRegionInfo> parentRegion = new ArrayList<HRegionInfo>();
2594     parentRegion.add(parentHri);
2595     Map<ServerName, List<HRegionInfo>> currentAssign =
2596         regionStates.getRegionAssignments(parentRegion);
2597     Collection<List<HRegionInfo>> c = currentAssign.values();
2598     for (List<HRegionInfo> l : c) {
2599       for (HRegionInfo h : l) {
2600         if (!RegionReplicaUtil.isDefaultReplica(h)) {
2601           LOG.debug("Unassigning parent's replica " + h);
2602           unassign(h);
2603         }
2604       }
2605     }
2606     // assign daughter replicas
2607     Map<HRegionInfo, ServerName> map = new HashMap<HRegionInfo, ServerName>();
2608     for (int i = 1; i < numReplicas; i++) {
2609       prepareDaughterReplicaForAssignment(hri_a, parentHri, i, map);
2610       prepareDaughterReplicaForAssignment(hri_b, parentHri, i, map);
2611     }
2612     try {
2613       assign(map);
2614     } catch (IOException e) {
2615       LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)");
2616     } catch (InterruptedException e) {
2617       LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)");
2618     }
2619   }
2620 
2621   private void prepareDaughterReplicaForAssignment(HRegionInfo daughterHri, HRegionInfo parentHri,
2622       int replicaId, Map<HRegionInfo, ServerName> map) {
2623     HRegionInfo parentReplica = RegionReplicaUtil.getRegionInfoForReplica(parentHri, replicaId);
2624     HRegionInfo daughterReplica = RegionReplicaUtil.getRegionInfoForReplica(daughterHri,
2625         replicaId);
2626     LOG.debug("Created replica region for daughter " + daughterReplica);
2627     ServerName sn;
2628     if ((sn = regionStates.getRegionServerOfRegion(parentReplica)) != null) {
2629       map.put(daughterReplica, sn);
2630     } else {
2631       List<ServerName> servers = serverManager.getOnlineServersList();
2632       sn = servers.get((new Random(System.currentTimeMillis())).nextInt(servers.size()));
2633       map.put(daughterReplica, sn);
2634     }
2635   }
2636 
2637   public Set<HRegionInfo> getReplicasToClose() {
2638     return replicasToClose;
2639   }
2640 
2641   /**
2642    * A region is offline.  The new state should be the specified one,
2643    * if not null.  If the specified state is null, the new state is Offline.
2644    * The specified state can be Split/Merged/Offline/null only.
2645    */
2646   private void regionOffline(final HRegionInfo regionInfo, final State state) {
2647     regionStates.regionOffline(regionInfo, state);
2648     removeClosedRegion(regionInfo);
2649     // remove the region plan as well just in case.
2650     clearRegionPlan(regionInfo);
2651     balancer.regionOffline(regionInfo);
2652 
2653     // Tell our listeners that a region was closed
2654     sendRegionClosedNotification(regionInfo);
2655     // also note that all the replicas of the primary should be closed
2656     if (state != null && state.equals(State.SPLIT)) {
2657       Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1);
2658       c.add(regionInfo);
2659       Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(c);
2660       Collection<List<HRegionInfo>> allReplicas = map.values();
2661       for (List<HRegionInfo> list : allReplicas) {
2662         replicasToClose.addAll(list);
2663       }
2664     }
2665     else if (state != null && state.equals(State.MERGED)) {
2666       Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1);
2667       c.add(regionInfo);
2668       Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(c);
2669       Collection<List<HRegionInfo>> allReplicas = map.values();
2670       for (List<HRegionInfo> list : allReplicas) {
2671         replicasToClose.addAll(list);
2672       }
2673     }
2674   }
2675 
2676   private void sendRegionOpenedNotification(final HRegionInfo regionInfo,
2677       final ServerName serverName) {
2678     if (!this.listeners.isEmpty()) {
2679       for (AssignmentListener listener : this.listeners) {
2680         listener.regionOpened(regionInfo, serverName);
2681       }
2682     }
2683   }
2684 
2685   private void sendRegionClosedNotification(final HRegionInfo regionInfo) {
2686     if (!this.listeners.isEmpty()) {
2687       for (AssignmentListener listener : this.listeners) {
2688         listener.regionClosed(regionInfo);
2689       }
2690     }
2691   }
2692 
2693   /**
2694    * Try to update some region states. If the state machine prevents
2695    * such update, an error message is returned to explain the reason.
2696    *
2697    * It's expected that in each transition there should have just one
2698    * region for opening/closing, 3 regions for splitting/merging.
2699    * These regions should be on the server that requested the change.
2700    *
2701    * Region state machine. Only these transitions
2702    * are expected to be triggered by a region server.
2703    *
2704    * On the state transition:
2705    *  (1) Open/Close should be initiated by master
2706    *      (a) Master sets the region to pending_open/pending_close
2707    *        in memory and hbase:meta after sending the request
2708    *        to the region server
2709    *      (b) Region server reports back to the master
2710    *        after open/close is done (either success/failure)
2711    *      (c) If region server has problem to report the status
2712    *        to master, it must be because the master is down or some
2713    *        temporary network issue. Otherwise, the region server should
2714    *        abort since it must be a bug. If the master is not accessible,
2715    *        the region server should keep trying until the server is
2716    *        stopped or till the status is reported to the (new) master
2717    *      (d) If region server dies in the middle of opening/closing
2718    *        a region, SSH picks it up and finishes it
2719    *      (e) If master dies in the middle, the new master recovers
2720    *        the state during initialization from hbase:meta. Region server
2721    *        can report any transition that has not been reported to
2722    *        the previous active master yet
2723    *  (2) Split/merge is initiated by region servers
2724    *      (a) To split a region, a region server sends a request
2725    *        to master to try to set a region to splitting, together with
2726    *        two daughters (to be created) to splitting new. If approved
2727    *        by the master, the splitting can then move ahead
2728    *      (b) To merge two regions, a region server sends a request to
2729    *        master to try to set the new merged region (to be created) to
2730    *        merging_new, together with two regions (to be merged) to merging.
2731    *        If it is ok with the master, the merge can then move ahead
2732    *      (c) Once the splitting/merging is done, the region server
2733    *        reports the status back to the master either success/failure.
2734    *      (d) Other scenarios should be handled similarly as for
2735    *        region open/close
2736    */
2737   protected String onRegionTransition(final ServerName serverName,
2738       final RegionStateTransition transition) {
2739     TransitionCode code = transition.getTransitionCode();
2740     HRegionInfo hri = HRegionInfo.convert(transition.getRegionInfo(0));
2741     Lock lock = locker.acquireLock(hri.getEncodedName());
2742     try {
2743       RegionState current = regionStates.getRegionState(hri);
2744       if (LOG.isDebugEnabled()) {
2745         LOG.debug("Got transition " + code + " for "
2746           + (current != null ? current.toString() : hri.getShortNameToLog())
2747           + " from " + serverName);
2748       }
2749       String errorMsg = null;
2750       switch (code) {
2751       case OPENED:
2752         errorMsg = onRegionOpen(current, hri, serverName, transition);
2753         break;
2754       case FAILED_OPEN:
2755         errorMsg = onRegionFailedOpen(current, hri, serverName);
2756         break;
2757       case CLOSED:
2758         errorMsg = onRegionClosed(current, hri, serverName);
2759         break;
2760       case READY_TO_SPLIT:
2761         errorMsg = onRegionReadyToSplit(current, hri, serverName, transition);
2762         break;
2763       case SPLIT_PONR:
2764         errorMsg = onRegionSplitPONR(current, hri, serverName, transition);
2765         break;
2766       case SPLIT:
2767         errorMsg = onRegionSplit(current, hri, serverName, transition);
2768         break;
2769       case SPLIT_REVERTED:
2770         errorMsg = onRegionSplitReverted(current, hri, serverName, transition);
2771         break;
2772       case READY_TO_MERGE:
2773         errorMsg = onRegionReadyToMerge(current, hri, serverName, transition);
2774         break;
2775       case MERGE_PONR:
2776         errorMsg = onRegionMergePONR(current, hri, serverName, transition);
2777         break;
2778       case MERGED:
2779         errorMsg = onRegionMerged(current, hri, serverName, transition);
2780         break;
2781       case MERGE_REVERTED:
2782         errorMsg = onRegionMergeReverted(current, hri, serverName, transition);
2783         break;
2784 
2785       default:
2786         errorMsg = "Unexpected transition code " + code;
2787       }
2788       if (errorMsg != null) {
2789         LOG.info("Could not transition region from " + current + " on "
2790           + code + " by " + serverName + ": " + errorMsg);
2791       }
2792       return errorMsg;
2793     } finally {
2794       lock.unlock();
2795     }
2796   }
2797 
2798   /**
2799    * @return Instance of load balancer
2800    */
2801   public LoadBalancer getBalancer() {
2802     return this.balancer;
2803   }
2804 
2805   public Map<ServerName, List<HRegionInfo>>
2806     getSnapShotOfAssignment(Collection<HRegionInfo> infos) {
2807     return getRegionStates().getRegionAssignments(infos);
2808   }
2809 }