View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Collection;
24  import java.util.Collections;
25  import java.util.HashMap;
26  import java.util.HashSet;
27  import java.util.Iterator;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.NavigableMap;
31  import java.util.Random;
32  import java.util.Set;
33  import java.util.TreeMap;
34  import java.util.concurrent.Callable;
35  import java.util.concurrent.ConcurrentHashMap;
36  import java.util.concurrent.CopyOnWriteArrayList;
37  import java.util.concurrent.TimeUnit;
38  import java.util.concurrent.atomic.AtomicBoolean;
39  import java.util.concurrent.atomic.AtomicInteger;
40  import java.util.concurrent.locks.Lock;
41  import java.util.concurrent.locks.ReentrantLock;
42  
43  import org.apache.commons.logging.Log;
44  import org.apache.commons.logging.LogFactory;
45  import org.apache.hadoop.hbase.classification.InterfaceAudience;
46  import org.apache.hadoop.conf.Configuration;
47  import org.apache.hadoop.fs.FileSystem;
48  import org.apache.hadoop.fs.Path;
49  import org.apache.hadoop.hbase.CoordinatedStateException;
50  import org.apache.hadoop.hbase.HBaseIOException;
51  import org.apache.hadoop.hbase.HConstants;
52  import org.apache.hadoop.hbase.HRegionInfo;
53  import org.apache.hadoop.hbase.HRegionLocation;
54  import org.apache.hadoop.hbase.HTableDescriptor;
55  import org.apache.hadoop.hbase.MetaTableAccessor;
56  import org.apache.hadoop.hbase.NotServingRegionException;
57  import org.apache.hadoop.hbase.RegionLocations;
58  import org.apache.hadoop.hbase.ServerName;
59  import org.apache.hadoop.hbase.TableName;
60  import org.apache.hadoop.hbase.TableNotFoundException;
61  import org.apache.hadoop.hbase.client.RegionReplicaUtil;
62  import org.apache.hadoop.hbase.client.Result;
63  import org.apache.hadoop.hbase.client.TableState;
64  import org.apache.hadoop.hbase.executor.EventHandler;
65  import org.apache.hadoop.hbase.executor.EventType;
66  import org.apache.hadoop.hbase.executor.ExecutorService;
67  import org.apache.hadoop.hbase.ipc.FailedServerException;
68  import org.apache.hadoop.hbase.ipc.RpcClient;
69  import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
70  import org.apache.hadoop.hbase.master.RegionState.State;
71  import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper;
72  import org.apache.hadoop.hbase.master.balancer.FavoredNodeLoadBalancer;
73  import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
74  import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
75  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
76  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
77  import org.apache.hadoop.hbase.quotas.RegionStateListener;
78  import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
79  import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
80  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
81  import org.apache.hadoop.hbase.wal.DefaultWALProvider;
82  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
83  import org.apache.hadoop.hbase.util.FSUtils;
84  import org.apache.hadoop.hbase.util.KeyLocker;
85  import org.apache.hadoop.hbase.util.Pair;
86  import org.apache.hadoop.hbase.util.PairOfSameType;
87  import org.apache.hadoop.hbase.util.Threads;
88  import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
89  import org.apache.hadoop.ipc.RemoteException;
90  import org.apache.hadoop.util.StringUtils;
91  import org.apache.zookeeper.KeeperException;
92  
93  import com.google.common.annotations.VisibleForTesting;
94  
95  /**
96   * Manages and performs region assignment.
97   * Related communications with regionserver are all done over RPC.
98   */
99  @InterfaceAudience.Private
100 public class AssignmentManager {
101   private static final Log LOG = LogFactory.getLog(AssignmentManager.class);
102 
103   protected final MasterServices server;
104 
105   private ServerManager serverManager;
106 
107   private boolean shouldAssignRegionsWithFavoredNodes;
108 
109   private LoadBalancer balancer;
110 
111   private final MetricsAssignmentManager metricsAssignmentManager;
112 
113   private final TableLockManager tableLockManager;
114 
115   private AtomicInteger numRegionsOpened = new AtomicInteger(0);
116 
117   final private KeyLocker<String> locker = new KeyLocker<String>();
118 
119   Set<HRegionInfo> replicasToClose = Collections.synchronizedSet(new HashSet<HRegionInfo>());
120 
121   /**
122    * Map of regions to reopen after the schema of a table is changed. Key -
123    * encoded region name, value - HRegionInfo
124    */
125   private final Map <String, HRegionInfo> regionsToReopen;
126 
127   /*
128    * Maximum times we recurse an assignment/unassignment.
129    * See below in {@link #assign()} and {@link #unassign()}.
130    */
131   private final int maximumAttempts;
132 
133   /**
134    * The sleep time for which the assignment will wait before retrying in case of
135    * hbase:meta assignment failure due to lack of availability of region plan or bad region plan
136    */
137   private final long sleepTimeBeforeRetryingMetaAssignment;
138 
139   /** Plans for region movement. Key is the encoded version of a region name*/
140   // TODO: When do plans get cleaned out?  Ever? In server open and in server
141   // shutdown processing -- St.Ack
142   // All access to this Map must be synchronized.
143   final NavigableMap<String, RegionPlan> regionPlans =
144     new TreeMap<String, RegionPlan>();
145 
146   private final TableStateManager tableStateManager;
147 
148   private final ExecutorService executorService;
149 
150   // Thread pool executor service. TODO, consolidate with executorService?
151   private java.util.concurrent.ExecutorService threadPoolExecutorService;
152 
153   private final RegionStates regionStates;
154 
155   // The threshold to use bulk assigning. Using bulk assignment
156   // only if assigning at least this many regions to at least this
157   // many servers. If assigning fewer regions to fewer servers,
158   // bulk assigning may be not as efficient.
159   private final int bulkAssignThresholdRegions;
160   private final int bulkAssignThresholdServers;
161   private final int bulkPerRegionOpenTimeGuesstimate;
162 
163   // Should bulk assignment wait till all regions are assigned,
164   // or it is timed out?  This is useful to measure bulk assignment
165   // performance, but not needed in most use cases.
166   private final boolean bulkAssignWaitTillAllAssigned;
167 
168   /**
169    * Indicator that AssignmentManager has recovered the region states so
170    * that ServerShutdownHandler can be fully enabled and re-assign regions
171    * of dead servers. So that when re-assignment happens, AssignmentManager
172    * has proper region states.
173    *
174    * Protected to ease testing.
175    */
176   protected final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
177 
178   /**
179    * A map to track the count a region fails to open in a row.
180    * So that we don't try to open a region forever if the failure is
181    * unrecoverable.  We don't put this information in region states
182    * because we don't expect this to happen frequently; we don't
183    * want to copy this information over during each state transition either.
184    */
185   private final ConcurrentHashMap<String, AtomicInteger>
186     failedOpenTracker = new ConcurrentHashMap<String, AtomicInteger>();
187 
188   // In case not using ZK for region assignment, region states
189   // are persisted in meta with a state store
190   private final RegionStateStore regionStateStore;
191 
192   /**
193    * For testing only!  Set to true to skip handling of split.
194    */
195   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
196   public static boolean TEST_SKIP_SPLIT_HANDLING = false;
197 
198   /** Listeners that are called on assignment events. */
199   private List<AssignmentListener> listeners = new CopyOnWriteArrayList<AssignmentListener>();
200 
201   private RegionStateListener regionStateListener;
202 
203   /**
204    * Constructs a new assignment manager.
205    *
206    * @param server instance of HMaster this AM running inside
207    * @param serverManager serverManager for associated HMaster
208    * @param balancer implementation of {@link LoadBalancer}
209    * @param service Executor service
210    * @param metricsMaster metrics manager
211    * @param tableLockManager TableLock manager
212    * @throws IOException
213    */
214   public AssignmentManager(MasterServices server, ServerManager serverManager,
215       final LoadBalancer balancer,
216       final ExecutorService service, MetricsMaster metricsMaster,
217       final TableLockManager tableLockManager,
218       final TableStateManager tableStateManager)
219           throws IOException {
220     this.server = server;
221     this.serverManager = serverManager;
222     this.executorService = service;
223     this.regionStateStore = new RegionStateStore(server);
224     this.regionsToReopen = Collections.synchronizedMap
225                            (new HashMap<String, HRegionInfo> ());
226     Configuration conf = server.getConfiguration();
227     // Only read favored nodes if using the favored nodes load balancer.
228     this.shouldAssignRegionsWithFavoredNodes = conf.getClass(
229            HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class).equals(
230            FavoredNodeLoadBalancer.class);
231 
232     this.tableStateManager = tableStateManager;
233 
234     // This is the max attempts, not retries, so it should be at least 1.
235     this.maximumAttempts = Math.max(1,
236       this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10));
237     this.sleepTimeBeforeRetryingMetaAssignment = this.server.getConfiguration().getLong(
238         "hbase.meta.assignment.retry.sleeptime", 1000l);
239     this.balancer = balancer;
240     int maxThreads = conf.getInt("hbase.assignment.threads.max", 30);
241     this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool(
242       maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("AM."));
243     this.regionStates = new RegionStates(
244       server, tableStateManager, serverManager, regionStateStore);
245 
246     this.bulkAssignWaitTillAllAssigned =
247       conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false);
248     this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7);
249     this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3);
250     this.bulkPerRegionOpenTimeGuesstimate =
251       conf.getInt("hbase.bulk.assignment.perregion.open.time", 10000);
252 
253     this.metricsAssignmentManager = new MetricsAssignmentManager();
254     this.tableLockManager = tableLockManager;
255   }
256 
257   /**
258    * Add the listener to the notification list.
259    * @param listener The AssignmentListener to register
260    */
261   public void registerListener(final AssignmentListener listener) {
262     this.listeners.add(listener);
263   }
264 
265   /**
266    * Remove the listener from the notification list.
267    * @param listener The AssignmentListener to unregister
268    */
269   public boolean unregisterListener(final AssignmentListener listener) {
270     return this.listeners.remove(listener);
271   }
272 
273   /**
274    * @return Instance of ZKTableStateManager.
275    */
276   public TableStateManager getTableStateManager() {
277     // These are 'expensive' to make involving trip to zk ensemble so allow
278     // sharing.
279     return this.tableStateManager;
280   }
281 
282   /**
283    * This SHOULD not be public. It is public now
284    * because of some unit tests.
285    *
286    * TODO: make it package private and keep RegionStates in the master package
287    */
288   public RegionStates getRegionStates() {
289     return regionStates;
290   }
291 
292   /**
293    * Used in some tests to mock up region state in meta
294    */
295   @VisibleForTesting
296   RegionStateStore getRegionStateStore() {
297     return regionStateStore;
298   }
299 
300   public RegionPlan getRegionReopenPlan(HRegionInfo hri) {
301     return new RegionPlan(hri, null, regionStates.getRegionServerOfRegion(hri));
302   }
303 
304   /**
305    * Add a regionPlan for the specified region.
306    * @param encodedName
307    * @param plan
308    */
309   public void addPlan(String encodedName, RegionPlan plan) {
310     synchronized (regionPlans) {
311       regionPlans.put(encodedName, plan);
312     }
313   }
314 
315   /**
316    * Add a map of region plans.
317    */
318   public void addPlans(Map<String, RegionPlan> plans) {
319     synchronized (regionPlans) {
320       regionPlans.putAll(plans);
321     }
322   }
323 
324   /**
325    * Set the list of regions that will be reopened
326    * because of an update in table schema
327    *
328    * @param regions
329    *          list of regions that should be tracked for reopen
330    */
331   public void setRegionsToReopen(List <HRegionInfo> regions) {
332     for(HRegionInfo hri : regions) {
333       regionsToReopen.put(hri.getEncodedName(), hri);
334     }
335   }
336 
337   /**
338    * Used by the client to identify if all regions have the schema updates
339    *
340    * @param tableName
341    * @return Pair indicating the status of the alter command
342    * @throws IOException
343    */
344   public Pair<Integer, Integer> getReopenStatus(TableName tableName)
345       throws IOException {
346     List<HRegionInfo> hris;
347     if (TableName.META_TABLE_NAME.equals(tableName)) {
348       hris = new MetaTableLocator().getMetaRegions(server.getZooKeeper());
349     } else {
350       hris = MetaTableAccessor.getTableRegions(server.getConnection(), tableName, true);
351     }
352 
353     Integer pending = 0;
354     for (HRegionInfo hri : hris) {
355       String name = hri.getEncodedName();
356       // no lock concurrent access ok: sequential consistency respected.
357       if (regionsToReopen.containsKey(name)
358           || regionStates.isRegionInTransition(name)) {
359         pending++;
360       }
361     }
362     return new Pair<Integer, Integer>(pending, hris.size());
363   }
364 
365   /**
366    * Used by ServerShutdownHandler to make sure AssignmentManager has completed
367    * the failover cleanup before re-assigning regions of dead servers. So that
368    * when re-assignment happens, AssignmentManager has proper region states.
369    */
370   public boolean isFailoverCleanupDone() {
371     return failoverCleanupDone.get();
372   }
373 
374   /**
375    * To avoid racing with AM, external entities may need to lock a region,
376    * for example, when SSH checks what regions to skip re-assigning.
377    */
378   public Lock acquireRegionLock(final String encodedName) {
379     return locker.acquireLock(encodedName);
380   }
381 
382   /**
383    * Now, failover cleanup is completed. Notify server manager to
384    * process queued up dead servers processing, if any.
385    */
386   void failoverCleanupDone() {
387     failoverCleanupDone.set(true);
388     serverManager.processQueuedDeadServers();
389   }
390 
391   /**
392    * Called on startup.
393    * Figures whether a fresh cluster start of we are joining extant running cluster.
394    * @throws IOException
395    * @throws KeeperException
396    * @throws InterruptedException
397    * @throws CoordinatedStateException
398    */
399   void joinCluster()
400   throws IOException, KeeperException, InterruptedException, CoordinatedStateException {
401     long startTime = System.currentTimeMillis();
402     // Concurrency note: In the below the accesses on regionsInTransition are
403     // outside of a synchronization block where usually all accesses to RIT are
404     // synchronized.  The presumption is that in this case it is safe since this
405     // method is being played by a single thread on startup.
406 
407     // TODO: Regions that have a null location and are not in regionsInTransitions
408     // need to be handled.
409 
410     // Scan hbase:meta to build list of existing regions, servers, and assignment
411     // Returns servers who have not checked in (assumed dead) that some regions
412     // were assigned to (according to the meta)
413     Set<ServerName> deadServers = rebuildUserRegions();
414 
415     // This method will assign all user regions if a clean server startup or
416     // it will reconstruct master state and cleanup any leftovers from previous master process.
417     boolean failover = processDeadServersAndRegionsInTransition(deadServers);
418 
419     recoverTableInDisablingState();
420     recoverTableInEnablingState();
421     LOG.info("Joined the cluster in " + (System.currentTimeMillis()
422       - startTime) + "ms, failover=" + failover);
423   }
424 
425   /**
426    * Process all regions that are in transition in zookeeper and also
427    * processes the list of dead servers.
428    * Used by master joining an cluster.  If we figure this is a clean cluster
429    * startup, will assign all user regions.
430    * @param deadServers Set of servers that are offline probably legitimately that were carrying
431    * regions according to a scan of hbase:meta. Can be null.
432    * @throws IOException
433    * @throws InterruptedException
434    */
435   boolean processDeadServersAndRegionsInTransition(final Set<ServerName> deadServers)
436   throws KeeperException, IOException, InterruptedException, CoordinatedStateException {
437     // TODO Needed? List<String> nodes = ZKUtil.listChildrenNoWatch(watcher, watcher.assignmentZNode);
438     boolean failover = !serverManager.getDeadServers().isEmpty();
439     if (failover) {
440       // This may not be a failover actually, especially if meta is on this master.
441       if (LOG.isDebugEnabled()) {
442         LOG.debug("Found dead servers out on cluster " + serverManager.getDeadServers());
443       }
444     } else {
445       // If any one region except meta is assigned, it's a failover.
446       Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
447       for (Map.Entry<HRegionInfo, ServerName> en:
448           regionStates.getRegionAssignments().entrySet()) {
449         HRegionInfo hri = en.getKey();
450         if (!hri.isMetaTable()
451             && onlineServers.contains(en.getValue())) {
452           LOG.debug("Found " + hri + " out on cluster");
453           failover = true;
454           break;
455         }
456       }
457       if (!failover) {
458         // If any region except meta is in transition on a live server, it's a failover.
459         Map<String, RegionState> regionsInTransition = regionStates.getRegionsInTransition();
460         if (!regionsInTransition.isEmpty()) {
461           for (RegionState regionState: regionsInTransition.values()) {
462             ServerName serverName = regionState.getServerName();
463             if (!regionState.getRegion().isMetaRegion()
464                 && serverName != null && onlineServers.contains(serverName)) {
465               LOG.debug("Found " + regionState + " in RITs");
466               failover = true;
467               break;
468             }
469           }
470         }
471       }
472     }
473     if (!failover) {
474       // If we get here, we have a full cluster restart. It is a failover only
475       // if there are some WALs are not split yet. For meta WALs, they should have
476       // been split already, if any. We can walk through those queued dead servers,
477       // if they don't have any WALs, this restart should be considered as a clean one
478       Set<ServerName> queuedDeadServers = serverManager.getRequeuedDeadServers().keySet();
479       if (!queuedDeadServers.isEmpty()) {
480         Configuration conf = server.getConfiguration();
481         Path rootdir = FSUtils.getRootDir(conf);
482         FileSystem fs = rootdir.getFileSystem(conf);
483         for (ServerName serverName: queuedDeadServers) {
484           // In the case of a clean exit, the shutdown handler would have presplit any WALs and
485           // removed empty directories.
486           Path logDir = new Path(rootdir,
487               DefaultWALProvider.getWALDirectoryName(serverName.toString()));
488           Path splitDir = logDir.suffix(DefaultWALProvider.SPLITTING_EXT);
489           if (fs.exists(logDir) || fs.exists(splitDir)) {
490             LOG.debug("Found queued dead server " + serverName);
491             failover = true;
492             break;
493           }
494         }
495         if (!failover) {
496           // We figured that it's not a failover, so no need to
497           // work on these re-queued dead servers any more.
498           LOG.info("AM figured that it's not a failover and cleaned up "
499             + queuedDeadServers.size() + " queued dead servers");
500           serverManager.removeRequeuedDeadServers();
501         }
502       }
503     }
504 
505     Set<TableName> disabledOrDisablingOrEnabling = null;
506     Map<HRegionInfo, ServerName> allRegions = null;
507 
508     if (!failover) {
509       disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
510         TableState.State.DISABLED, TableState.State.DISABLING,
511         TableState.State.ENABLING);
512 
513       // Clean re/start, mark all user regions closed before reassignment
514       allRegions = regionStates.closeAllUserRegions(
515         disabledOrDisablingOrEnabling);
516     }
517 
518     // Now region states are restored
519     regionStateStore.start();
520 
521     if (failover) {
522       if (deadServers != null && !deadServers.isEmpty()) {
523         for (ServerName serverName: deadServers) {
524           if (!serverManager.isServerDead(serverName)) {
525             serverManager.expireServer(serverName); // Let SSH do region re-assign
526           }
527         }
528       }
529       processRegionsInTransition(regionStates.getRegionsInTransition().values());
530     }
531 
532     // Now we can safely claim failover cleanup completed and enable
533     // ServerShutdownHandler for further processing. The nodes (below)
534     // in transition, if any, are for regions not related to those
535     // dead servers at all, and can be done in parallel to SSH.
536     failoverCleanupDone();
537     if (!failover) {
538       // Fresh cluster startup.
539       LOG.info("Clean cluster startup. Assigning user regions");
540       assignAllUserRegions(allRegions);
541     }
542     // unassign replicas of the split parents and the merged regions
543     // the daughter replicas are opened in assignAllUserRegions if it was
544     // not already opened.
545     for (HRegionInfo h : replicasToClose) {
546       unassign(h);
547     }
548     replicasToClose.clear();
549     return failover;
550   }
551 
552   /**
553    * When a region is closed, it should be removed from the regionsToReopen
554    * @param hri HRegionInfo of the region which was closed
555    */
556   public void removeClosedRegion(HRegionInfo hri) {
557     if (regionsToReopen.remove(hri.getEncodedName()) != null) {
558       LOG.debug("Removed region from reopening regions because it was closed");
559     }
560   }
561 
562   // TODO: processFavoredNodes might throw an exception, for e.g., if the
563   // meta could not be contacted/updated. We need to see how seriously to treat
564   // this problem as. Should we fail the current assignment. We should be able
565   // to recover from this problem eventually (if the meta couldn't be updated
566   // things should work normally and eventually get fixed up).
567   void processFavoredNodes(List<HRegionInfo> regions) throws IOException {
568     if (!shouldAssignRegionsWithFavoredNodes) return;
569     // The AM gets the favored nodes info for each region and updates the meta
570     // table with that info
571     Map<HRegionInfo, List<ServerName>> regionToFavoredNodes =
572         new HashMap<HRegionInfo, List<ServerName>>();
573     for (HRegionInfo region : regions) {
574       regionToFavoredNodes.put(region,
575           ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region));
576     }
577     FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(regionToFavoredNodes,
578       this.server.getConnection());
579   }
580 
581   /**
582    * Marks the region as online.  Removes it from regions in transition and
583    * updates the in-memory assignment information.
584    * <p>
585    * Used when a region has been successfully opened on a region server.
586    * @param regionInfo
587    * @param sn
588    */
589   void regionOnline(HRegionInfo regionInfo, ServerName sn) {
590     regionOnline(regionInfo, sn, HConstants.NO_SEQNUM);
591   }
592 
593   void regionOnline(HRegionInfo regionInfo, ServerName sn, long openSeqNum) {
594     numRegionsOpened.incrementAndGet();
595     regionStates.regionOnline(regionInfo, sn, openSeqNum);
596 
597     // Remove plan if one.
598     clearRegionPlan(regionInfo);
599     balancer.regionOnline(regionInfo, sn);
600 
601     // Tell our listeners that a region was opened
602     sendRegionOpenedNotification(regionInfo, sn);
603   }
604 
605   /**
606    * Marks the region as offline.  Removes it from regions in transition and
607    * removes in-memory assignment information.
608    * <p>
609    * Used when a region has been closed and should remain closed.
610    * @param regionInfo
611    */
612   public void regionOffline(final HRegionInfo regionInfo) {
613     regionOffline(regionInfo, null);
614   }
615 
616   public void offlineDisabledRegion(HRegionInfo regionInfo) {
617     replicasToClose.remove(regionInfo);
618     regionOffline(regionInfo);
619   }
620 
621   // Assignment methods
622 
623   /**
624    * Assigns the specified region.
625    * <p>
626    * If a RegionPlan is available with a valid destination then it will be used
627    * to determine what server region is assigned to.  If no RegionPlan is
628    * available, region will be assigned to a random available server.
629    * <p>
630    * Updates the RegionState and sends the OPEN RPC.
631    * <p>
632    * This will only succeed if the region is in transition and in a CLOSED or
633    * OFFLINE state or not in transition, and of course, the
634    * chosen server is up and running (It may have just crashed!).
635    *
636    * @param region server to be assigned
637    */
638   public void assign(HRegionInfo region) {
639     assign(region, false);
640   }
641 
642   /**
643    * Use care with forceNewPlan. It could cause double assignment.
644    */
645   public void assign(HRegionInfo region, boolean forceNewPlan) {
646     if (isDisabledorDisablingRegionInRIT(region)) {
647       return;
648     }
649     String encodedName = region.getEncodedName();
650     Lock lock = locker.acquireLock(encodedName);
651     try {
652       RegionState state = forceRegionStateToOffline(region, forceNewPlan);
653       if (state != null) {
654         if (regionStates.wasRegionOnDeadServer(encodedName)) {
655           LOG.info("Skip assigning " + region.getRegionNameAsString()
656             + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
657             + " is dead but not processed yet");
658           return;
659         }
660         assign(state, forceNewPlan);
661       }
662     } finally {
663       lock.unlock();
664     }
665   }
666 
667   /**
668    * Bulk assign regions to <code>destination</code>.
669    * @param destination
670    * @param regions Regions to assign.
671    * @return true if successful
672    */
673   boolean assign(final ServerName destination, final List<HRegionInfo> regions)
674     throws InterruptedException {
675     long startTime = EnvironmentEdgeManager.currentTime();
676     try {
677       int regionCount = regions.size();
678       if (regionCount == 0) {
679         return true;
680       }
681       LOG.info("Assigning " + regionCount + " region(s) to " + destination.toString());
682       Set<String> encodedNames = new HashSet<String>(regionCount);
683       for (HRegionInfo region : regions) {
684         encodedNames.add(region.getEncodedName());
685       }
686 
687       List<HRegionInfo> failedToOpenRegions = new ArrayList<HRegionInfo>();
688       Map<String, Lock> locks = locker.acquireLocks(encodedNames);
689       try {
690         Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>(regionCount);
691         List<RegionState> states = new ArrayList<RegionState>(regionCount);
692         for (HRegionInfo region : regions) {
693           String encodedName = region.getEncodedName();
694           if (!isDisabledorDisablingRegionInRIT(region)) {
695             RegionState state = forceRegionStateToOffline(region, false);
696             boolean onDeadServer = false;
697             if (state != null) {
698               if (regionStates.wasRegionOnDeadServer(encodedName)) {
699                 LOG.info("Skip assigning " + region.getRegionNameAsString()
700                   + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
701                   + " is dead but not processed yet");
702                 onDeadServer = true;
703               } else {
704                 RegionPlan plan = new RegionPlan(region, state.getServerName(), destination);
705                 plans.put(encodedName, plan);
706                 states.add(state);
707                 continue;
708               }
709             }
710             // Reassign if the region wasn't on a dead server
711             if (!onDeadServer) {
712               LOG.info("failed to force region state to offline, "
713                 + "will reassign later: " + region);
714               failedToOpenRegions.add(region); // assign individually later
715             }
716           }
717           // Release the lock, this region is excluded from bulk assign because
718           // we can't update its state, or set its znode to offline.
719           Lock lock = locks.remove(encodedName);
720           lock.unlock();
721         }
722 
723         if (server.isStopped()) {
724           return false;
725         }
726 
727         // Add region plans, so we can updateTimers when one region is opened so
728         // that unnecessary timeout on RIT is reduced.
729         this.addPlans(plans);
730 
731         List<Pair<HRegionInfo, List<ServerName>>> regionOpenInfos =
732           new ArrayList<Pair<HRegionInfo, List<ServerName>>>(states.size());
733         for (RegionState state: states) {
734           HRegionInfo region = state.getRegion();
735           regionStates.updateRegionState(
736             region, State.PENDING_OPEN, destination);
737           List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
738           if (this.shouldAssignRegionsWithFavoredNodes) {
739             favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
740           }
741           regionOpenInfos.add(new Pair<HRegionInfo, List<ServerName>>(
742             region, favoredNodes));
743         }
744 
745         // Move on to open regions.
746         try {
747           // Send OPEN RPC. If it fails on a IOE or RemoteException,
748           // regions will be assigned individually.
749           Configuration conf = server.getConfiguration();
750           long maxWaitTime = System.currentTimeMillis() +
751             conf.getLong("hbase.regionserver.rpc.startup.waittime", 60000);
752           for (int i = 1; i <= maximumAttempts && !server.isStopped(); i++) {
753             try {
754               List<RegionOpeningState> regionOpeningStateList = serverManager
755                 .sendRegionOpen(destination, regionOpenInfos);
756               for (int k = 0, n = regionOpeningStateList.size(); k < n; k++) {
757                 RegionOpeningState openingState = regionOpeningStateList.get(k);
758                 if (openingState != RegionOpeningState.OPENED) {
759                   HRegionInfo region = regionOpenInfos.get(k).getFirst();
760                   LOG.info("Got opening state " + openingState
761                     + ", will reassign later: " + region);
762                   // Failed opening this region, reassign it later
763                   forceRegionStateToOffline(region, true);
764                   failedToOpenRegions.add(region);
765                 }
766               }
767               break;
768             } catch (IOException e) {
769               if (e instanceof RemoteException) {
770                 e = ((RemoteException)e).unwrapRemoteException();
771               }
772               if (e instanceof RegionServerStoppedException) {
773                 LOG.warn("The region server was shut down, ", e);
774                 // No need to retry, the region server is a goner.
775                 return false;
776               } else if (e instanceof ServerNotRunningYetException) {
777                 long now = System.currentTimeMillis();
778                 if (now < maxWaitTime) {
779                   if (LOG.isDebugEnabled()) {
780                     LOG.debug("Server is not yet up; waiting up to " +
781                       (maxWaitTime - now) + "ms", e);
782                   }
783                   Thread.sleep(100);
784                   i--; // reset the try count
785                   continue;
786                 }
787               } else if (e instanceof java.net.SocketTimeoutException
788                   && this.serverManager.isServerOnline(destination)) {
789                 // In case socket is timed out and the region server is still online,
790                 // the openRegion RPC could have been accepted by the server and
791                 // just the response didn't go through.  So we will retry to
792                 // open the region on the same server.
793                 if (LOG.isDebugEnabled()) {
794                   LOG.debug("Bulk assigner openRegion() to " + destination
795                     + " has timed out, but the regions might"
796                     + " already be opened on it.", e);
797                 }
798                 // wait and reset the re-try count, server might be just busy.
799                 Thread.sleep(100);
800                 i--;
801                 continue;
802               } else if (e instanceof FailedServerException && i < maximumAttempts) {
803                 // In case the server is in the failed server list, no point to
804                 // retry too soon. Retry after the failed_server_expiry time
805                 long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
806                   RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
807                 if (LOG.isDebugEnabled()) {
808                   LOG.debug(destination + " is on failed server list; waiting "
809                     + sleepTime + "ms", e);
810                 }
811                 Thread.sleep(sleepTime);
812                 continue;
813               }
814               throw e;
815             }
816           }
817         } catch (IOException e) {
818           // Can be a socket timeout, EOF, NoRouteToHost, etc
819           LOG.info("Unable to communicate with " + destination
820             + " in order to assign regions, ", e);
821           for (RegionState state: states) {
822             HRegionInfo region = state.getRegion();
823             forceRegionStateToOffline(region, true);
824           }
825           return false;
826         }
827       } finally {
828         for (Lock lock : locks.values()) {
829           lock.unlock();
830         }
831       }
832 
833       if (!failedToOpenRegions.isEmpty()) {
834         for (HRegionInfo region : failedToOpenRegions) {
835           if (!regionStates.isRegionOnline(region)) {
836             invokeAssign(region);
837           }
838         }
839       }
840 
841       // wait for assignment completion
842       ArrayList<HRegionInfo> userRegionSet = new ArrayList<HRegionInfo>(regions.size());
843       for (HRegionInfo region: regions) {
844         if (!region.getTable().isSystemTable()) {
845           userRegionSet.add(region);
846         }
847       }
848       if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
849             System.currentTimeMillis())) {
850         LOG.debug("some user regions are still in transition: " + userRegionSet);
851       }
852       LOG.debug("Bulk assigning done for " + destination);
853       return true;
854     } finally {
855       metricsAssignmentManager.updateBulkAssignTime(EnvironmentEdgeManager.currentTime() - startTime);
856     }
857   }
858 
859   /**
860    * Send CLOSE RPC if the server is online, otherwise, offline the region.
861    *
862    * The RPC will be sent only to the region sever found in the region state
863    * if it is passed in, otherwise, to the src server specified. If region
864    * state is not specified, we don't update region state at all, instead
865    * we just send the RPC call. This is useful for some cleanup without
866    * messing around the region states (see handleRegion, on region opened
867    * on an unexpected server scenario, for an example)
868    */
869   private void unassign(final HRegionInfo region,
870       final ServerName server, final ServerName dest) {
871     for (int i = 1; i <= this.maximumAttempts; i++) {
872       if (this.server.isStopped() || this.server.isAborted()) {
873         LOG.debug("Server stopped/aborted; skipping unassign of " + region);
874         return;
875       }
876       if (!serverManager.isServerOnline(server)) {
877         LOG.debug("Offline " + region.getRegionNameAsString()
878           + ", no need to unassign since it's on a dead server: " + server);
879         regionStates.updateRegionState(region, State.OFFLINE);
880         return;
881       }
882       try {
883         // Send CLOSE RPC
884         if (serverManager.sendRegionClose(server, region, dest)) {
885           LOG.debug("Sent CLOSE to " + server + " for region " +
886             region.getRegionNameAsString());
887           return;
888         }
889         // This never happens. Currently regionserver close always return true.
890         // Todo; this can now happen (0.96) if there is an exception in a coprocessor
891         LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
892           region.getRegionNameAsString());
893       } catch (Throwable t) {
894         long sleepTime = 0;
895         Configuration conf = this.server.getConfiguration();
896         if (t instanceof RemoteException) {
897           t = ((RemoteException)t).unwrapRemoteException();
898         }
899         if (t instanceof RegionServerAbortedException
900             || t instanceof RegionServerStoppedException
901             || t instanceof ServerNotRunningYetException) {
902           // RS is aborting, we cannot offline the region since the region may need to do WAL
903           // recovery. Until we see  the RS expiration, we should retry.
904           sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
905             RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
906 
907         } else if (t instanceof NotServingRegionException) {
908           LOG.debug("Offline " + region.getRegionNameAsString()
909             + ", it's not any more on " + server, t);
910           regionStates.updateRegionState(region, State.OFFLINE);
911           return;
912         } else if (t instanceof FailedServerException && i < maximumAttempts) {
913           // In case the server is in the failed server list, no point to
914           // retry too soon. Retry after the failed_server_expiry time
915           sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
916           RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
917           if (LOG.isDebugEnabled()) {
918             LOG.debug(server + " is on failed server list; waiting " + sleepTime + "ms", t);
919           }
920        }
921        try {
922          if (sleepTime > 0) {
923            Thread.sleep(sleepTime);
924          }
925        } catch (InterruptedException ie) {
926          LOG.warn("Interrupted unassign " + region.getRegionNameAsString(), ie);
927          Thread.currentThread().interrupt();
928          regionStates.updateRegionState(region, State.FAILED_CLOSE);
929          return;
930        }
931        LOG.info("Server " + server + " returned " + t + " for "
932          + region.getRegionNameAsString() + ", try=" + i
933          + " of " + this.maximumAttempts, t);
934       }
935     }
936     // Run out of attempts
937     regionStates.updateRegionState(region, State.FAILED_CLOSE);
938   }
939 
940   /**
941    * Set region to OFFLINE unless it is opening and forceNewPlan is false.
942    */
943   private RegionState forceRegionStateToOffline(
944       final HRegionInfo region, final boolean forceNewPlan) {
945     RegionState state = regionStates.getRegionState(region);
946     if (state == null) {
947       LOG.warn("Assigning but not in region states: " + region);
948       state = regionStates.createRegionState(region);
949     }
950 
951     if (forceNewPlan && LOG.isDebugEnabled()) {
952       LOG.debug("Force region state offline " + state);
953     }
954 
955     switch (state.getState()) {
956     case OPEN:
957     case OPENING:
958     case PENDING_OPEN:
959     case CLOSING:
960     case PENDING_CLOSE:
961       if (!forceNewPlan) {
962         LOG.debug("Skip assigning " +
963           region + ", it is already " + state);
964         return null;
965       }
966     case FAILED_CLOSE:
967     case FAILED_OPEN:
968       regionStates.updateRegionState(region, State.PENDING_CLOSE);
969       unassign(region, state.getServerName(), null);
970       state = regionStates.getRegionState(region);
971       if (!state.isOffline() && !state.isClosed()) {
972         // If the region isn't offline, we can't re-assign
973         // it now. It will be assigned automatically after
974         // the regionserver reports it's closed.
975         return null;
976       }
977     case OFFLINE:
978     case CLOSED:
979       break;
980     default:
981       LOG.error("Trying to assign region " + region
982         + ", which is " + state);
983       return null;
984     }
985     return state;
986   }
987 
988   /**
989    * Caller must hold lock on the passed <code>state</code> object.
990    * @param state
991    * @param forceNewPlan
992    */
993   private void assign(RegionState state, boolean forceNewPlan) {
994     long startTime = EnvironmentEdgeManager.currentTime();
995     try {
996       Configuration conf = server.getConfiguration();
997       RegionPlan plan = null;
998       long maxWaitTime = -1;
999       HRegionInfo region = state.getRegion();
1000       Throwable previousException = null;
1001       for (int i = 1; i <= maximumAttempts; i++) {
1002         if (server.isStopped() || server.isAborted()) {
1003           LOG.info("Skip assigning " + region.getRegionNameAsString()
1004             + ", the server is stopped/aborted");
1005           return;
1006         }
1007 
1008         if (plan == null) { // Get a server for the region at first
1009           try {
1010             plan = getRegionPlan(region, forceNewPlan);
1011           } catch (HBaseIOException e) {
1012             LOG.warn("Failed to get region plan", e);
1013           }
1014         }
1015 
1016         if (plan == null) {
1017           LOG.warn("Unable to determine a plan to assign " + region);
1018 
1019           // For meta region, we have to keep retrying until succeeding
1020           if (region.isMetaRegion()) {
1021             if (i == maximumAttempts) {
1022               i = 0; // re-set attempt count to 0 for at least 1 retry
1023 
1024               LOG.warn("Unable to determine a plan to assign a hbase:meta region " + region +
1025                 " after maximumAttempts (" + this.maximumAttempts +
1026                 "). Reset attempts count and continue retrying.");
1027             }
1028             waitForRetryingMetaAssignment();
1029             continue;
1030           }
1031 
1032           regionStates.updateRegionState(region, State.FAILED_OPEN);
1033           return;
1034         }
1035         LOG.info("Assigning " + region.getRegionNameAsString() +
1036             " to " + plan.getDestination().toString());
1037         // Transition RegionState to PENDING_OPEN
1038        regionStates.updateRegionState(region,
1039           State.PENDING_OPEN, plan.getDestination());
1040 
1041         boolean needNewPlan = false;
1042         final String assignMsg = "Failed assignment of " + region.getRegionNameAsString() +
1043             " to " + plan.getDestination();
1044         try {
1045           List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
1046           if (this.shouldAssignRegionsWithFavoredNodes) {
1047             favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
1048           }
1049           serverManager.sendRegionOpen(plan.getDestination(), region, favoredNodes);
1050           return; // we're done
1051         } catch (Throwable t) {
1052           if (t instanceof RemoteException) {
1053             t = ((RemoteException) t).unwrapRemoteException();
1054           }
1055           previousException = t;
1056 
1057           // Should we wait a little before retrying? If the server is starting it's yes.
1058           boolean hold = (t instanceof ServerNotRunningYetException);
1059 
1060           // In case socket is timed out and the region server is still online,
1061           // the openRegion RPC could have been accepted by the server and
1062           // just the response didn't go through.  So we will retry to
1063           // open the region on the same server.
1064           boolean retry = !hold && (t instanceof java.net.SocketTimeoutException
1065               && this.serverManager.isServerOnline(plan.getDestination()));
1066 
1067           if (hold) {
1068             LOG.warn(assignMsg + ", waiting a little before trying on the same region server " +
1069               "try=" + i + " of " + this.maximumAttempts, t);
1070 
1071             if (maxWaitTime < 0) {
1072               maxWaitTime = EnvironmentEdgeManager.currentTime()
1073                 + this.server.getConfiguration().getLong(
1074                   "hbase.regionserver.rpc.startup.waittime", 60000);
1075             }
1076             try {
1077               long now = EnvironmentEdgeManager.currentTime();
1078               if (now < maxWaitTime) {
1079                 if (LOG.isDebugEnabled()) {
1080                   LOG.debug("Server is not yet up; waiting up to "
1081                     + (maxWaitTime - now) + "ms", t);
1082                 }
1083                 Thread.sleep(100);
1084                 i--; // reset the try count
1085               } else {
1086                 LOG.debug("Server is not up for a while; try a new one", t);
1087                 needNewPlan = true;
1088               }
1089             } catch (InterruptedException ie) {
1090               LOG.warn("Failed to assign "
1091                   + region.getRegionNameAsString() + " since interrupted", ie);
1092               regionStates.updateRegionState(region, State.FAILED_OPEN);
1093               Thread.currentThread().interrupt();
1094               return;
1095             }
1096           } else if (retry) {
1097             i--; // we want to retry as many times as needed as long as the RS is not dead.
1098             if (LOG.isDebugEnabled()) {
1099               LOG.debug(assignMsg + ", trying to assign to the same region server due ", t);
1100             }
1101           } else {
1102             needNewPlan = true;
1103             LOG.warn(assignMsg + ", trying to assign elsewhere instead;" +
1104                 " try=" + i + " of " + this.maximumAttempts, t);
1105           }
1106         }
1107 
1108         if (i == this.maximumAttempts) {
1109           // For meta region, we have to keep retrying until succeeding
1110           if (region.isMetaRegion()) {
1111             i = 0; // re-set attempt count to 0 for at least 1 retry
1112             LOG.warn(assignMsg +
1113                 ", trying to assign a hbase:meta region reached to maximumAttempts (" +
1114                 this.maximumAttempts + ").  Reset attempt counts and continue retrying.");
1115             waitForRetryingMetaAssignment();
1116           }
1117           else {
1118             // Don't reset the region state or get a new plan any more.
1119             // This is the last try.
1120             continue;
1121           }
1122         }
1123 
1124         // If region opened on destination of present plan, reassigning to new
1125         // RS may cause double assignments. In case of RegionAlreadyInTransitionException
1126         // reassigning to same RS.
1127         if (needNewPlan) {
1128           // Force a new plan and reassign. Will return null if no servers.
1129           // The new plan could be the same as the existing plan since we don't
1130           // exclude the server of the original plan, which should not be
1131           // excluded since it could be the only server up now.
1132           RegionPlan newPlan = null;
1133           try {
1134             newPlan = getRegionPlan(region, true);
1135           } catch (HBaseIOException e) {
1136             LOG.warn("Failed to get region plan", e);
1137           }
1138           if (newPlan == null) {
1139             regionStates.updateRegionState(region, State.FAILED_OPEN);
1140             LOG.warn("Unable to find a viable location to assign region " +
1141                 region.getRegionNameAsString());
1142             return;
1143           }
1144 
1145           if (plan != newPlan && !plan.getDestination().equals(newPlan.getDestination())) {
1146             // Clean out plan we failed execute and one that doesn't look like it'll
1147             // succeed anyways; we need a new plan!
1148             // Transition back to OFFLINE
1149             regionStates.updateRegionState(region, State.OFFLINE);
1150             plan = newPlan;
1151           } else if(plan.getDestination().equals(newPlan.getDestination()) &&
1152               previousException instanceof FailedServerException) {
1153             try {
1154               LOG.info("Trying to re-assign " + region.getRegionNameAsString() +
1155                 " to the same failed server.");
1156               Thread.sleep(1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1157                 RpcClient.FAILED_SERVER_EXPIRY_DEFAULT));
1158             } catch (InterruptedException ie) {
1159               LOG.warn("Failed to assign "
1160                   + region.getRegionNameAsString() + " since interrupted", ie);
1161               regionStates.updateRegionState(region, State.FAILED_OPEN);
1162               Thread.currentThread().interrupt();
1163               return;
1164             }
1165           }
1166         }
1167       }
1168       // Run out of attempts
1169       regionStates.updateRegionState(region, State.FAILED_OPEN);
1170     } finally {
1171       metricsAssignmentManager.updateAssignmentTime(EnvironmentEdgeManager.currentTime() - startTime);
1172     }
1173   }
1174 
1175   private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) {
1176     if (this.tableStateManager.isTableState(region.getTable(),
1177             TableState.State.DISABLED,
1178             TableState.State.DISABLING) || replicasToClose.contains(region)) {
1179       LOG.info("Table " + region.getTable() + " is disabled or disabling;"
1180         + " skipping assign of " + region.getRegionNameAsString());
1181       offlineDisabledRegion(region);
1182       return true;
1183     }
1184     return false;
1185   }
1186 
1187   /**
1188    * @param region the region to assign
1189    * @param forceNewPlan If true, then if an existing plan exists, a new plan
1190    * will be generated.
1191    * @return Plan for passed <code>region</code> (If none currently, it creates one or
1192    * if no servers to assign, it returns null).
1193    */
1194   private RegionPlan getRegionPlan(final HRegionInfo region,
1195       final boolean forceNewPlan) throws HBaseIOException {
1196     // Pickup existing plan or make a new one
1197     final String encodedName = region.getEncodedName();
1198     final List<ServerName> destServers =
1199       serverManager.createDestinationServersList();
1200 
1201     if (destServers.isEmpty()){
1202       LOG.warn("Can't move " + encodedName +
1203         ", there is no destination server available.");
1204       return null;
1205     }
1206 
1207     RegionPlan randomPlan = null;
1208     boolean newPlan = false;
1209     RegionPlan existingPlan;
1210 
1211     synchronized (this.regionPlans) {
1212       existingPlan = this.regionPlans.get(encodedName);
1213 
1214       if (existingPlan != null && existingPlan.getDestination() != null) {
1215         LOG.debug("Found an existing plan for " + region.getRegionNameAsString()
1216           + " destination server is " + existingPlan.getDestination() +
1217             " accepted as a dest server = " + destServers.contains(existingPlan.getDestination()));
1218       }
1219 
1220       if (forceNewPlan
1221           || existingPlan == null
1222           || existingPlan.getDestination() == null
1223           || !destServers.contains(existingPlan.getDestination())) {
1224         newPlan = true;
1225         randomPlan = new RegionPlan(region, null,
1226             balancer.randomAssignment(region, destServers));
1227         if (!region.isMetaTable() && shouldAssignRegionsWithFavoredNodes) {
1228           List<HRegionInfo> regions = new ArrayList<HRegionInfo>(1);
1229           regions.add(region);
1230           try {
1231             processFavoredNodes(regions);
1232           } catch (IOException ie) {
1233             LOG.warn("Ignoring exception in processFavoredNodes " + ie);
1234           }
1235         }
1236         this.regionPlans.put(encodedName, randomPlan);
1237       }
1238     }
1239 
1240     if (newPlan) {
1241       if (randomPlan.getDestination() == null) {
1242         LOG.warn("Can't find a destination for " + encodedName);
1243         return null;
1244       }
1245       if (LOG.isDebugEnabled()) {
1246         LOG.debug("No previous transition plan found (or ignoring " +
1247           "an existing plan) for " + region.getRegionNameAsString() +
1248           "; generated random plan=" + randomPlan + "; " + destServers.size() +
1249           " (online=" + serverManager.getOnlineServers().size() +
1250           ") available servers, forceNewPlan=" + forceNewPlan);
1251       }
1252       return randomPlan;
1253     }
1254     if (LOG.isDebugEnabled()) {
1255       LOG.debug("Using pre-existing plan for " +
1256         region.getRegionNameAsString() + "; plan=" + existingPlan);
1257     }
1258     return existingPlan;
1259   }
1260 
1261   /**
1262    * Wait for some time before retrying meta table region assignment
1263    */
1264   private void waitForRetryingMetaAssignment() {
1265     try {
1266       Thread.sleep(this.sleepTimeBeforeRetryingMetaAssignment);
1267     } catch (InterruptedException e) {
1268       LOG.error("Got exception while waiting for hbase:meta assignment");
1269       Thread.currentThread().interrupt();
1270     }
1271   }
1272 
1273   /**
1274    * Unassigns the specified region.
1275    * <p>
1276    * Updates the RegionState and sends the CLOSE RPC unless region is being
1277    * split by regionserver; then the unassign fails (silently) because we
1278    * presume the region being unassigned no longer exists (its been split out
1279    * of existence). TODO: What to do if split fails and is rolled back and
1280    * parent is revivified?
1281    * <p>
1282    * If a RegionPlan is already set, it will remain.
1283    *
1284    * @param region server to be unassigned
1285    */
1286   public void unassign(HRegionInfo region) {
1287     unassign(region, null);
1288   }
1289 
1290 
1291   /**
1292    * Unassigns the specified region.
1293    * <p>
1294    * Updates the RegionState and sends the CLOSE RPC unless region is being
1295    * split by regionserver; then the unassign fails (silently) because we
1296    * presume the region being unassigned no longer exists (its been split out
1297    * of existence). TODO: What to do if split fails and is rolled back and
1298    * parent is revivified?
1299    * <p>
1300    * If a RegionPlan is already set, it will remain.
1301    *
1302    * @param region server to be unassigned
1303    * @param dest the destination server of the region
1304    */
1305   public void unassign(HRegionInfo region, ServerName dest) {
1306     // TODO: Method needs refactoring.  Ugly buried returns throughout.  Beware!
1307     LOG.debug("Starting unassign of " + region.getRegionNameAsString()
1308       + " (offlining), current state: " + regionStates.getRegionState(region));
1309 
1310     String encodedName = region.getEncodedName();
1311     // Grab the state of this region and synchronize on it
1312     // We need a lock here as we're going to do a put later and we don't want multiple states
1313     //  creation
1314     ReentrantLock lock = locker.acquireLock(encodedName);
1315     RegionState state = regionStates.getRegionTransitionState(encodedName);
1316     try {
1317       if (state == null || state.isFailedClose()) {
1318         if (state == null) {
1319           // Region is not in transition.
1320           // We can unassign it only if it's not SPLIT/MERGED.
1321           state = regionStates.getRegionState(encodedName);
1322           if (state != null && state.isUnassignable()) {
1323             LOG.info("Attempting to unassign " + state + ", ignored");
1324             // Offline region will be reassigned below
1325             return;
1326           }
1327           if (state == null || state.getServerName() == null) {
1328             // We don't know where the region is, offline it.
1329             // No need to send CLOSE RPC
1330             LOG.warn("Attempting to unassign a region not in RegionStates "
1331               + region.getRegionNameAsString() + ", offlined");
1332             regionOffline(region);
1333             return;
1334           }
1335         }
1336         state = regionStates.updateRegionState(
1337           region, State.PENDING_CLOSE);
1338       } else if (state.isFailedOpen()) {
1339         // The region is not open yet
1340         regionOffline(region);
1341         return;
1342       } else {
1343         LOG.debug("Attempting to unassign " +
1344           region.getRegionNameAsString() + " but it is " +
1345           "already in transition (" + state.getState());
1346         return;
1347       }
1348 
1349       unassign(region, state.getServerName(), dest);
1350     } finally {
1351       lock.unlock();
1352 
1353       // Region is expected to be reassigned afterwards
1354       if (!replicasToClose.contains(region)
1355           && regionStates.isRegionInState(region, State.OFFLINE)) {
1356         assign(region);
1357       }
1358     }
1359   }
1360 
1361   /**
1362    * Used by unit tests. Return the number of regions opened so far in the life
1363    * of the master. Increases by one every time the master opens a region
1364    * @return the counter value of the number of regions opened so far
1365    */
1366   public int getNumRegionsOpened() {
1367     return numRegionsOpened.get();
1368   }
1369 
1370   /**
1371    * Waits until the specified region has completed assignment.
1372    * <p>
1373    * If the region is already assigned, returns immediately.  Otherwise, method
1374    * blocks until the region is assigned.
1375    * @param regionInfo region to wait on assignment for
1376    * @return true if the region is assigned false otherwise.
1377    * @throws InterruptedException
1378    */
1379   public boolean waitForAssignment(HRegionInfo regionInfo)
1380       throws InterruptedException {
1381     ArrayList<HRegionInfo> regionSet = new ArrayList<HRegionInfo>(1);
1382     regionSet.add(regionInfo);
1383     return waitForAssignment(regionSet, true, Long.MAX_VALUE);
1384   }
1385 
1386   /**
1387    * Waits until the specified region has completed assignment, or the deadline is reached.
1388    */
1389   protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
1390       final boolean waitTillAllAssigned, final int reassigningRegions,
1391       final long minEndTime) throws InterruptedException {
1392     long deadline = minEndTime + bulkPerRegionOpenTimeGuesstimate * (reassigningRegions + 1);
1393     return waitForAssignment(regionSet, waitTillAllAssigned, deadline);
1394   }
1395 
1396   /**
1397    * Waits until the specified region has completed assignment, or the deadline is reached.
1398    * @param regionSet set of region to wait on. the set is modified and the assigned regions removed
1399    * @param waitTillAllAssigned true if we should wait all the regions to be assigned
1400    * @param deadline the timestamp after which the wait is aborted
1401    * @return true if all the regions are assigned false otherwise.
1402    * @throws InterruptedException
1403    */
1404   protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
1405       final boolean waitTillAllAssigned, final long deadline) throws InterruptedException {
1406     // We're not synchronizing on regionsInTransition now because we don't use any iterator.
1407     while (!regionSet.isEmpty() && !server.isStopped() && deadline > System.currentTimeMillis()) {
1408       int failedOpenCount = 0;
1409       Iterator<HRegionInfo> regionInfoIterator = regionSet.iterator();
1410       while (regionInfoIterator.hasNext()) {
1411         HRegionInfo hri = regionInfoIterator.next();
1412         if (regionStates.isRegionOnline(hri) || regionStates.isRegionInState(hri,
1413             State.SPLITTING, State.SPLIT, State.MERGING, State.MERGED)) {
1414           regionInfoIterator.remove();
1415         } else if (regionStates.isRegionInState(hri, State.FAILED_OPEN)) {
1416           failedOpenCount++;
1417         }
1418       }
1419       if (!waitTillAllAssigned) {
1420         // No need to wait, let assignment going on asynchronously
1421         break;
1422       }
1423       if (!regionSet.isEmpty()) {
1424         if (failedOpenCount == regionSet.size()) {
1425           // all the regions we are waiting had an error on open.
1426           break;
1427         }
1428         regionStates.waitForUpdate(100);
1429       }
1430     }
1431     return regionSet.isEmpty();
1432   }
1433 
1434   /**
1435    * Assigns the hbase:meta region or a replica.
1436    * <p>
1437    * Assumes that hbase:meta is currently closed and is not being actively served by
1438    * any RegionServer.
1439    * @param hri TODO
1440    */
1441   public void assignMeta(HRegionInfo hri) throws KeeperException {
1442     regionStates.updateRegionState(hri, State.OFFLINE);
1443     assign(hri);
1444   }
1445 
1446   /**
1447    * Assigns specified regions retaining assignments, if any.
1448    * <p>
1449    * This is a synchronous call and will return once every region has been
1450    * assigned.  If anything fails, an exception is thrown
1451    * @throws InterruptedException
1452    * @throws IOException
1453    */
1454   public void assign(Map<HRegionInfo, ServerName> regions)
1455         throws IOException, InterruptedException {
1456     if (regions == null || regions.isEmpty()) {
1457       return;
1458     }
1459     List<ServerName> servers = serverManager.createDestinationServersList();
1460     if (servers == null || servers.isEmpty()) {
1461       throw new IOException("Found no destination server to assign region(s)");
1462     }
1463 
1464     // Reuse existing assignment info
1465     Map<ServerName, List<HRegionInfo>> bulkPlan =
1466       balancer.retainAssignment(regions, servers);
1467     if (bulkPlan == null) {
1468       throw new IOException("Unable to determine a plan to assign region(s)");
1469     }
1470 
1471     assign(regions.size(), servers.size(),
1472       "retainAssignment=true", bulkPlan);
1473   }
1474 
1475   /**
1476    * Assigns specified regions round robin, if any.
1477    * <p>
1478    * This is a synchronous call and will return once every region has been
1479    * assigned.  If anything fails, an exception is thrown
1480    * @throws InterruptedException
1481    * @throws IOException
1482    */
1483   public void assign(List<HRegionInfo> regions)
1484         throws IOException, InterruptedException {
1485     if (regions == null || regions.isEmpty()) {
1486       return;
1487     }
1488 
1489     List<ServerName> servers = serverManager.createDestinationServersList();
1490     if (servers == null || servers.isEmpty()) {
1491       throw new IOException("Found no destination server to assign region(s)");
1492     }
1493 
1494     // Generate a round-robin bulk assignment plan
1495     Map<ServerName, List<HRegionInfo>> bulkPlan = balancer.roundRobinAssignment(regions, servers);
1496     if (bulkPlan == null) {
1497       throw new IOException("Unable to determine a plan to assign region(s)");
1498     }
1499 
1500     processFavoredNodes(regions);
1501     assign(regions.size(), servers.size(), "round-robin=true", bulkPlan);
1502   }
1503 
1504   private void assign(int regions, int totalServers,
1505       String message, Map<ServerName, List<HRegionInfo>> bulkPlan)
1506           throws InterruptedException, IOException {
1507 
1508     int servers = bulkPlan.size();
1509     if (servers == 1 || (regions < bulkAssignThresholdRegions
1510         && servers < bulkAssignThresholdServers)) {
1511 
1512       // Not use bulk assignment.  This could be more efficient in small
1513       // cluster, especially mini cluster for testing, so that tests won't time out
1514       if (LOG.isTraceEnabled()) {
1515         LOG.trace("Not using bulk assignment since we are assigning only " + regions +
1516           " region(s) to " + servers + " server(s)");
1517       }
1518 
1519       // invoke assignment (async)
1520       ArrayList<HRegionInfo> userRegionSet = new ArrayList<HRegionInfo>(regions);
1521       for (Map.Entry<ServerName, List<HRegionInfo>> plan: bulkPlan.entrySet()) {
1522         if (!assign(plan.getKey(), plan.getValue()) && !server.isStopped()) {
1523           for (HRegionInfo region: plan.getValue()) {
1524             if (!regionStates.isRegionOnline(region)) {
1525               invokeAssign(region);
1526               if (!region.getTable().isSystemTable()) {
1527                 userRegionSet.add(region);
1528               }
1529             }
1530           }
1531         }
1532       }
1533 
1534       // wait for assignment completion
1535       if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
1536             System.currentTimeMillis())) {
1537         LOG.debug("some user regions are still in transition: " + userRegionSet);
1538       }
1539     } else {
1540       LOG.info("Bulk assigning " + regions + " region(s) across "
1541         + totalServers + " server(s), " + message);
1542 
1543       // Use fixed count thread pool assigning.
1544       BulkAssigner ba = new GeneralBulkAssigner(
1545         this.server, bulkPlan, this, bulkAssignWaitTillAllAssigned);
1546       ba.bulkAssign();
1547       LOG.info("Bulk assigning done");
1548     }
1549   }
1550 
1551   /**
1552    * Assigns all user regions, if any exist.  Used during cluster startup.
1553    * <p>
1554    * This is a synchronous call and will return once every region has been
1555    * assigned.  If anything fails, an exception is thrown and the cluster
1556    * should be shutdown.
1557    * @throws InterruptedException
1558    * @throws IOException
1559    */
1560   private void assignAllUserRegions(Map<HRegionInfo, ServerName> allRegions)
1561       throws IOException, InterruptedException {
1562     if (allRegions == null || allRegions.isEmpty()) return;
1563 
1564     // Determine what type of assignment to do on startup
1565     boolean retainAssignment = server.getConfiguration().
1566       getBoolean("hbase.master.startup.retainassign", true);
1567 
1568     Set<HRegionInfo> regionsFromMetaScan = allRegions.keySet();
1569     if (retainAssignment) {
1570       assign(allRegions);
1571     } else {
1572       List<HRegionInfo> regions = new ArrayList<HRegionInfo>(regionsFromMetaScan);
1573       assign(regions);
1574     }
1575 
1576     for (HRegionInfo hri : regionsFromMetaScan) {
1577       TableName tableName = hri.getTable();
1578       if (!tableStateManager.isTableState(tableName,
1579               TableState.State.ENABLED)) {
1580         setEnabledTable(tableName);
1581       }
1582     }
1583     // assign all the replicas that were not recorded in the meta
1584     assign(replicaRegionsNotRecordedInMeta(regionsFromMetaScan, (MasterServices)server));
1585   }
1586 
1587   /**
1588    * Get a list of replica regions that are:
1589    * not recorded in meta yet. We might not have recorded the locations
1590    * for the replicas since the replicas may not have been online yet, master restarted
1591    * in the middle of assigning, ZK erased, etc.
1592    * @param regionsRecordedInMeta the list of regions we know are recorded in meta
1593    * either as a default, or, as the location of a replica
1594    * @param master
1595    * @return list of replica regions
1596    * @throws IOException
1597    */
1598   public static List<HRegionInfo> replicaRegionsNotRecordedInMeta(
1599       Set<HRegionInfo> regionsRecordedInMeta, MasterServices master)throws IOException {
1600     List<HRegionInfo> regionsNotRecordedInMeta = new ArrayList<HRegionInfo>();
1601     for (HRegionInfo hri : regionsRecordedInMeta) {
1602       TableName table = hri.getTable();
1603       HTableDescriptor htd = master.getTableDescriptors().get(table);
1604       // look at the HTD for the replica count. That's the source of truth
1605       int desiredRegionReplication = htd.getRegionReplication();
1606       for (int i = 0; i < desiredRegionReplication; i++) {
1607         HRegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hri, i);
1608         if (regionsRecordedInMeta.contains(replica)) continue;
1609         regionsNotRecordedInMeta.add(replica);
1610       }
1611     }
1612     return regionsNotRecordedInMeta;
1613   }
1614 
1615   /**
1616    * Rebuild the list of user regions and assignment information.
1617    * Updates regionstates with findings as we go through list of regions.
1618    * @return set of servers not online that hosted some regions according to a scan of hbase:meta
1619    * @throws IOException
1620    */
1621   Set<ServerName> rebuildUserRegions() throws
1622           IOException, KeeperException {
1623     Set<TableName> disabledOrEnablingTables = tableStateManager.getTablesInStates(
1624             TableState.State.DISABLED, TableState.State.ENABLING);
1625 
1626     Set<TableName> disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
1627             TableState.State.DISABLED,
1628             TableState.State.DISABLING,
1629             TableState.State.ENABLING);
1630 
1631     // Region assignment from META
1632     List<Result> results = MetaTableAccessor.fullScanRegions(server.getConnection());
1633     // Get any new but slow to checkin region server that joined the cluster
1634     Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
1635     // Set of offline servers to be returned
1636     Set<ServerName> offlineServers = new HashSet<ServerName>();
1637     // Iterate regions in META
1638     for (Result result : results) {
1639       if (result == null && LOG.isDebugEnabled()){
1640         LOG.debug("null result from meta - ignoring but this is strange.");
1641         continue;
1642       }
1643       // keep a track of replicas to close. These were the replicas of the originally
1644       // unmerged regions. The master might have closed them before but it mightn't
1645       // maybe because it crashed.
1646       PairOfSameType<HRegionInfo> p = MetaTableAccessor.getMergeRegions(result);
1647       if (p.getFirst() != null && p.getSecond() != null) {
1648         int numReplicas = ((MasterServices)server).getTableDescriptors().get(p.getFirst().
1649             getTable()).getRegionReplication();
1650         for (HRegionInfo merge : p) {
1651           for (int i = 1; i < numReplicas; i++) {
1652             replicasToClose.add(RegionReplicaUtil.getRegionInfoForReplica(merge, i));
1653           }
1654         }
1655       }
1656       RegionLocations rl =  MetaTableAccessor.getRegionLocations(result);
1657       if (rl == null) continue;
1658       HRegionLocation[] locations = rl.getRegionLocations();
1659       if (locations == null) continue;
1660       for (HRegionLocation hrl : locations) {
1661         if (hrl == null) continue;
1662         HRegionInfo regionInfo = hrl.getRegionInfo();
1663         if (regionInfo == null) continue;
1664         int replicaId = regionInfo.getReplicaId();
1665         State state = RegionStateStore.getRegionState(result, replicaId);
1666         // keep a track of replicas to close. These were the replicas of the split parents
1667         // from the previous life of the master. The master should have closed them before
1668         // but it couldn't maybe because it crashed
1669         if (replicaId == 0 && state.equals(State.SPLIT)) {
1670           for (HRegionLocation h : locations) {
1671             replicasToClose.add(h.getRegionInfo());
1672           }
1673         }
1674         ServerName lastHost = hrl.getServerName();
1675         ServerName regionLocation = RegionStateStore.getRegionServer(result, replicaId);
1676         regionStates.createRegionState(regionInfo, state, regionLocation, lastHost);
1677         if (!regionStates.isRegionInState(regionInfo, State.OPEN)) {
1678           // Region is not open (either offline or in transition), skip
1679           continue;
1680         }
1681         TableName tableName = regionInfo.getTable();
1682         if (!onlineServers.contains(regionLocation)) {
1683           // Region is located on a server that isn't online
1684           offlineServers.add(regionLocation);
1685         } else if (!disabledOrEnablingTables.contains(tableName)) {
1686           // Region is being served and on an active server
1687           // add only if region not in disabled or enabling table
1688           regionStates.regionOnline(regionInfo, regionLocation);
1689           balancer.regionOnline(regionInfo, regionLocation);
1690         }
1691         // need to enable the table if not disabled or disabling or enabling
1692         // this will be used in rolling restarts
1693         if (!disabledOrDisablingOrEnabling.contains(tableName)
1694           && !getTableStateManager().isTableState(tableName,
1695                 TableState.State.ENABLED)) {
1696           setEnabledTable(tableName);
1697         }
1698       }
1699     }
1700     return offlineServers;
1701   }
1702 
1703   /**
1704    * Recover the tables that were not fully moved to DISABLED state. These
1705    * tables are in DISABLING state when the master restarted/switched.
1706    *
1707    * @throws KeeperException
1708    * @throws TableNotFoundException
1709    * @throws IOException
1710    */
1711   private void recoverTableInDisablingState()
1712           throws KeeperException, IOException {
1713     Set<TableName> disablingTables =
1714             tableStateManager.getTablesInStates(TableState.State.DISABLING);
1715     if (disablingTables.size() != 0) {
1716       for (TableName tableName : disablingTables) {
1717         // Recover by calling DisableTableHandler
1718         LOG.info("The table " + tableName
1719             + " is in DISABLING state.  Hence recovering by moving the table"
1720             + " to DISABLED state.");
1721         new DisableTableHandler(this.server, tableName,
1722             this, tableLockManager, true).prepare().process();
1723       }
1724     }
1725   }
1726 
1727   /**
1728    * Recover the tables that are not fully moved to ENABLED state. These tables
1729    * are in ENABLING state when the master restarted/switched
1730    *
1731    * @throws KeeperException
1732    * @throws org.apache.hadoop.hbase.TableNotFoundException
1733    * @throws IOException
1734    */
1735   private void recoverTableInEnablingState()
1736           throws KeeperException, IOException {
1737     Set<TableName> enablingTables = tableStateManager.
1738             getTablesInStates(TableState.State.ENABLING);
1739     if (enablingTables.size() != 0) {
1740       for (TableName tableName : enablingTables) {
1741         // Recover by calling EnableTableHandler
1742         LOG.info("The table " + tableName
1743             + " is in ENABLING state.  Hence recovering by moving the table"
1744             + " to ENABLED state.");
1745         // enableTable in sync way during master startup,
1746         // no need to invoke coprocessor
1747         EnableTableHandler eth = new EnableTableHandler(this.server, tableName,
1748           this, tableLockManager, true);
1749         try {
1750           eth.prepare();
1751         } catch (TableNotFoundException e) {
1752           LOG.warn("Table " + tableName + " not found in hbase:meta to recover.");
1753           continue;
1754         }
1755         eth.process();
1756       }
1757     }
1758   }
1759 
1760   /**
1761    * Processes list of regions in transition at startup
1762    */
1763   void processRegionsInTransition(Collection<RegionState> regionsInTransition) {
1764     // We need to send RPC call again for PENDING_OPEN/PENDING_CLOSE regions
1765     // in case the RPC call is not sent out yet before the master was shut down
1766     // since we update the state before we send the RPC call. We can't update
1767     // the state after the RPC call. Otherwise, we don't know what's happened
1768     // to the region if the master dies right after the RPC call is out.
1769     for (RegionState regionState: regionsInTransition) {
1770       LOG.info("Processing " + regionState);
1771       ServerName serverName = regionState.getServerName();
1772       // Server could be null in case of FAILED_OPEN when master cannot find a region plan. In that
1773       // case, try assigning it here.
1774       if (serverName != null && !serverManager.getOnlineServers().containsKey(serverName)) {
1775         LOG.info("Server " + serverName + " isn't online. SSH will handle this");
1776         continue; // SSH will handle it
1777       }
1778       HRegionInfo regionInfo = regionState.getRegion();
1779       RegionState.State state = regionState.getState();
1780       switch (state) {
1781       case CLOSED:
1782         invokeAssign(regionState.getRegion());
1783         break;
1784       case PENDING_OPEN:
1785         retrySendRegionOpen(regionState);
1786         break;
1787       case PENDING_CLOSE:
1788         retrySendRegionClose(regionState);
1789         break;
1790       case FAILED_CLOSE:
1791       case FAILED_OPEN:
1792         invokeUnAssign(regionInfo);
1793         break;
1794       default:
1795         // No process for other states
1796       }
1797     }
1798   }
1799 
1800   /**
1801    * At master failover, for pending_open region, make sure
1802    * sendRegionOpen RPC call is sent to the target regionserver
1803    */
1804   private void retrySendRegionOpen(final RegionState regionState) {
1805     this.executorService.submit(
1806       new EventHandler(server, EventType.M_MASTER_RECOVERY) {
1807         @Override
1808         public void process() throws IOException {
1809           HRegionInfo hri = regionState.getRegion();
1810           ServerName serverName = regionState.getServerName();
1811           ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
1812           try {
1813             for (int i = 1; i <= maximumAttempts; i++) {
1814               if (!serverManager.isServerOnline(serverName)
1815                   || server.isStopped() || server.isAborted()) {
1816                 return; // No need any more
1817               }
1818               try {
1819                 if (!regionState.equals(regionStates.getRegionState(hri))) {
1820                   return; // Region is not in the expected state any more
1821                 }
1822                 List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
1823                 if (shouldAssignRegionsWithFavoredNodes) {
1824                   favoredNodes = ((FavoredNodeLoadBalancer)balancer).getFavoredNodes(hri);
1825                 }
1826                 serverManager.sendRegionOpen(serverName, hri, favoredNodes);
1827                 return; // we're done
1828               } catch (Throwable t) {
1829                 if (t instanceof RemoteException) {
1830                   t = ((RemoteException) t).unwrapRemoteException();
1831                 }
1832                 if (t instanceof FailedServerException && i < maximumAttempts) {
1833                   // In case the server is in the failed server list, no point to
1834                   // retry too soon. Retry after the failed_server_expiry time
1835                   try {
1836                     Configuration conf = this.server.getConfiguration();
1837                     long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1838                       RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
1839                     if (LOG.isDebugEnabled()) {
1840                       LOG.debug(serverName + " is on failed server list; waiting "
1841                         + sleepTime + "ms", t);
1842                     }
1843                     Thread.sleep(sleepTime);
1844                     continue;
1845                   } catch (InterruptedException ie) {
1846                     LOG.warn("Failed to assign "
1847                       + hri.getRegionNameAsString() + " since interrupted", ie);
1848                     regionStates.updateRegionState(hri, State.FAILED_OPEN);
1849                     Thread.currentThread().interrupt();
1850                     return;
1851                   }
1852                 }
1853                 if (serverManager.isServerOnline(serverName)
1854                     && t instanceof java.net.SocketTimeoutException) {
1855                   i--; // reset the try count
1856                 } else {
1857                   LOG.info("Got exception in retrying sendRegionOpen for "
1858                     + regionState + "; try=" + i + " of " + maximumAttempts, t);
1859                 }
1860                 Threads.sleep(100);
1861               }
1862             }
1863             // Run out of attempts
1864             regionStates.updateRegionState(hri, State.FAILED_OPEN);
1865           } finally {
1866             lock.unlock();
1867           }
1868         }
1869       });
1870   }
1871 
1872   /**
1873    * At master failover, for pending_close region, make sure
1874    * sendRegionClose RPC call is sent to the target regionserver
1875    */
1876   private void retrySendRegionClose(final RegionState regionState) {
1877     this.executorService.submit(
1878       new EventHandler(server, EventType.M_MASTER_RECOVERY) {
1879         @Override
1880         public void process() throws IOException {
1881           HRegionInfo hri = regionState.getRegion();
1882           ServerName serverName = regionState.getServerName();
1883           ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
1884           try {
1885             for (int i = 1; i <= maximumAttempts; i++) {
1886               if (!serverManager.isServerOnline(serverName)
1887                   || server.isStopped() || server.isAborted()) {
1888                 return; // No need any more
1889               }
1890               try {
1891                 if (!regionState.equals(regionStates.getRegionState(hri))) {
1892                   return; // Region is not in the expected state any more
1893                 }
1894                 serverManager.sendRegionClose(serverName, hri, null);
1895                 return; // Done.
1896               } catch (Throwable t) {
1897                 if (t instanceof RemoteException) {
1898                   t = ((RemoteException) t).unwrapRemoteException();
1899                 }
1900                 if (t instanceof FailedServerException && i < maximumAttempts) {
1901                   // In case the server is in the failed server list, no point to
1902                   // retry too soon. Retry after the failed_server_expiry time
1903                   try {
1904                     Configuration conf = this.server.getConfiguration();
1905                     long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1906                       RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
1907                     if (LOG.isDebugEnabled()) {
1908                       LOG.debug(serverName + " is on failed server list; waiting "
1909                         + sleepTime + "ms", t);
1910                     }
1911                     Thread.sleep(sleepTime);
1912                     continue;
1913                   } catch (InterruptedException ie) {
1914                     LOG.warn("Failed to unassign "
1915                       + hri.getRegionNameAsString() + " since interrupted", ie);
1916                     regionStates.updateRegionState(hri, RegionState.State.FAILED_CLOSE);
1917                     Thread.currentThread().interrupt();
1918                     return;
1919                   }
1920                 }
1921                 if (serverManager.isServerOnline(serverName)
1922                     && t instanceof java.net.SocketTimeoutException) {
1923                   i--; // reset the try count
1924                 } else {
1925                   LOG.info("Got exception in retrying sendRegionClose for "
1926                     + regionState + "; try=" + i + " of " + maximumAttempts, t);
1927                 }
1928                 Threads.sleep(100);
1929               }
1930             }
1931             // Run out of attempts
1932             regionStates.updateRegionState(hri, State.FAILED_CLOSE);
1933           } finally {
1934             lock.unlock();
1935           }
1936         }
1937       });
1938   }
1939 
1940   /**
1941    * Set Regions in transitions metrics.
1942    * This takes an iterator on the RegionInTransition map (CLSM), and is not synchronized.
1943    * This iterator is not fail fast, which may lead to stale read; but that's better than
1944    * creating a copy of the map for metrics computation, as this method will be invoked
1945    * on a frequent interval.
1946    */
1947   public void updateRegionsInTransitionMetrics() {
1948     long currentTime = System.currentTimeMillis();
1949     int totalRITs = 0;
1950     int totalRITsOverThreshold = 0;
1951     long oldestRITTime = 0;
1952     int ritThreshold = this.server.getConfiguration().
1953       getInt(HConstants.METRICS_RIT_STUCK_WARNING_THRESHOLD, 60000);
1954     for (RegionState state: regionStates.getRegionsInTransition().values()) {
1955       totalRITs++;
1956       long ritTime = currentTime - state.getStamp();
1957       if (ritTime > ritThreshold) { // more than the threshold
1958         totalRITsOverThreshold++;
1959       }
1960       if (oldestRITTime < ritTime) {
1961         oldestRITTime = ritTime;
1962       }
1963     }
1964     if (this.metricsAssignmentManager != null) {
1965       this.metricsAssignmentManager.updateRITOldestAge(oldestRITTime);
1966       this.metricsAssignmentManager.updateRITCount(totalRITs);
1967       this.metricsAssignmentManager.updateRITCountOverThreshold(totalRITsOverThreshold);
1968     }
1969   }
1970 
1971   /**
1972    * @param region Region whose plan we are to clear.
1973    */
1974   private void clearRegionPlan(final HRegionInfo region) {
1975     synchronized (this.regionPlans) {
1976       this.regionPlans.remove(region.getEncodedName());
1977     }
1978   }
1979 
1980   /**
1981    * Wait on region to clear regions-in-transition.
1982    * @param hri Region to wait on.
1983    * @throws IOException
1984    */
1985   public void waitOnRegionToClearRegionsInTransition(final HRegionInfo hri)
1986       throws IOException, InterruptedException {
1987     waitOnRegionToClearRegionsInTransition(hri, -1L);
1988   }
1989 
1990   /**
1991    * Wait on region to clear regions-in-transition or time out
1992    * @param hri
1993    * @param timeOut Milliseconds to wait for current region to be out of transition state.
1994    * @return True when a region clears regions-in-transition before timeout otherwise false
1995    * @throws InterruptedException
1996    */
1997   public boolean waitOnRegionToClearRegionsInTransition(final HRegionInfo hri, long timeOut)
1998       throws InterruptedException {
1999     if (!regionStates.isRegionInTransition(hri)) return true;
2000     long end = (timeOut <= 0) ? Long.MAX_VALUE : EnvironmentEdgeManager.currentTime()
2001         + timeOut;
2002     // There is already a timeout monitor on regions in transition so I
2003     // should not have to have one here too?
2004     LOG.info("Waiting for " + hri.getEncodedName() +
2005         " to leave regions-in-transition, timeOut=" + timeOut + " ms.");
2006     while (!this.server.isStopped() && regionStates.isRegionInTransition(hri)) {
2007       regionStates.waitForUpdate(100);
2008       if (EnvironmentEdgeManager.currentTime() > end) {
2009         LOG.info("Timed out on waiting for " + hri.getEncodedName() + " to be assigned.");
2010         return false;
2011       }
2012     }
2013     if (this.server.isStopped()) {
2014       LOG.info("Giving up wait on regions in transition because stoppable.isStopped is set");
2015       return false;
2016     }
2017     return true;
2018   }
2019 
2020   void invokeAssign(HRegionInfo regionInfo) {
2021     threadPoolExecutorService.submit(new AssignCallable(this, regionInfo));
2022   }
2023 
2024   void invokeUnAssign(HRegionInfo regionInfo) {
2025     threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
2026   }
2027 
2028   public boolean isCarryingMeta(ServerName serverName) {
2029     return isCarryingRegion(serverName, HRegionInfo.FIRST_META_REGIONINFO);
2030   }
2031 
2032   public boolean isCarryingMetaReplica(ServerName serverName, int replicaId) {
2033     return isCarryingRegion(serverName,
2034         RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO, replicaId));
2035   }
2036 
2037   public boolean isCarryingMetaReplica(ServerName serverName, HRegionInfo metaHri) {
2038     return isCarryingRegion(serverName, metaHri);
2039   }
2040 
2041   /**
2042    * Check if the shutdown server carries the specific region.
2043    * @return whether the serverName currently hosts the region
2044    */
2045   private boolean isCarryingRegion(ServerName serverName, HRegionInfo hri) {
2046     RegionState regionState = regionStates.getRegionTransitionState(hri);
2047     ServerName transitionAddr = regionState != null? regionState.getServerName(): null;
2048     if (transitionAddr != null) {
2049       boolean matchTransitionAddr = transitionAddr.equals(serverName);
2050       LOG.debug("Checking region=" + hri.getRegionNameAsString()
2051         + ", transitioning on server=" + matchTransitionAddr
2052         + " server being checked: " + serverName
2053         + ", matches=" + matchTransitionAddr);
2054       return matchTransitionAddr;
2055     }
2056 
2057     ServerName assignedAddr = regionStates.getRegionServerOfRegion(hri);
2058     boolean matchAssignedAddr = serverName.equals(assignedAddr);
2059     LOG.debug("based on AM, current region=" + hri.getRegionNameAsString()
2060       + " is on server=" + assignedAddr + ", server being checked: "
2061       + serverName);
2062     return matchAssignedAddr;
2063   }
2064 
2065   /**
2066    * Clean out crashed server removing any assignments.
2067    * @param sn Server that went down.
2068    * @return list of regions in transition on this server
2069    */
2070   public List<HRegionInfo> cleanOutCrashedServerReferences(final ServerName sn) {
2071     // Clean out any existing assignment plans for this server
2072     synchronized (this.regionPlans) {
2073       for (Iterator <Map.Entry<String, RegionPlan>> i = this.regionPlans.entrySet().iterator();
2074           i.hasNext();) {
2075         Map.Entry<String, RegionPlan> e = i.next();
2076         ServerName otherSn = e.getValue().getDestination();
2077         // The name will be null if the region is planned for a random assign.
2078         if (otherSn != null && otherSn.equals(sn)) {
2079           // Use iterator's remove else we'll get CME
2080           i.remove();
2081         }
2082       }
2083     }
2084     List<HRegionInfo> rits = regionStates.serverOffline(sn);
2085     for (Iterator<HRegionInfo> it = rits.iterator(); it.hasNext(); ) {
2086       HRegionInfo hri = it.next();
2087       String encodedName = hri.getEncodedName();
2088 
2089       // We need a lock on the region as we could update it
2090       Lock lock = locker.acquireLock(encodedName);
2091       try {
2092         RegionState regionState = regionStates.getRegionTransitionState(encodedName);
2093         if (regionState == null
2094             || (regionState.getServerName() != null && !regionState.isOnServer(sn))
2095             || !RegionStates.isOneOfStates(regionState, State.PENDING_OPEN,
2096                 State.OPENING, State.FAILED_OPEN, State.FAILED_CLOSE, State.OFFLINE)) {
2097           LOG.info("Skip " + regionState + " since it is not opening/failed_close"
2098             + " on the dead server any more: " + sn);
2099           it.remove();
2100         } else {
2101           if (tableStateManager.isTableState(hri.getTable(),
2102                   TableState.State.DISABLED, TableState.State.DISABLING)) {
2103             regionStates.regionOffline(hri);
2104             it.remove();
2105             continue;
2106           }
2107           // Mark the region offline and assign it again by SSH
2108           regionStates.updateRegionState(hri, State.OFFLINE);
2109         }
2110       } finally {
2111         lock.unlock();
2112       }
2113     }
2114     return rits;
2115   }
2116 
2117   /**
2118    * @param plan Plan to execute.
2119    */
2120   public void balance(final RegionPlan plan) {
2121 
2122     HRegionInfo hri = plan.getRegionInfo();
2123     TableName tableName = hri.getTable();
2124     if (tableStateManager.isTableState(tableName,
2125             TableState.State.DISABLED, TableState.State.DISABLING)) {
2126       LOG.info("Ignored moving region of disabling/disabled table "
2127         + tableName);
2128       return;
2129     }
2130 
2131     // Move the region only if it's assigned
2132     String encodedName = hri.getEncodedName();
2133     ReentrantLock lock = locker.acquireLock(encodedName);
2134     try {
2135       if (!regionStates.isRegionOnline(hri)) {
2136         RegionState state = regionStates.getRegionState(encodedName);
2137         LOG.info("Ignored moving region not assigned: " + hri + ", "
2138           + (state == null ? "not in region states" : state));
2139         return;
2140       }
2141       synchronized (this.regionPlans) {
2142         this.regionPlans.put(plan.getRegionName(), plan);
2143       }
2144       unassign(hri, plan.getDestination());
2145     } finally {
2146       lock.unlock();
2147     }
2148   }
2149 
2150   public void stop() {
2151     // Shutdown the threadpool executor service
2152     threadPoolExecutorService.shutdownNow();
2153     regionStateStore.stop();
2154   }
2155 
2156   protected void setEnabledTable(TableName tableName) {
2157     try {
2158       this.tableStateManager.setTableState(tableName,
2159               TableState.State.ENABLED);
2160     } catch (IOException e) {
2161       // here we can abort as it is the start up flow
2162       String errorMsg = "Unable to ensure that the table " + tableName
2163           + " will be" + " enabled because of a ZooKeeper issue";
2164       LOG.error(errorMsg);
2165       this.server.abort(errorMsg, e);
2166     }
2167   }
2168 
2169   private String onRegionFailedOpen(final RegionState current,
2170       final HRegionInfo hri, final ServerName serverName) {
2171     // The region must be opening on this server.
2172     // If current state is failed_open on the same server,
2173     // it could be a reportRegionTransition RPC retry.
2174     if (current == null || !current.isOpeningOrFailedOpenOnServer(serverName)) {
2175       return hri.getShortNameToLog() + " is not opening on " + serverName;
2176     }
2177 
2178     // Just return in case of retrying
2179     if (current.isFailedOpen()) {
2180       return null;
2181     }
2182 
2183     String encodedName = hri.getEncodedName();
2184     AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
2185     if (failedOpenCount == null) {
2186       failedOpenCount = new AtomicInteger();
2187       // No need to use putIfAbsent, or extra synchronization since
2188       // this whole handleRegion block is locked on the encoded region
2189       // name, and failedOpenTracker is updated only in this block
2190       failedOpenTracker.put(encodedName, failedOpenCount);
2191     }
2192     if (failedOpenCount.incrementAndGet() >= maximumAttempts && !hri.isMetaRegion()) {
2193       regionStates.updateRegionState(hri, State.FAILED_OPEN);
2194       // remove the tracking info to save memory, also reset
2195       // the count for next open initiative
2196       failedOpenTracker.remove(encodedName);
2197     } else {
2198       if (hri.isMetaRegion() && failedOpenCount.get() >= maximumAttempts) {
2199         // Log a warning message if a meta region failedOpenCount exceeds maximumAttempts
2200         // so that we are aware of potential problem if it persists for a long time.
2201         LOG.warn("Failed to open the hbase:meta region " +
2202             hri.getRegionNameAsString() + " after" +
2203             failedOpenCount.get() + " retries. Continue retrying.");
2204       }
2205 
2206       // Handle this the same as if it were opened and then closed.
2207       RegionState regionState = regionStates.updateRegionState(hri, State.CLOSED);
2208       if (regionState != null) {
2209         // When there are more than one region server a new RS is selected as the
2210         // destination and the same is updated in the region plan. (HBASE-5546)
2211         if (getTableStateManager().isTableState(hri.getTable(),
2212                 TableState.State.DISABLED, TableState.State.DISABLING) ||
2213                 replicasToClose.contains(hri)) {
2214           offlineDisabledRegion(hri);
2215           return null;
2216         }
2217         regionStates.updateRegionState(hri, RegionState.State.CLOSED);
2218         // This below has to do w/ online enable/disable of a table
2219         removeClosedRegion(hri);
2220         try {
2221           getRegionPlan(hri, true);
2222         } catch (HBaseIOException e) {
2223           LOG.warn("Failed to get region plan", e);
2224         }
2225         invokeAssign(hri);
2226       }
2227     }
2228     // Null means no error
2229     return null;
2230   }
2231 
2232   private String onRegionOpen(final RegionState current, final HRegionInfo hri,
2233       final ServerName serverName, final RegionStateTransition transition) {
2234     // The region must be opening on this server.
2235     // If current state is already opened on the same server,
2236     // it could be a reportRegionTransition RPC retry.
2237     if (current == null || !current.isOpeningOrOpenedOnServer(serverName)) {
2238       return hri.getShortNameToLog() + " is not opening on " + serverName;
2239     }
2240 
2241     // Just return in case of retrying
2242     if (current.isOpened()) {
2243       return null;
2244     }
2245 
2246     long openSeqNum = transition.hasOpenSeqNum()
2247       ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM;
2248     if (openSeqNum < 0) {
2249       return "Newly opened region has invalid open seq num " + openSeqNum;
2250     }
2251     regionOnline(hri, serverName, openSeqNum);
2252 
2253     // reset the count, if any
2254     failedOpenTracker.remove(hri.getEncodedName());
2255     if (getTableStateManager().isTableState(hri.getTable(),
2256             TableState.State.DISABLED, TableState.State.DISABLING)) {
2257       invokeUnAssign(hri);
2258     }
2259     return null;
2260   }
2261 
2262   private String onRegionClosed(final RegionState current,
2263       final HRegionInfo hri, final ServerName serverName) {
2264     // Region will be usually assigned right after closed. When a RPC retry comes
2265     // in, the region may already have moved away from closed state. However, on the
2266     // region server side, we don't care much about the response for this transition.
2267     // We only make sure master has got and processed this report, either
2268     // successfully or not. So this is fine, not a problem at all.
2269     if (current == null || !current.isClosingOrClosedOnServer(serverName)) {
2270       return hri.getShortNameToLog() + " is not closing on " + serverName;
2271     }
2272 
2273     // Just return in case of retrying
2274     if (current.isClosed()) {
2275       return null;
2276     }
2277 
2278     if (getTableStateManager().isTableState(hri.getTable(), TableState.State.DISABLED,
2279         TableState.State.DISABLING) || replicasToClose.contains(hri)) {
2280       offlineDisabledRegion(hri);
2281       return null;
2282     }
2283 
2284     regionStates.updateRegionState(hri, RegionState.State.CLOSED);
2285     sendRegionClosedNotification(hri);
2286     // This below has to do w/ online enable/disable of a table
2287     removeClosedRegion(hri);
2288     invokeAssign(hri);
2289     return null;
2290   }
2291 
2292   private String onRegionReadyToSplit(final RegionState current, final HRegionInfo hri,
2293       final ServerName serverName, final RegionStateTransition transition) {
2294     // The region must be opened on this server.
2295     // If current state is already splitting on the same server,
2296     // it could be a reportRegionTransition RPC retry.
2297     if (current == null || !current.isSplittingOrOpenedOnServer(serverName)) {
2298       return hri.getShortNameToLog() + " is not opening on " + serverName;
2299     }
2300 
2301     // Just return in case of retrying
2302     if (current.isSplitting()) {
2303       return null;
2304     }
2305 
2306     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2307     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2308     RegionState rs_a = regionStates.getRegionState(a);
2309     RegionState rs_b = regionStates.getRegionState(b);
2310     if (rs_a != null || rs_b != null) {
2311       return "Some daughter is already existing. "
2312         + "a=" + rs_a + ", b=" + rs_b;
2313     }
2314 
2315     // Server holding is not updated at this stage.
2316     // It is done after PONR.
2317     regionStates.updateRegionState(hri, State.SPLITTING);
2318     regionStates.createRegionState(
2319       a, State.SPLITTING_NEW, serverName, null);
2320     regionStates.createRegionState(
2321       b, State.SPLITTING_NEW, serverName, null);
2322     return null;
2323   }
2324 
2325   private String onRegionSplitPONR(final RegionState current, final HRegionInfo hri,
2326       final ServerName serverName, final RegionStateTransition transition) {
2327     // The region must be splitting on this server, and the daughters must be in
2328     // splitting_new state. To check RPC retry, we use server holding info.
2329     if (current == null || !current.isSplittingOnServer(serverName)) {
2330       return hri.getShortNameToLog() + " is not splitting on " + serverName;
2331     }
2332 
2333     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2334     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2335     RegionState rs_a = regionStates.getRegionState(a);
2336     RegionState rs_b = regionStates.getRegionState(b);
2337 
2338     // Master could have restarted and lost the new region
2339     // states, if so, they must be lost together
2340     if (rs_a == null && rs_b == null) {
2341       rs_a = regionStates.createRegionState(
2342         a, State.SPLITTING_NEW, serverName, null);
2343       rs_b = regionStates.createRegionState(
2344         b, State.SPLITTING_NEW, serverName, null);
2345     }
2346 
2347     if (rs_a == null || !rs_a.isSplittingNewOnServer(serverName)
2348         || rs_b == null || !rs_b.isSplittingNewOnServer(serverName)) {
2349       return "Some daughter is not known to be splitting on " + serverName
2350         + ", a=" + rs_a + ", b=" + rs_b;
2351     }
2352 
2353     // Just return in case of retrying
2354     if (!regionStates.isRegionOnServer(hri, serverName)) {
2355       return null;
2356     }
2357 
2358     try {
2359       regionStates.splitRegion(hri, a, b, serverName);
2360     } catch (IOException ioe) {
2361       LOG.info("Failed to record split region " + hri.getShortNameToLog());
2362       return "Failed to record the splitting in meta";
2363     }
2364     return null;
2365   }
2366 
2367   private String onRegionSplit(final RegionState current, final HRegionInfo hri,
2368       final ServerName serverName, final RegionStateTransition transition) {
2369     // The region must be splitting on this server, and the daughters must be in
2370     // splitting_new state.
2371     // If current state is already split on the same server,
2372     // it could be a reportRegionTransition RPC retry.
2373     if (current == null || !current.isSplittingOrSplitOnServer(serverName)) {
2374       return hri.getShortNameToLog() + " is not splitting on " + serverName;
2375     }
2376 
2377     // Just return in case of retrying
2378     if (current.isSplit()) {
2379       return null;
2380     }
2381 
2382     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2383     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2384     RegionState rs_a = regionStates.getRegionState(a);
2385     RegionState rs_b = regionStates.getRegionState(b);
2386     if (rs_a == null || !rs_a.isSplittingNewOnServer(serverName)
2387         || rs_b == null || !rs_b.isSplittingNewOnServer(serverName)) {
2388       return "Some daughter is not known to be splitting on " + serverName
2389         + ", a=" + rs_a + ", b=" + rs_b;
2390     }
2391 
2392     if (TEST_SKIP_SPLIT_HANDLING) {
2393       return "Skipping split message, TEST_SKIP_SPLIT_HANDLING is set";
2394     }
2395     regionOffline(hri, State.SPLIT);
2396     regionOnline(a, serverName, 1);
2397     regionOnline(b, serverName, 1);
2398 
2399     // User could disable the table before master knows the new region.
2400     if (getTableStateManager().isTableState(hri.getTable(),
2401         TableState.State.DISABLED, TableState.State.DISABLING)) {
2402       invokeUnAssign(a);
2403       invokeUnAssign(b);
2404     } else {
2405       Callable<Object> splitReplicasCallable = new Callable<Object>() {
2406         @Override
2407         public Object call() {
2408           doSplittingOfReplicas(hri, a, b);
2409           return null;
2410         }
2411       };
2412       threadPoolExecutorService.submit(splitReplicasCallable);
2413     }
2414     return null;
2415   }
2416 
2417   private String onRegionSplitReverted(final RegionState current, final HRegionInfo hri,
2418       final ServerName serverName, final RegionStateTransition transition) {
2419     // The region must be splitting on this server, and the daughters must be in
2420     // splitting_new state.
2421     // If the region is in open state, it could be an RPC retry.
2422     if (current == null || !current.isSplittingOrOpenedOnServer(serverName)) {
2423       return hri.getShortNameToLog() + " is not splitting on " + serverName;
2424     }
2425 
2426     // Just return in case of retrying
2427     if (current.isOpened()) {
2428       return null;
2429     }
2430 
2431     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2432     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2433     RegionState rs_a = regionStates.getRegionState(a);
2434     RegionState rs_b = regionStates.getRegionState(b);
2435     if (rs_a == null || !rs_a.isSplittingNewOnServer(serverName)
2436         || rs_b == null || !rs_b.isSplittingNewOnServer(serverName)) {
2437       return "Some daughter is not known to be splitting on " + serverName
2438         + ", a=" + rs_a + ", b=" + rs_b;
2439     }
2440 
2441     regionOnline(hri, serverName);
2442     regionOffline(a);
2443     regionOffline(b);
2444     if (getTableStateManager().isTableState(hri.getTable(),
2445         TableState.State.DISABLED, TableState.State.DISABLING)) {
2446       invokeUnAssign(hri);
2447     }
2448     return null;
2449   }
2450 
2451   private String onRegionReadyToMerge(final RegionState current, final HRegionInfo hri,
2452       final ServerName serverName, final RegionStateTransition transition) {
2453     // The region must be new, and the daughters must be open on this server.
2454     // If the region is in merge_new state, it could be an RPC retry.
2455     if (current != null && !current.isMergingNewOnServer(serverName)) {
2456       return "Merging daughter region already exists, p=" + current;
2457     }
2458 
2459     // Just return in case of retrying
2460     if (current != null) {
2461       return null;
2462     }
2463 
2464     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2465     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2466     Set<String> encodedNames = new HashSet<String>(2);
2467     encodedNames.add(a.getEncodedName());
2468     encodedNames.add(b.getEncodedName());
2469     Map<String, Lock> locks = locker.acquireLocks(encodedNames);
2470     try {
2471       RegionState rs_a = regionStates.getRegionState(a);
2472       RegionState rs_b = regionStates.getRegionState(b);
2473       if (rs_a == null || !rs_a.isOpenedOnServer(serverName)
2474           || rs_b == null || !rs_b.isOpenedOnServer(serverName)) {
2475         return "Some daughter is not in a state to merge on " + serverName
2476           + ", a=" + rs_a + ", b=" + rs_b;
2477       }
2478 
2479       regionStates.updateRegionState(a, State.MERGING);
2480       regionStates.updateRegionState(b, State.MERGING);
2481       regionStates.createRegionState(
2482         hri, State.MERGING_NEW, serverName, null);
2483       return null;
2484     } finally {
2485       for (Lock lock: locks.values()) {
2486         lock.unlock();
2487       }
2488     }
2489   }
2490 
2491   private String onRegionMergePONR(final RegionState current, final HRegionInfo hri,
2492       final ServerName serverName, final RegionStateTransition transition) {
2493     // The region must be in merging_new state, and the daughters must be
2494     // merging. To check RPC retry, we use server holding info.
2495     if (current != null && !current.isMergingNewOnServer(serverName)) {
2496       return hri.getShortNameToLog() + " is not merging on " + serverName;
2497     }
2498 
2499     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2500     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2501     RegionState rs_a = regionStates.getRegionState(a);
2502     RegionState rs_b = regionStates.getRegionState(b);
2503     if (rs_a == null || !rs_a.isMergingOnServer(serverName)
2504         || rs_b == null || !rs_b.isMergingOnServer(serverName)) {
2505       return "Some daughter is not known to be merging on " + serverName
2506         + ", a=" + rs_a + ", b=" + rs_b;
2507     }
2508 
2509     // Master could have restarted and lost the new region state
2510     if (current == null) {
2511       regionStates.createRegionState(
2512         hri, State.MERGING_NEW, serverName, null);
2513     }
2514 
2515     // Just return in case of retrying
2516     if (regionStates.isRegionOnServer(hri, serverName)) {
2517       return null;
2518     }
2519 
2520     try {
2521       regionStates.mergeRegions(hri, a, b, serverName);
2522     } catch (IOException ioe) {
2523       LOG.info("Failed to record merged region " + hri.getShortNameToLog());
2524       return "Failed to record the merging in meta";
2525     }
2526     return null;
2527   }
2528 
2529   private String onRegionMerged(final RegionState current, final HRegionInfo hri,
2530       final ServerName serverName, final RegionStateTransition transition) {
2531     // The region must be in merging_new state, and the daughters must be
2532     // merging on this server.
2533     // If current state is already opened on the same server,
2534     // it could be a reportRegionTransition RPC retry.
2535     if (current == null || !current.isMergingNewOrOpenedOnServer(serverName)) {
2536       return hri.getShortNameToLog() + " is not merging on " + serverName;
2537     }
2538 
2539     // Just return in case of retrying
2540     if (current.isOpened()) {
2541       return null;
2542     }
2543 
2544     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2545     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2546     RegionState rs_a = regionStates.getRegionState(a);
2547     RegionState rs_b = regionStates.getRegionState(b);
2548     if (rs_a == null || !rs_a.isMergingOnServer(serverName)
2549         || rs_b == null || !rs_b.isMergingOnServer(serverName)) {
2550       return "Some daughter is not known to be merging on " + serverName
2551         + ", a=" + rs_a + ", b=" + rs_b;
2552     }
2553 
2554     regionOffline(a, State.MERGED);
2555     regionOffline(b, State.MERGED);
2556     regionOnline(hri, serverName, 1);
2557 
2558     // User could disable the table before master knows the new region.
2559     if (getTableStateManager().isTableState(hri.getTable(),
2560         TableState.State.DISABLED, TableState.State.DISABLING)) {
2561       invokeUnAssign(hri);
2562     } else {
2563       Callable<Object> mergeReplicasCallable = new Callable<Object>() {
2564         @Override
2565         public Object call() {
2566           doMergingOfReplicas(hri, a, b);
2567           return null;
2568         }
2569       };
2570       threadPoolExecutorService.submit(mergeReplicasCallable);
2571     }
2572     return null;
2573   }
2574 
2575   private String onRegionMergeReverted(final RegionState current, final HRegionInfo hri,
2576       final ServerName serverName, final RegionStateTransition transition) {
2577     // The region must be in merging_new state, and the daughters must be
2578     // merging on this server.
2579     // If the region is in offline state, it could be an RPC retry.
2580     if (current == null || !current.isMergingNewOrOfflineOnServer(serverName)) {
2581       return hri.getShortNameToLog() + " is not merging on " + serverName;
2582     }
2583 
2584     // Just return in case of retrying
2585     if (current.isOffline()) {
2586       return null;
2587     }
2588 
2589     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2590     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2591     RegionState rs_a = regionStates.getRegionState(a);
2592     RegionState rs_b = regionStates.getRegionState(b);
2593     if (rs_a == null || !rs_a.isMergingOnServer(serverName)
2594         || rs_b == null || !rs_b.isMergingOnServer(serverName)) {
2595       return "Some daughter is not known to be merging on " + serverName
2596         + ", a=" + rs_a + ", b=" + rs_b;
2597     }
2598 
2599     regionOnline(a, serverName);
2600     regionOnline(b, serverName);
2601     regionOffline(hri);
2602 
2603     if (getTableStateManager().isTableState(hri.getTable(),
2604         TableState.State.DISABLED, TableState.State.DISABLING)) {
2605       invokeUnAssign(a);
2606       invokeUnAssign(b);
2607     }
2608     return null;
2609   }
2610 
2611   private void doMergingOfReplicas(HRegionInfo mergedHri, final HRegionInfo hri_a,
2612       final HRegionInfo hri_b) {
2613     // Close replicas for the original unmerged regions. create/assign new replicas
2614     // for the merged parent.
2615     List<HRegionInfo> unmergedRegions = new ArrayList<HRegionInfo>();
2616     unmergedRegions.add(hri_a);
2617     unmergedRegions.add(hri_b);
2618     Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(unmergedRegions);
2619     Collection<List<HRegionInfo>> c = map.values();
2620     for (List<HRegionInfo> l : c) {
2621       for (HRegionInfo h : l) {
2622         if (!RegionReplicaUtil.isDefaultReplica(h)) {
2623           LOG.debug("Unassigning un-merged replica " + h);
2624           unassign(h);
2625         }
2626       }
2627     }
2628     int numReplicas = 1;
2629     try {
2630       numReplicas = ((MasterServices)server).getTableDescriptors().get(mergedHri.getTable()).
2631           getRegionReplication();
2632     } catch (IOException e) {
2633       LOG.warn("Couldn't get the replication attribute of the table " + mergedHri.getTable() +
2634           " due to " + e.getMessage() + ". The assignment of replicas for the merged region " +
2635           "will not be done");
2636     }
2637     List<HRegionInfo> regions = new ArrayList<HRegionInfo>();
2638     for (int i = 1; i < numReplicas; i++) {
2639       regions.add(RegionReplicaUtil.getRegionInfoForReplica(mergedHri, i));
2640     }
2641     try {
2642       assign(regions);
2643     } catch (IOException ioe) {
2644       LOG.warn("Couldn't assign all replica(s) of region " + mergedHri + " because of " +
2645                 ioe.getMessage());
2646     } catch (InterruptedException ie) {
2647       LOG.warn("Couldn't assign all replica(s) of region " + mergedHri+ " because of " +
2648                 ie.getMessage());
2649     }
2650   }
2651 
2652   private void doSplittingOfReplicas(final HRegionInfo parentHri, final HRegionInfo hri_a,
2653       final HRegionInfo hri_b) {
2654     // create new regions for the replica, and assign them to match with the
2655     // current replica assignments. If replica1 of parent is assigned to RS1,
2656     // the replica1s of daughters will be on the same machine
2657     int numReplicas = 1;
2658     try {
2659       numReplicas = ((MasterServices)server).getTableDescriptors().get(parentHri.getTable()).
2660           getRegionReplication();
2661     } catch (IOException e) {
2662       LOG.warn("Couldn't get the replication attribute of the table " + parentHri.getTable() +
2663           " due to " + e.getMessage() + ". The assignment of daughter replicas " +
2664           "replicas will not be done");
2665     }
2666     // unassign the old replicas
2667     List<HRegionInfo> parentRegion = new ArrayList<HRegionInfo>();
2668     parentRegion.add(parentHri);
2669     Map<ServerName, List<HRegionInfo>> currentAssign =
2670         regionStates.getRegionAssignments(parentRegion);
2671     Collection<List<HRegionInfo>> c = currentAssign.values();
2672     for (List<HRegionInfo> l : c) {
2673       for (HRegionInfo h : l) {
2674         if (!RegionReplicaUtil.isDefaultReplica(h)) {
2675           LOG.debug("Unassigning parent's replica " + h);
2676           unassign(h);
2677         }
2678       }
2679     }
2680     // assign daughter replicas
2681     Map<HRegionInfo, ServerName> map = new HashMap<HRegionInfo, ServerName>();
2682     for (int i = 1; i < numReplicas; i++) {
2683       prepareDaughterReplicaForAssignment(hri_a, parentHri, i, map);
2684       prepareDaughterReplicaForAssignment(hri_b, parentHri, i, map);
2685     }
2686     try {
2687       assign(map);
2688     } catch (IOException e) {
2689       LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)");
2690     } catch (InterruptedException e) {
2691       LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)");
2692     }
2693   }
2694 
2695   private void prepareDaughterReplicaForAssignment(HRegionInfo daughterHri, HRegionInfo parentHri,
2696       int replicaId, Map<HRegionInfo, ServerName> map) {
2697     HRegionInfo parentReplica = RegionReplicaUtil.getRegionInfoForReplica(parentHri, replicaId);
2698     HRegionInfo daughterReplica = RegionReplicaUtil.getRegionInfoForReplica(daughterHri,
2699         replicaId);
2700     LOG.debug("Created replica region for daughter " + daughterReplica);
2701     ServerName sn;
2702     if ((sn = regionStates.getRegionServerOfRegion(parentReplica)) != null) {
2703       map.put(daughterReplica, sn);
2704     } else {
2705       List<ServerName> servers = serverManager.getOnlineServersList();
2706       sn = servers.get((new Random(System.currentTimeMillis())).nextInt(servers.size()));
2707       map.put(daughterReplica, sn);
2708     }
2709   }
2710 
2711   public Set<HRegionInfo> getReplicasToClose() {
2712     return replicasToClose;
2713   }
2714 
2715   /**
2716    * A region is offline.  The new state should be the specified one,
2717    * if not null.  If the specified state is null, the new state is Offline.
2718    * The specified state can be Split/Merged/Offline/null only.
2719    */
2720   private void regionOffline(final HRegionInfo regionInfo, final State state) {
2721     regionStates.regionOffline(regionInfo, state);
2722     removeClosedRegion(regionInfo);
2723     // remove the region plan as well just in case.
2724     clearRegionPlan(regionInfo);
2725     balancer.regionOffline(regionInfo);
2726 
2727     // Tell our listeners that a region was closed
2728     sendRegionClosedNotification(regionInfo);
2729     // also note that all the replicas of the primary should be closed
2730     if (state != null && state.equals(State.SPLIT)) {
2731       Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1);
2732       c.add(regionInfo);
2733       Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(c);
2734       Collection<List<HRegionInfo>> allReplicas = map.values();
2735       for (List<HRegionInfo> list : allReplicas) {
2736         replicasToClose.addAll(list);
2737       }
2738     }
2739     else if (state != null && state.equals(State.MERGED)) {
2740       Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1);
2741       c.add(regionInfo);
2742       Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(c);
2743       Collection<List<HRegionInfo>> allReplicas = map.values();
2744       for (List<HRegionInfo> list : allReplicas) {
2745         replicasToClose.addAll(list);
2746       }
2747     }
2748   }
2749 
2750   private void sendRegionOpenedNotification(final HRegionInfo regionInfo,
2751       final ServerName serverName) {
2752     if (!this.listeners.isEmpty()) {
2753       for (AssignmentListener listener : this.listeners) {
2754         listener.regionOpened(regionInfo, serverName);
2755       }
2756     }
2757   }
2758 
2759   private void sendRegionClosedNotification(final HRegionInfo regionInfo) {
2760     if (!this.listeners.isEmpty()) {
2761       for (AssignmentListener listener : this.listeners) {
2762         listener.regionClosed(regionInfo);
2763       }
2764     }
2765   }
2766 
2767   /**
2768    * Try to update some region states. If the state machine prevents
2769    * such update, an error message is returned to explain the reason.
2770    *
2771    * It's expected that in each transition there should have just one
2772    * region for opening/closing, 3 regions for splitting/merging.
2773    * These regions should be on the server that requested the change.
2774    *
2775    * Region state machine. Only these transitions
2776    * are expected to be triggered by a region server.
2777    *
2778    * On the state transition:
2779    *  (1) Open/Close should be initiated by master
2780    *      (a) Master sets the region to pending_open/pending_close
2781    *        in memory and hbase:meta after sending the request
2782    *        to the region server
2783    *      (b) Region server reports back to the master
2784    *        after open/close is done (either success/failure)
2785    *      (c) If region server has problem to report the status
2786    *        to master, it must be because the master is down or some
2787    *        temporary network issue. Otherwise, the region server should
2788    *        abort since it must be a bug. If the master is not accessible,
2789    *        the region server should keep trying until the server is
2790    *        stopped or till the status is reported to the (new) master
2791    *      (d) If region server dies in the middle of opening/closing
2792    *        a region, SSH picks it up and finishes it
2793    *      (e) If master dies in the middle, the new master recovers
2794    *        the state during initialization from hbase:meta. Region server
2795    *        can report any transition that has not been reported to
2796    *        the previous active master yet
2797    *  (2) Split/merge is initiated by region servers
2798    *      (a) To split a region, a region server sends a request
2799    *        to master to try to set a region to splitting, together with
2800    *        two daughters (to be created) to splitting new. If approved
2801    *        by the master, the splitting can then move ahead
2802    *      (b) To merge two regions, a region server sends a request to
2803    *        master to try to set the new merged region (to be created) to
2804    *        merging_new, together with two regions (to be merged) to merging.
2805    *        If it is ok with the master, the merge can then move ahead
2806    *      (c) Once the splitting/merging is done, the region server
2807    *        reports the status back to the master either success/failure.
2808    *      (d) Other scenarios should be handled similarly as for
2809    *        region open/close
2810    */
2811   protected String onRegionTransition(final ServerName serverName,
2812       final RegionStateTransition transition) {
2813     TransitionCode code = transition.getTransitionCode();
2814     HRegionInfo hri = HRegionInfo.convert(transition.getRegionInfo(0));
2815     Lock lock = locker.acquireLock(hri.getEncodedName());
2816     try {
2817       RegionState current = regionStates.getRegionState(hri);
2818       if (LOG.isDebugEnabled()) {
2819         LOG.debug("Got transition " + code + " for "
2820           + (current != null ? current.toString() : hri.getShortNameToLog())
2821           + " from " + serverName);
2822       }
2823       String errorMsg = null;
2824       switch (code) {
2825       case OPENED:
2826         errorMsg = onRegionOpen(current, hri, serverName, transition);
2827         break;
2828       case FAILED_OPEN:
2829         errorMsg = onRegionFailedOpen(current, hri, serverName);
2830         break;
2831       case CLOSED:
2832         errorMsg = onRegionClosed(current, hri, serverName);
2833         break;
2834       case READY_TO_SPLIT:
2835         try {
2836           regionStateListener.onRegionSplit(hri);
2837           errorMsg = onRegionReadyToSplit(current, hri, serverName, transition);
2838         } catch (IOException exp) {
2839           errorMsg = StringUtils.stringifyException(exp);
2840         }
2841         break;
2842       case SPLIT_PONR:
2843         errorMsg = onRegionSplitPONR(current, hri, serverName, transition);
2844         break;
2845       case SPLIT:
2846         errorMsg = onRegionSplit(current, hri, serverName, transition);
2847         break;
2848       case SPLIT_REVERTED:
2849         errorMsg = onRegionSplitReverted(current, hri, serverName, transition);
2850         if (org.apache.commons.lang.StringUtils.isEmpty(errorMsg)) {
2851           try {
2852             regionStateListener.onRegionSplitReverted(hri);
2853           } catch (IOException exp) {
2854             LOG.warn(StringUtils.stringifyException(exp));
2855           }
2856         }
2857         break;
2858       case READY_TO_MERGE:
2859         errorMsg = onRegionReadyToMerge(current, hri, serverName, transition);
2860         break;
2861       case MERGE_PONR:
2862         errorMsg = onRegionMergePONR(current, hri, serverName, transition);
2863         break;
2864       case MERGED:
2865         try {
2866           errorMsg = onRegionMerged(current, hri, serverName, transition);
2867           regionStateListener.onRegionMerged(hri);
2868         } catch (IOException exp) {
2869           errorMsg = StringUtils.stringifyException(exp);
2870         }
2871         break;
2872       case MERGE_REVERTED:
2873         errorMsg = onRegionMergeReverted(current, hri, serverName, transition);
2874         break;
2875 
2876       default:
2877         errorMsg = "Unexpected transition code " + code;
2878       }
2879       if (errorMsg != null) {
2880         LOG.info("Could not transition region from " + current + " on "
2881           + code + " by " + serverName + ": " + errorMsg);
2882       }
2883       return errorMsg;
2884     } finally {
2885       lock.unlock();
2886     }
2887   }
2888 
2889   /**
2890    * @return Instance of load balancer
2891    */
2892   public LoadBalancer getBalancer() {
2893     return this.balancer;
2894   }
2895 
2896   public Map<ServerName, List<HRegionInfo>>
2897     getSnapShotOfAssignment(Collection<HRegionInfo> infos) {
2898     return getRegionStates().getRegionAssignments(infos);
2899   }
2900 
2901   void setRegionStateListener(RegionStateListener listener) {
2902     this.regionStateListener = listener;
2903   }
2904 }