View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Collection;
24  import java.util.Collections;
25  import java.util.HashMap;
26  import java.util.HashSet;
27  import java.util.Iterator;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.NavigableMap;
31  import java.util.Random;
32  import java.util.Set;
33  import java.util.TreeMap;
34  import java.util.concurrent.Callable;
35  import java.util.concurrent.ConcurrentHashMap;
36  import java.util.concurrent.CopyOnWriteArrayList;
37  import java.util.concurrent.TimeUnit;
38  import java.util.concurrent.atomic.AtomicBoolean;
39  import java.util.concurrent.atomic.AtomicInteger;
40  import java.util.concurrent.locks.Lock;
41  import java.util.concurrent.locks.ReentrantLock;
42  
43  import org.apache.commons.logging.Log;
44  import org.apache.commons.logging.LogFactory;
45  import org.apache.hadoop.classification.InterfaceAudience;
46  import org.apache.hadoop.conf.Configuration;
47  import org.apache.hadoop.fs.FileSystem;
48  import org.apache.hadoop.fs.Path;
49  import org.apache.hadoop.hbase.HBaseIOException;
50  import org.apache.hadoop.hbase.HConstants;
51  import org.apache.hadoop.hbase.HRegionInfo;
52  import org.apache.hadoop.hbase.HRegionLocation;
53  import org.apache.hadoop.hbase.HTableDescriptor;
54  import org.apache.hadoop.hbase.MetaTableAccessor;
55  import org.apache.hadoop.hbase.NotServingRegionException;
56  import org.apache.hadoop.hbase.RegionLocations;
57  import org.apache.hadoop.hbase.Server;
58  import org.apache.hadoop.hbase.ServerName;
59  import org.apache.hadoop.hbase.TableName;
60  import org.apache.hadoop.hbase.TableNotFoundException;
61  import org.apache.hadoop.hbase.client.RegionReplicaUtil;
62  import org.apache.hadoop.hbase.client.Result;
63  import org.apache.hadoop.hbase.client.TableState;
64  import org.apache.hadoop.hbase.executor.EventHandler;
65  import org.apache.hadoop.hbase.executor.EventType;
66  import org.apache.hadoop.hbase.executor.ExecutorService;
67  import org.apache.hadoop.hbase.ipc.RpcClient;
68  import org.apache.hadoop.hbase.ipc.RpcClient.FailedServerException;
69  import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
70  import org.apache.hadoop.hbase.master.RegionState.State;
71  import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper;
72  import org.apache.hadoop.hbase.master.balancer.FavoredNodeLoadBalancer;
73  import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
74  import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
75  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
76  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
77  import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
78  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
79  import org.apache.hadoop.hbase.regionserver.wal.HLog;
80  import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
81  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
82  import org.apache.hadoop.hbase.util.FSUtils;
83  import org.apache.hadoop.hbase.util.KeyLocker;
84  import org.apache.hadoop.hbase.util.Pair;
85  import org.apache.hadoop.hbase.util.PairOfSameType;
86  import org.apache.hadoop.hbase.util.Threads;
87  import org.apache.hadoop.ipc.RemoteException;
88  import org.apache.zookeeper.KeeperException;
89  
90  import com.google.common.annotations.VisibleForTesting;
91  
92  /**
93   * Manages and performs region assignment.
94   * Related communications with regionserver are all done over RPC.
95   */
96  @InterfaceAudience.Private
97  public class AssignmentManager {
98    private static final Log LOG = LogFactory.getLog(AssignmentManager.class);
99  
100   protected final Server server;
101 
102   private ServerManager serverManager;
103 
104   private boolean shouldAssignRegionsWithFavoredNodes;
105 
106   private LoadBalancer balancer;
107 
108   private final MetricsAssignmentManager metricsAssignmentManager;
109 
110   private final TableLockManager tableLockManager;
111 
112   private AtomicInteger numRegionsOpened = new AtomicInteger(0);
113 
114   final private KeyLocker<String> locker = new KeyLocker<String>();
115 
116   Set<HRegionInfo> replicasToClose = Collections.synchronizedSet(new HashSet<HRegionInfo>());
117 
118   /**
119    * Map of regions to reopen after the schema of a table is changed. Key -
120    * encoded region name, value - HRegionInfo
121    */
122   private final Map <String, HRegionInfo> regionsToReopen;
123 
124   /*
125    * Maximum times we recurse an assignment/unassignment.
126    * See below in {@link #assign()} and {@link #unassign()}.
127    */
128   private final int maximumAttempts;
129 
130   /**
131    * The sleep time for which the assignment will wait before retrying in case of hbase:meta assignment
132    * failure due to lack of availability of region plan
133    */
134   private final long sleepTimeBeforeRetryingMetaAssignment;
135 
136   /** Plans for region movement. Key is the encoded version of a region name*/
137   // TODO: When do plans get cleaned out?  Ever? In server open and in server
138   // shutdown processing -- St.Ack
139   // All access to this Map must be synchronized.
140   final NavigableMap<String, RegionPlan> regionPlans =
141     new TreeMap<String, RegionPlan>();
142 
143   private final TableStateManager tableStateManager;
144 
145   private final ExecutorService executorService;
146 
147   // Thread pool executor service. TODO, consolidate with executorService?
148   private java.util.concurrent.ExecutorService threadPoolExecutorService;
149 
150   private final RegionStates regionStates;
151 
152   // The threshold to use bulk assigning. Using bulk assignment
153   // only if assigning at least this many regions to at least this
154   // many servers. If assigning fewer regions to fewer servers,
155   // bulk assigning may be not as efficient.
156   private final int bulkAssignThresholdRegions;
157   private final int bulkAssignThresholdServers;
158 
159   // Should bulk assignment wait till all regions are assigned,
160   // or it is timed out?  This is useful to measure bulk assignment
161   // performance, but not needed in most use cases.
162   private final boolean bulkAssignWaitTillAllAssigned;
163 
164   /**
165    * Indicator that AssignmentManager has recovered the region states so
166    * that ServerShutdownHandler can be fully enabled and re-assign regions
167    * of dead servers. So that when re-assignment happens, AssignmentManager
168    * has proper region states.
169    *
170    * Protected to ease testing.
171    */
172   protected final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
173 
174   /**
175    * A map to track the count a region fails to open in a row.
176    * So that we don't try to open a region forever if the failure is
177    * unrecoverable.  We don't put this information in region states
178    * because we don't expect this to happen frequently; we don't
179    * want to copy this information over during each state transition either.
180    */
181   private final ConcurrentHashMap<String, AtomicInteger>
182     failedOpenTracker = new ConcurrentHashMap<String, AtomicInteger>();
183 
184   // In case not using ZK for region assignment, region states
185   // are persisted in meta with a state store
186   private final RegionStateStore regionStateStore;
187 
188   /**
189    * For testing only!  Set to true to skip handling of split.
190    */
191   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
192   public static boolean TEST_SKIP_SPLIT_HANDLING = false;
193 
194   /** Listeners that are called on assignment events. */
195   private List<AssignmentListener> listeners = new CopyOnWriteArrayList<AssignmentListener>();
196 
197   /**
198    * Constructs a new assignment manager.
199    *
200    * @param server instance of HMaster this AM running inside
201    * @param serverManager serverManager for associated HMaster
202    * @param balancer implementation of {@link LoadBalancer}
203    * @param service Executor service
204    * @param metricsMaster metrics manager
205    * @param tableLockManager TableLock manager
206    * @throws IOException
207    */
208   public AssignmentManager(Server server, ServerManager serverManager,
209       final LoadBalancer balancer,
210       final ExecutorService service, MetricsMaster metricsMaster,
211       final TableLockManager tableLockManager,
212       final TableStateManager tableStateManager)
213           throws IOException {
214     this.server = server;
215     this.serverManager = serverManager;
216     this.executorService = service;
217     this.regionStateStore = new RegionStateStore(server);
218     this.regionsToReopen = Collections.synchronizedMap
219                            (new HashMap<String, HRegionInfo> ());
220     Configuration conf = server.getConfiguration();
221     // Only read favored nodes if using the favored nodes load balancer.
222     this.shouldAssignRegionsWithFavoredNodes = conf.getClass(
223            HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class).equals(
224            FavoredNodeLoadBalancer.class);
225 
226     this.tableStateManager = tableStateManager;
227 
228     // This is the max attempts, not retries, so it should be at least 1.
229     this.maximumAttempts = Math.max(1,
230       this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10));
231     this.sleepTimeBeforeRetryingMetaAssignment = this.server.getConfiguration().getLong(
232         "hbase.meta.assignment.retry.sleeptime", 1000l);
233     this.balancer = balancer;
234     int maxThreads = conf.getInt("hbase.assignment.threads.max", 30);
235     this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool(
236       maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("AM."));
237     this.regionStates = new RegionStates(
238       server, tableStateManager, serverManager, regionStateStore);
239 
240     this.bulkAssignWaitTillAllAssigned =
241       conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false);
242     this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7);
243     this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3);
244 
245     this.metricsAssignmentManager = new MetricsAssignmentManager();
246     this.tableLockManager = tableLockManager;
247   }
248 
249   /**
250    * Add the listener to the notification list.
251    * @param listener The AssignmentListener to register
252    */
253   public void registerListener(final AssignmentListener listener) {
254     this.listeners.add(listener);
255   }
256 
257   /**
258    * Remove the listener from the notification list.
259    * @param listener The AssignmentListener to unregister
260    */
261   public boolean unregisterListener(final AssignmentListener listener) {
262     return this.listeners.remove(listener);
263   }
264 
265   /**
266    * @return Instance of ZKTableStateManager.
267    */
268   public TableStateManager getTableStateManager() {
269     // These are 'expensive' to make involving trip to zk ensemble so allow
270     // sharing.
271     return this.tableStateManager;
272   }
273 
274   /**
275    * This SHOULD not be public. It is public now
276    * because of some unit tests.
277    *
278    * TODO: make it package private and keep RegionStates in the master package
279    */
280   public RegionStates getRegionStates() {
281     return regionStates;
282   }
283 
284   /**
285    * Used in some tests to mock up region state in meta
286    */
287   @VisibleForTesting
288   RegionStateStore getRegionStateStore() {
289     return regionStateStore;
290   }
291 
292   public RegionPlan getRegionReopenPlan(HRegionInfo hri) {
293     return new RegionPlan(hri, null, regionStates.getRegionServerOfRegion(hri));
294   }
295 
296   /**
297    * Add a regionPlan for the specified region.
298    * @param encodedName
299    * @param plan
300    */
301   public void addPlan(String encodedName, RegionPlan plan) {
302     synchronized (regionPlans) {
303       regionPlans.put(encodedName, plan);
304     }
305   }
306 
307   /**
308    * Add a map of region plans.
309    */
310   public void addPlans(Map<String, RegionPlan> plans) {
311     synchronized (regionPlans) {
312       regionPlans.putAll(plans);
313     }
314   }
315 
316   /**
317    * Set the list of regions that will be reopened
318    * because of an update in table schema
319    *
320    * @param regions
321    *          list of regions that should be tracked for reopen
322    */
323   public void setRegionsToReopen(List <HRegionInfo> regions) {
324     for(HRegionInfo hri : regions) {
325       regionsToReopen.put(hri.getEncodedName(), hri);
326     }
327   }
328 
329   /**
330    * Used by the client to identify if all regions have the schema updates
331    *
332    * @param tableName
333    * @return Pair indicating the status of the alter command
334    * @throws IOException
335    */
336   public Pair<Integer, Integer> getReopenStatus(TableName tableName)
337       throws IOException {
338     List <HRegionInfo> hris = MetaTableAccessor.getTableRegions(
339       this.server.getZooKeeper(), this.server.getShortCircuitConnection(),
340       tableName, true);
341     Integer pending = 0;
342     for (HRegionInfo hri : hris) {
343       String name = hri.getEncodedName();
344       // no lock concurrent access ok: sequential consistency respected.
345       if (regionsToReopen.containsKey(name)
346           || regionStates.isRegionInTransition(name)) {
347         pending++;
348       }
349     }
350     return new Pair<Integer, Integer>(pending, hris.size());
351   }
352 
353   /**
354    * Used by ServerShutdownHandler to make sure AssignmentManager has completed
355    * the failover cleanup before re-assigning regions of dead servers. So that
356    * when re-assignment happens, AssignmentManager has proper region states.
357    */
358   public boolean isFailoverCleanupDone() {
359     return failoverCleanupDone.get();
360   }
361 
362   /**
363    * To avoid racing with AM, external entities may need to lock a region,
364    * for example, when SSH checks what regions to skip re-assigning.
365    */
366   public Lock acquireRegionLock(final String encodedName) {
367     return locker.acquireLock(encodedName);
368   }
369 
370   /**
371    * Now, failover cleanup is completed. Notify server manager to
372    * process queued up dead servers processing, if any.
373    */
374   void failoverCleanupDone() {
375     failoverCleanupDone.set(true);
376     serverManager.processQueuedDeadServers();
377   }
378 
379   /**
380    * Called on startup.
381    * Figures whether a fresh cluster start of we are joining extant running cluster.
382    * @throws IOException
383    * @throws KeeperException
384    * @throws InterruptedException
385    */
386   void joinCluster() throws IOException,
387           KeeperException, InterruptedException {
388     long startTime = System.currentTimeMillis();
389     // Concurrency note: In the below the accesses on regionsInTransition are
390     // outside of a synchronization block where usually all accesses to RIT are
391     // synchronized.  The presumption is that in this case it is safe since this
392     // method is being played by a single thread on startup.
393 
394     // TODO: Regions that have a null location and are not in regionsInTransitions
395     // need to be handled.
396 
397     // Scan hbase:meta to build list of existing regions, servers, and assignment
398     // Returns servers who have not checked in (assumed dead) that some regions
399     // were assigned to (according to the meta)
400     Set<ServerName> deadServers = rebuildUserRegions();
401 
402     // This method will assign all user regions if a clean server startup or
403     // it will reconstruct master state and cleanup any leftovers from
404     // previous master process.
405     boolean failover = processDeadServersAndRegionsInTransition(deadServers);
406 
407     recoverTableInDisablingState();
408     recoverTableInEnablingState();
409     LOG.info("Joined the cluster in " + (System.currentTimeMillis()
410       - startTime) + "ms, failover=" + failover);
411   }
412 
413   /**
414    * Process all regions that are in transition in zookeeper and also
415    * processes the list of dead servers by scanning the META.
416    * Used by master joining an cluster.  If we figure this is a clean cluster
417    * startup, will assign all user regions.
418    * @param deadServers
419    *          Map of dead servers and their regions. Can be null.
420    * @throws IOException
421    * @throws InterruptedException
422    */
423   boolean processDeadServersAndRegionsInTransition(final Set<ServerName> deadServers)
424           throws IOException, InterruptedException {
425     boolean failover = !serverManager.getDeadServers().isEmpty();
426     if (failover) {
427       // This may not be a failover actually, especially if meta is on this master.
428       if (LOG.isDebugEnabled()) {
429         LOG.debug("Found dead servers out on cluster " + serverManager.getDeadServers());
430       }
431     } else {
432       // If any one region except meta is assigned, it's a failover.
433       Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
434       for (Map.Entry<HRegionInfo, ServerName> en:
435           regionStates.getRegionAssignments().entrySet()) {
436         HRegionInfo hri = en.getKey();
437         if (!hri.isMetaTable()
438             && onlineServers.contains(en.getValue())) {
439           LOG.debug("Found " + hri + " out on cluster");
440           failover = true;
441           break;
442         }
443       }
444       if (!failover) {
445         // If any region except meta is in transition on a live server, it's a failover.
446         Map<String, RegionState> regionsInTransition = regionStates.getRegionsInTransition();
447         if (!regionsInTransition.isEmpty()) {
448           for (RegionState regionState: regionsInTransition.values()) {
449             if (!regionState.getRegion().isMetaRegion()
450                 && onlineServers.contains(regionState.getServerName())) {
451               LOG.debug("Found " + regionState + " in RITs");
452               failover = true;
453               break;
454             }
455           }
456         }
457       }
458     }
459     if (!failover) {
460       // If we get here, we have a full cluster restart. It is a failover only
461       // if there are some HLogs are not split yet. For meta HLogs, they should have
462       // been split already, if any. We can walk through those queued dead servers,
463       // if they don't have any HLogs, this restart should be considered as a clean one
464       Set<ServerName> queuedDeadServers = serverManager.getRequeuedDeadServers().keySet();
465       if (!queuedDeadServers.isEmpty()) {
466         Configuration conf = server.getConfiguration();
467         Path rootdir = FSUtils.getRootDir(conf);
468         FileSystem fs = rootdir.getFileSystem(conf);
469         for (ServerName serverName: queuedDeadServers) {
470           Path logDir = new Path(rootdir, HLogUtil.getHLogDirectoryName(serverName.toString()));
471           Path splitDir = logDir.suffix(HLog.SPLITTING_EXT);
472           if (fs.exists(logDir) || fs.exists(splitDir)) {
473             LOG.debug("Found queued dead server " + serverName);
474             failover = true;
475             break;
476           }
477         }
478         if (!failover) {
479           // We figured that it's not a failover, so no need to
480           // work on these re-queued dead servers any more.
481           LOG.info("AM figured that it's not a failover and cleaned up "
482             + queuedDeadServers.size() + " queued dead servers");
483           serverManager.removeRequeuedDeadServers();
484         }
485       }
486     }
487 
488     Set<TableName> disabledOrDisablingOrEnabling = null;
489     Map<HRegionInfo, ServerName> allRegions = null;
490 
491     if (!failover) {
492       disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
493         TableState.State.DISABLED, TableState.State.DISABLING,
494         TableState.State.ENABLING);
495 
496       // Clean re/start, mark all user regions closed before reassignment
497       allRegions = regionStates.closeAllUserRegions(
498         disabledOrDisablingOrEnabling);
499     }
500 
501     // Now region states are restored
502     regionStateStore.start();
503 
504     if (failover) {
505       if (deadServers != null && !deadServers.isEmpty()) {
506         for (ServerName serverName: deadServers) {
507           if (!serverManager.isServerDead(serverName)) {
508             serverManager.expireServer(serverName); // Let SSH do region re-assign
509           }
510         }
511       }
512       processRegionsInTransition(regionStates.getRegionsInTransition().values());
513     }
514 
515     // Now we can safely claim failover cleanup completed and enable
516     // ServerShutdownHandler for further processing. The nodes (below)
517     // in transition, if any, are for regions not related to those
518     // dead servers at all, and can be done in parallel to SSH.
519     failoverCleanupDone();
520     if (!failover) {
521       // Fresh cluster startup.
522       LOG.info("Clean cluster startup. Assigning user regions");
523       assignAllUserRegions(allRegions);
524     }
525     // unassign replicas of the split parents and the merged regions
526     // the daughter replicas are opened in assignAllUserRegions if it was
527     // not already opened.
528     for (HRegionInfo h : replicasToClose) {
529       unassign(h);
530     }
531     replicasToClose.clear();
532     return failover;
533   }
534 
535   /**
536    * When a region is closed, it should be removed from the regionsToReopen
537    * @param hri HRegionInfo of the region which was closed
538    */
539   public void removeClosedRegion(HRegionInfo hri) {
540     if (regionsToReopen.remove(hri.getEncodedName()) != null) {
541       LOG.debug("Removed region from reopening regions because it was closed");
542     }
543   }
544 
545   // TODO: processFavoredNodes might throw an exception, for e.g., if the
546   // meta could not be contacted/updated. We need to see how seriously to treat
547   // this problem as. Should we fail the current assignment. We should be able
548   // to recover from this problem eventually (if the meta couldn't be updated
549   // things should work normally and eventually get fixed up).
550   void processFavoredNodes(List<HRegionInfo> regions) throws IOException {
551     if (!shouldAssignRegionsWithFavoredNodes) return;
552     // The AM gets the favored nodes info for each region and updates the meta
553     // table with that info
554     Map<HRegionInfo, List<ServerName>> regionToFavoredNodes =
555         new HashMap<HRegionInfo, List<ServerName>>();
556     for (HRegionInfo region : regions) {
557       regionToFavoredNodes.put(region,
558           ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region));
559     }
560     FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(regionToFavoredNodes,
561       this.server.getShortCircuitConnection());
562   }
563 
564   /**
565    * Marks the region as online.  Removes it from regions in transition and
566    * updates the in-memory assignment information.
567    * <p>
568    * Used when a region has been successfully opened on a region server.
569    * @param regionInfo
570    * @param sn
571    */
572   void regionOnline(HRegionInfo regionInfo, ServerName sn) {
573     regionOnline(regionInfo, sn, HConstants.NO_SEQNUM);
574   }
575 
576   void regionOnline(HRegionInfo regionInfo, ServerName sn, long openSeqNum) {
577     numRegionsOpened.incrementAndGet();
578     regionStates.regionOnline(regionInfo, sn, openSeqNum);
579 
580     // Remove plan if one.
581     clearRegionPlan(regionInfo);
582     balancer.regionOnline(regionInfo, sn);
583 
584     // Tell our listeners that a region was opened
585     sendRegionOpenedNotification(regionInfo, sn);
586   }
587 
588   /**
589    * Marks the region as offline.  Removes it from regions in transition and
590    * removes in-memory assignment information.
591    * <p>
592    * Used when a region has been closed and should remain closed.
593    * @param regionInfo
594    */
595   public void regionOffline(final HRegionInfo regionInfo) {
596     regionOffline(regionInfo, null);
597   }
598 
599   public void offlineDisabledRegion(HRegionInfo regionInfo) {
600     replicasToClose.remove(regionInfo);
601     regionOffline(regionInfo);
602   }
603 
604   // Assignment methods
605 
606   /**
607    * Assigns the specified region.
608    * <p>
609    * If a RegionPlan is available with a valid destination then it will be used
610    * to determine what server region is assigned to.  If no RegionPlan is
611    * available, region will be assigned to a random available server.
612    * <p>
613    * Updates the RegionState and sends the OPEN RPC.
614    * <p>
615    * This will only succeed if the region is in transition and in a CLOSED or
616    * OFFLINE state or not in transition, and of course, the
617    * chosen server is up and running (It may have just crashed!).
618    *
619    * @param region server to be assigned
620    */
621   public void assign(HRegionInfo region) {
622     assign(region, false);
623   }
624 
625   /**
626    * Use care with forceNewPlan. It could cause double assignment.
627    */
628   public void assign(HRegionInfo region, boolean forceNewPlan) {
629     if (isDisabledorDisablingRegionInRIT(region)) {
630       return;
631     }
632     String encodedName = region.getEncodedName();
633     Lock lock = locker.acquireLock(encodedName);
634     try {
635       RegionState state = forceRegionStateToOffline(region, forceNewPlan);
636       if (state != null) {
637         if (regionStates.wasRegionOnDeadServer(encodedName)) {
638           LOG.info("Skip assigning " + region.getRegionNameAsString()
639             + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
640             + " is dead but not processed yet");
641           return;
642         }
643         assign(state, forceNewPlan);
644       }
645     } finally {
646       lock.unlock();
647     }
648   }
649 
650   /**
651    * Bulk assign regions to <code>destination</code>.
652    * @param destination
653    * @param regions Regions to assign.
654    * @return true if successful
655    */
656   boolean assign(final ServerName destination, final List<HRegionInfo> regions)
657     throws InterruptedException {
658     long startTime = EnvironmentEdgeManager.currentTime();
659     try {
660       int regionCount = regions.size();
661       if (regionCount == 0) {
662         return true;
663       }
664       LOG.info("Assigning " + regionCount + " region(s) to " + destination.toString());
665       Set<String> encodedNames = new HashSet<String>(regionCount);
666       for (HRegionInfo region : regions) {
667         encodedNames.add(region.getEncodedName());
668       }
669 
670       List<HRegionInfo> failedToOpenRegions = new ArrayList<HRegionInfo>();
671       Map<String, Lock> locks = locker.acquireLocks(encodedNames);
672       try {
673         Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>(regionCount);
674         List<RegionState> states = new ArrayList<RegionState>(regionCount);
675         for (HRegionInfo region : regions) {
676           String encodedName = region.getEncodedName();
677           if (!isDisabledorDisablingRegionInRIT(region)) {
678             RegionState state = forceRegionStateToOffline(region, false);
679             boolean onDeadServer = false;
680             if (state != null) {
681               if (regionStates.wasRegionOnDeadServer(encodedName)) {
682                 LOG.info("Skip assigning " + region.getRegionNameAsString()
683                   + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
684                   + " is dead but not processed yet");
685                 onDeadServer = true;
686               } else {
687                 RegionPlan plan = new RegionPlan(region, state.getServerName(), destination);
688                 plans.put(encodedName, plan);
689                 states.add(state);
690                 continue;
691               }
692             }
693             // Reassign if the region wasn't on a dead server
694             if (!onDeadServer) {
695               LOG.info("failed to force region state to offline, "
696                 + "will reassign later: " + region);
697               failedToOpenRegions.add(region); // assign individually later
698             }
699           }
700           // Release the lock, this region is excluded from bulk assign because
701           // we can't update its state, or set its znode to offline.
702           Lock lock = locks.remove(encodedName);
703           lock.unlock();
704         }
705 
706         if (server.isStopped()) {
707           return false;
708         }
709 
710         // Add region plans, so we can updateTimers when one region is opened so
711         // that unnecessary timeout on RIT is reduced.
712         this.addPlans(plans);
713 
714         List<Pair<HRegionInfo, List<ServerName>>> regionOpenInfos =
715           new ArrayList<Pair<HRegionInfo, List<ServerName>>>(states.size());
716         for (RegionState state: states) {
717           HRegionInfo region = state.getRegion();
718           regionStates.updateRegionState(
719             region, State.PENDING_OPEN, destination);
720           List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
721           if (this.shouldAssignRegionsWithFavoredNodes) {
722             favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
723           }
724           regionOpenInfos.add(new Pair<HRegionInfo, List<ServerName>>(
725             region, favoredNodes));
726         }
727 
728         // Move on to open regions.
729         try {
730           // Send OPEN RPC. If it fails on a IOE or RemoteException,
731           // regions will be assigned individually.
732           Configuration conf = server.getConfiguration();
733           long maxWaitTime = System.currentTimeMillis() +
734             conf.getLong("hbase.regionserver.rpc.startup.waittime", 60000);
735           for (int i = 1; i <= maximumAttempts && !server.isStopped(); i++) {
736             try {
737               List<RegionOpeningState> regionOpeningStateList = serverManager
738                 .sendRegionOpen(destination, regionOpenInfos);
739               for (int k = 0, n = regionOpeningStateList.size(); k < n; k++) {
740                 RegionOpeningState openingState = regionOpeningStateList.get(k);
741                 if (openingState != RegionOpeningState.OPENED) {
742                   HRegionInfo region = regionOpenInfos.get(k).getFirst();
743                   LOG.info("Got opening state " + openingState
744                     + ", will reassign later: " + region);
745                   // Failed opening this region, reassign it later
746                   forceRegionStateToOffline(region, true);
747                   failedToOpenRegions.add(region);
748                 }
749               }
750               break;
751             } catch (IOException e) {
752               if (e instanceof RemoteException) {
753                 e = ((RemoteException)e).unwrapRemoteException();
754               }
755               if (e instanceof RegionServerStoppedException) {
756                 LOG.warn("The region server was shut down, ", e);
757                 // No need to retry, the region server is a goner.
758                 return false;
759               } else if (e instanceof ServerNotRunningYetException) {
760                 long now = System.currentTimeMillis();
761                 if (now < maxWaitTime) {
762                   if (LOG.isDebugEnabled()) {
763                     LOG.debug("Server is not yet up; waiting up to " +
764                       (maxWaitTime - now) + "ms", e);
765                   }
766                   Thread.sleep(100);
767                   i--; // reset the try count
768                   continue;
769                 }
770               } else if (e instanceof java.net.SocketTimeoutException
771                   && this.serverManager.isServerOnline(destination)) {
772                 // In case socket is timed out and the region server is still online,
773                 // the openRegion RPC could have been accepted by the server and
774                 // just the response didn't go through.  So we will retry to
775                 // open the region on the same server.
776                 if (LOG.isDebugEnabled()) {
777                   LOG.debug("Bulk assigner openRegion() to " + destination
778                     + " has timed out, but the regions might"
779                     + " already be opened on it.", e);
780                 }
781                 // wait and reset the re-try count, server might be just busy.
782                 Thread.sleep(100);
783                 i--;
784                 continue;
785               } else if (e instanceof FailedServerException && i < maximumAttempts) {
786                 // In case the server is in the failed server list, no point to
787                 // retry too soon. Retry after the failed_server_expiry time
788                 long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
789                   RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
790                 if (LOG.isDebugEnabled()) {
791                   LOG.debug(destination + " is on failed server list; waiting "
792                     + sleepTime + "ms", e);
793                 }
794                 Thread.sleep(sleepTime);
795                 continue;
796               }
797               throw e;
798             }
799           }
800         } catch (IOException e) {
801           // Can be a socket timeout, EOF, NoRouteToHost, etc
802           LOG.info("Unable to communicate with " + destination
803             + " in order to assign regions, ", e);
804           for (RegionState state: states) {
805             HRegionInfo region = state.getRegion();
806             forceRegionStateToOffline(region, true);
807           }
808           return false;
809         }
810       } finally {
811         for (Lock lock : locks.values()) {
812           lock.unlock();
813         }
814       }
815 
816       if (!failedToOpenRegions.isEmpty()) {
817         for (HRegionInfo region : failedToOpenRegions) {
818           if (!regionStates.isRegionOnline(region)) {
819             invokeAssign(region);
820           }
821         }
822       }
823       LOG.debug("Bulk assigning done for " + destination);
824       return true;
825     } finally {
826       metricsAssignmentManager.updateBulkAssignTime(EnvironmentEdgeManager.currentTime() - startTime);
827     }
828   }
829 
830   /**
831    * Send CLOSE RPC if the server is online, otherwise, offline the region.
832    *
833    * The RPC will be sent only to the region sever found in the region state
834    * if it is passed in, otherwise, to the src server specified. If region
835    * state is not specified, we don't update region state at all, instead
836    * we just send the RPC call. This is useful for some cleanup without
837    * messing around the region states (see handleRegion, on region opened
838    * on an unexpected server scenario, for an example)
839    */
840   private void unassign(final HRegionInfo region,
841       final ServerName server, final ServerName dest) {
842     for (int i = 1; i <= this.maximumAttempts; i++) {
843       if (this.server.isStopped() || this.server.isAborted()) {
844         LOG.debug("Server stopped/aborted; skipping unassign of " + region);
845         return;
846       }
847       if (!serverManager.isServerOnline(server)) {
848         LOG.debug("Offline " + region.getRegionNameAsString()
849           + ", no need to unassign since it's on a dead server: " + server);
850         regionStates.updateRegionState(region, State.OFFLINE);
851         return;
852       }
853       try {
854         // Send CLOSE RPC
855         if (serverManager.sendRegionClose(server, region, dest)) {
856           LOG.debug("Sent CLOSE to " + server + " for region " +
857             region.getRegionNameAsString());
858           return;
859         }
860         // This never happens. Currently regionserver close always return true.
861         // Todo; this can now happen (0.96) if there is an exception in a coprocessor
862         LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
863           region.getRegionNameAsString());
864       } catch (Throwable t) {
865         if (t instanceof RemoteException) {
866           t = ((RemoteException)t).unwrapRemoteException();
867         }
868         if (t instanceof NotServingRegionException
869             || t instanceof RegionServerStoppedException
870             || t instanceof ServerNotRunningYetException) {
871           LOG.debug("Offline " + region.getRegionNameAsString()
872             + ", it's not any more on " + server, t);
873           regionStates.updateRegionState(region, State.OFFLINE);
874           return;
875         } else if (t instanceof FailedServerException && i < maximumAttempts) {
876           // In case the server is in the failed server list, no point to
877           // retry too soon. Retry after the failed_server_expiry time
878           try {
879             Configuration conf = this.server.getConfiguration();
880             long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
881               RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
882             if (LOG.isDebugEnabled()) {
883               LOG.debug(server + " is on failed server list; waiting "
884                 + sleepTime + "ms", t);
885             }
886             Thread.sleep(sleepTime);
887           } catch (InterruptedException ie) {
888             LOG.warn("Failed to unassign "
889               + region.getRegionNameAsString() + " since interrupted", ie);
890             regionStates.updateRegionState(region, State.FAILED_CLOSE);
891             Thread.currentThread().interrupt();
892             return;
893           }
894         }
895 
896         LOG.info("Server " + server + " returned " + t + " for "
897           + region.getRegionNameAsString() + ", try=" + i
898           + " of " + this.maximumAttempts, t);
899       }
900     }
901     // Run out of attempts
902     regionStates.updateRegionState(region, State.FAILED_CLOSE);
903   }
904 
905   /**
906    * Set region to OFFLINE unless it is opening and forceNewPlan is false.
907    */
908   private RegionState forceRegionStateToOffline(
909       final HRegionInfo region, final boolean forceNewPlan) {
910     RegionState state = regionStates.getRegionState(region);
911     if (state == null) {
912       LOG.warn("Assigning a region not in region states: " + region);
913       state = regionStates.createRegionState(region);
914     }
915 
916     if (forceNewPlan && LOG.isDebugEnabled()) {
917       LOG.debug("Force region state offline " + state);
918     }
919 
920     switch (state.getState()) {
921     case OPEN:
922     case OPENING:
923     case PENDING_OPEN:
924     case CLOSING:
925     case PENDING_CLOSE:
926       if (!forceNewPlan) {
927         LOG.debug("Skip assigning " +
928           region + ", it is already " + state);
929         return null;
930       }
931     case FAILED_CLOSE:
932     case FAILED_OPEN:
933       regionStates.updateRegionState(region, State.PENDING_CLOSE);
934       unassign(region, state.getServerName(), null);
935       state = regionStates.getRegionState(region);
936       if (!state.isOffline() && !state.isClosed()) {
937         // If the region isn't offline, we can't re-assign
938         // it now. It will be assigned automatically after
939         // the regionserver reports it's closed.
940         return null;
941       }
942     case OFFLINE:
943     case CLOSED:
944       break;
945     default:
946       LOG.error("Trying to assign region " + region
947         + ", which is " + state);
948       return null;
949     }
950     return state;
951   }
952 
953   /**
954    * Caller must hold lock on the passed <code>state</code> object.
955    * @param state
956    * @param forceNewPlan
957    */
958   private void assign(RegionState state, boolean forceNewPlan) {
959     long startTime = EnvironmentEdgeManager.currentTime();
960     try {
961       Configuration conf = server.getConfiguration();
962       RegionPlan plan = null;
963       long maxWaitTime = -1;
964       HRegionInfo region = state.getRegion();
965       Throwable previousException = null;
966       for (int i = 1; i <= maximumAttempts; i++) {
967         if (server.isStopped() || server.isAborted()) {
968           LOG.info("Skip assigning " + region.getRegionNameAsString()
969             + ", the server is stopped/aborted");
970           return;
971         }
972         if (plan == null) { // Get a server for the region at first
973           try {
974             plan = getRegionPlan(region, forceNewPlan);
975           } catch (HBaseIOException e) {
976             LOG.warn("Failed to get region plan", e);
977           }
978         }
979         if (plan == null) {
980           LOG.warn("Unable to determine a plan to assign " + region);
981           if (region.isMetaRegion()) {
982             try {
983               Thread.sleep(this.sleepTimeBeforeRetryingMetaAssignment);
984               if (i == maximumAttempts) i = 1;
985               continue;
986             } catch (InterruptedException e) {
987               LOG.error("Got exception while waiting for hbase:meta assignment");
988               Thread.currentThread().interrupt();
989             }
990           }
991           regionStates.updateRegionState(region, State.FAILED_OPEN);
992           return;
993         }
994         // In case of assignment from EnableTableHandler table state is ENABLING. Any how
995         // EnableTableHandler will set ENABLED after assigning all the table regions. If we
996         // try to set to ENABLED directly then client API may think table is enabled.
997         // When we have a case such as all the regions are added directly into hbase:meta and we call
998         // assignRegion then we need to make the table ENABLED. Hence in such case the table
999         // will not be in ENABLING or ENABLED state.
1000         TableName tableName = region.getTable();
1001         if (!tableStateManager.isTableState(tableName,
1002           TableState.State.ENABLED, TableState.State.ENABLING)) {
1003           LOG.debug("Setting table " + tableName + " to ENABLED state.");
1004           setEnabledTable(tableName);
1005         }
1006         LOG.info("Assigning " + region.getRegionNameAsString() +
1007             " to " + plan.getDestination().toString());
1008         // Transition RegionState to PENDING_OPEN
1009        regionStates.updateRegionState(region,
1010           State.PENDING_OPEN, plan.getDestination());
1011 
1012         boolean needNewPlan = false;
1013         final String assignMsg = "Failed assignment of " + region.getRegionNameAsString() +
1014             " to " + plan.getDestination();
1015         try {
1016           List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
1017           if (this.shouldAssignRegionsWithFavoredNodes) {
1018             favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
1019           }
1020           serverManager.sendRegionOpen(plan.getDestination(), region, favoredNodes);
1021           return; // we're done
1022         } catch (Throwable t) {
1023           if (t instanceof RemoteException) {
1024             t = ((RemoteException) t).unwrapRemoteException();
1025           }
1026           previousException = t;
1027 
1028           // Should we wait a little before retrying? If the server is starting it's yes.
1029           boolean hold = (t instanceof ServerNotRunningYetException);
1030 
1031           // In case socket is timed out and the region server is still online,
1032           // the openRegion RPC could have been accepted by the server and
1033           // just the response didn't go through.  So we will retry to
1034           // open the region on the same server.
1035           boolean retry = !hold && (t instanceof java.net.SocketTimeoutException
1036               && this.serverManager.isServerOnline(plan.getDestination()));
1037 
1038           if (hold) {
1039             LOG.warn(assignMsg + ", waiting a little before trying on the same region server " +
1040               "try=" + i + " of " + this.maximumAttempts, t);
1041 
1042             if (maxWaitTime < 0) {
1043               maxWaitTime = EnvironmentEdgeManager.currentTime()
1044                 + this.server.getConfiguration().getLong(
1045                   "hbase.regionserver.rpc.startup.waittime", 60000);
1046             }
1047             try {
1048               long now = EnvironmentEdgeManager.currentTime();
1049               if (now < maxWaitTime) {
1050                 if (LOG.isDebugEnabled()) {
1051                   LOG.debug("Server is not yet up; waiting up to "
1052                     + (maxWaitTime - now) + "ms", t);
1053                 }
1054                 Thread.sleep(100);
1055                 i--; // reset the try count
1056               } else {
1057                 LOG.debug("Server is not up for a while; try a new one", t);
1058                 needNewPlan = true;
1059               }
1060             } catch (InterruptedException ie) {
1061               LOG.warn("Failed to assign "
1062                   + region.getRegionNameAsString() + " since interrupted", ie);
1063               regionStates.updateRegionState(region, State.FAILED_OPEN);
1064               Thread.currentThread().interrupt();
1065               return;
1066             }
1067           } else if (retry) {
1068             i--; // we want to retry as many times as needed as long as the RS is not dead.
1069             if (LOG.isDebugEnabled()) {
1070               LOG.debug(assignMsg + ", trying to assign to the same region server due ", t);
1071             }
1072           } else {
1073             needNewPlan = true;
1074             LOG.warn(assignMsg + ", trying to assign elsewhere instead;" +
1075                 " try=" + i + " of " + this.maximumAttempts, t);
1076           }
1077         }
1078 
1079         if (i == this.maximumAttempts) {
1080           // Don't reset the region state or get a new plan any more.
1081           // This is the last try.
1082           continue;
1083         }
1084 
1085         // If region opened on destination of present plan, reassigning to new
1086         // RS may cause double assignments. In case of RegionAlreadyInTransitionException
1087         // reassigning to same RS.
1088         if (needNewPlan) {
1089           // Force a new plan and reassign. Will return null if no servers.
1090           // The new plan could be the same as the existing plan since we don't
1091           // exclude the server of the original plan, which should not be
1092           // excluded since it could be the only server up now.
1093           RegionPlan newPlan = null;
1094           try {
1095             newPlan = getRegionPlan(region, true);
1096           } catch (HBaseIOException e) {
1097             LOG.warn("Failed to get region plan", e);
1098           }
1099           if (newPlan == null) {
1100             regionStates.updateRegionState(region, State.FAILED_OPEN);
1101             LOG.warn("Unable to find a viable location to assign region " +
1102                 region.getRegionNameAsString());
1103             return;
1104           }
1105 
1106           if (plan != newPlan && !plan.getDestination().equals(newPlan.getDestination())) {
1107             // Clean out plan we failed execute and one that doesn't look like it'll
1108             // succeed anyways; we need a new plan!
1109             // Transition back to OFFLINE
1110             regionStates.updateRegionState(region, State.OFFLINE);
1111             plan = newPlan;
1112           } else if(plan.getDestination().equals(newPlan.getDestination()) &&
1113               previousException instanceof FailedServerException) {
1114             try {
1115               LOG.info("Trying to re-assign " + region.getRegionNameAsString() +
1116                 " to the same failed server.");
1117               Thread.sleep(1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1118                 RpcClient.FAILED_SERVER_EXPIRY_DEFAULT));
1119             } catch (InterruptedException ie) {
1120               LOG.warn("Failed to assign "
1121                   + region.getRegionNameAsString() + " since interrupted", ie);
1122               regionStates.updateRegionState(region, State.FAILED_OPEN);
1123               Thread.currentThread().interrupt();
1124               return;
1125             }
1126           }
1127         }
1128       }
1129       // Run out of attempts
1130       regionStates.updateRegionState(region, State.FAILED_OPEN);
1131     } finally {
1132       metricsAssignmentManager.updateAssignmentTime(EnvironmentEdgeManager.currentTime() - startTime);
1133     }
1134   }
1135 
1136   private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) {
1137     if (this.tableStateManager.isTableState(region.getTable(),
1138             TableState.State.DISABLED,
1139             TableState.State.DISABLING) || replicasToClose.contains(region)) {
1140       LOG.info("Table " + region.getTable() + " is disabled or disabling;"
1141         + " skipping assign of " + region.getRegionNameAsString());
1142       offlineDisabledRegion(region);
1143       return true;
1144     }
1145     return false;
1146   }
1147 
1148   /**
1149    * @param region the region to assign
1150    * @param forceNewPlan If true, then if an existing plan exists, a new plan
1151    * will be generated.
1152    * @return Plan for passed <code>region</code> (If none currently, it creates one or
1153    * if no servers to assign, it returns null).
1154    */
1155   private RegionPlan getRegionPlan(final HRegionInfo region,
1156       final boolean forceNewPlan) throws HBaseIOException {
1157     // Pickup existing plan or make a new one
1158     final String encodedName = region.getEncodedName();
1159     final List<ServerName> destServers =
1160       serverManager.createDestinationServersList();
1161 
1162     if (destServers.isEmpty()){
1163       LOG.warn("Can't move " + encodedName +
1164         ", there is no destination server available.");
1165       return null;
1166     }
1167 
1168     RegionPlan randomPlan = null;
1169     boolean newPlan = false;
1170     RegionPlan existingPlan;
1171 
1172     synchronized (this.regionPlans) {
1173       existingPlan = this.regionPlans.get(encodedName);
1174 
1175       if (existingPlan != null && existingPlan.getDestination() != null) {
1176         LOG.debug("Found an existing plan for " + region.getRegionNameAsString()
1177           + " destination server is " + existingPlan.getDestination() +
1178             " accepted as a dest server = " + destServers.contains(existingPlan.getDestination()));
1179       }
1180 
1181       if (forceNewPlan
1182           || existingPlan == null
1183           || existingPlan.getDestination() == null
1184           || !destServers.contains(existingPlan.getDestination())) {
1185         newPlan = true;
1186         randomPlan = new RegionPlan(region, null,
1187             balancer.randomAssignment(region, destServers));
1188         if (!region.isMetaTable() && shouldAssignRegionsWithFavoredNodes) {
1189           List<HRegionInfo> regions = new ArrayList<HRegionInfo>(1);
1190           regions.add(region);
1191           try {
1192             processFavoredNodes(regions);
1193           } catch (IOException ie) {
1194             LOG.warn("Ignoring exception in processFavoredNodes " + ie);
1195           }
1196         }
1197         this.regionPlans.put(encodedName, randomPlan);
1198       }
1199     }
1200 
1201     if (newPlan) {
1202       if (randomPlan.getDestination() == null) {
1203         LOG.warn("Can't find a destination for " + encodedName);
1204         return null;
1205       }
1206       if (LOG.isDebugEnabled()) {
1207         LOG.debug("No previous transition plan found (or ignoring " +
1208           "an existing plan) for " + region.getRegionNameAsString() +
1209           "; generated random plan=" + randomPlan + "; " + destServers.size() +
1210           " (online=" + serverManager.getOnlineServers().size() +
1211           ") available servers, forceNewPlan=" + forceNewPlan);
1212       }
1213       return randomPlan;
1214     }
1215     if (LOG.isDebugEnabled()) {
1216       LOG.debug("Using pre-existing plan for " +
1217         region.getRegionNameAsString() + "; plan=" + existingPlan);
1218     }
1219     return existingPlan;
1220   }
1221 
1222   /**
1223    * Unassigns the specified region.
1224    * <p>
1225    * Updates the RegionState and sends the CLOSE RPC unless region is being
1226    * split by regionserver; then the unassign fails (silently) because we
1227    * presume the region being unassigned no longer exists (its been split out
1228    * of existence). TODO: What to do if split fails and is rolled back and
1229    * parent is revivified?
1230    * <p>
1231    * If a RegionPlan is already set, it will remain.
1232    *
1233    * @param region server to be unassigned
1234    */
1235   public void unassign(HRegionInfo region) {
1236     unassign(region, null);
1237   }
1238 
1239 
1240   /**
1241    * Unassigns the specified region.
1242    * <p>
1243    * Updates the RegionState and sends the CLOSE RPC unless region is being
1244    * split by regionserver; then the unassign fails (silently) because we
1245    * presume the region being unassigned no longer exists (its been split out
1246    * of existence). TODO: What to do if split fails and is rolled back and
1247    * parent is revivified?
1248    * <p>
1249    * If a RegionPlan is already set, it will remain.
1250    *
1251    * @param region server to be unassigned
1252    * @param dest the destination server of the region
1253    */
1254   public void unassign(HRegionInfo region, ServerName dest) {
1255     // TODO: Method needs refactoring.  Ugly buried returns throughout.  Beware!
1256     LOG.debug("Starting unassign of " + region.getRegionNameAsString()
1257       + " (offlining), current state: " + regionStates.getRegionState(region));
1258 
1259     String encodedName = region.getEncodedName();
1260     // Grab the state of this region and synchronize on it
1261     // We need a lock here as we're going to do a put later and we don't want multiple states
1262     //  creation
1263     ReentrantLock lock = locker.acquireLock(encodedName);
1264     RegionState state = regionStates.getRegionTransitionState(encodedName);
1265     try {
1266       if (state == null || state.isFailedClose()) {
1267         if (state == null) {
1268           // Region is not in transition.
1269           // We can unassign it only if it's not SPLIT/MERGED.
1270           state = regionStates.getRegionState(encodedName);
1271           if (state != null && state.isUnassignable()) {
1272             LOG.info("Attempting to unassign " + state + ", ignored");
1273             // Offline region will be reassigned below
1274             return;
1275           }
1276           if (state == null || state.getServerName() == null) {
1277             // We don't know where the region is, offline it.
1278             // No need to send CLOSE RPC
1279             LOG.warn("Attempting to unassign a region not in RegionStates"
1280               + region.getRegionNameAsString() + ", offlined");
1281             regionOffline(region);
1282             return;
1283           }
1284         }
1285         state = regionStates.updateRegionState(
1286           region, State.PENDING_CLOSE);
1287       } else if (state.isFailedOpen()) {
1288         // The region is not open yet
1289         regionOffline(region);
1290         return;
1291       } else {
1292         LOG.debug("Attempting to unassign " +
1293           region.getRegionNameAsString() + " but it is " +
1294           "already in transition (" + state.getState());
1295         return;
1296       }
1297 
1298       unassign(region, state.getServerName(), dest);
1299     } finally {
1300       lock.unlock();
1301 
1302       // Region is expected to be reassigned afterwards
1303       if (!replicasToClose.contains(region)
1304           && regionStates.isRegionInState(region, State.OFFLINE)) {
1305         assign(region);
1306       }
1307     }
1308   }
1309 
1310   /**
1311    * Used by unit tests. Return the number of regions opened so far in the life
1312    * of the master. Increases by one every time the master opens a region
1313    * @return the counter value of the number of regions opened so far
1314    */
1315   public int getNumRegionsOpened() {
1316     return numRegionsOpened.get();
1317   }
1318 
1319   /**
1320    * Waits until the specified region has completed assignment.
1321    * <p>
1322    * If the region is already assigned, returns immediately.  Otherwise, method
1323    * blocks until the region is assigned.
1324    * @param regionInfo region to wait on assignment for
1325    * @throws InterruptedException
1326    */
1327   public boolean waitForAssignment(HRegionInfo regionInfo)
1328       throws InterruptedException {
1329     while (!regionStates.isRegionOnline(regionInfo)) {
1330       if (regionStates.isRegionInState(regionInfo, State.FAILED_OPEN)
1331           || this.server.isStopped()) {
1332         return false;
1333       }
1334 
1335       // We should receive a notification, but it's
1336       //  better to have a timeout to recheck the condition here:
1337       //  it lowers the impact of a race condition if any
1338       regionStates.waitForUpdate(100);
1339     }
1340     return true;
1341   }
1342 
1343   /**
1344    * Assigns the hbase:meta region.
1345    * <p>
1346    * Assumes that hbase:meta is currently closed and is not being actively served by
1347    * any RegionServer.
1348    */
1349   public void assignMeta() throws KeeperException {
1350     regionStates.updateRegionState(HRegionInfo.FIRST_META_REGIONINFO, State.OFFLINE);
1351     assign(HRegionInfo.FIRST_META_REGIONINFO);
1352   }
1353 
1354   /**
1355    * Assigns specified regions retaining assignments, if any.
1356    * <p>
1357    * This is a synchronous call and will return once every region has been
1358    * assigned.  If anything fails, an exception is thrown
1359    * @throws InterruptedException
1360    * @throws IOException
1361    */
1362   public void assign(Map<HRegionInfo, ServerName> regions)
1363         throws IOException, InterruptedException {
1364     if (regions == null || regions.isEmpty()) {
1365       return;
1366     }
1367     List<ServerName> servers = serverManager.createDestinationServersList();
1368     if (servers == null || servers.isEmpty()) {
1369       throw new IOException("Found no destination server to assign region(s)");
1370     }
1371 
1372     // Reuse existing assignment info
1373     Map<ServerName, List<HRegionInfo>> bulkPlan =
1374       balancer.retainAssignment(regions, servers);
1375 
1376     assign(regions.size(), servers.size(),
1377       "retainAssignment=true", bulkPlan);
1378   }
1379 
1380   /**
1381    * Assigns specified regions round robin, if any.
1382    * <p>
1383    * This is a synchronous call and will return once every region has been
1384    * assigned.  If anything fails, an exception is thrown
1385    * @throws InterruptedException
1386    * @throws IOException
1387    */
1388   public void assign(List<HRegionInfo> regions)
1389         throws IOException, InterruptedException {
1390     if (regions == null || regions.isEmpty()) {
1391       return;
1392     }
1393 
1394     List<ServerName> servers = serverManager.createDestinationServersList();
1395     if (servers == null || servers.isEmpty()) {
1396       throw new IOException("Found no destination server to assign region(s)");
1397     }
1398 
1399     // Generate a round-robin bulk assignment plan
1400     Map<ServerName, List<HRegionInfo>> bulkPlan
1401       = balancer.roundRobinAssignment(regions, servers);
1402     processFavoredNodes(regions);
1403 
1404     assign(regions.size(), servers.size(),
1405       "round-robin=true", bulkPlan);
1406   }
1407 
1408   private void assign(int regions, int totalServers,
1409       String message, Map<ServerName, List<HRegionInfo>> bulkPlan)
1410           throws InterruptedException, IOException {
1411 
1412     int servers = bulkPlan.size();
1413     if (servers == 1 || (regions < bulkAssignThresholdRegions
1414         && servers < bulkAssignThresholdServers)) {
1415 
1416       // Not use bulk assignment.  This could be more efficient in small
1417       // cluster, especially mini cluster for testing, so that tests won't time out
1418       if (LOG.isTraceEnabled()) {
1419         LOG.trace("Not using bulk assignment since we are assigning only " + regions +
1420           " region(s) to " + servers + " server(s)");
1421       }
1422       for (Map.Entry<ServerName, List<HRegionInfo>> plan: bulkPlan.entrySet()) {
1423         if (!assign(plan.getKey(), plan.getValue()) && !server.isStopped()) {
1424           for (HRegionInfo region: plan.getValue()) {
1425             if (!regionStates.isRegionOnline(region)) {
1426               invokeAssign(region);
1427             }
1428           }
1429         }
1430       }
1431     } else {
1432       LOG.info("Bulk assigning " + regions + " region(s) across "
1433         + totalServers + " server(s), " + message);
1434 
1435       // Use fixed count thread pool assigning.
1436       BulkAssigner ba = new GeneralBulkAssigner(
1437         this.server, bulkPlan, this, bulkAssignWaitTillAllAssigned);
1438       ba.bulkAssign();
1439       LOG.info("Bulk assigning done");
1440     }
1441   }
1442 
1443   /**
1444    * Assigns all user regions, if any exist.  Used during cluster startup.
1445    * <p>
1446    * This is a synchronous call and will return once every region has been
1447    * assigned.  If anything fails, an exception is thrown and the cluster
1448    * should be shutdown.
1449    * @throws InterruptedException
1450    * @throws IOException
1451    */
1452   private void assignAllUserRegions(Map<HRegionInfo, ServerName> allRegions)
1453       throws IOException, InterruptedException {
1454     if (allRegions == null || allRegions.isEmpty()) return;
1455 
1456     // Determine what type of assignment to do on startup
1457     boolean retainAssignment = server.getConfiguration().
1458       getBoolean("hbase.master.startup.retainassign", true);
1459 
1460     Set<HRegionInfo> regionsFromMetaScan = allRegions.keySet();
1461     if (retainAssignment) {
1462       assign(allRegions);
1463     } else {
1464       List<HRegionInfo> regions = new ArrayList<HRegionInfo>(regionsFromMetaScan);
1465       assign(regions);
1466     }
1467 
1468     for (HRegionInfo hri : regionsFromMetaScan) {
1469       TableName tableName = hri.getTable();
1470       if (!tableStateManager.isTableState(tableName,
1471               TableState.State.ENABLED)) {
1472         setEnabledTable(tableName);
1473       }
1474     }
1475     // assign all the replicas that were not recorded in the meta
1476     assign(replicaRegionsNotRecordedInMeta(regionsFromMetaScan, (MasterServices)server));
1477   }
1478 
1479   /**
1480    * Get a list of replica regions that are:
1481    * not recorded in meta yet. We might not have recorded the locations
1482    * for the replicas since the replicas may not have been online yet, master restarted
1483    * in the middle of assigning, ZK erased, etc.
1484    * @param regionsRecordedInMeta the list of regions we know are recorded in meta
1485    * either as a default, or, as the location of a replica
1486    * @param master
1487    * @return list of replica regions
1488    * @throws IOException
1489    */
1490   public static List<HRegionInfo> replicaRegionsNotRecordedInMeta(
1491       Set<HRegionInfo> regionsRecordedInMeta, MasterServices master)throws IOException {
1492     List<HRegionInfo> regionsNotRecordedInMeta = new ArrayList<HRegionInfo>();
1493     for (HRegionInfo hri : regionsRecordedInMeta) {
1494       TableName table = hri.getTable();
1495       HTableDescriptor htd = master.getTableDescriptors().get(table);
1496       // look at the HTD for the replica count. That's the source of truth
1497       int desiredRegionReplication = htd.getRegionReplication();
1498       for (int i = 0; i < desiredRegionReplication; i++) {
1499         HRegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hri, i);
1500         if (regionsRecordedInMeta.contains(replica)) continue;
1501         regionsNotRecordedInMeta.add(replica);
1502       }
1503     }
1504     return regionsNotRecordedInMeta;
1505   }
1506 
1507   /**
1508    * Rebuild the list of user regions and assignment information.
1509    * <p>
1510    * Returns a set of servers that are not found to be online that hosted
1511    * some regions.
1512    * @return set of servers not online that hosted some regions per meta
1513    * @throws IOException
1514    */
1515   Set<ServerName> rebuildUserRegions() throws
1516           IOException, KeeperException {
1517     Set<TableName> disabledOrEnablingTables = tableStateManager.getTablesInStates(
1518             TableState.State.DISABLED, TableState.State.ENABLING);
1519 
1520     Set<TableName> disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
1521             TableState.State.DISABLED,
1522             TableState.State.DISABLING,
1523             TableState.State.ENABLING);
1524 
1525     // Region assignment from META
1526     List<Result> results = MetaTableAccessor.fullScanOfMeta(server.getShortCircuitConnection());
1527     // Get any new but slow to checkin region server that joined the cluster
1528     Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
1529     // Set of offline servers to be returned
1530     Set<ServerName> offlineServers = new HashSet<ServerName>();
1531     // Iterate regions in META
1532     for (Result result : results) {
1533       if (result == null && LOG.isDebugEnabled()){
1534         LOG.debug("null result from meta - ignoring but this is strange.");
1535         continue;
1536       }
1537       // keep a track of replicas to close. These were the replicas of the originally
1538       // unmerged regions. The master might have closed them before but it mightn't
1539       // maybe because it crashed.
1540       PairOfSameType<HRegionInfo> p = MetaTableAccessor.getMergeRegions(result);
1541       if (p.getFirst() != null && p.getSecond() != null) {
1542         int numReplicas = ((MasterServices)server).getTableDescriptors().get(p.getFirst().
1543             getTable()).getRegionReplication();
1544         for (HRegionInfo merge : p) {
1545           for (int i = 1; i < numReplicas; i++) {
1546             replicasToClose.add(RegionReplicaUtil.getRegionInfoForReplica(merge, i));
1547           }
1548         }
1549       }
1550       RegionLocations rl =  MetaTableAccessor.getRegionLocations(result);
1551       if (rl == null) continue;
1552       HRegionLocation[] locations = rl.getRegionLocations();
1553       if (locations == null) continue;
1554       for (HRegionLocation hrl : locations) {
1555         HRegionInfo regionInfo = hrl.getRegionInfo();
1556         if (regionInfo == null) continue;
1557         int replicaId = regionInfo.getReplicaId();
1558         State state = RegionStateStore.getRegionState(result, replicaId);
1559         // keep a track of replicas to close. These were the replicas of the split parents
1560         // from the previous life of the master. The master should have closed them before
1561         // but it couldn't maybe because it crashed
1562         if (replicaId == 0 && state.equals(State.SPLIT)) {
1563           for (HRegionLocation h : locations) {
1564             replicasToClose.add(h.getRegionInfo());
1565           }
1566         }
1567         ServerName lastHost = hrl.getServerName();
1568         ServerName regionLocation = RegionStateStore.getRegionServer(result, replicaId);
1569         regionStates.createRegionState(regionInfo, state, regionLocation, lastHost);
1570         if (!regionStates.isRegionInState(regionInfo, State.OPEN)) {
1571           // Region is not open (either offline or in transition), skip
1572           continue;
1573         }
1574         TableName tableName = regionInfo.getTable();
1575         if (!onlineServers.contains(regionLocation)) {
1576           // Region is located on a server that isn't online
1577           offlineServers.add(regionLocation);
1578         } else if (!disabledOrEnablingTables.contains(tableName)) {
1579           // Region is being served and on an active server
1580           // add only if region not in disabled or enabling table
1581           regionStates.regionOnline(regionInfo, regionLocation);
1582           balancer.regionOnline(regionInfo, regionLocation);
1583         }
1584         // need to enable the table if not disabled or disabling or enabling
1585         // this will be used in rolling restarts
1586         if (!disabledOrDisablingOrEnabling.contains(tableName)
1587           && !getTableStateManager().isTableState(tableName,
1588                 TableState.State.ENABLED)) {
1589           setEnabledTable(tableName);
1590         }
1591       }
1592     }
1593     return offlineServers;
1594   }
1595 
1596   /**
1597    * Recover the tables that were not fully moved to DISABLED state. These
1598    * tables are in DISABLING state when the master restarted/switched.
1599    *
1600    * @throws KeeperException
1601    * @throws TableNotFoundException
1602    * @throws IOException
1603    */
1604   private void recoverTableInDisablingState()
1605           throws KeeperException, IOException {
1606     Set<TableName> disablingTables =
1607             tableStateManager.getTablesInStates(TableState.State.DISABLING);
1608     if (disablingTables.size() != 0) {
1609       for (TableName tableName : disablingTables) {
1610         // Recover by calling DisableTableHandler
1611         LOG.info("The table " + tableName
1612             + " is in DISABLING state.  Hence recovering by moving the table"
1613             + " to DISABLED state.");
1614         new DisableTableHandler(this.server, tableName,
1615             this, tableLockManager, true).prepare().process();
1616       }
1617     }
1618   }
1619 
1620   /**
1621    * Recover the tables that are not fully moved to ENABLED state. These tables
1622    * are in ENABLING state when the master restarted/switched
1623    *
1624    * @throws KeeperException
1625    * @throws org.apache.hadoop.hbase.TableNotFoundException
1626    * @throws IOException
1627    */
1628   private void recoverTableInEnablingState()
1629           throws KeeperException, IOException {
1630     Set<TableName> enablingTables = tableStateManager.
1631             getTablesInStates(TableState.State.ENABLING);
1632     if (enablingTables.size() != 0) {
1633       for (TableName tableName : enablingTables) {
1634         // Recover by calling EnableTableHandler
1635         LOG.info("The table " + tableName
1636             + " is in ENABLING state.  Hence recovering by moving the table"
1637             + " to ENABLED state.");
1638         // enableTable in sync way during master startup,
1639         // no need to invoke coprocessor
1640         EnableTableHandler eth = new EnableTableHandler(this.server, tableName,
1641           this, tableLockManager, true);
1642         try {
1643           eth.prepare();
1644         } catch (TableNotFoundException e) {
1645           LOG.warn("Table " + tableName + " not found in hbase:meta to recover.");
1646           continue;
1647         }
1648         eth.process();
1649       }
1650     }
1651   }
1652 
1653   /**
1654    * Processes list of regions in transition at startup
1655    */
1656   void processRegionsInTransition(Collection<RegionState> regionStates) {
1657     // We need to send RPC call again for PENDING_OPEN/PENDING_CLOSE regions
1658     // in case the RPC call is not sent out yet before the master was shut down
1659     // since we update the state before we send the RPC call. We can't update
1660     // the state after the RPC call. Otherwise, we don't know what's happened
1661     // to the region if the master dies right after the RPC call is out.
1662     for (RegionState regionState: regionStates) {
1663       if (!serverManager.isServerOnline(regionState.getServerName())) {
1664         continue; // SSH will handle it
1665       }
1666       RegionState.State state = regionState.getState();
1667       LOG.info("Processing " + regionState);
1668       switch (state) {
1669       case CLOSED:
1670         invokeAssign(regionState.getRegion());
1671         break;
1672       case PENDING_OPEN:
1673         retrySendRegionOpen(regionState);
1674         break;
1675       case PENDING_CLOSE:
1676         retrySendRegionClose(regionState);
1677         break;
1678       default:
1679         // No process for other states
1680       }
1681     }
1682   }
1683 
1684   /**
1685    * At master failover, for pending_open region, make sure
1686    * sendRegionOpen RPC call is sent to the target regionserver
1687    */
1688   private void retrySendRegionOpen(final RegionState regionState) {
1689     this.executorService.submit(
1690       new EventHandler(server, EventType.M_MASTER_RECOVERY) {
1691         @Override
1692         public void process() throws IOException {
1693           HRegionInfo hri = regionState.getRegion();
1694           ServerName serverName = regionState.getServerName();
1695           ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
1696           try {
1697             if (!regionState.equals(regionStates.getRegionState(hri))) {
1698               return; // Region is not in the expected state any more
1699             }
1700             for (int i = 1; i <= maximumAttempts; i++) {
1701               if (!serverManager.isServerOnline(serverName)
1702                   || server.isStopped() || server.isAborted()) {
1703                 return; // No need any more
1704               }
1705               try {
1706                 List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
1707                 if (shouldAssignRegionsWithFavoredNodes) {
1708                   favoredNodes = ((FavoredNodeLoadBalancer)balancer).getFavoredNodes(hri);
1709                 }
1710                 serverManager.sendRegionOpen(serverName, hri, favoredNodes);
1711                 return; // we're done
1712               } catch (Throwable t) {
1713                 if (t instanceof RemoteException) {
1714                   t = ((RemoteException) t).unwrapRemoteException();
1715                 }
1716                 if (t instanceof FailedServerException && i < maximumAttempts) {
1717                   // In case the server is in the failed server list, no point to
1718                   // retry too soon. Retry after the failed_server_expiry time
1719                   try {
1720                     Configuration conf = this.server.getConfiguration();
1721                     long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1722                       RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
1723                     if (LOG.isDebugEnabled()) {
1724                       LOG.debug(serverName + " is on failed server list; waiting "
1725                         + sleepTime + "ms", t);
1726                     }
1727                     Thread.sleep(sleepTime);
1728                     continue;
1729                   } catch (InterruptedException ie) {
1730                     LOG.warn("Failed to assign "
1731                       + hri.getRegionNameAsString() + " since interrupted", ie);
1732                     regionStates.updateRegionState(hri, State.FAILED_OPEN);
1733                     Thread.currentThread().interrupt();
1734                     return;
1735                   }
1736                 }
1737                 if (serverManager.isServerOnline(serverName)
1738                     && t instanceof java.net.SocketTimeoutException) {
1739                   i--; // reset the try count
1740                 } else {
1741                   LOG.info("Got exception in retrying sendRegionOpen for "
1742                     + regionState + "; try=" + i + " of " + maximumAttempts, t);
1743                 }
1744                 Threads.sleep(100);
1745               }
1746             }
1747             // Run out of attempts
1748             regionStates.updateRegionState(hri, State.FAILED_OPEN);
1749           } finally {
1750             lock.unlock();
1751           }
1752         }
1753       });
1754   }
1755 
1756   /**
1757    * At master failover, for pending_close region, make sure
1758    * sendRegionClose RPC call is sent to the target regionserver
1759    */
1760   private void retrySendRegionClose(final RegionState regionState) {
1761     this.executorService.submit(
1762       new EventHandler(server, EventType.M_MASTER_RECOVERY) {
1763         @Override
1764         public void process() throws IOException {
1765           HRegionInfo hri = regionState.getRegion();
1766           ServerName serverName = regionState.getServerName();
1767           ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
1768           try {
1769             if (!regionState.equals(regionStates.getRegionState(hri))) {
1770               return; // Region is not in the expected state any more
1771             }
1772             for (int i = 1; i <= maximumAttempts; i++) {
1773               if (!serverManager.isServerOnline(serverName)
1774                   || server.isStopped() || server.isAborted()) {
1775                 return; // No need any more
1776               }
1777               try {
1778                 serverManager.sendRegionClose(serverName, hri, null);
1779                 return; // Done.
1780               } catch (Throwable t) {
1781                 if (t instanceof RemoteException) {
1782                   t = ((RemoteException) t).unwrapRemoteException();
1783                 }
1784                 if (t instanceof FailedServerException && i < maximumAttempts) {
1785                   // In case the server is in the failed server list, no point to
1786                   // retry too soon. Retry after the failed_server_expiry time
1787                   try {
1788                     Configuration conf = this.server.getConfiguration();
1789                     long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1790                       RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
1791                     if (LOG.isDebugEnabled()) {
1792                       LOG.debug(serverName + " is on failed server list; waiting "
1793                         + sleepTime + "ms", t);
1794                     }
1795                     Thread.sleep(sleepTime);
1796                     continue;
1797                   } catch (InterruptedException ie) {
1798                     LOG.warn("Failed to unassign "
1799                       + hri.getRegionNameAsString() + " since interrupted", ie);
1800                     regionStates.updateRegionState(hri, RegionState.State.FAILED_CLOSE);
1801                     Thread.currentThread().interrupt();
1802                     return;
1803                   }
1804                 }
1805                 if (serverManager.isServerOnline(serverName)
1806                     && t instanceof java.net.SocketTimeoutException) {
1807                   i--; // reset the try count
1808                 } else {
1809                   LOG.info("Got exception in retrying sendRegionClose for "
1810                     + regionState + "; try=" + i + " of " + maximumAttempts, t);
1811                 }
1812                 Threads.sleep(100);
1813               }
1814             }
1815             // Run out of attempts
1816             regionStates.updateRegionState(hri, State.FAILED_CLOSE);
1817           } finally {
1818             lock.unlock();
1819           }
1820         }
1821       });
1822   }
1823 
1824   /**
1825    * Set Regions in transitions metrics.
1826    * This takes an iterator on the RegionInTransition map (CLSM), and is not synchronized.
1827    * This iterator is not fail fast, which may lead to stale read; but that's better than
1828    * creating a copy of the map for metrics computation, as this method will be invoked
1829    * on a frequent interval.
1830    */
1831   public void updateRegionsInTransitionMetrics() {
1832     long currentTime = System.currentTimeMillis();
1833     int totalRITs = 0;
1834     int totalRITsOverThreshold = 0;
1835     long oldestRITTime = 0;
1836     int ritThreshold = this.server.getConfiguration().
1837       getInt(HConstants.METRICS_RIT_STUCK_WARNING_THRESHOLD, 60000);
1838     for (RegionState state: regionStates.getRegionsInTransition().values()) {
1839       totalRITs++;
1840       long ritTime = currentTime - state.getStamp();
1841       if (ritTime > ritThreshold) { // more than the threshold
1842         totalRITsOverThreshold++;
1843       }
1844       if (oldestRITTime < ritTime) {
1845         oldestRITTime = ritTime;
1846       }
1847     }
1848     if (this.metricsAssignmentManager != null) {
1849       this.metricsAssignmentManager.updateRITOldestAge(oldestRITTime);
1850       this.metricsAssignmentManager.updateRITCount(totalRITs);
1851       this.metricsAssignmentManager.updateRITCountOverThreshold(totalRITsOverThreshold);
1852     }
1853   }
1854 
1855   /**
1856    * @param region Region whose plan we are to clear.
1857    */
1858   private void clearRegionPlan(final HRegionInfo region) {
1859     synchronized (this.regionPlans) {
1860       this.regionPlans.remove(region.getEncodedName());
1861     }
1862   }
1863 
1864   /**
1865    * Wait on region to clear regions-in-transition.
1866    * @param hri Region to wait on.
1867    * @throws IOException
1868    */
1869   public void waitOnRegionToClearRegionsInTransition(final HRegionInfo hri)
1870       throws IOException, InterruptedException {
1871     waitOnRegionToClearRegionsInTransition(hri, -1L);
1872   }
1873 
1874   /**
1875    * Wait on region to clear regions-in-transition or time out
1876    * @param hri
1877    * @param timeOut Milliseconds to wait for current region to be out of transition state.
1878    * @return True when a region clears regions-in-transition before timeout otherwise false
1879    * @throws InterruptedException
1880    */
1881   public boolean waitOnRegionToClearRegionsInTransition(final HRegionInfo hri, long timeOut)
1882       throws InterruptedException {
1883     if (!regionStates.isRegionInTransition(hri)) return true;
1884     long end = (timeOut <= 0) ? Long.MAX_VALUE : EnvironmentEdgeManager.currentTime()
1885         + timeOut;
1886     // There is already a timeout monitor on regions in transition so I
1887     // should not have to have one here too?
1888     LOG.info("Waiting for " + hri.getEncodedName() +
1889         " to leave regions-in-transition, timeOut=" + timeOut + " ms.");
1890     while (!this.server.isStopped() && regionStates.isRegionInTransition(hri)) {
1891       regionStates.waitForUpdate(100);
1892       if (EnvironmentEdgeManager.currentTime() > end) {
1893         LOG.info("Timed out on waiting for " + hri.getEncodedName() + " to be assigned.");
1894         return false;
1895       }
1896     }
1897     if (this.server.isStopped()) {
1898       LOG.info("Giving up wait on regions in transition because stoppable.isStopped is set");
1899       return false;
1900     }
1901     return true;
1902   }
1903 
1904   void invokeAssign(HRegionInfo regionInfo) {
1905     threadPoolExecutorService.submit(new AssignCallable(this, regionInfo));
1906   }
1907 
1908   void invokeUnAssign(HRegionInfo regionInfo) {
1909     threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
1910   }
1911 
1912   public boolean isCarryingMeta(ServerName serverName) {
1913     return isCarryingRegion(serverName, HRegionInfo.FIRST_META_REGIONINFO);
1914   }
1915 
1916   /**
1917    * Check if the shutdown server carries the specific region.
1918    * @return whether the serverName currently hosts the region
1919    */
1920   private boolean isCarryingRegion(ServerName serverName, HRegionInfo hri) {
1921     RegionState regionState = regionStates.getRegionTransitionState(hri);
1922     ServerName transitionAddr = regionState != null? regionState.getServerName(): null;
1923     if (transitionAddr != null) {
1924       boolean matchTransitionAddr = transitionAddr.equals(serverName);
1925       LOG.debug("Checking region=" + hri.getRegionNameAsString()
1926         + ", transitioning on server=" + matchTransitionAddr
1927         + " server being checked: " + serverName
1928         + ", matches=" + matchTransitionAddr);
1929       return matchTransitionAddr;
1930     }
1931 
1932     ServerName assignedAddr = regionStates.getRegionServerOfRegion(hri);
1933     boolean matchAssignedAddr = serverName.equals(assignedAddr);
1934     LOG.debug("based on AM, current region=" + hri.getRegionNameAsString()
1935       + " is on server=" + assignedAddr + ", server being checked: "
1936       + serverName);
1937     return matchAssignedAddr;
1938   }
1939 
1940   /**
1941    * Process shutdown server removing any assignments.
1942    * @param sn Server that went down.
1943    * @return list of regions in transition on this server
1944    */
1945   public List<HRegionInfo> processServerShutdown(final ServerName sn) {
1946     // Clean out any existing assignment plans for this server
1947     synchronized (this.regionPlans) {
1948       for (Iterator <Map.Entry<String, RegionPlan>> i =
1949           this.regionPlans.entrySet().iterator(); i.hasNext();) {
1950         Map.Entry<String, RegionPlan> e = i.next();
1951         ServerName otherSn = e.getValue().getDestination();
1952         // The name will be null if the region is planned for a random assign.
1953         if (otherSn != null && otherSn.equals(sn)) {
1954           // Use iterator's remove else we'll get CME
1955           i.remove();
1956         }
1957       }
1958     }
1959     List<HRegionInfo> rits = regionStates.serverOffline(sn);
1960     for (Iterator<HRegionInfo> it = rits.iterator(); it.hasNext(); ) {
1961       HRegionInfo hri = it.next();
1962       String encodedName = hri.getEncodedName();
1963 
1964       // We need a lock on the region as we could update it
1965       Lock lock = locker.acquireLock(encodedName);
1966       try {
1967         RegionState regionState =
1968           regionStates.getRegionTransitionState(encodedName);
1969         if (regionState == null
1970             || (regionState.getServerName() != null && !regionState.isOnServer(sn))
1971             || !RegionStates.isOneOfStates(regionState, State.PENDING_OPEN,
1972                 State.OPENING, State.FAILED_OPEN, State.FAILED_CLOSE, State.OFFLINE)) {
1973           LOG.info("Skip " + regionState + " since it is not opening/failed_close"
1974             + " on the dead server any more: " + sn);
1975           it.remove();
1976         } else {
1977           if (tableStateManager.isTableState(hri.getTable(),
1978                   TableState.State.DISABLED, TableState.State.DISABLING)) {
1979             regionStates.regionOffline(hri);
1980             it.remove();
1981             continue;
1982           }
1983           // Mark the region offline and assign it again by SSH
1984           regionStates.updateRegionState(hri, State.OFFLINE);
1985         }
1986       } finally {
1987         lock.unlock();
1988       }
1989     }
1990     return rits;
1991   }
1992 
1993   /**
1994    * @param plan Plan to execute.
1995    */
1996   public void balance(final RegionPlan plan) {
1997     HRegionInfo hri = plan.getRegionInfo();
1998     TableName tableName = hri.getTable();
1999     if (tableStateManager.isTableState(tableName,
2000             TableState.State.DISABLED, TableState.State.DISABLING)) {
2001       LOG.info("Ignored moving region of disabling/disabled table "
2002         + tableName);
2003       return;
2004     }
2005 
2006     // Move the region only if it's assigned
2007     String encodedName = hri.getEncodedName();
2008     ReentrantLock lock = locker.acquireLock(encodedName);
2009     try {
2010       if (!regionStates.isRegionOnline(hri)) {
2011         RegionState state = regionStates.getRegionState(encodedName);
2012         LOG.info("Ignored moving region not assigned: " + hri + ", "
2013           + (state == null ? "not in region states" : state));
2014         return;
2015       }
2016       synchronized (this.regionPlans) {
2017         this.regionPlans.put(plan.getRegionName(), plan);
2018       }
2019       unassign(hri, plan.getDestination());
2020     } finally {
2021       lock.unlock();
2022     }
2023   }
2024 
2025   public void stop() {
2026     // Shutdown the threadpool executor service
2027     threadPoolExecutorService.shutdownNow();
2028     regionStateStore.stop();
2029   }
2030 
2031   protected void setEnabledTable(TableName tableName) {
2032     try {
2033       this.tableStateManager.setTableState(tableName,
2034               TableState.State.ENABLED);
2035     } catch (IOException e) {
2036       // here we can abort as it is the start up flow
2037       String errorMsg = "Unable to ensure that the table " + tableName
2038           + " will be" + " enabled because of a ZooKeeper issue";
2039       LOG.error(errorMsg);
2040       this.server.abort(errorMsg, e);
2041     }
2042   }
2043 
2044   private String onRegionFailedOpen(final RegionState current,
2045       final HRegionInfo hri, final ServerName serverName) {
2046     // The region must be opening on this server.
2047     // If current state is failed_open on the same server,
2048     // it could be a reportRegionTransition RPC retry.
2049     if (current == null || !current.isOnServer(serverName)
2050         || !(current.isOpening() || current.isFailedOpen())) {
2051       return hri.getShortNameToLog() + " is not opening on " + serverName;
2052     }
2053 
2054     if (current.isFailedOpen()) {
2055       return null;
2056     }
2057 
2058     String encodedName = hri.getEncodedName();
2059     AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
2060     if (failedOpenCount == null) {
2061       failedOpenCount = new AtomicInteger();
2062       // No need to use putIfAbsent, or extra synchronization since
2063       // this whole handleRegion block is locked on the encoded region
2064       // name, and failedOpenTracker is updated only in this block
2065       failedOpenTracker.put(encodedName, failedOpenCount);
2066     }
2067     if (failedOpenCount.incrementAndGet() >= maximumAttempts) {
2068       regionStates.updateRegionState(hri, State.FAILED_OPEN);
2069       // remove the tracking info to save memory, also reset
2070       // the count for next open initiative
2071       failedOpenTracker.remove(encodedName);
2072     } else {
2073       // Handle this the same as if it were opened and then closed.
2074       RegionState regionState = regionStates.updateRegionState(hri, State.CLOSED);
2075       if (regionState != null) {
2076         // When there are more than one region server a new RS is selected as the
2077         // destination and the same is updated in the region plan. (HBASE-5546)
2078         if (getTableStateManager().isTableState(hri.getTable(),
2079                 TableState.State.DISABLED, TableState.State.DISABLING) ||
2080                 replicasToClose.contains(hri)) {
2081           offlineDisabledRegion(hri);
2082           return null;
2083         }
2084         regionStates.updateRegionState(hri, RegionState.State.CLOSED);
2085         // This below has to do w/ online enable/disable of a table
2086         removeClosedRegion(hri);
2087         try {
2088           getRegionPlan(hri, true);
2089         } catch (HBaseIOException e) {
2090           LOG.warn("Failed to get region plan", e);
2091         }
2092         invokeAssign(hri);
2093       }
2094     }
2095     // Null means no error
2096     return null;
2097   }
2098 
2099   private String onRegionOpen(final RegionState current, final HRegionInfo hri,
2100       final ServerName serverName, final RegionStateTransition transition) {
2101     // The region must be opening on this server.
2102     // If current state is already opened on the same server,
2103     // it could be a reportRegionTransition RPC retry.
2104     if (current == null || !current.isOnServer(serverName)
2105         || !(current.isOpening() || current.isOpened())) {
2106       return hri.getShortNameToLog() + " is not opening on " + serverName;
2107     }
2108 
2109     if (current.isOpened()) {
2110       return null;
2111     }
2112 
2113     long openSeqNum = transition.hasOpenSeqNum()
2114       ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM;
2115     if (openSeqNum < 0) {
2116       return "Newly opened region has invalid open seq num " + openSeqNum;
2117     }
2118     regionOnline(hri, serverName, openSeqNum);
2119 
2120     // reset the count, if any
2121     failedOpenTracker.remove(hri.getEncodedName());
2122     if (getTableStateManager().isTableState(hri.getTable(),
2123             TableState.State.DISABLED, TableState.State.DISABLING)) {
2124       invokeUnAssign(hri);
2125     }
2126     return null;
2127   }
2128 
2129   private String onRegionClosed(final RegionState current,
2130       final HRegionInfo hri, final ServerName serverName) {
2131     // We didn't check if the region is already closed/offline on the server
2132     // as we did for other transitions to handle reportRegionTransition RPC retry.
2133     // There are two reasons. 1. Closed/offline states are transient. Region will be
2134     // usually assigned right after closed. When a RPC retry comes in, the region may
2135     // already have moved away from closed state. 2. On the region server side, we
2136     // don't care much about the response for this transition. We only make sure
2137     // master has got and processed this report, either successfully or not.
2138     if (current == null || !current.isOnServer(serverName) || !current.isClosing()) {
2139       return hri.getShortNameToLog() + " is not closing on " + serverName;
2140     }
2141     if (getTableStateManager().isTableState(hri.getTable(), TableState.State.DISABLED,
2142         TableState.State.DISABLING) || replicasToClose.contains(hri)) {
2143       offlineDisabledRegion(hri);
2144       return null;
2145     }
2146 
2147     regionStates.updateRegionState(hri, RegionState.State.CLOSED);
2148     sendRegionClosedNotification(hri);
2149     // This below has to do w/ online enable/disable of a table
2150     removeClosedRegion(hri);
2151     invokeAssign(hri);
2152     return null;
2153   }
2154 
2155   private String onRegionReadyToSplit(final RegionState current, final HRegionInfo hri,
2156       final ServerName serverName, final RegionStateTransition transition) {
2157     // The region must be opened on this server.
2158     // If current state is already splitting on the same server,
2159     // it could be a reportRegionTransition RPC retry.
2160     if (current == null || !current.isOnServer(serverName)
2161         || !(current.isOpened() || current.isSplitting())) {
2162       return hri.getShortNameToLog() + " is not opening on " + serverName;
2163     }
2164 
2165     if (current.isSplitting()) {
2166       return null;
2167     }
2168 
2169     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2170     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2171     RegionState rs_a = regionStates.getRegionState(a);
2172     RegionState rs_b = regionStates.getRegionState(b);
2173     if (rs_a != null || rs_b != null) {
2174       return "Some daughter is already existing. "
2175         + "a=" + rs_a + ", b=" + rs_b;
2176     }
2177 
2178     // Server holding is not updated at this stage.
2179     // It is done after PONR.
2180     regionStates.updateRegionState(hri, State.SPLITTING);
2181     regionStates.createRegionState(
2182       a, State.SPLITTING_NEW, serverName, null);
2183     regionStates.createRegionState(
2184       b, State.SPLITTING_NEW, serverName, null);
2185     return null;
2186   }
2187 
2188   private String onRegionSplitPONR(final RegionState current, final HRegionInfo hri,
2189       final ServerName serverName, final RegionStateTransition transition) {
2190     // The region must be splitting on this server, and the daughters must be in
2191     // splitting_new state. To check RPC retry, we use server holding info.
2192     if (current == null || !current.isOnServer(serverName) || !current.isSplitting()) {
2193       return hri.getShortNameToLog() + " is not splitting on " + serverName;
2194     }
2195 
2196     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2197     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2198     RegionState rs_a = regionStates.getRegionState(a);
2199     RegionState rs_b = regionStates.getRegionState(b);
2200     if (rs_a == null || rs_b == null || !rs_a.isOnServer(serverName)
2201         || !rs_b.isOnServer(serverName) || !rs_a.isSplittingNew()
2202         || !rs_b.isSplittingNew()) {
2203       return "Some daughter is not known to be splitting on " + serverName
2204         + ", a=" + rs_a + ", b=" + rs_b;
2205     }
2206 
2207     if (!regionStates.isRegionOnServer(hri, serverName)) {
2208       return null;
2209     }
2210 
2211     try {
2212       regionStates.splitRegion(hri, a, b, serverName);
2213     } catch (IOException ioe) {
2214       LOG.info("Failed to record split region " + hri.getShortNameToLog());
2215       return "Failed to record the splitting in meta";
2216     }
2217     return null;
2218   }
2219 
2220   private String onRegionSplit(final RegionState current, final HRegionInfo hri,
2221       final ServerName serverName, final RegionStateTransition transition) {
2222     // The region must be splitting on this server, and the daughters must be in
2223     // splitting_new state.
2224     // If current state is already split on the same server,
2225     // it could be a reportRegionTransition RPC retry.
2226     if (current == null || !current.isOnServer(serverName)
2227         || !(current.isSplitting() || current.isSplit())) {
2228       return hri.getShortNameToLog() + " is not splitting on " + serverName;
2229     }
2230 
2231     if (current.isSplit()) {
2232       return null;
2233     }
2234 
2235     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2236     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2237     RegionState rs_a = regionStates.getRegionState(a);
2238     RegionState rs_b = regionStates.getRegionState(b);
2239     if (rs_a == null || rs_b == null || !rs_a.isOnServer(serverName)
2240         || !rs_b.isOnServer(serverName) || !rs_a.isSplittingNew()
2241         || !rs_b.isSplittingNew()) {
2242       return "Some daughter is not known to be splitting on " + serverName
2243         + ", a=" + rs_a + ", b=" + rs_b;
2244     }
2245 
2246     if (TEST_SKIP_SPLIT_HANDLING) {
2247       return "Skipping split message, TEST_SKIP_SPLIT_HANDLING is set";
2248     }
2249     regionOffline(hri, State.SPLIT);
2250     regionOnline(a, serverName, 1);
2251     regionOnline(b, serverName, 1);
2252 
2253     // User could disable the table before master knows the new region.
2254     if (getTableStateManager().isTableState(hri.getTable(),
2255         TableState.State.DISABLED, TableState.State.DISABLING)) {
2256       invokeUnAssign(a);
2257       invokeUnAssign(b);
2258     } else {
2259       Callable<Object> splitReplicasCallable = new Callable<Object>() {
2260         @Override
2261         public Object call() {
2262           doSplittingOfReplicas(hri, a, b);
2263           return null;
2264         }
2265       };
2266       threadPoolExecutorService.submit(splitReplicasCallable);
2267     }
2268     return null;
2269   }
2270 
2271   private String onRegionSplitReverted(final RegionState current, final HRegionInfo hri,
2272       final ServerName serverName, final RegionStateTransition transition) {
2273     // The region must be splitting on this server, and the daughters must be in
2274     // splitting_new state.
2275     // If the region is in open state, it could be an RPC retry.
2276     if (current == null || !current.isOnServer(serverName)
2277         || !(current.isSplitting() || current.isOpened())) {
2278       return hri.getShortNameToLog() + " is not splitting on " + serverName;
2279     }
2280 
2281     if (current.isOpened()) {
2282       return null;
2283     }
2284 
2285     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2286     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2287     RegionState rs_a = regionStates.getRegionState(a);
2288     RegionState rs_b = regionStates.getRegionState(b);
2289     if (rs_a == null || rs_b == null || !rs_a.isOnServer(serverName)
2290         || !rs_b.isOnServer(serverName) || !rs_a.isSplittingNew()
2291         || !rs_b.isSplittingNew()) {
2292       return "Some daughter is not known to be splitting on " + serverName
2293         + ", a=" + rs_a + ", b=" + rs_b;
2294     }
2295 
2296     regionOnline(hri, serverName);
2297     regionOffline(a);
2298     regionOffline(b);
2299     if (getTableStateManager().isTableState(hri.getTable(),
2300         TableState.State.DISABLED, TableState.State.DISABLING)) {
2301       invokeUnAssign(hri);
2302     }
2303     return null;
2304   }
2305 
2306   private String onRegionReadyToMerge(final RegionState current, final HRegionInfo hri,
2307       final ServerName serverName, final RegionStateTransition transition) {
2308     // The region must be new, and the daughters must be open on this server.
2309     // If the region is in merge_new state, it could be an RPC retry.
2310     if (current != null && (!current.isOnServer(serverName)
2311         || !current.isMergingNew())) {
2312       return "Merging daughter region already exists, p=" + current;
2313     }
2314 
2315     if (current != null) {
2316       return null;
2317     }
2318 
2319     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2320     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2321     Set<String> encodedNames = new HashSet<String>(2);
2322     encodedNames.add(a.getEncodedName());
2323     encodedNames.add(b.getEncodedName());
2324     Map<String, Lock> locks = locker.acquireLocks(encodedNames);
2325     try {
2326       RegionState rs_a = regionStates.getRegionState(a);
2327       RegionState rs_b = regionStates.getRegionState(b);
2328       if (rs_a == null || rs_b == null || !rs_a.isOnServer(serverName)
2329           || !rs_b.isOnServer(serverName) || !rs_a.isOpened()
2330           || !rs_b.isOpened()) {
2331         return "Some daughter is not in a state to merge on " + serverName
2332           + ", a=" + rs_a + ", b=" + rs_b;
2333       }
2334 
2335       regionStates.updateRegionState(a, State.MERGING);
2336       regionStates.updateRegionState(b, State.MERGING);
2337       regionStates.createRegionState(
2338         hri, State.MERGING_NEW, serverName, null);
2339       return null;
2340     } finally {
2341       for (Lock lock: locks.values()) {
2342         lock.unlock();
2343       }
2344     }
2345   }
2346 
2347   private String onRegionMergePONR(final RegionState current, final HRegionInfo hri,
2348       final ServerName serverName, final RegionStateTransition transition) {
2349     // The region must be in merging_new state, and the daughters must be
2350     // merging. To check RPC retry, we use server holding info.
2351     if (current == null || !current.isOnServer(serverName) || !current.isMergingNew()) {
2352       return hri.getShortNameToLog() + " is not merging on " + serverName;
2353     }
2354 
2355     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2356     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2357     RegionState rs_a = regionStates.getRegionState(a);
2358     RegionState rs_b = regionStates.getRegionState(b);
2359     if (rs_a == null || rs_b == null || !rs_a.isOnServer(serverName)
2360         || !rs_b.isOnServer(serverName) || !rs_a.isMerging()
2361         || !rs_b.isMerging()) {
2362       return "Some daughter is not known to be merging on " + serverName
2363         + ", a=" + rs_a + ", b=" + rs_b;
2364     }
2365 
2366     if (regionStates.isRegionOnServer(hri, serverName)) {
2367       return null;
2368     }
2369 
2370     try {
2371       regionStates.mergeRegions(hri, a, b, serverName);
2372     } catch (IOException ioe) {
2373       LOG.info("Failed to record merged region " + hri.getShortNameToLog());
2374       return "Failed to record the merging in meta";
2375     }
2376     return null;
2377   }
2378 
2379   private String onRegionMerged(final RegionState current, final HRegionInfo hri,
2380       final ServerName serverName, final RegionStateTransition transition) {
2381     // The region must be in merging_new state, and the daughters must be
2382     // merging on this server.
2383     // If current state is already opened on the same server,
2384     // it could be a reportRegionTransition RPC retry.
2385     if (current == null || !current.isOnServer(serverName)
2386         || !(current.isMergingNew() || current.isOpened())) {
2387       return hri.getShortNameToLog() + " is not merging on " + serverName;
2388     }
2389 
2390     if (current.isOpened()) {
2391       return null;
2392     }
2393 
2394     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2395     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2396     RegionState rs_a = regionStates.getRegionState(a);
2397     RegionState rs_b = regionStates.getRegionState(b);
2398     if (rs_a == null || rs_b == null || !rs_a.isOnServer(serverName)
2399         || !rs_b.isOnServer(serverName) || !rs_a.isMerging()
2400         || !rs_b.isMerging()) {
2401       return "Some daughter is not known to be merging on " + serverName
2402         + ", a=" + rs_a + ", b=" + rs_b;
2403     }
2404 
2405     regionOffline(a, State.MERGED);
2406     regionOffline(b, State.MERGED);
2407     regionOnline(hri, serverName, 1);
2408 
2409     // User could disable the table before master knows the new region.
2410     if (getTableStateManager().isTableState(hri.getTable(),
2411         TableState.State.DISABLED, TableState.State.DISABLING)) {
2412       invokeUnAssign(hri);
2413     } else {
2414       Callable<Object> mergeReplicasCallable = new Callable<Object>() {
2415         @Override
2416         public Object call() {
2417           doMergingOfReplicas(hri, a, b);
2418           return null;
2419         }
2420       };
2421       threadPoolExecutorService.submit(mergeReplicasCallable);
2422     }
2423     return null;
2424   }
2425 
2426   private String onRegionMergeReverted(final RegionState current, final HRegionInfo hri,
2427       final ServerName serverName, final RegionStateTransition transition) {
2428     // The region must be in merging_new state, and the daughters must be
2429     // merging on this server.
2430     // If the region is in offline state, it could be an RPC retry.
2431     if (current == null || !current.isOnServer(serverName)
2432         || !(current.isMergingNew() || current.isOffline())) {
2433       return hri.getShortNameToLog() + " is not merging on " + serverName;
2434     }
2435 
2436     if (current.isOffline()) {
2437       return null;
2438     }
2439 
2440     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2441     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2442     RegionState rs_a = regionStates.getRegionState(a);
2443     RegionState rs_b = regionStates.getRegionState(b);
2444     if (rs_a == null || rs_b == null || !rs_a.isOnServer(serverName)
2445         || !rs_b.isOnServer(serverName) || !rs_a.isMerging()
2446         || !rs_b.isMerging()) {
2447       return "Some daughter is not known to be merging on " + serverName
2448         + ", a=" + rs_a + ", b=" + rs_b;
2449     }
2450 
2451     regionOnline(a, serverName);
2452     regionOnline(b, serverName);
2453     regionOffline(hri);
2454 
2455     if (getTableStateManager().isTableState(hri.getTable(),
2456         TableState.State.DISABLED, TableState.State.DISABLING)) {
2457       invokeUnAssign(a);
2458       invokeUnAssign(b);
2459     }
2460     return null;
2461   }
2462 
2463   private void doMergingOfReplicas(HRegionInfo mergedHri, final HRegionInfo hri_a,
2464       final HRegionInfo hri_b) {
2465     // Close replicas for the original unmerged regions. create/assign new replicas
2466     // for the merged parent.
2467     List<HRegionInfo> unmergedRegions = new ArrayList<HRegionInfo>();
2468     unmergedRegions.add(hri_a);
2469     unmergedRegions.add(hri_b);
2470     Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(unmergedRegions);
2471     Collection<List<HRegionInfo>> c = map.values();
2472     for (List<HRegionInfo> l : c) {
2473       for (HRegionInfo h : l) {
2474         if (!RegionReplicaUtil.isDefaultReplica(h)) {
2475           LOG.debug("Unassigning un-merged replica " + h);
2476           unassign(h);
2477         }
2478       }
2479     }
2480     int numReplicas = 1;
2481     try {
2482       numReplicas = ((MasterServices)server).getTableDescriptors().get(mergedHri.getTable()).
2483           getRegionReplication();
2484     } catch (IOException e) {
2485       LOG.warn("Couldn't get the replication attribute of the table " + mergedHri.getTable() +
2486           " due to " + e.getMessage() + ". The assignment of replicas for the merged region " +
2487           "will not be done");
2488     }
2489     List<HRegionInfo> regions = new ArrayList<HRegionInfo>();
2490     for (int i = 1; i < numReplicas; i++) {
2491       regions.add(RegionReplicaUtil.getRegionInfoForReplica(mergedHri, i));
2492     }
2493     try {
2494       assign(regions);
2495     } catch (IOException ioe) {
2496       LOG.warn("Couldn't assign all replica(s) of region " + mergedHri + " because of " +
2497                 ioe.getMessage());
2498     } catch (InterruptedException ie) {
2499       LOG.warn("Couldn't assign all replica(s) of region " + mergedHri+ " because of " +
2500                 ie.getMessage());
2501     }
2502   }
2503 
2504   private void doSplittingOfReplicas(final HRegionInfo parentHri, final HRegionInfo hri_a,
2505       final HRegionInfo hri_b) {
2506     // create new regions for the replica, and assign them to match with the
2507     // current replica assignments. If replica1 of parent is assigned to RS1,
2508     // the replica1s of daughters will be on the same machine
2509     int numReplicas = 1;
2510     try {
2511       numReplicas = ((MasterServices)server).getTableDescriptors().get(parentHri.getTable()).
2512           getRegionReplication();
2513     } catch (IOException e) {
2514       LOG.warn("Couldn't get the replication attribute of the table " + parentHri.getTable() +
2515           " due to " + e.getMessage() + ". The assignment of daughter replicas " +
2516           "replicas will not be done");
2517     }
2518     // unassign the old replicas
2519     List<HRegionInfo> parentRegion = new ArrayList<HRegionInfo>();
2520     parentRegion.add(parentHri);
2521     Map<ServerName, List<HRegionInfo>> currentAssign =
2522         regionStates.getRegionAssignments(parentRegion);
2523     Collection<List<HRegionInfo>> c = currentAssign.values();
2524     for (List<HRegionInfo> l : c) {
2525       for (HRegionInfo h : l) {
2526         if (!RegionReplicaUtil.isDefaultReplica(h)) {
2527           LOG.debug("Unassigning parent's replica " + h);
2528           unassign(h);
2529         }
2530       }
2531     }
2532     // assign daughter replicas
2533     Map<HRegionInfo, ServerName> map = new HashMap<HRegionInfo, ServerName>();
2534     for (int i = 1; i < numReplicas; i++) {
2535       prepareDaughterReplicaForAssignment(hri_a, parentHri, i, map);
2536       prepareDaughterReplicaForAssignment(hri_b, parentHri, i, map);
2537     }
2538     try {
2539       assign(map);
2540     } catch (IOException e) {
2541       LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)");
2542     } catch (InterruptedException e) {
2543       LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)");
2544     }
2545   }
2546 
2547   private void prepareDaughterReplicaForAssignment(HRegionInfo daughterHri, HRegionInfo parentHri,
2548       int replicaId, Map<HRegionInfo, ServerName> map) {
2549     HRegionInfo parentReplica = RegionReplicaUtil.getRegionInfoForReplica(parentHri, replicaId);
2550     HRegionInfo daughterReplica = RegionReplicaUtil.getRegionInfoForReplica(daughterHri,
2551         replicaId);
2552     LOG.debug("Created replica region for daughter " + daughterReplica);
2553     ServerName sn;
2554     if ((sn = regionStates.getRegionServerOfRegion(parentReplica)) != null) {
2555       map.put(daughterReplica, sn);
2556     } else {
2557       List<ServerName> servers = serverManager.getOnlineServersList();
2558       sn = servers.get((new Random(System.currentTimeMillis())).nextInt(servers.size()));
2559       map.put(daughterReplica, sn);
2560     }
2561   }
2562 
2563   public Set<HRegionInfo> getReplicasToClose() {
2564     return replicasToClose;
2565   }
2566 
2567   /**
2568    * A region is offline.  The new state should be the specified one,
2569    * if not null.  If the specified state is null, the new state is Offline.
2570    * The specified state can be Split/Merged/Offline/null only.
2571    */
2572   private void regionOffline(final HRegionInfo regionInfo, final State state) {
2573     regionStates.regionOffline(regionInfo, state);
2574     removeClosedRegion(regionInfo);
2575     // remove the region plan as well just in case.
2576     clearRegionPlan(regionInfo);
2577     balancer.regionOffline(regionInfo);
2578 
2579     // Tell our listeners that a region was closed
2580     sendRegionClosedNotification(regionInfo);
2581     // also note that all the replicas of the primary should be closed
2582     if (state != null && state.equals(State.SPLIT)) {
2583       Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1);
2584       c.add(regionInfo);
2585       Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(c);
2586       Collection<List<HRegionInfo>> allReplicas = map.values();
2587       for (List<HRegionInfo> list : allReplicas) {
2588         replicasToClose.addAll(list);
2589       }
2590     }
2591     else if (state != null && state.equals(State.MERGED)) {
2592       Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1);
2593       c.add(regionInfo);
2594       Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(c);
2595       Collection<List<HRegionInfo>> allReplicas = map.values();
2596       for (List<HRegionInfo> list : allReplicas) {
2597         replicasToClose.addAll(list);
2598       }
2599     }
2600   }
2601 
2602   private void sendRegionOpenedNotification(final HRegionInfo regionInfo,
2603       final ServerName serverName) {
2604     if (!this.listeners.isEmpty()) {
2605       for (AssignmentListener listener : this.listeners) {
2606         listener.regionOpened(regionInfo, serverName);
2607       }
2608     }
2609   }
2610 
2611   private void sendRegionClosedNotification(final HRegionInfo regionInfo) {
2612     if (!this.listeners.isEmpty()) {
2613       for (AssignmentListener listener : this.listeners) {
2614         listener.regionClosed(regionInfo);
2615       }
2616     }
2617   }
2618 
2619   /**
2620    * Try to update some region states. If the state machine prevents
2621    * such update, an error message is returned to explain the reason.
2622    *
2623    * It's expected that in each transition there should have just one
2624    * region for opening/closing, 3 regions for splitting/merging.
2625    * These regions should be on the server that requested the change.
2626    *
2627    * Region state machine. Only these transitions
2628    * are expected to be triggered by a region server.
2629    *
2630    * On the state transition:
2631    *  (1) Open/Close should be initiated by master
2632    *      (a) Master sets the region to pending_open/pending_close
2633    *        in memory and hbase:meta after sending the request
2634    *        to the region server
2635    *      (b) Region server reports back to the master
2636    *        after open/close is done (either success/failure)
2637    *      (c) If region server has problem to report the status
2638    *        to master, it must be because the master is down or some
2639    *        temporary network issue. Otherwise, the region server should
2640    *        abort since it must be a bug. If the master is not accessible,
2641    *        the region server should keep trying until the server is
2642    *        stopped or till the status is reported to the (new) master
2643    *      (d) If region server dies in the middle of opening/closing
2644    *        a region, SSH picks it up and finishes it
2645    *      (e) If master dies in the middle, the new master recovers
2646    *        the state during initialization from hbase:meta. Region server
2647    *        can report any transition that has not been reported to
2648    *        the previous active master yet
2649    *  (2) Split/merge is initiated by region servers
2650    *      (a) To split a region, a region server sends a request
2651    *        to master to try to set a region to splitting, together with
2652    *        two daughters (to be created) to splitting new. If approved
2653    *        by the master, the splitting can then move ahead
2654    *      (b) To merge two regions, a region server sends a request to
2655    *        master to try to set the new merged region (to be created) to
2656    *        merging_new, together with two regions (to be merged) to merging.
2657    *        If it is ok with the master, the merge can then move ahead
2658    *      (c) Once the splitting/merging is done, the region server
2659    *        reports the status back to the master either success/failure.
2660    *      (d) Other scenarios should be handled similarly as for
2661    *        region open/close
2662    */
2663   protected String onRegionTransition(final ServerName serverName,
2664       final RegionStateTransition transition) {
2665     TransitionCode code = transition.getTransitionCode();
2666     HRegionInfo hri = HRegionInfo.convert(transition.getRegionInfo(0));
2667     Lock lock = locker.acquireLock(hri.getEncodedName());
2668     try {
2669       RegionState current = regionStates.getRegionState(hri);
2670       if (LOG.isDebugEnabled()) {
2671         LOG.debug("Got transition " + code + " for "
2672           + (current != null ? current.toString() : hri.getShortNameToLog())
2673           + " from " + serverName);
2674       }
2675       String errorMsg = null;
2676       switch (code) {
2677       case OPENED:
2678         errorMsg = onRegionOpen(current, hri, serverName, transition);
2679         break;
2680       case FAILED_OPEN:
2681         errorMsg = onRegionFailedOpen(current, hri, serverName);
2682         break;
2683       case CLOSED:
2684         errorMsg = onRegionClosed(current, hri, serverName);
2685         break;
2686       case READY_TO_SPLIT:
2687         errorMsg = onRegionReadyToSplit(current, hri, serverName, transition);
2688         break;
2689       case SPLIT_PONR:
2690         errorMsg = onRegionSplitPONR(current, hri, serverName, transition);
2691         break;
2692       case SPLIT:
2693         errorMsg = onRegionSplit(current, hri, serverName, transition);
2694         break;
2695       case SPLIT_REVERTED:
2696         errorMsg = onRegionSplitReverted(current, hri, serverName, transition);
2697         break;
2698       case READY_TO_MERGE:
2699         errorMsg = onRegionReadyToMerge(current, hri, serverName, transition);
2700         break;
2701       case MERGE_PONR:
2702         errorMsg = onRegionMergePONR(current, hri, serverName, transition);
2703         break;
2704       case MERGED:
2705         errorMsg = onRegionMerged(current, hri, serverName, transition);
2706         break;
2707       case MERGE_REVERTED:
2708         errorMsg = onRegionMergeReverted(current, hri, serverName, transition);
2709         break;
2710 
2711       default:
2712         errorMsg = "Unexpected transition code " + code;
2713       }
2714       if (errorMsg != null) {
2715         LOG.error("Failed to transition region from " + current + " on "
2716           + code + " by " + serverName + ": " + errorMsg);
2717       }
2718       return errorMsg;
2719     } finally {
2720       lock.unlock();
2721     }
2722   }
2723 
2724   /**
2725    * @return Instance of load balancer
2726    */
2727   public LoadBalancer getBalancer() {
2728     return this.balancer;
2729   }
2730 
2731   public Map<ServerName, List<HRegionInfo>>
2732     getSnapShotOfAssignment(Collection<HRegionInfo> infos) {
2733     return getRegionStates().getRegionAssignments(infos);
2734   }
2735 }