View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import com.google.common.annotations.VisibleForTesting;
22  
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Collection;
26  import java.util.Collections;
27  import java.util.HashMap;
28  import java.util.HashSet;
29  import java.util.Iterator;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.NavigableMap;
33  import java.util.Random;
34  import java.util.Set;
35  import java.util.TreeMap;
36  import java.util.concurrent.Callable;
37  import java.util.concurrent.ConcurrentHashMap;
38  import java.util.concurrent.CopyOnWriteArrayList;
39  import java.util.concurrent.ScheduledThreadPoolExecutor;
40  import java.util.concurrent.TimeUnit;
41  import java.util.concurrent.atomic.AtomicBoolean;
42  import java.util.concurrent.atomic.AtomicInteger;
43  import java.util.concurrent.locks.Lock;
44  import java.util.concurrent.locks.ReentrantLock;
45
46  import org.apache.commons.logging.Log;
47  import org.apache.commons.logging.LogFactory;
48  import org.apache.hadoop.conf.Configuration;
49  import org.apache.hadoop.fs.FileStatus;
50  import org.apache.hadoop.fs.FileSystem;
51  import org.apache.hadoop.fs.Path;
52  import org.apache.hadoop.hbase.CoordinatedStateException;
53  import org.apache.hadoop.hbase.HBaseIOException;
54  import org.apache.hadoop.hbase.HConstants;
55  import org.apache.hadoop.hbase.HRegionInfo;
56  import org.apache.hadoop.hbase.HRegionLocation;
57  import org.apache.hadoop.hbase.HTableDescriptor;
58  import org.apache.hadoop.hbase.MetaTableAccessor;
59  import org.apache.hadoop.hbase.NotServingRegionException;
60  import org.apache.hadoop.hbase.RegionLocations;
61  import org.apache.hadoop.hbase.RegionStateListener;
62  import org.apache.hadoop.hbase.ServerName;
63  import org.apache.hadoop.hbase.TableName;
64  import org.apache.hadoop.hbase.TableNotFoundException;
65  import org.apache.hadoop.hbase.classification.InterfaceAudience;
66  import org.apache.hadoop.hbase.client.MasterSwitchType;
67  import org.apache.hadoop.hbase.client.RegionReplicaUtil;
68  import org.apache.hadoop.hbase.client.Result;
69  import org.apache.hadoop.hbase.client.TableState;
70  import org.apache.hadoop.hbase.executor.EventHandler;
71  import org.apache.hadoop.hbase.executor.EventType;
72  import org.apache.hadoop.hbase.executor.ExecutorService;
73  import org.apache.hadoop.hbase.ipc.FailedServerException;
74  import org.apache.hadoop.hbase.ipc.RpcClient;
75  import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
76  import org.apache.hadoop.hbase.master.RegionState.State;
77  import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper;
78  import org.apache.hadoop.hbase.master.balancer.FavoredNodeLoadBalancer;
79  import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
80  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
81  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
82  import org.apache.hadoop.hbase.quotas.QuotaExceededException;
83  import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
84  import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
85  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
86  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
87  import org.apache.hadoop.hbase.util.FSUtils;
88  import org.apache.hadoop.hbase.util.KeyLocker;
89  import org.apache.hadoop.hbase.util.Pair;
90  import org.apache.hadoop.hbase.util.PairOfSameType;
91  import org.apache.hadoop.hbase.util.RetryCounter;
92  import org.apache.hadoop.hbase.util.Threads;
93  import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
94  import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
95  import org.apache.hadoop.ipc.RemoteException;
96  import org.apache.hadoop.util.StringUtils;
97  import org.apache.zookeeper.KeeperException;
98
99  /**
100  * Manages and performs region assignment.
101  * Related communications with regionserver are all done over RPC.
102  */
103 @InterfaceAudience.Private
104 public class AssignmentManager {
105   private static final Log LOG = LogFactory.getLog(AssignmentManager.class);
106 
107   protected final MasterServices server;
108 
109   private ServerManager serverManager;
110 
111   private boolean shouldAssignRegionsWithFavoredNodes;
112 
113   private LoadBalancer balancer;
114 
115   private final MetricsAssignmentManager metricsAssignmentManager;
116 
117   private final TableLockManager tableLockManager;
118 
119   private AtomicInteger numRegionsOpened = new AtomicInteger(0);
120 
121   final private KeyLocker<String> locker = new KeyLocker<String>();
122 
123   Set<HRegionInfo> replicasToClose = Collections.synchronizedSet(new HashSet<HRegionInfo>());
124
125   /**
126    * Map of regions to reopen after the schema of a table is changed. Key -
127    * encoded region name, value - HRegionInfo
128    */
129   private final Map <String, HRegionInfo> regionsToReopen;
130
131   /*
132    * Maximum times we recurse an assignment/unassignment.
133    * See below in {@link #assign()} and {@link #unassign()}.
134    */
135   private final int maximumAttempts;
136
137   /**
138    * The sleep time for which the assignment will wait before retrying in case of
139    * hbase:meta assignment failure due to lack of availability of region plan or bad region plan
140    */
141   private final long sleepTimeBeforeRetryingMetaAssignment;
142
143   /** Plans for region movement. Key is the encoded version of a region name*/
144   // TODO: When do plans get cleaned out?  Ever? In server open and in server
145   // shutdown processing -- St.Ack
146   // All access to this Map must be synchronized.
147   final NavigableMap<String, RegionPlan> regionPlans =
148     new TreeMap<String, RegionPlan>();
149 
150   private final TableStateManager tableStateManager;
151 
152   private final ExecutorService executorService;
153
154   private java.util.concurrent.ExecutorService threadPoolExecutorService;
155   private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
156 
157   private final RegionStates regionStates;
158
159   // The threshold to use bulk assigning. Using bulk assignment
160   // only if assigning at least this many regions to at least this
161   // many servers. If assigning fewer regions to fewer servers,
162   // bulk assigning may be not as efficient.
163   private final int bulkAssignThresholdRegions;
164   private final int bulkAssignThresholdServers;
165   private final int bulkPerRegionOpenTimeGuesstimate;
166
167   // Should bulk assignment wait till all regions are assigned,
168   // or it is timed out?  This is useful to measure bulk assignment
169   // performance, but not needed in most use cases.
170   private final boolean bulkAssignWaitTillAllAssigned;
171
172   /**
173    * Indicator that AssignmentManager has recovered the region states so
174    * that ServerShutdownHandler can be fully enabled and re-assign regions
175    * of dead servers. So that when re-assignment happens, AssignmentManager
176    * has proper region states.
177    *
178    * Protected to ease testing.
179    */
180   protected final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
181
182   /**
183    * A map to track the count a region fails to open in a row.
184    * So that we don't try to open a region forever if the failure is
185    * unrecoverable.  We don't put this information in region states
186    * because we don't expect this to happen frequently; we don't
187    * want to copy this information over during each state transition either.
188    */
189   private final ConcurrentHashMap<String, AtomicInteger>
190     failedOpenTracker = new ConcurrentHashMap<String, AtomicInteger>();
191
192   // In case not using ZK for region assignment, region states
193   // are persisted in meta with a state store
194   private final RegionStateStore regionStateStore;
195
196   /**
197    * For testing only!  Set to true to skip handling of split.
198    */
199   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
200   public static boolean TEST_SKIP_SPLIT_HANDLING = false;
201
202   /** Listeners that are called on assignment events. */
203   private List<AssignmentListener> listeners = new CopyOnWriteArrayList<AssignmentListener>();
204 
205   private RegionStateListener regionStateListener;
206
207   private RetryCounter.BackoffPolicy backoffPolicy;
208   private RetryCounter.RetryConfig retryConfig;
209   /**
210    * Constructs a new assignment manager.
211    *
212    * @param server instance of HMaster this AM running inside
213    * @param serverManager serverManager for associated HMaster
214    * @param balancer implementation of {@link LoadBalancer}
215    * @param service Executor service
216    * @param metricsMaster metrics manager
217    * @param tableLockManager TableLock manager
218    * @throws IOException
219    */
220   public AssignmentManager(MasterServices server, ServerManager serverManager,
221       final LoadBalancer balancer,
222       final ExecutorService service, MetricsMaster metricsMaster,
223       final TableLockManager tableLockManager,
224       final TableStateManager tableStateManager)
225           throws IOException {
226     this.server = server;
227     this.serverManager = serverManager;
228     this.executorService = service;
229     this.regionStateStore = new RegionStateStore(server);
230     this.regionsToReopen = Collections.synchronizedMap
231                            (new HashMap<String, HRegionInfo> ());
232     Configuration conf = server.getConfiguration();
233     // Only read favored nodes if using the favored nodes load balancer.
234     this.shouldAssignRegionsWithFavoredNodes = conf.getClass(
235            HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class).equals(
236            FavoredNodeLoadBalancer.class);
237
238     this.tableStateManager = tableStateManager;
239
240     // This is the max attempts, not retries, so it should be at least 1.
241     this.maximumAttempts = Math.max(1,
242       this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10));
243     this.sleepTimeBeforeRetryingMetaAssignment = this.server.getConfiguration().getLong(
244         "hbase.meta.assignment.retry.sleeptime", 1000l);
245     this.balancer = balancer;
246     int maxThreads = conf.getInt("hbase.assignment.threads.max", 30);
247
248     this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool(
249         maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("AM."));
250
251     this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1,
252         Threads.newDaemonThreadFactory("AM.Scheduler"));
253
254     this.regionStates = new RegionStates(
255       server, tableStateManager, serverManager, regionStateStore);
256
257     this.bulkAssignWaitTillAllAssigned =
258       conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false);
259     this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7);
260     this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3);
261     this.bulkPerRegionOpenTimeGuesstimate =
262       conf.getInt("hbase.bulk.assignment.perregion.open.time", 10000);
263
264     this.metricsAssignmentManager = new MetricsAssignmentManager();
265     this.tableLockManager = tableLockManager;
266
267     // Configurations for retrying opening a region on receiving a FAILED_OPEN
268     this.retryConfig = new RetryCounter.RetryConfig();
269     this.retryConfig.setSleepInterval(conf.getLong("hbase.assignment.retry.sleep.initial", 0l));
270     // Set the max time limit to the initial sleep interval so we use a constant time sleep strategy
271     // if the user does not set a max sleep limit
272     this.retryConfig.setMaxSleepTime(conf.getLong("hbase.assignment.retry.sleep.max",
273         retryConfig.getSleepInterval()));
274     this.backoffPolicy = getBackoffPolicy();
275   }
276
277   /**
278    * Returns the backoff policy used for Failed Region Open retries
279    * @return the backoff policy used for Failed Region Open retries
280    */
281   RetryCounter.BackoffPolicy getBackoffPolicy() {
282     return new RetryCounter.ExponentialBackoffPolicyWithLimit();
283   }
284
285   MetricsAssignmentManager getAssignmentManagerMetrics() {
286     return this.metricsAssignmentManager;
287   }
288
289   /**
290    * Add the listener to the notification list.
291    * @param listener The AssignmentListener to register
292    */
293   public void registerListener(final AssignmentListener listener) {
294     this.listeners.add(listener);
295   }
296
297   /**
298    * Remove the listener from the notification list.
299    * @param listener The AssignmentListener to unregister
300    */
301   public boolean unregisterListener(final AssignmentListener listener) {
302     return this.listeners.remove(listener);
303   }
304
305   /**
306    * @return Instance of ZKTableStateManager.
307    */
308   public TableStateManager getTableStateManager() {
309     // These are 'expensive' to make involving trip to zk ensemble so allow
310     // sharing.
311     return this.tableStateManager;
312   }
313
314   /**
315    * This SHOULD not be public. It is public now
316    * because of some unit tests.
317    *
318    * TODO: make it package private and keep RegionStates in the master package
319    */
320   public RegionStates getRegionStates() {
321     return regionStates;
322   }
323
324   /**
325    * Used in some tests to mock up region state in meta
326    */
327   @VisibleForTesting
328   RegionStateStore getRegionStateStore() {
329     return regionStateStore;
330   }
331
332   public RegionPlan getRegionReopenPlan(HRegionInfo hri) {
333     return new RegionPlan(hri, null, regionStates.getRegionServerOfRegion(hri));
334   }
335
336   /**
337    * Add a regionPlan for the specified region.
338    * @param encodedName
339    * @param plan
340    */
341   public void addPlan(String encodedName, RegionPlan plan) {
342     synchronized (regionPlans) {
343       regionPlans.put(encodedName, plan);
344     }
345   }
346
347   /**
348    * Add a map of region plans.
349    */
350   public void addPlans(Map<String, RegionPlan> plans) {
351     synchronized (regionPlans) {
352       regionPlans.putAll(plans);
353     }
354   }
355
356   /**
357    * Set the list of regions that will be reopened
358    * because of an update in table schema
359    *
360    * @param regions
361    *          list of regions that should be tracked for reopen
362    */
363   public void setRegionsToReopen(List <HRegionInfo> regions) {
364     for(HRegionInfo hri : regions) {
365       regionsToReopen.put(hri.getEncodedName(), hri);
366     }
367   }
368
369   /**
370    * Used by the client to identify if all regions have the schema updates
371    *
372    * @param tableName
373    * @return Pair indicating the status of the alter command
374    * @throws IOException
375    */
376   public Pair<Integer, Integer> getReopenStatus(TableName tableName)
377       throws IOException {
378     List<HRegionInfo> hris;
379     if (TableName.META_TABLE_NAME.equals(tableName)) {
380       hris = new MetaTableLocator().getMetaRegions(server.getZooKeeper());
381     } else {
382       hris = MetaTableAccessor.getTableRegions(server.getConnection(), tableName, true);
383     }
384
385     Integer pending = 0;
386     for (HRegionInfo hri : hris) {
387       String name = hri.getEncodedName();
388       // no lock concurrent access ok: sequential consistency respected.
389       if (regionsToReopen.containsKey(name)
390           || regionStates.isRegionInTransition(name)) {
391         pending++;
392       }
393     }
394     return new Pair<Integer, Integer>(pending, hris.size());
395   }
396
397   /**
398    * Used by ServerShutdownHandler to make sure AssignmentManager has completed
399    * the failover cleanup before re-assigning regions of dead servers. So that
400    * when re-assignment happens, AssignmentManager has proper region states.
401    */
402   public boolean isFailoverCleanupDone() {
403     return failoverCleanupDone.get();
404   }
405
406   /**
407    * To avoid racing with AM, external entities may need to lock a region,
408    * for example, when SSH checks what regions to skip re-assigning.
409    */
410   public Lock acquireRegionLock(final String encodedName) {
411     return locker.acquireLock(encodedName);
412   }
413
414   /**
415    * Now, failover cleanup is completed. Notify server manager to
416    * process queued up dead servers processing, if any.
417    */
418   void failoverCleanupDone() {
419     failoverCleanupDone.set(true);
420     serverManager.processQueuedDeadServers();
421   }
422
423   /**
424    * Called on startup.
425    * Figures whether a fresh cluster start of we are joining extant running cluster.
426    * @throws IOException
427    * @throws KeeperException
428    * @throws InterruptedException
429    * @throws CoordinatedStateException
430    */
431   void joinCluster()
432   throws IOException, KeeperException, InterruptedException, CoordinatedStateException {
433     long startTime = System.currentTimeMillis();
434     // Concurrency note: In the below the accesses on regionsInTransition are
435     // outside of a synchronization block where usually all accesses to RIT are
436     // synchronized.  The presumption is that in this case it is safe since this
437     // method is being played by a single thread on startup.
438
439     // TODO: Regions that have a null location and are not in regionsInTransitions
440     // need to be handled.
441
442     // Scan hbase:meta to build list of existing regions, servers, and assignment
443     // Returns servers who have not checked in (assumed dead) that some regions
444     // were assigned to (according to the meta)
445     Set<ServerName> deadServers = rebuildUserRegions();
446
447     // This method will assign all user regions if a clean server startup or
448     // it will reconstruct master state and cleanup any leftovers from previous master process.
449     boolean failover = processDeadServersAndRegionsInTransition(deadServers);
450
451     LOG.info("Joined the cluster in " + (System.currentTimeMillis()
452       - startTime) + "ms, failover=" + failover);
453   }
454
455   /**
456    * Process all regions that are in transition in zookeeper and also
457    * processes the list of dead servers.
458    * Used by master joining an cluster.  If we figure this is a clean cluster
459    * startup, will assign all user regions.
460    * @param deadServers Set of servers that are offline probably legitimately that were carrying
461    * regions according to a scan of hbase:meta. Can be null.
462    * @throws IOException
463    * @throws InterruptedException
464    */
465   boolean processDeadServersAndRegionsInTransition(final Set<ServerName> deadServers)
466   throws KeeperException, IOException, InterruptedException, CoordinatedStateException {
467     // TODO Needed? List<String> nodes = ZKUtil.listChildrenNoWatch(watcher, watcher.assignmentZNode);
468     boolean failover = !serverManager.getDeadServers().isEmpty();
469     if (failover) {
470       // This may not be a failover actually, especially if meta is on this master.
471       if (LOG.isDebugEnabled()) {
472         LOG.debug("Found dead servers out on cluster " + serverManager.getDeadServers());
473       }
474       // Check if there are any regions on these servers
475       failover = false;
476       for (ServerName serverName : serverManager.getDeadServers().copyServerNames()) {
477         if (regionStates.getRegionAssignments().values().contains(serverName)) {
478           LOG.debug("Found regions on dead server: " + serverName);
479           failover = true;
480           break;
481         }
482       }
483     }
484     Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
485     if (!failover) {
486       // If any one region except meta is assigned, it's a failover.
487       for (Map.Entry<HRegionInfo, ServerName> en:
488           regionStates.getRegionAssignments().entrySet()) {
489         HRegionInfo hri = en.getKey();
490         if (!hri.isMetaTable()
491             && onlineServers.contains(en.getValue())) {
492           LOG.debug("Found region " + hri + " out on cluster");
493           failover = true;
494           break;
495         }
496       }
497     }
498     if (!failover) {
499       // If any region except meta is in transition on a live server, it's a failover.
500       Set<RegionState> regionsInTransition = regionStates.getRegionsInTransition();
501       if (!regionsInTransition.isEmpty()) {
502         for (RegionState regionState: regionsInTransition) {
503           ServerName serverName = regionState.getServerName();
504           if (!regionState.getRegion().isMetaRegion()
505               && serverName != null && onlineServers.contains(serverName)) {
506             LOG.debug("Found " + regionState + " for region " +
507               regionState.getRegion().getRegionNameAsString() + " for server " +
508                 serverName + "in RITs");
509             failover = true;
510             break;
511           }
512         }
513       }
514     }
515     if (!failover) {
516       // If we get here, we have a full cluster restart. It is a failover only
517       // if there are some WALs are not split yet. For meta WALs, they should have
518       // been split already, if any. We can walk through those queued dead servers,
519       // if they don't have any WALs, this restart should be considered as a clean one
520       Set<ServerName> queuedDeadServers = serverManager.getRequeuedDeadServers().keySet();
521       if (!queuedDeadServers.isEmpty()) {
522         Configuration conf = server.getConfiguration();
523         Path rootdir = FSUtils.getRootDir(conf);
524         FileSystem fs = rootdir.getFileSystem(conf);
525         for (ServerName serverName: queuedDeadServers) {
526           // In the case of a clean exit, the shutdown handler would have presplit any WALs and
527           // removed empty directories.
528           Path logDir = new Path(rootdir,
529             AbstractFSWALProvider.getWALDirectoryName(serverName.toString()));
530           Path splitDir = logDir.suffix(AbstractFSWALProvider.SPLITTING_EXT);
531           if (checkWals(fs, logDir) || checkWals(fs, splitDir)) {
532             LOG.debug("Found queued dead server " + serverName);
533             failover = true;
534             break;
535           }
536         }
537         if (!failover) {
538           // We figured that it's not a failover, so no need to
539           // work on these re-queued dead servers any more.
540           LOG.info("AM figured that it's not a failover and cleaned up "
541             + queuedDeadServers.size() + " queued dead servers");
542           serverManager.removeRequeuedDeadServers();
543         }
544       }
545     }
546
547     Set<TableName> disabledOrDisablingOrEnabling = null;
548     Map<HRegionInfo, ServerName> allRegions = null;
549
550     if (!failover) {
551       disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
552         TableState.State.DISABLED, TableState.State.DISABLING,
553         TableState.State.ENABLING);
554
555       // Clean re/start, mark all user regions closed before reassignment
556       allRegions = regionStates.closeAllUserRegions(
557         disabledOrDisablingOrEnabling);
558     }
559
560     // Now region states are restored
561     regionStateStore.start();
562
563     if (failover) {
564       if (deadServers != null && !deadServers.isEmpty()) {
565         for (ServerName serverName: deadServers) {
566           if (!serverManager.isServerDead(serverName)) {
567             serverManager.expireServer(serverName); // Let SSH do region re-assign
568           }
569         }
570       }
571       processRegionsInTransition(regionStates.getRegionsInTransition());
572     }
573
574     // Now we can safely claim failover cleanup completed and enable
575     // ServerShutdownHandler for further processing. The nodes (below)
576     // in transition, if any, are for regions not related to those
577     // dead servers at all, and can be done in parallel to SSH.
578     failoverCleanupDone();
579     if (!failover) {
580       // Fresh cluster startup.
581       LOG.info("Clean cluster startup. Don't reassign user regions");
582       assignAllUserRegions(allRegions);
583     } else {
584       LOG.info("Failover! Reassign user regions");
585     }
586     // unassign replicas of the split parents and the merged regions
587     // the daughter replicas are opened in assignAllUserRegions if it was
588     // not already opened.
589     for (HRegionInfo h : replicasToClose) {
590       unassign(h);
591     }
592     replicasToClose.clear();
593     return failover;
594   }
595
596   private boolean checkWals(FileSystem fs, Path dir) throws IOException {
597     if (!fs.exists(dir)) {
598       LOG.debug(dir + " doesn't exist");
599       return false;
600     }
601     if (!fs.getFileStatus(dir).isDirectory()) {
602       LOG.warn(dir + " is not a directory");
603       return false;
604     }
605     FileStatus[] files = FSUtils.listStatus(fs, dir);
606     if (files == null || files.length == 0) {
607       LOG.debug(dir + " has no files");
608       return false;
609     }
610     for (int i = 0; i < files.length; i++) {
611       if (files[i].isFile() && files[i].getLen() > 0) {
612         LOG.debug(dir + " has a non-empty file: " + files[i].getPath());
613         return true;
614       } else if (files[i].isDirectory() && checkWals(fs, dir)) {
615         LOG.debug(dir + " is a directory and has a non-empty file: " + files[i].getPath());
616         return true;
617       }
618     }
619     LOG.debug("Found 0 non-empty wal files for :" + dir);
620     return false;
621   }
622
623   /**
624    * When a region is closed, it should be removed from the regionsToReopen
625    * @param hri HRegionInfo of the region which was closed
626    */
627   public void removeClosedRegion(HRegionInfo hri) {
628     if (regionsToReopen.remove(hri.getEncodedName()) != null) {
629       LOG.debug("Removed region from reopening regions because it was closed");
630     }
631   }
632
633   // TODO: processFavoredNodes might throw an exception, for e.g., if the
634   // meta could not be contacted/updated. We need to see how seriously to treat
635   // this problem as. Should we fail the current assignment. We should be able
636   // to recover from this problem eventually (if the meta couldn't be updated
637   // things should work normally and eventually get fixed up).
638   void processFavoredNodes(List<HRegionInfo> regions) throws IOException {
639     if (!shouldAssignRegionsWithFavoredNodes) return;
640     // The AM gets the favored nodes info for each region and updates the meta
641     // table with that info
642     Map<HRegionInfo, List<ServerName>> regionToFavoredNodes =
643         new HashMap<HRegionInfo, List<ServerName>>();
644     for (HRegionInfo region : regions) {
645       regionToFavoredNodes.put(region,
646           ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region));
647     }
648     FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(regionToFavoredNodes,
649       this.server.getConnection());
650   }
651
652   /**
653    * Marks the region as online.  Removes it from regions in transition and
654    * updates the in-memory assignment information.
655    * <p>
656    * Used when a region has been successfully opened on a region server.
657    * @param regionInfo
658    * @param sn
659    */
660   void regionOnline(HRegionInfo regionInfo, ServerName sn) {
661     regionOnline(regionInfo, sn, HConstants.NO_SEQNUM);
662   }
663
664   void regionOnline(HRegionInfo regionInfo, ServerName sn, long openSeqNum) {
665     numRegionsOpened.incrementAndGet();
666     regionStates.regionOnline(regionInfo, sn, openSeqNum);
667
668     // Remove plan if one.
669     clearRegionPlan(regionInfo);
670     balancer.regionOnline(regionInfo, sn);
671
672     // Tell our listeners that a region was opened
673     sendRegionOpenedNotification(regionInfo, sn);
674   }
675
676   /**
677    * Marks the region as offline.  Removes it from regions in transition and
678    * removes in-memory assignment information.
679    * <p>
680    * Used when a region has been closed and should remain closed.
681    * @param regionInfo
682    */
683   public void regionOffline(final HRegionInfo regionInfo) {
684     regionOffline(regionInfo, null);
685   }
686
687   public void offlineDisabledRegion(HRegionInfo regionInfo) {
688     replicasToClose.remove(regionInfo);
689     regionOffline(regionInfo);
690   }
691
692   // Assignment methods
693
694   /**
695    * Assigns the specified region.
696    * <p>
697    * If a RegionPlan is available with a valid destination then it will be used
698    * to determine what server region is assigned to.  If no RegionPlan is
699    * available, region will be assigned to a random available server.
700    * <p>
701    * Updates the RegionState and sends the OPEN RPC.
702    * <p>
703    * This will only succeed if the region is in transition and in a CLOSED or
704    * OFFLINE state or not in transition, and of course, the
705    * chosen server is up and running (It may have just crashed!).
706    *
707    * @param region server to be assigned
708    */
709   public void assign(HRegionInfo region) {
710     assign(region, false);
711   }
712
713   /**
714    * Use care with forceNewPlan. It could cause double assignment.
715    */
716   public void assign(HRegionInfo region, boolean forceNewPlan) {
717     if (isDisabledorDisablingRegionInRIT(region)) {
718       return;
719     }
720     String encodedName = region.getEncodedName();
721     Lock lock = locker.acquireLock(encodedName);
722     try {
723       RegionState state = forceRegionStateToOffline(region, forceNewPlan);
724       if (state != null) {
725         if (regionStates.wasRegionOnDeadServer(encodedName)) {
726           LOG.info("Skip assigning " + region.getRegionNameAsString()
727             + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
728             + " is dead but not processed yet");
729           return;
730         }
731         assign(state, forceNewPlan);
732       }
733     } finally {
734       lock.unlock();
735     }
736   }
737
738   /**
739    * Bulk assign regions to <code>destination</code>.
740    * @param destination
741    * @param regions Regions to assign.
742    * @return true if successful
743    */
744   boolean assign(final ServerName destination, final List<HRegionInfo> regions)
745     throws InterruptedException {
746     long startTime = EnvironmentEdgeManager.currentTime();
747     try {
748       int regionCount = regions.size();
749       if (regionCount == 0) {
750         return true;
751       }
752       LOG.info("Assigning " + regionCount + " region(s) to " + destination.toString());
753       Set<String> encodedNames = new HashSet<String>(regionCount);
754       for (HRegionInfo region : regions) {
755         encodedNames.add(region.getEncodedName());
756       }
757
758       List<HRegionInfo> failedToOpenRegions = new ArrayList<HRegionInfo>();
759       Map<String, Lock> locks = locker.acquireLocks(encodedNames);
760       try {
761         Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>(regionCount);
762         List<RegionState> states = new ArrayList<RegionState>(regionCount);
763         for (HRegionInfo region : regions) {
764           String encodedName = region.getEncodedName();
765           if (!isDisabledorDisablingRegionInRIT(region)) {
766             RegionState state = forceRegionStateToOffline(region, false);
767             boolean onDeadServer = false;
768             if (state != null) {
769               if (regionStates.wasRegionOnDeadServer(encodedName)) {
770                 LOG.info("Skip assigning " + region.getRegionNameAsString()
771                   + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
772                   + " is dead but not processed yet");
773                 onDeadServer = true;
774               } else {
775                 RegionPlan plan = new RegionPlan(region, state.getServerName(), destination);
776                 plans.put(encodedName, plan);
777                 states.add(state);
778                 continue;
779               }
780             }
781             // Reassign if the region wasn't on a dead server
782             if (!onDeadServer) {
783               LOG.info("failed to force region state to offline, "
784                 + "will reassign later: " + region);
785               failedToOpenRegions.add(region); // assign individually later
786             }
787           }
788           // Release the lock, this region is excluded from bulk assign because
789           // we can't update its state, or set its znode to offline.
790           Lock lock = locks.remove(encodedName);
791           lock.unlock();
792         }
793
794         if (server.isStopped()) {
795           return false;
796         }
797
798         // Add region plans, so we can updateTimers when one region is opened so
799         // that unnecessary timeout on RIT is reduced.
800         this.addPlans(plans);
801
802         List<Pair<HRegionInfo, List<ServerName>>> regionOpenInfos =
803           new ArrayList<Pair<HRegionInfo, List<ServerName>>>(states.size());
804         for (RegionState state: states) {
805           HRegionInfo region = state.getRegion();
806           regionStates.updateRegionState(
807             region, State.PENDING_OPEN, destination);
808           List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
809           if (this.shouldAssignRegionsWithFavoredNodes) {
810             favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
811           }
812           regionOpenInfos.add(new Pair<HRegionInfo, List<ServerName>>(
813             region, favoredNodes));
814         }
815
816         // Move on to open regions.
817         try {
818           // Send OPEN RPC. If it fails on a IOE or RemoteException,
819           // regions will be assigned individually.
820           Configuration conf = server.getConfiguration();
821           long maxWaitTime = System.currentTimeMillis() +
822             conf.getLong("hbase.regionserver.rpc.startup.waittime", 60000);
823           for (int i = 1; i <= maximumAttempts && !server.isStopped(); i++) {
824             try {
825               List<RegionOpeningState> regionOpeningStateList = serverManager
826                 .sendRegionOpen(destination, regionOpenInfos);
827               for (int k = 0, n = regionOpeningStateList.size(); k < n; k++) {
828                 RegionOpeningState openingState = regionOpeningStateList.get(k);
829                 if (openingState != RegionOpeningState.OPENED) {
830                   HRegionInfo region = regionOpenInfos.get(k).getFirst();
831                   LOG.info("Got opening state " + openingState
832                     + ", will reassign later: " + region);
833                   // Failed opening this region, reassign it later
834                   forceRegionStateToOffline(region, true);
835                   failedToOpenRegions.add(region);
836                 }
837               }
838               break;
839             } catch (IOException e) {
840               if (e instanceof RemoteException) {
841                 e = ((RemoteException)e).unwrapRemoteException();
842               }
843               if (e instanceof RegionServerStoppedException) {
844                 LOG.warn("The region server was shut down, ", e);
845                 // No need to retry, the region server is a goner.
846                 return false;
847               } else if (e instanceof ServerNotRunningYetException) {
848                 long now = System.currentTimeMillis();
849                 if (now < maxWaitTime) {
850                   if (LOG.isDebugEnabled()) {
851                     LOG.debug("Server is not yet up; waiting up to " +
852                       (maxWaitTime - now) + "ms", e);
853                   }
854                   Thread.sleep(100);
855                   i--; // reset the try count
856                   continue;
857                 }
858               } else if (e instanceof java.net.SocketTimeoutException
859                   && this.serverManager.isServerOnline(destination)) {
860                 // In case socket is timed out and the region server is still online,
861                 // the openRegion RPC could have been accepted by the server and
862                 // just the response didn't go through.  So we will retry to
863                 // open the region on the same server.
864                 if (LOG.isDebugEnabled()) {
865                   LOG.debug("Bulk assigner openRegion() to " + destination
866                     + " has timed out, but the regions might"
867                     + " already be opened on it.", e);
868                 }
869                 // wait and reset the re-try count, server might be just busy.
870                 Thread.sleep(100);
871                 i--;
872                 continue;
873               } else if (e instanceof FailedServerException && i < maximumAttempts) {
874                 // In case the server is in the failed server list, no point to
875                 // retry too soon. Retry after the failed_server_expiry time
876                 long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
877                   RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
878                 if (LOG.isDebugEnabled()) {
879                   LOG.debug(destination + " is on failed server list; waiting "
880                     + sleepTime + "ms", e);
881                 }
882                 Thread.sleep(sleepTime);
883                 continue;
884               }
885               throw e;
886             }
887           }
888         } catch (IOException e) {
889           // Can be a socket timeout, EOF, NoRouteToHost, etc
890           LOG.info("Unable to communicate with " + destination
891             + " in order to assign regions, ", e);
892           for (RegionState state: states) {
893             HRegionInfo region = state.getRegion();
894             forceRegionStateToOffline(region, true);
895           }
896           return false;
897         }
898       } finally {
899         for (Lock lock : locks.values()) {
900           lock.unlock();
901         }
902       }
903
904       if (!failedToOpenRegions.isEmpty()) {
905         for (HRegionInfo region : failedToOpenRegions) {
906           if (!regionStates.isRegionOnline(region)) {
907             invokeAssign(region);
908           }
909         }
910       }
911
912       // wait for assignment completion
913       ArrayList<HRegionInfo> userRegionSet = new ArrayList<HRegionInfo>(regions.size());
914       for (HRegionInfo region: regions) {
915         if (!region.getTable().isSystemTable()) {
916           userRegionSet.add(region);
917         }
918       }
919       if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
920             System.currentTimeMillis())) {
921         LOG.debug("some user regions are still in transition: " + userRegionSet);
922       }
923       LOG.debug("Bulk assigning done for " + destination);
924       return true;
925     } finally {
926       metricsAssignmentManager.updateBulkAssignTime(EnvironmentEdgeManager.currentTime() - startTime);
927     }
928   }
929
930   /**
931    * Send CLOSE RPC if the server is online, otherwise, offline the region.
932    *
933    * The RPC will be sent only to the region sever found in the region state
934    * if it is passed in, otherwise, to the src server specified. If region
935    * state is not specified, we don't update region state at all, instead
936    * we just send the RPC call. This is useful for some cleanup without
937    * messing around the region states (see handleRegion, on region opened
938    * on an unexpected server scenario, for an example)
939    */
940   private void unassign(final HRegionInfo region,
941       final ServerName server, final ServerName dest) {
942     for (int i = 1; i <= this.maximumAttempts; i++) {
943       if (this.server.isStopped() || this.server.isAborted()) {
944         LOG.debug("Server stopped/aborted; skipping unassign of " + region);
945         return;
946       }
947       if (!serverManager.isServerOnline(server)) {
948         LOG.debug("Offline " + region.getRegionNameAsString()
949           + ", no need to unassign since it's on a dead server: " + server);
950         regionStates.updateRegionState(region, State.OFFLINE);
951         return;
952       }
953       try {
954         // Send CLOSE RPC
955         if (serverManager.sendRegionClose(server, region, dest)) {
956           LOG.debug("Sent CLOSE to " + server + " for region " +
957             region.getRegionNameAsString());
958           return;
959         }
960         // This never happens. Currently regionserver close always return true.
961         // Todo; this can now happen (0.96) if there is an exception in a coprocessor
962         LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
963           region.getRegionNameAsString());
964       } catch (Throwable t) {
965         long sleepTime = 0;
966         Configuration conf = this.server.getConfiguration();
967         if (t instanceof RemoteException) {
968           t = ((RemoteException)t).unwrapRemoteException();
969         }
970         if (t instanceof RegionServerAbortedException
971             || t instanceof RegionServerStoppedException
972             || t instanceof ServerNotRunningYetException) {
973           // RS is aborting, we cannot offline the region since the region may need to do WAL
974           // recovery. Until we see  the RS expiration, we should retry.
975           sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
976             RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
977
978         } else if (t instanceof NotServingRegionException) {
979           LOG.debug("Offline " + region.getRegionNameAsString()
980             + ", it's not any more on " + server, t);
981           regionStates.updateRegionState(region, State.OFFLINE);
982           return;
983         } else if (t instanceof FailedServerException && i < maximumAttempts) {
984           // In case the server is in the failed server list, no point to
985           // retry too soon. Retry after the failed_server_expiry time
986           sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
987           RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
988           if (LOG.isDebugEnabled()) {
989             LOG.debug(server + " is on failed server list; waiting " + sleepTime + "ms", t);
990           }
991        }
992        try {
993          if (sleepTime > 0) {
994            Thread.sleep(sleepTime);
995          }
996        } catch (InterruptedException ie) {
997          LOG.warn("Interrupted unassign " + region.getRegionNameAsString(), ie);
998          Thread.currentThread().interrupt();
999          regionStates.updateRegionState(region, State.FAILED_CLOSE);
1000          return;
1001        }
1002        LOG.info("Server " + server + " returned " + t + " for "
1003          + region.getRegionNameAsString() + ", try=" + i
1004          + " of " + this.maximumAttempts, t);
1005       }
1006     }
1007     // Run out of attempts
1008     regionStates.updateRegionState(region, State.FAILED_CLOSE);
1009   }
1010
1011   /**
1012    * Set region to OFFLINE unless it is opening and forceNewPlan is false.
1013    */
1014   private RegionState forceRegionStateToOffline(
1015       final HRegionInfo region, final boolean forceNewPlan) {
1016     RegionState state = regionStates.getRegionState(region);
1017     if (state == null) {
1018       LOG.warn("Assigning but not in region states: " + region);
1019       state = regionStates.createRegionState(region);
1020     }
1021
1022     if (forceNewPlan && LOG.isDebugEnabled()) {
1023       LOG.debug("Force region state offline " + state);
1024     }
1025
1026     switch (state.getState()) {
1027     case OPEN:
1028     case OPENING:
1029     case PENDING_OPEN:
1030     case CLOSING:
1031     case PENDING_CLOSE:
1032       if (!forceNewPlan) {
1033         LOG.debug("Skip assigning " +
1034           region + ", it is already " + state);
1035         return null;
1036       }
1037     case FAILED_CLOSE:
1038     case FAILED_OPEN:
1039       regionStates.updateRegionState(region, State.PENDING_CLOSE);
1040       unassign(region, state.getServerName(), null);
1041       state = regionStates.getRegionState(region);
1042       if (!state.isOffline() && !state.isClosed()) {
1043         // If the region isn't offline, we can't re-assign
1044         // it now. It will be assigned automatically after
1045         // the regionserver reports it's closed.
1046         return null;
1047       }
1048     case OFFLINE:
1049     case CLOSED:
1050       break;
1051     default:
1052       LOG.error("Trying to assign region " + region
1053         + ", which is " + state);
1054       return null;
1055     }
1056     return state;
1057   }
1058
1059   /**
1060    * Caller must hold lock on the passed <code>state</code> object.
1061    * @param state
1062    * @param forceNewPlan
1063    */
1064   private void assign(RegionState state, boolean forceNewPlan) {
1065     long startTime = EnvironmentEdgeManager.currentTime();
1066     try {
1067       Configuration conf = server.getConfiguration();
1068       RegionPlan plan = null;
1069       long maxWaitTime = -1;
1070       HRegionInfo region = state.getRegion();
1071       Throwable previousException = null;
1072       for (int i = 1; i <= maximumAttempts; i++) {
1073         if (server.isStopped() || server.isAborted()) {
1074           LOG.info("Skip assigning " + region.getRegionNameAsString()
1075             + ", the server is stopped/aborted");
1076           return;
1077         }
1078
1079         if (plan == null) { // Get a server for the region at first
1080           try {
1081             plan = getRegionPlan(region, forceNewPlan);
1082           } catch (HBaseIOException e) {
1083             LOG.warn("Failed to get region plan", e);
1084           }
1085         }
1086
1087         if (plan == null) {
1088           LOG.warn("Unable to determine a plan to assign " + region);
1089
1090           // For meta region, we have to keep retrying until succeeding
1091           if (region.isMetaRegion()) {
1092             if (i == maximumAttempts) {
1093               i = 0; // re-set attempt count to 0 for at least 1 retry
1094
1095               LOG.warn("Unable to determine a plan to assign a hbase:meta region " + region +
1096                 " after maximumAttempts (" + this.maximumAttempts +
1097                 "). Reset attempts count and continue retrying.");
1098             }
1099             waitForRetryingMetaAssignment();
1100             continue;
1101           }
1102
1103           regionStates.updateRegionState(region, State.FAILED_OPEN);
1104           return;
1105         }
1106         LOG.info("Assigning " + region.getRegionNameAsString() +
1107             " to " + plan.getDestination());
1108         // Transition RegionState to PENDING_OPEN
1109        regionStates.updateRegionState(region,
1110           State.PENDING_OPEN, plan.getDestination());
1111
1112         boolean needNewPlan = false;
1113         final String assignMsg = "Failed assignment of " + region.getRegionNameAsString() +
1114             " to " + plan.getDestination();
1115         try {
1116           List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
1117           if (this.shouldAssignRegionsWithFavoredNodes) {
1118             favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
1119           }
1120           serverManager.sendRegionOpen(plan.getDestination(), region, favoredNodes);
1121           return; // we're done
1122         } catch (Throwable t) {
1123           if (t instanceof RemoteException) {
1124             t = ((RemoteException) t).unwrapRemoteException();
1125           }
1126           previousException = t;
1127
1128           // Should we wait a little before retrying? If the server is starting it's yes.
1129           boolean hold = (t instanceof ServerNotRunningYetException);
1130
1131           // In case socket is timed out and the region server is still online,
1132           // the openRegion RPC could have been accepted by the server and
1133           // just the response didn't go through.  So we will retry to
1134           // open the region on the same server.
1135           boolean retry = !hold && (t instanceof java.net.SocketTimeoutException
1136               && this.serverManager.isServerOnline(plan.getDestination()));
1137
1138           if (hold) {
1139             LOG.warn(assignMsg + ", waiting a little before trying on the same region server " +
1140               "try=" + i + " of " + this.maximumAttempts, t);
1141
1142             if (maxWaitTime < 0) {
1143               maxWaitTime = EnvironmentEdgeManager.currentTime()
1144                 + this.server.getConfiguration().getLong(
1145                   "hbase.regionserver.rpc.startup.waittime", 60000);
1146             }
1147             try {
1148               long now = EnvironmentEdgeManager.currentTime();
1149               if (now < maxWaitTime) {
1150                 if (LOG.isDebugEnabled()) {
1151                   LOG.debug("Server is not yet up; waiting up to "
1152                     + (maxWaitTime - now) + "ms", t);
1153                 }
1154                 Thread.sleep(100);
1155                 i--; // reset the try count
1156               } else {
1157                 LOG.debug("Server is not up for a while; try a new one", t);
1158                 needNewPlan = true;
1159               }
1160             } catch (InterruptedException ie) {
1161               LOG.warn("Failed to assign "
1162                   + region.getRegionNameAsString() + " since interrupted", ie);
1163               regionStates.updateRegionState(region, State.FAILED_OPEN);
1164               Thread.currentThread().interrupt();
1165               return;
1166             }
1167           } else if (retry) {
1168             i--; // we want to retry as many times as needed as long as the RS is not dead.
1169             if (LOG.isDebugEnabled()) {
1170               LOG.debug(assignMsg + ", trying to assign to the same region server due ", t);
1171             }
1172           } else {
1173             needNewPlan = true;
1174             LOG.warn(assignMsg + ", trying to assign elsewhere instead;" +
1175                 " try=" + i + " of " + this.maximumAttempts, t);
1176           }
1177         }
1178
1179         if (i == this.maximumAttempts) {
1180           // For meta region, we have to keep retrying until succeeding
1181           if (region.isMetaRegion()) {
1182             i = 0; // re-set attempt count to 0 for at least 1 retry
1183             LOG.warn(assignMsg +
1184                 ", trying to assign a hbase:meta region reached to maximumAttempts (" +
1185                 this.maximumAttempts + ").  Reset attempt counts and continue retrying.");
1186             waitForRetryingMetaAssignment();
1187           }
1188           else {
1189             // Don't reset the region state or get a new plan any more.
1190             // This is the last try.
1191             continue;
1192           }
1193         }
1194
1195         // If region opened on destination of present plan, reassigning to new
1196         // RS may cause double assignments. In case of RegionAlreadyInTransitionException
1197         // reassigning to same RS.
1198         if (needNewPlan) {
1199           // Force a new plan and reassign. Will return null if no servers.
1200           // The new plan could be the same as the existing plan since we don't
1201           // exclude the server of the original plan, which should not be
1202           // excluded since it could be the only server up now.
1203           RegionPlan newPlan = null;
1204           try {
1205             newPlan = getRegionPlan(region, true);
1206           } catch (HBaseIOException e) {
1207             LOG.warn("Failed to get region plan", e);
1208           }
1209           if (newPlan == null) {
1210             regionStates.updateRegionState(region, State.FAILED_OPEN);
1211             LOG.warn("Unable to find a viable location to assign region " +
1212                 region.getRegionNameAsString());
1213             return;
1214           }
1215
1216           if (plan != newPlan && !plan.getDestination().equals(newPlan.getDestination())) {
1217             // Clean out plan we failed execute and one that doesn't look like it'll
1218             // succeed anyways; we need a new plan!
1219             // Transition back to OFFLINE
1220             regionStates.updateRegionState(region, State.OFFLINE);
1221             plan = newPlan;
1222           } else if(plan.getDestination().equals(newPlan.getDestination()) &&
1223               previousException instanceof FailedServerException) {
1224             try {
1225               LOG.info("Trying to re-assign " + region.getRegionNameAsString() +
1226                 " to the same failed server.");
1227               Thread.sleep(1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1228                 RpcClient.FAILED_SERVER_EXPIRY_DEFAULT));
1229             } catch (InterruptedException ie) {
1230               LOG.warn("Failed to assign "
1231                   + region.getRegionNameAsString() + " since interrupted", ie);
1232               regionStates.updateRegionState(region, State.FAILED_OPEN);
1233               Thread.currentThread().interrupt();
1234               return;
1235             }
1236           }
1237         }
1238       }
1239       // Run out of attempts
1240       regionStates.updateRegionState(region, State.FAILED_OPEN);
1241     } finally {
1242       metricsAssignmentManager.updateAssignmentTime(EnvironmentEdgeManager.currentTime() - startTime);
1243     }
1244   }
1245
1246   private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) {
1247     if (this.tableStateManager.isTableState(region.getTable(),
1248             TableState.State.DISABLED,
1249             TableState.State.DISABLING) || replicasToClose.contains(region)) {
1250       LOG.info("Table " + region.getTable() + " is disabled or disabling;"
1251         + " skipping assign of " + region.getRegionNameAsString());
1252       offlineDisabledRegion(region);
1253       return true;
1254     }
1255     return false;
1256   }
1257
1258   /**
1259    * @param region the region to assign
1260    * @param forceNewPlan If true, then if an existing plan exists, a new plan
1261    * will be generated.
1262    * @return Plan for passed <code>region</code> (If none currently, it creates one or
1263    * if no servers to assign, it returns null).
1264    */
1265   private RegionPlan getRegionPlan(final HRegionInfo region,
1266       final boolean forceNewPlan) throws HBaseIOException {
1267     // Pickup existing plan or make a new one
1268     final String encodedName = region.getEncodedName();
1269     final List<ServerName> destServers =
1270       serverManager.createDestinationServersList();
1271
1272     if (destServers.isEmpty()){
1273       LOG.warn("Can't move " + encodedName +
1274         ", there is no destination server available.");
1275       return null;
1276     }
1277
1278     RegionPlan randomPlan = null;
1279     boolean newPlan = false;
1280     RegionPlan existingPlan;
1281
1282     synchronized (this.regionPlans) {
1283       existingPlan = this.regionPlans.get(encodedName);
1284
1285       if (existingPlan != null && existingPlan.getDestination() != null) {
1286         LOG.debug("Found an existing plan for " + region.getRegionNameAsString()
1287           + " destination server is " + existingPlan.getDestination() +
1288             " accepted as a dest server = " + destServers.contains(existingPlan.getDestination()));
1289       }
1290
1291       if (forceNewPlan
1292           || existingPlan == null
1293           || existingPlan.getDestination() == null
1294           || !destServers.contains(existingPlan.getDestination())) {
1295         newPlan = true;
1296         try {
1297           randomPlan = new RegionPlan(region, null,
1298               balancer.randomAssignment(region, destServers));
1299         } catch (IOException ex) {
1300           LOG.warn("Failed to create new plan.",ex);
1301           return null;
1302         }
1303         if (!region.isMetaTable() && shouldAssignRegionsWithFavoredNodes) {
1304           List<HRegionInfo> regions = new ArrayList<HRegionInfo>(1);
1305           regions.add(region);
1306           try {
1307             processFavoredNodes(regions);
1308           } catch (IOException ie) {
1309             LOG.warn("Ignoring exception in processFavoredNodes " + ie);
1310           }
1311         }
1312         this.regionPlans.put(encodedName, randomPlan);
1313       }
1314     }
1315
1316     if (newPlan) {
1317       if (randomPlan.getDestination() == null) {
1318         LOG.warn("Can't find a destination for " + encodedName);
1319         return null;
1320       }
1321       if (LOG.isDebugEnabled()) {
1322         LOG.debug("No previous transition plan found (or ignoring " +
1323           "an existing plan) for " + region.getRegionNameAsString() +
1324           "; generated random plan=" + randomPlan + "; " + destServers.size() +
1325           " (online=" + serverManager.getOnlineServers().size() +
1326           ") available servers, forceNewPlan=" + forceNewPlan);
1327       }
1328       return randomPlan;
1329     }
1330     if (LOG.isDebugEnabled()) {
1331       LOG.debug("Using pre-existing plan for " +
1332         region.getRegionNameAsString() + "; plan=" + existingPlan);
1333     }
1334     return existingPlan;
1335   }
1336
1337   /**
1338    * Wait for some time before retrying meta table region assignment
1339    */
1340   private void waitForRetryingMetaAssignment() {
1341     try {
1342       Thread.sleep(this.sleepTimeBeforeRetryingMetaAssignment);
1343     } catch (InterruptedException e) {
1344       LOG.error("Got exception while waiting for hbase:meta assignment");
1345       Thread.currentThread().interrupt();
1346     }
1347   }
1348
1349   /**
1350    * Unassigns the specified region.
1351    * <p>
1352    * Updates the RegionState and sends the CLOSE RPC unless region is being
1353    * split by regionserver; then the unassign fails (silently) because we
1354    * presume the region being unassigned no longer exists (its been split out
1355    * of existence). TODO: What to do if split fails and is rolled back and
1356    * parent is revivified?
1357    * <p>
1358    * If a RegionPlan is already set, it will remain.
1359    *
1360    * @param region server to be unassigned
1361    */
1362   public void unassign(HRegionInfo region) {
1363     unassign(region, null);
1364   }
1365
1366
1367   /**
1368    * Unassigns the specified region.
1369    * <p>
1370    * Updates the RegionState and sends the CLOSE RPC unless region is being
1371    * split by regionserver; then the unassign fails (silently) because we
1372    * presume the region being unassigned no longer exists (its been split out
1373    * of existence). TODO: What to do if split fails and is rolled back and
1374    * parent is revivified?
1375    * <p>
1376    * If a RegionPlan is already set, it will remain.
1377    *
1378    * @param region server to be unassigned
1379    * @param dest the destination server of the region
1380    */
1381   public void unassign(HRegionInfo region, ServerName dest) {
1382     // TODO: Method needs refactoring.  Ugly buried returns throughout.  Beware!
1383     LOG.debug("Starting unassign of " + region.getRegionNameAsString()
1384       + " (offlining), current state: " + regionStates.getRegionState(region));
1385
1386     String encodedName = region.getEncodedName();
1387     // Grab the state of this region and synchronize on it
1388     // We need a lock here as we're going to do a put later and we don't want multiple states
1389     //  creation
1390     ReentrantLock lock = locker.acquireLock(encodedName);
1391     RegionState state = regionStates.getRegionTransitionState(encodedName);
1392     try {
1393       if (state == null || state.isFailedClose()) {
1394         if (state == null) {
1395           // Region is not in transition.
1396           // We can unassign it only if it's not SPLIT/MERGED.
1397           state = regionStates.getRegionState(encodedName);
1398           if (state != null && state.isUnassignable()) {
1399             LOG.info("Attempting to unassign " + state + ", ignored");
1400             // Offline region will be reassigned below
1401             return;
1402           }
1403           if (state == null || state.getServerName() == null) {
1404             // We don't know where the region is, offline it.
1405             // No need to send CLOSE RPC
1406             LOG.warn("Attempting to unassign a region not in RegionStates "
1407               + region.getRegionNameAsString() + ", offlined");
1408             regionOffline(region);
1409             return;
1410           }
1411         }
1412         state = regionStates.updateRegionState(
1413           region, State.PENDING_CLOSE);
1414       } else if (state.isFailedOpen()) {
1415         // The region is not open yet
1416         regionOffline(region);
1417         return;
1418       } else {
1419         LOG.debug("Attempting to unassign " +
1420           region.getRegionNameAsString() + " but it is " +
1421           "already in transition (" + state.getState());
1422         return;
1423       }
1424
1425       unassign(region, state.getServerName(), dest);
1426     } finally {
1427       lock.unlock();
1428
1429       // Region is expected to be reassigned afterwards
1430       if (!replicasToClose.contains(region)
1431           && regionStates.isRegionInState(region, State.OFFLINE)) {
1432         assign(region);
1433       }
1434     }
1435   }
1436
1437   /**
1438    * Used by unit tests. Return the number of regions opened so far in the life
1439    * of the master. Increases by one every time the master opens a region
1440    * @return the counter value of the number of regions opened so far
1441    */
1442   public int getNumRegionsOpened() {
1443     return numRegionsOpened.get();
1444   }
1445
1446   /**
1447    * Waits until the specified region has completed assignment.
1448    * <p>
1449    * If the region is already assigned, returns immediately.  Otherwise, method
1450    * blocks until the region is assigned.
1451    * @param regionInfo region to wait on assignment for
1452    * @return true if the region is assigned false otherwise.
1453    * @throws InterruptedException
1454    */
1455   public boolean waitForAssignment(HRegionInfo regionInfo)
1456       throws InterruptedException {
1457     ArrayList<HRegionInfo> regionSet = new ArrayList<HRegionInfo>(1);
1458     regionSet.add(regionInfo);
1459     return waitForAssignment(regionSet, true, Long.MAX_VALUE);
1460   }
1461
1462   /**
1463    * Waits until the specified region has completed assignment, or the deadline is reached.
1464    */
1465   protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
1466       final boolean waitTillAllAssigned, final int reassigningRegions,
1467       final long minEndTime) throws InterruptedException {
1468     long deadline = minEndTime + bulkPerRegionOpenTimeGuesstimate * (reassigningRegions + 1);
1469     if (deadline < 0) { // Overflow
1470       deadline = Long.MAX_VALUE; // wait forever
1471     }
1472     return waitForAssignment(regionSet, waitTillAllAssigned, deadline);
1473   }
1474
1475   /**
1476    * Waits until the specified region has completed assignment, or the deadline is reached.
1477    * @param regionSet set of region to wait on. the set is modified and the assigned regions removed
1478    * @param waitTillAllAssigned true if we should wait all the regions to be assigned
1479    * @param deadline the timestamp after which the wait is aborted
1480    * @return true if all the regions are assigned false otherwise.
1481    * @throws InterruptedException
1482    */
1483   protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
1484       final boolean waitTillAllAssigned, final long deadline) throws InterruptedException {
1485     // We're not synchronizing on regionsInTransition now because we don't use any iterator.
1486     while (!regionSet.isEmpty() && !server.isStopped() && deadline > System.currentTimeMillis()) {
1487       int failedOpenCount = 0;
1488       Iterator<HRegionInfo> regionInfoIterator = regionSet.iterator();
1489       while (regionInfoIterator.hasNext()) {
1490         HRegionInfo hri = regionInfoIterator.next();
1491         if (regionStates.isRegionOnline(hri) || regionStates.isRegionInState(hri,
1492             State.SPLITTING, State.SPLIT, State.MERGING, State.MERGED)) {
1493           regionInfoIterator.remove();
1494         } else if (regionStates.isRegionInState(hri, State.FAILED_OPEN)) {
1495           failedOpenCount++;
1496         }
1497       }
1498       if (!waitTillAllAssigned) {
1499         // No need to wait, let assignment going on asynchronously
1500         break;
1501       }
1502       if (!regionSet.isEmpty()) {
1503         if (failedOpenCount == regionSet.size()) {
1504           // all the regions we are waiting had an error on open.
1505           break;
1506         }
1507         regionStates.waitForUpdate(100);
1508       }
1509     }
1510     return regionSet.isEmpty();
1511   }
1512
1513   /**
1514    * Assigns the hbase:meta region or a replica.
1515    * <p>
1516    * Assumes that hbase:meta is currently closed and is not being actively served by
1517    * any RegionServer.
1518    * @param hri TODO
1519    */
1520   public void assignMeta(HRegionInfo hri) throws KeeperException {
1521     regionStates.updateRegionState(hri, State.OFFLINE);
1522     assign(hri);
1523   }
1524
1525   /**
1526    * Assigns specified regions retaining assignments, if any.
1527    * <p>
1528    * This is a synchronous call and will return once every region has been
1529    * assigned.  If anything fails, an exception is thrown
1530    * @throws InterruptedException
1531    * @throws IOException
1532    */
1533   public void assign(Map<HRegionInfo, ServerName> regions)
1534         throws IOException, InterruptedException {
1535     if (regions == null || regions.isEmpty()) {
1536       return;
1537     }
1538     List<ServerName> servers = serverManager.createDestinationServersList();
1539     if (servers == null || servers.isEmpty()) {
1540       throw new IOException("Found no destination server to assign region(s)");
1541     }
1542
1543     // Reuse existing assignment info
1544     Map<ServerName, List<HRegionInfo>> bulkPlan =
1545       balancer.retainAssignment(regions, servers);
1546     if (bulkPlan == null) {
1547       throw new IOException("Unable to determine a plan to assign region(s)");
1548     }
1549
1550     processBogusAssignments(bulkPlan);
1551
1552     assign(regions.size(), servers.size(),
1553       "retainAssignment=true", bulkPlan);
1554   }
1555
1556   /**
1557    * Assigns specified regions round robin, if any.
1558    * <p>
1559    * This is a synchronous call and will return once every region has been
1560    * assigned.  If anything fails, an exception is thrown
1561    * @throws InterruptedException
1562    * @throws IOException
1563    */
1564   public void assign(List<HRegionInfo> regions)
1565         throws IOException, InterruptedException {
1566     if (regions == null || regions.isEmpty()) {
1567       return;
1568     }
1569
1570     List<ServerName> servers = serverManager.createDestinationServersList();
1571     if (servers == null || servers.isEmpty()) {
1572       throw new IOException("Found no destination server to assign region(s)");
1573     }
1574
1575     // Generate a round-robin bulk assignment plan
1576     Map<ServerName, List<HRegionInfo>> bulkPlan = balancer.roundRobinAssignment(regions, servers);
1577     if (bulkPlan == null) {
1578       throw new IOException("Unable to determine a plan to assign region(s)");
1579     }
1580
1581     processBogusAssignments(bulkPlan);
1582
1583     processFavoredNodes(regions);
1584     assign(regions.size(), servers.size(), "round-robin=true", bulkPlan);
1585   }
1586
1587   private void assign(int regions, int totalServers,
1588       String message, Map<ServerName, List<HRegionInfo>> bulkPlan)
1589           throws InterruptedException, IOException {
1590
1591     int servers = bulkPlan.size();
1592     if (servers == 1 || (regions < bulkAssignThresholdRegions
1593         && servers < bulkAssignThresholdServers)) {
1594
1595       // Not use bulk assignment.  This could be more efficient in small
1596       // cluster, especially mini cluster for testing, so that tests won't time out
1597       if (LOG.isTraceEnabled()) {
1598         LOG.trace("Not using bulk assignment since we are assigning only " + regions +
1599           " region(s) to " + servers + " server(s)");
1600       }
1601
1602       // invoke assignment (async)
1603       ArrayList<HRegionInfo> userRegionSet = new ArrayList<HRegionInfo>(regions);
1604       for (Map.Entry<ServerName, List<HRegionInfo>> plan: bulkPlan.entrySet()) {
1605         if (!assign(plan.getKey(), plan.getValue()) && !server.isStopped()) {
1606           for (HRegionInfo region: plan.getValue()) {
1607             if (!regionStates.isRegionOnline(region)) {
1608               invokeAssign(region);
1609               if (!region.getTable().isSystemTable()) {
1610                 userRegionSet.add(region);
1611               }
1612             }
1613           }
1614         }
1615       }
1616
1617       // wait for assignment completion
1618       if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
1619             System.currentTimeMillis())) {
1620         LOG.debug("some user regions are still in transition: " + userRegionSet);
1621       }
1622     } else {
1623       LOG.info("Bulk assigning " + regions + " region(s) across "
1624         + totalServers + " server(s), " + message);
1625
1626       // Use fixed count thread pool assigning.
1627       BulkAssigner ba = new GeneralBulkAssigner(
1628         this.server, bulkPlan, this, bulkAssignWaitTillAllAssigned);
1629       ba.bulkAssign();
1630       LOG.info("Bulk assigning done");
1631     }
1632   }
1633
1634   /**
1635    * Assigns all user regions, if any exist.  Used during cluster startup.
1636    * <p>
1637    * This is a synchronous call and will return once every region has been
1638    * assigned.  If anything fails, an exception is thrown and the cluster
1639    * should be shutdown.
1640    * @throws InterruptedException
1641    * @throws IOException
1642    */
1643   private void assignAllUserRegions(Map<HRegionInfo, ServerName> allRegions)
1644       throws IOException, InterruptedException {
1645     if (allRegions == null || allRegions.isEmpty()) return;
1646
1647     // Determine what type of assignment to do on startup
1648     boolean retainAssignment = server.getConfiguration().
1649       getBoolean("hbase.master.startup.retainassign", true);
1650
1651     Set<HRegionInfo> regionsFromMetaScan = allRegions.keySet();
1652     if (retainAssignment) {
1653       assign(allRegions);
1654     } else {
1655       List<HRegionInfo> regions = new ArrayList<HRegionInfo>(regionsFromMetaScan);
1656       assign(regions);
1657     }
1658
1659     for (HRegionInfo hri : regionsFromMetaScan) {
1660       TableName tableName = hri.getTable();
1661       if (!tableStateManager.isTableState(tableName,
1662               TableState.State.ENABLED)) {
1663         setEnabledTable(tableName);
1664       }
1665     }
1666     // assign all the replicas that were not recorded in the meta
1667     assign(replicaRegionsNotRecordedInMeta(regionsFromMetaScan, (MasterServices)server));
1668   }
1669
1670   /**
1671    * Get number of replicas of a table
1672    */
1673   private static int getNumReplicas(MasterServices master, TableName table) {
1674     int numReplica = 1;
1675     try {
1676       HTableDescriptor htd = master.getTableDescriptors().get(table);
1677       if (htd == null) {
1678         LOG.warn("master can not get TableDescriptor from table '" + table);
1679       } else {
1680         numReplica = htd.getRegionReplication();
1681       }
1682     } catch (IOException e){
1683       LOG.warn("Couldn't get the replication attribute of the table " + table + " due to "
1684           + e.getMessage());
1685     }
1686     return numReplica;
1687   }
1688
1689   /**
1690    * Get a list of replica regions that are:
1691    * not recorded in meta yet. We might not have recorded the locations
1692    * for the replicas since the replicas may not have been online yet, master restarted
1693    * in the middle of assigning, ZK erased, etc.
1694    * @param regionsRecordedInMeta the list of regions we know are recorded in meta
1695    * either as a default, or, as the location of a replica
1696    * @param master
1697    * @return list of replica regions
1698    * @throws IOException
1699    */
1700   public static List<HRegionInfo> replicaRegionsNotRecordedInMeta(
1701       Set<HRegionInfo> regionsRecordedInMeta, MasterServices master)throws IOException {
1702     List<HRegionInfo> regionsNotRecordedInMeta = new ArrayList<HRegionInfo>();
1703     for (HRegionInfo hri : regionsRecordedInMeta) {
1704       TableName table = hri.getTable();
1705       if(master.getTableDescriptors().get(table) == null)
1706         continue;
1707       int  desiredRegionReplication = getNumReplicas(master, table);
1708       for (int i = 0; i < desiredRegionReplication; i++) {
1709         HRegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hri, i);
1710         if (regionsRecordedInMeta.contains(replica)) continue;
1711         regionsNotRecordedInMeta.add(replica);
1712       }
1713     }
1714     return regionsNotRecordedInMeta;
1715   }
1716
1717   /**
1718    * Rebuild the list of user regions and assignment information.
1719    * Updates regionstates with findings as we go through list of regions.
1720    * @return set of servers not online that hosted some regions according to a scan of hbase:meta
1721    * @throws IOException
1722    */
1723   Set<ServerName> rebuildUserRegions() throws
1724           IOException, KeeperException {
1725     Set<TableName> disabledOrEnablingTables = tableStateManager.getTablesInStates(
1726             TableState.State.DISABLED, TableState.State.ENABLING);
1727
1728     Set<TableName> disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
1729             TableState.State.DISABLED,
1730             TableState.State.DISABLING,
1731             TableState.State.ENABLING);
1732
1733     // Region assignment from META
1734     List<Result> results = MetaTableAccessor.fullScanRegions(server.getConnection());
1735     // Get any new but slow to checkin region server that joined the cluster
1736     Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
1737     // Set of offline servers to be returned
1738     Set<ServerName> offlineServers = new HashSet<ServerName>();
1739     // Iterate regions in META
1740     for (Result result : results) {
1741       if (result == null && LOG.isDebugEnabled()){
1742         LOG.debug("null result from meta - ignoring but this is strange.");
1743         continue;
1744       }
1745       // keep a track of replicas to close. These were the replicas of the originally
1746       // unmerged regions. The master might have closed them before but it mightn't
1747       // maybe because it crashed.
1748       PairOfSameType<HRegionInfo> p = MetaTableAccessor.getMergeRegions(result);
1749       if (p.getFirst() != null && p.getSecond() != null) {
1750         int numReplicas = getNumReplicas(server, p.getFirst().getTable());
1751         for (HRegionInfo merge : p) {
1752           for (int i = 1; i < numReplicas; i++) {
1753             replicasToClose.add(RegionReplicaUtil.getRegionInfoForReplica(merge, i));
1754           }
1755         }
1756       }
1757       RegionLocations rl =  MetaTableAccessor.getRegionLocations(result);
1758       if (rl == null) {
1759         continue;
1760       }
1761       HRegionLocation[] locations = rl.getRegionLocations();
1762       if (locations == null) {
1763         continue;
1764       }
1765       for (HRegionLocation hrl : locations) {
1766         if (hrl == null) continue;
1767         HRegionInfo regionInfo = hrl.getRegionInfo();
1768         if (regionInfo == null) continue;
1769         int replicaId = regionInfo.getReplicaId();
1770         State state = RegionStateStore.getRegionState(result, replicaId);
1771         // keep a track of replicas to close. These were the replicas of the split parents
1772         // from the previous life of the master. The master should have closed them before
1773         // but it couldn't maybe because it crashed
1774         if (replicaId == 0 && state.equals(State.SPLIT)) {
1775           for (HRegionLocation h : locations) {
1776             replicasToClose.add(h.getRegionInfo());
1777           }
1778         }
1779         ServerName lastHost = hrl.getServerName();
1780         ServerName regionLocation = RegionStateStore.getRegionServer(result, replicaId);
1781         regionStates.createRegionState(regionInfo, state, regionLocation, lastHost);
1782         if (!regionStates.isRegionInState(regionInfo, State.OPEN)) {
1783           // Region is not open (either offline or in transition), skip
1784           continue;
1785         }
1786         TableName tableName = regionInfo.getTable();
1787         if (!onlineServers.contains(regionLocation)) {
1788           // Region is located on a server that isn't online
1789           offlineServers.add(regionLocation);
1790         } else if (!disabledOrEnablingTables.contains(tableName)) {
1791           // Region is being served and on an active server
1792           // add only if region not in disabled or enabling table
1793           regionStates.regionOnline(regionInfo, regionLocation);
1794           balancer.regionOnline(regionInfo, regionLocation);
1795         }
1796         // need to enable the table if not disabled or disabling or enabling
1797         // this will be used in rolling restarts
1798         if (!disabledOrDisablingOrEnabling.contains(tableName)
1799           && !getTableStateManager().isTableState(tableName,
1800                 TableState.State.ENABLED)) {
1801           setEnabledTable(tableName);
1802         }
1803       }
1804     }
1805     return offlineServers;
1806   }
1807
1808   /**
1809    * Processes list of regions in transition at startup
1810    */
1811   void processRegionsInTransition(Collection<RegionState> regionsInTransition) {
1812     // We need to send RPC call again for PENDING_OPEN/PENDING_CLOSE regions
1813     // in case the RPC call is not sent out yet before the master was shut down
1814     // since we update the state before we send the RPC call. We can't update
1815     // the state after the RPC call. Otherwise, we don't know what's happened
1816     // to the region if the master dies right after the RPC call is out.
1817     for (RegionState regionState: regionsInTransition) {
1818       LOG.info("Processing " + regionState);
1819       ServerName serverName = regionState.getServerName();
1820       // Server could be null in case of FAILED_OPEN when master cannot find a region plan. In that
1821       // case, try assigning it here.
1822       if (serverName != null && !serverManager.getOnlineServers().containsKey(serverName)) {
1823         LOG.info("Server " + serverName + " isn't online. SSH will handle this");
1824         continue; // SSH will handle it
1825       }
1826       HRegionInfo regionInfo = regionState.getRegion();
1827       RegionState.State state = regionState.getState();
1828       switch (state) {
1829       case CLOSED:
1830         invokeAssign(regionState.getRegion());
1831         break;
1832       case PENDING_OPEN:
1833         retrySendRegionOpen(regionState);
1834         break;
1835       case PENDING_CLOSE:
1836         retrySendRegionClose(regionState);
1837         break;
1838       case FAILED_CLOSE:
1839       case FAILED_OPEN:
1840         invokeUnAssign(regionInfo);
1841         break;
1842       default:
1843           // No process for other states
1844           break;
1845       }
1846     }
1847   }
1848
1849   /**
1850    * At master failover, for pending_open region, make sure
1851    * sendRegionOpen RPC call is sent to the target regionserver
1852    */
1853   private void retrySendRegionOpen(final RegionState regionState) {
1854     this.executorService.submit(
1855       new EventHandler(server, EventType.M_MASTER_RECOVERY) {
1856         @Override
1857         public void process() throws IOException {
1858           HRegionInfo hri = regionState.getRegion();
1859           ServerName serverName = regionState.getServerName();
1860           ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
1861           try {
1862             for (int i = 1; i <= maximumAttempts; i++) {
1863               if (!serverManager.isServerOnline(serverName)
1864                   || server.isStopped() || server.isAborted()) {
1865                 return; // No need any more
1866               }
1867               try {
1868                 if (!regionState.equals(regionStates.getRegionState(hri))) {
1869                   return; // Region is not in the expected state any more
1870                 }
1871                 List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
1872                 if (shouldAssignRegionsWithFavoredNodes) {
1873                   favoredNodes = ((FavoredNodeLoadBalancer)balancer).getFavoredNodes(hri);
1874                 }
1875                 serverManager.sendRegionOpen(serverName, hri, favoredNodes);
1876                 return; // we're done
1877               } catch (Throwable t) {
1878                 if (t instanceof RemoteException) {
1879                   t = ((RemoteException) t).unwrapRemoteException();
1880                 }
1881                 if (t instanceof FailedServerException && i < maximumAttempts) {
1882                   // In case the server is in the failed server list, no point to
1883                   // retry too soon. Retry after the failed_server_expiry time
1884                   try {
1885                     Configuration conf = this.server.getConfiguration();
1886                     long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1887                       RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
1888                     if (LOG.isDebugEnabled()) {
1889                       LOG.debug(serverName + " is on failed server list; waiting "
1890                         + sleepTime + "ms", t);
1891                     }
1892                     Thread.sleep(sleepTime);
1893                     continue;
1894                   } catch (InterruptedException ie) {
1895                     LOG.warn("Failed to assign "
1896                       + hri.getRegionNameAsString() + " since interrupted", ie);
1897                     regionStates.updateRegionState(hri, State.FAILED_OPEN);
1898                     Thread.currentThread().interrupt();
1899                     return;
1900                   }
1901                 }
1902                 if (serverManager.isServerOnline(serverName)
1903                     && t instanceof java.net.SocketTimeoutException) {
1904                   i--; // reset the try count
1905                 } else {
1906                   LOG.info("Got exception in retrying sendRegionOpen for "
1907                     + regionState + "; try=" + i + " of " + maximumAttempts, t);
1908                 }
1909                 Threads.sleep(100);
1910               }
1911             }
1912             // Run out of attempts
1913             regionStates.updateRegionState(hri, State.FAILED_OPEN);
1914           } finally {
1915             lock.unlock();
1916           }
1917         }
1918       });
1919   }
1920
1921   /**
1922    * At master failover, for pending_close region, make sure
1923    * sendRegionClose RPC call is sent to the target regionserver
1924    */
1925   private void retrySendRegionClose(final RegionState regionState) {
1926     this.executorService.submit(
1927       new EventHandler(server, EventType.M_MASTER_RECOVERY) {
1928         @Override
1929         public void process() throws IOException {
1930           HRegionInfo hri = regionState.getRegion();
1931           ServerName serverName = regionState.getServerName();
1932           ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
1933           try {
1934             for (int i = 1; i <= maximumAttempts; i++) {
1935               if (!serverManager.isServerOnline(serverName)
1936                   || server.isStopped() || server.isAborted()) {
1937                 return; // No need any more
1938               }
1939               try {
1940                 if (!regionState.equals(regionStates.getRegionState(hri))) {
1941                   return; // Region is not in the expected state any more
1942                 }
1943                 serverManager.sendRegionClose(serverName, hri, null);
1944                 return; // Done.
1945               } catch (Throwable t) {
1946                 if (t instanceof RemoteException) {
1947                   t = ((RemoteException) t).unwrapRemoteException();
1948                 }
1949                 if (t instanceof FailedServerException && i < maximumAttempts) {
1950                   // In case the server is in the failed server list, no point to
1951                   // retry too soon. Retry after the failed_server_expiry time
1952                   try {
1953                     Configuration conf = this.server.getConfiguration();
1954                     long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1955                       RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
1956                     if (LOG.isDebugEnabled()) {
1957                       LOG.debug(serverName + " is on failed server list; waiting "
1958                         + sleepTime + "ms", t);
1959                     }
1960                     Thread.sleep(sleepTime);
1961                     continue;
1962                   } catch (InterruptedException ie) {
1963                     LOG.warn("Failed to unassign "
1964                       + hri.getRegionNameAsString() + " since interrupted", ie);
1965                     regionStates.updateRegionState(hri, RegionState.State.FAILED_CLOSE);
1966                     Thread.currentThread().interrupt();
1967                     return;
1968                   }
1969                 }
1970                 if (serverManager.isServerOnline(serverName)
1971                     && t instanceof java.net.SocketTimeoutException) {
1972                   i--; // reset the try count
1973                 } else {
1974                   LOG.info("Got exception in retrying sendRegionClose for "
1975                     + regionState + "; try=" + i + " of " + maximumAttempts, t);
1976                 }
1977                 Threads.sleep(100);
1978               }
1979             }
1980             // Run out of attempts
1981             regionStates.updateRegionState(hri, State.FAILED_CLOSE);
1982           } finally {
1983             lock.unlock();
1984           }
1985         }
1986       });
1987   }
1988
1989   /**
1990    * Set Regions in transitions metrics.
1991    * This takes an iterator on the RegionInTransition map (CLSM), and is not synchronized.
1992    * This iterator is not fail fast, which may lead to stale read; but that's better than
1993    * creating a copy of the map for metrics computation, as this method will be invoked
1994    * on a frequent interval.
1995    */
1996   public void updateRegionsInTransitionMetrics() {
1997     long currentTime = System.currentTimeMillis();
1998     int totalRITs = 0;
1999     int totalRITsOverThreshold = 0;
2000     long oldestRITTime = 0;
2001     int ritThreshold = this.server.getConfiguration().
2002       getInt(HConstants.METRICS_RIT_STUCK_WARNING_THRESHOLD, 60000);
2003     for (RegionState state: regionStates.getRegionsInTransition()) {
2004       totalRITs++;
2005       long ritTime = currentTime - state.getStamp();
2006       if (ritTime > ritThreshold) { // more than the threshold
2007         totalRITsOverThreshold++;
2008       }
2009       if (oldestRITTime < ritTime) {
2010         oldestRITTime = ritTime;
2011       }
2012     }
2013     if (this.metricsAssignmentManager != null) {
2014       this.metricsAssignmentManager.updateRITOldestAge(oldestRITTime);
2015       this.metricsAssignmentManager.updateRITCount(totalRITs);
2016       this.metricsAssignmentManager.updateRITCountOverThreshold(totalRITsOverThreshold);
2017     }
2018   }
2019
2020   /**
2021    * @param region Region whose plan we are to clear.
2022    */
2023   private void clearRegionPlan(final HRegionInfo region) {
2024     synchronized (this.regionPlans) {
2025       this.regionPlans.remove(region.getEncodedName());
2026     }
2027   }
2028
2029   /**
2030    * Wait on region to clear regions-in-transition.
2031    * @param hri Region to wait on.
2032    * @throws IOException
2033    */
2034   public void waitOnRegionToClearRegionsInTransition(final HRegionInfo hri)
2035       throws IOException, InterruptedException {
2036     waitOnRegionToClearRegionsInTransition(hri, -1L);
2037   }
2038
2039   /**
2040    * Wait on region to clear regions-in-transition or time out
2041    * @param hri
2042    * @param timeOut Milliseconds to wait for current region to be out of transition state.
2043    * @return True when a region clears regions-in-transition before timeout otherwise false
2044    * @throws InterruptedException
2045    */
2046   public boolean waitOnRegionToClearRegionsInTransition(final HRegionInfo hri, long timeOut)
2047       throws InterruptedException {
2048     if (!regionStates.isRegionInTransition(hri)) {
2049       return true;
2050     }
2051     long end = (timeOut <= 0) ? Long.MAX_VALUE : EnvironmentEdgeManager.currentTime()
2052         + timeOut;
2053     // There is already a timeout monitor on regions in transition so I
2054     // should not have to have one here too?
2055     LOG.info("Waiting for " + hri.getEncodedName() +
2056         " to leave regions-in-transition, timeOut=" + timeOut + " ms.");
2057     while (!this.server.isStopped() && regionStates.isRegionInTransition(hri)) {
2058       regionStates.waitForUpdate(100);
2059       if (EnvironmentEdgeManager.currentTime() > end) {
2060         LOG.info("Timed out on waiting for " + hri.getEncodedName() + " to be assigned.");
2061         return false;
2062       }
2063     }
2064     if (this.server.isStopped()) {
2065       LOG.info("Giving up wait on regions in transition because stoppable.isStopped is set");
2066       return false;
2067     }
2068     return true;
2069   }
2070
2071   void invokeAssign(HRegionInfo regionInfo) {
2072     threadPoolExecutorService.submit(new AssignCallable(this, regionInfo));
2073   }
2074
2075   void invokeAssignLater(HRegionInfo regionInfo, long sleepMillis) {
2076     scheduledThreadPoolExecutor.schedule(new DelayedAssignCallable(
2077         new AssignCallable(this, regionInfo)), sleepMillis, TimeUnit.MILLISECONDS);
2078   }
2079
2080   void invokeUnAssign(HRegionInfo regionInfo) {
2081     threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
2082   }
2083
2084   public boolean isCarryingMeta(ServerName serverName) {
2085     return isCarryingRegion(serverName, HRegionInfo.FIRST_META_REGIONINFO);
2086   }
2087
2088   public boolean isCarryingMetaReplica(ServerName serverName, int replicaId) {
2089     return isCarryingRegion(serverName,
2090         RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO, replicaId));
2091   }
2092
2093   public boolean isCarryingMetaReplica(ServerName serverName, HRegionInfo metaHri) {
2094     return isCarryingRegion(serverName, metaHri);
2095   }
2096
2097   /**
2098    * Check if the shutdown server carries the specific region.
2099    * @return whether the serverName currently hosts the region
2100    */
2101   private boolean isCarryingRegion(ServerName serverName, HRegionInfo hri) {
2102     RegionState regionState = regionStates.getRegionTransitionState(hri);
2103     ServerName transitionAddr = regionState != null? regionState.getServerName(): null;
2104     if (transitionAddr != null) {
2105       boolean matchTransitionAddr = transitionAddr.equals(serverName);
2106       LOG.debug("Checking region=" + hri.getRegionNameAsString()
2107         + ", transitioning on server=" + matchTransitionAddr
2108         + " server being checked: " + serverName
2109         + ", matches=" + matchTransitionAddr);
2110       return matchTransitionAddr;
2111     }
2112
2113     ServerName assignedAddr = regionStates.getRegionServerOfRegion(hri);
2114     boolean matchAssignedAddr = serverName.equals(assignedAddr);
2115     LOG.debug("based on AM, current region=" + hri.getRegionNameAsString()
2116       + " is on server=" + assignedAddr + ", server being checked: "
2117       + serverName);
2118     return matchAssignedAddr;
2119   }
2120
2121   /**
2122    * Clean out crashed server removing any assignments.
2123    * @param sn Server that went down.
2124    * @return list of regions in transition on this server
2125    */
2126   public List<HRegionInfo> cleanOutCrashedServerReferences(final ServerName sn) {
2127     // Clean out any existing assignment plans for this server
2128     synchronized (this.regionPlans) {
2129       for (Iterator <Map.Entry<String, RegionPlan>> i = this.regionPlans.entrySet().iterator();
2130           i.hasNext();) {
2131         Map.Entry<String, RegionPlan> e = i.next();
2132         ServerName otherSn = e.getValue().getDestination();
2133         // The name will be null if the region is planned for a random assign.
2134         if (otherSn != null && otherSn.equals(sn)) {
2135           // Use iterator's remove else we'll get CME
2136           i.remove();
2137         }
2138       }
2139     }
2140     List<HRegionInfo> rits = regionStates.serverOffline(sn);
2141     for (Iterator<HRegionInfo> it = rits.iterator(); it.hasNext(); ) {
2142       HRegionInfo hri = it.next();
2143       String encodedName = hri.getEncodedName();
2144
2145       // We need a lock on the region as we could update it
2146       Lock lock = locker.acquireLock(encodedName);
2147       try {
2148         RegionState regionState = regionStates.getRegionTransitionState(encodedName);
2149         if (regionState == null
2150             || (regionState.getServerName() != null && !regionState.isOnServer(sn))
2151             || !RegionStates.isOneOfStates(regionState, State.PENDING_OPEN,
2152                 State.OPENING, State.FAILED_OPEN, State.FAILED_CLOSE, State.OFFLINE)) {
2153           LOG.info("Skip " + regionState + " since it is not opening/failed_close"
2154             + " on the dead server any more: " + sn);
2155           it.remove();
2156         } else {
2157           if (tableStateManager.isTableState(hri.getTable(),
2158                   TableState.State.DISABLED, TableState.State.DISABLING)) {
2159             regionStates.regionOffline(hri);
2160             it.remove();
2161             continue;
2162           }
2163           // Mark the region offline and assign it again by SSH
2164           regionStates.updateRegionState(hri, State.OFFLINE);
2165         }
2166       } finally {
2167         lock.unlock();
2168       }
2169     }
2170     return rits;
2171   }
2172
2173   /**
2174    * @param plan Plan to execute.
2175    */
2176   public void balance(final RegionPlan plan) {
2177
2178     HRegionInfo hri = plan.getRegionInfo();
2179     TableName tableName = hri.getTable();
2180     if (tableStateManager.isTableState(tableName,
2181             TableState.State.DISABLED, TableState.State.DISABLING)) {
2182       LOG.info("Ignored moving region of disabling/disabled table "
2183         + tableName);
2184       return;
2185     }
2186
2187     // Move the region only if it's assigned
2188     String encodedName = hri.getEncodedName();
2189     ReentrantLock lock = locker.acquireLock(encodedName);
2190     try {
2191       if (!regionStates.isRegionOnline(hri)) {
2192         RegionState state = regionStates.getRegionState(encodedName);
2193         LOG.info("Ignored moving region not assigned: " + hri + ", "
2194           + (state == null ? "not in region states" : state));
2195         return;
2196       }
2197       synchronized (this.regionPlans) {
2198         this.regionPlans.put(plan.getRegionName(), plan);
2199       }
2200       unassign(hri, plan.getDestination());
2201     } finally {
2202       lock.unlock();
2203     }
2204   }
2205
2206   public void stop() {
2207     // Shutdown the threadpool executor service
2208     threadPoolExecutorService.shutdownNow();
2209     regionStateStore.stop();
2210   }
2211
2212   protected void setEnabledTable(TableName tableName) {
2213     try {
2214       this.tableStateManager.setTableState(tableName,
2215               TableState.State.ENABLED);
2216     } catch (IOException e) {
2217       // here we can abort as it is the start up flow
2218       String errorMsg = "Unable to ensure that the table " + tableName
2219           + " will be" + " enabled because of a ZooKeeper issue";
2220       LOG.error(errorMsg);
2221       this.server.abort(errorMsg, e);
2222     }
2223   }
2224
2225   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
2226       value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
2227       justification="Worth fixing but not the end of the world.")
2228   private String onRegionFailedOpen(final RegionState current,
2229       final HRegionInfo hri, final ServerName serverName) {
2230     // The region must be opening on this server.
2231     // If current state is failed_open on the same server,
2232     // it could be a reportRegionTransition RPC retry.
2233     if (current == null || !current.isOpeningOrFailedOpenOnServer(serverName)) {
2234       return hri.getShortNameToLog() + " is not opening on " + serverName;
2235     }
2236
2237     // Just return in case of retrying
2238     if (current.isFailedOpen()) {
2239       return null;
2240     }
2241
2242     String encodedName = hri.getEncodedName();
2243     // FindBugs: AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION Worth fixing!!!
2244     AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
2245     if (failedOpenCount == null) {
2246       failedOpenCount = new AtomicInteger();
2247       // No need to use putIfAbsent, or extra synchronization since
2248       // this whole handleRegion block is locked on the encoded region
2249       // name, and failedOpenTracker is updated only in this block
2250       failedOpenTracker.put(encodedName, failedOpenCount);
2251     }
2252     if (failedOpenCount.incrementAndGet() >= maximumAttempts && !hri.isMetaRegion()) {
2253       regionStates.updateRegionState(hri, State.FAILED_OPEN);
2254       // remove the tracking info to save memory, also reset
2255       // the count for next open initiative
2256       failedOpenTracker.remove(encodedName);
2257     } else {
2258       if (hri.isMetaRegion() && failedOpenCount.get() >= maximumAttempts) {
2259         // Log a warning message if a meta region failedOpenCount exceeds maximumAttempts
2260         // so that we are aware of potential problem if it persists for a long time.
2261         LOG.warn("Failed to open the hbase:meta region " +
2262             hri.getRegionNameAsString() + " after" +
2263             failedOpenCount.get() + " retries. Continue retrying.");
2264       }
2265
2266       // Handle this the same as if it were opened and then closed.
2267       RegionState regionState = regionStates.updateRegionState(hri, State.CLOSED);
2268       if (regionState != null) {
2269         // When there are more than one region server a new RS is selected as the
2270         // destination and the same is updated in the region plan. (HBASE-5546)
2271         if (getTableStateManager().isTableState(hri.getTable(),
2272                 TableState.State.DISABLED, TableState.State.DISABLING) ||
2273                 replicasToClose.contains(hri)) {
2274           offlineDisabledRegion(hri);
2275           return null;
2276         }
2277         regionStates.updateRegionState(hri, RegionState.State.CLOSED);
2278         // This below has to do w/ online enable/disable of a table
2279         removeClosedRegion(hri);
2280         try {
2281           getRegionPlan(hri, true);
2282         } catch (HBaseIOException e) {
2283           LOG.warn("Failed to get region plan", e);
2284         }
2285         // Have the current thread sleep a bit before resubmitting the RPC request
2286         long sleepTime = backoffPolicy.getBackoffTime(retryConfig,
2287             failedOpenTracker.get(encodedName).get());
2288         invokeAssignLater(hri, sleepTime);
2289       }
2290     }
2291     // Null means no error
2292     return null;
2293   }
2294
2295   private String onRegionOpen(final RegionState current, final HRegionInfo hri,
2296       final ServerName serverName, final RegionStateTransition transition) {
2297     // The region must be opening on this server.
2298     // If current state is already opened on the same server,
2299     // it could be a reportRegionTransition RPC retry.
2300     if (current == null || !current.isOpeningOrOpenedOnServer(serverName)) {
2301       return hri.getShortNameToLog() + " is not opening on " + serverName;
2302     }
2303
2304     // Just return in case of retrying
2305     if (current.isOpened()) {
2306       return null;
2307     }
2308
2309     long openSeqNum = transition.hasOpenSeqNum()
2310       ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM;
2311     if (openSeqNum < 0) {
2312       return "Newly opened region has invalid open seq num " + openSeqNum;
2313     }
2314     regionOnline(hri, serverName, openSeqNum);
2315
2316     // reset the count, if any
2317     failedOpenTracker.remove(hri.getEncodedName());
2318     if (getTableStateManager().isTableState(hri.getTable(),
2319             TableState.State.DISABLED, TableState.State.DISABLING)) {
2320       invokeUnAssign(hri);
2321     }
2322     return null;
2323   }
2324
2325   private String onRegionClosed(final RegionState current,
2326       final HRegionInfo hri, final ServerName serverName) {
2327     // Region will be usually assigned right after closed. When a RPC retry comes
2328     // in, the region may already have moved away from closed state. However, on the
2329     // region server side, we don't care much about the response for this transition.
2330     // We only make sure master has got and processed this report, either
2331     // successfully or not. So this is fine, not a problem at all.
2332     if (current == null || !current.isClosingOrClosedOnServer(serverName)) {
2333       return hri.getShortNameToLog() + " is not closing on " + serverName;
2334     }
2335
2336     // Just return in case of retrying
2337     if (current.isClosed()) {
2338       return null;
2339     }
2340
2341     if (getTableStateManager().isTableState(hri.getTable(), TableState.State.DISABLED,
2342         TableState.State.DISABLING) || replicasToClose.contains(hri)) {
2343       offlineDisabledRegion(hri);
2344       return null;
2345     }
2346
2347     regionStates.updateRegionState(hri, RegionState.State.CLOSED);
2348     sendRegionClosedNotification(hri);
2349     // This below has to do w/ online enable/disable of a table
2350     removeClosedRegion(hri);
2351     invokeAssign(hri);
2352     return null;
2353   }
2354
2355   private String onRegionReadyToSplit(final RegionState current, final HRegionInfo hri,
2356       final ServerName serverName, final RegionStateTransition transition) {
2357     // The region must be opened on this server.
2358     // If current state is already splitting on the same server,
2359     // it could be a reportRegionTransition RPC retry.
2360     if (current == null || !current.isSplittingOrOpenedOnServer(serverName)) {
2361       return hri.getShortNameToLog() + " is not opening on " + serverName;
2362     }
2363
2364     if (!((HMaster)server).getSplitOrMergeTracker().isSplitOrMergeEnabled(
2365             MasterSwitchType.SPLIT)) {
2366       return "split switch is off!";
2367     }
2368
2369     // Just return in case of retrying
2370     if (current.isSplitting()) {
2371       return null;
2372     }
2373
2374     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2375     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2376     RegionState rs_a = regionStates.getRegionState(a);
2377     RegionState rs_b = regionStates.getRegionState(b);
2378     if (rs_a != null || rs_b != null) {
2379       return "Some daughter is already existing. "
2380         + "a=" + rs_a + ", b=" + rs_b;
2381     }
2382
2383     // Server holding is not updated at this stage.
2384     // It is done after PONR.
2385     regionStates.updateRegionState(hri, State.SPLITTING);
2386     regionStates.createRegionState(
2387       a, State.SPLITTING_NEW, serverName, null);
2388     regionStates.createRegionState(
2389       b, State.SPLITTING_NEW, serverName, null);
2390     return null;
2391   }
2392
2393   private String onRegionSplitPONR(final RegionState current, final HRegionInfo hri,
2394       final ServerName serverName, final RegionStateTransition transition) {
2395     // The region must be splitting on this server, and the daughters must be in
2396     // splitting_new state. To check RPC retry, we use server holding info.
2397     if (current == null || !current.isSplittingOnServer(serverName)) {
2398       return hri.getShortNameToLog() + " is not splitting on " + serverName;
2399     }
2400
2401     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2402     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2403     RegionState rs_a = regionStates.getRegionState(a);
2404     RegionState rs_b = regionStates.getRegionState(b);
2405
2406     // Master could have restarted and lost the new region
2407     // states, if so, they must be lost together
2408     if (rs_a == null && rs_b == null) {
2409       rs_a = regionStates.createRegionState(
2410         a, State.SPLITTING_NEW, serverName, null);
2411       rs_b = regionStates.createRegionState(
2412         b, State.SPLITTING_NEW, serverName, null);
2413     }
2414
2415     if (rs_a == null || !rs_a.isSplittingNewOnServer(serverName)
2416         || rs_b == null || !rs_b.isSplittingNewOnServer(serverName)) {
2417       return "Some daughter is not known to be splitting on " + serverName
2418         + ", a=" + rs_a + ", b=" + rs_b;
2419     }
2420
2421     // Just return in case of retrying
2422     if (!regionStates.isRegionOnServer(hri, serverName)) {
2423       return null;
2424     }
2425
2426     try {
2427       regionStates.splitRegion(hri, a, b, serverName);
2428     } catch (IOException ioe) {
2429       LOG.info("Failed to record split region " + hri.getShortNameToLog());
2430       return "Failed to record the splitting in meta";
2431     }
2432     return null;
2433   }
2434
2435   private String onRegionSplit(final RegionState current, final HRegionInfo hri,
2436       final ServerName serverName, final RegionStateTransition transition) {
2437     // The region must be splitting on this server, and the daughters must be in
2438     // splitting_new state.
2439     // If current state is already split on the same server,
2440     // it could be a reportRegionTransition RPC retry.
2441     if (current == null || !current.isSplittingOrSplitOnServer(serverName)) {
2442       return hri.getShortNameToLog() + " is not splitting on " + serverName;
2443     }
2444
2445     // Just return in case of retrying
2446     if (current.isSplit()) {
2447       return null;
2448     }
2449
2450     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2451     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2452     RegionState rs_a = regionStates.getRegionState(a);
2453     RegionState rs_b = regionStates.getRegionState(b);
2454     if (rs_a == null || !rs_a.isSplittingNewOnServer(serverName)
2455         || rs_b == null || !rs_b.isSplittingNewOnServer(serverName)) {
2456       return "Some daughter is not known to be splitting on " + serverName
2457         + ", a=" + rs_a + ", b=" + rs_b;
2458     }
2459
2460     if (TEST_SKIP_SPLIT_HANDLING) {
2461       return "Skipping split message, TEST_SKIP_SPLIT_HANDLING is set";
2462     }
2463     regionOffline(hri, State.SPLIT);
2464     regionOnline(a, serverName, 1);
2465     regionOnline(b, serverName, 1);
2466
2467     // User could disable the table before master knows the new region.
2468     if (getTableStateManager().isTableState(hri.getTable(),
2469         TableState.State.DISABLED, TableState.State.DISABLING)) {
2470       invokeUnAssign(a);
2471       invokeUnAssign(b);
2472     } else {
2473       Callable<Object> splitReplicasCallable = new Callable<Object>() {
2474         @Override
2475         public Object call() {
2476           doSplittingOfReplicas(hri, a, b);
2477           return null;
2478         }
2479       };
2480       threadPoolExecutorService.submit(splitReplicasCallable);
2481     }
2482     return null;
2483   }
2484
2485   private String onRegionSplitReverted(final RegionState current, final HRegionInfo hri,
2486       final ServerName serverName, final RegionStateTransition transition) {
2487     // The region must be splitting on this server, and the daughters must be in
2488     // splitting_new state.
2489     // If the region is in open state, it could be an RPC retry.
2490     if (current == null || !current.isSplittingOrOpenedOnServer(serverName)) {
2491       return hri.getShortNameToLog() + " is not splitting on " + serverName;
2492     }
2493
2494     // Just return in case of retrying
2495     if (current.isOpened()) {
2496       return null;
2497     }
2498
2499     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2500     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2501     RegionState rs_a = regionStates.getRegionState(a);
2502     RegionState rs_b = regionStates.getRegionState(b);
2503     if (rs_a == null || !rs_a.isSplittingNewOnServer(serverName)
2504         || rs_b == null || !rs_b.isSplittingNewOnServer(serverName)) {
2505       return "Some daughter is not known to be splitting on " + serverName
2506         + ", a=" + rs_a + ", b=" + rs_b;
2507     }
2508
2509     regionOnline(hri, serverName);
2510     regionOffline(a);
2511     regionOffline(b);
2512     if (getTableStateManager().isTableState(hri.getTable(),
2513         TableState.State.DISABLED, TableState.State.DISABLING)) {
2514       invokeUnAssign(hri);
2515     }
2516     return null;
2517   }
2518
2519   private String onRegionReadyToMerge(final RegionState current, final HRegionInfo hri,
2520       final ServerName serverName, final RegionStateTransition transition) {
2521     // The region must be new, and the daughters must be open on this server.
2522     // If the region is in merge_new state, it could be an RPC retry.
2523     if (current != null && !current.isMergingNewOnServer(serverName)) {
2524       return "Merging daughter region already exists, p=" + current;
2525     }
2526
2527     if (!((HMaster)server).getSplitOrMergeTracker().isSplitOrMergeEnabled(
2528             MasterSwitchType.MERGE)) {
2529       return "merge switch is off!";
2530     }
2531     // Just return in case of retrying
2532     if (current != null) {
2533       return null;
2534     }
2535
2536     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2537     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2538     Set<String> encodedNames = new HashSet<String>(2);
2539     encodedNames.add(a.getEncodedName());
2540     encodedNames.add(b.getEncodedName());
2541     Map<String, Lock> locks = locker.acquireLocks(encodedNames);
2542     try {
2543       RegionState rs_a = regionStates.getRegionState(a);
2544       RegionState rs_b = regionStates.getRegionState(b);
2545       if (rs_a == null || !rs_a.isOpenedOnServer(serverName)
2546           || rs_b == null || !rs_b.isOpenedOnServer(serverName)) {
2547         return "Some daughter is not in a state to merge on " + serverName
2548           + ", a=" + rs_a + ", b=" + rs_b;
2549       }
2550
2551       regionStates.updateRegionState(a, State.MERGING);
2552       regionStates.updateRegionState(b, State.MERGING);
2553       regionStates.createRegionState(
2554         hri, State.MERGING_NEW, serverName, null);
2555       return null;
2556     } finally {
2557       for (Lock lock: locks.values()) {
2558         lock.unlock();
2559       }
2560     }
2561   }
2562
2563   private String onRegionMergePONR(final RegionState current, final HRegionInfo hri,
2564       final ServerName serverName, final RegionStateTransition transition) {
2565     // The region must be in merging_new state, and the daughters must be
2566     // merging. To check RPC retry, we use server holding info.
2567     if (current != null && !current.isMergingNewOnServer(serverName)) {
2568       return hri.getShortNameToLog() + " is not merging on " + serverName;
2569     }
2570
2571     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2572     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2573     RegionState rs_a = regionStates.getRegionState(a);
2574     RegionState rs_b = regionStates.getRegionState(b);
2575     if (rs_a == null || !rs_a.isMergingOnServer(serverName)
2576         || rs_b == null || !rs_b.isMergingOnServer(serverName)) {
2577       return "Some daughter is not known to be merging on " + serverName
2578         + ", a=" + rs_a + ", b=" + rs_b;
2579     }
2580
2581     // Master could have restarted and lost the new region state
2582     if (current == null) {
2583       regionStates.createRegionState(
2584         hri, State.MERGING_NEW, serverName, null);
2585     }
2586
2587     // Just return in case of retrying
2588     if (regionStates.isRegionOnServer(hri, serverName)) {
2589       return null;
2590     }
2591
2592     try {
2593       regionStates.mergeRegions(hri, a, b, serverName);
2594     } catch (IOException ioe) {
2595       LOG.info("Failed to record merged region " + hri.getShortNameToLog());
2596       return "Failed to record the merging in meta";
2597     }
2598     return null;
2599   }
2600
2601   private String onRegionMerged(final RegionState current, final HRegionInfo hri,
2602       final ServerName serverName, final RegionStateTransition transition) {
2603     // The region must be in merging_new state, and the daughters must be
2604     // merging on this server.
2605     // If current state is already opened on the same server,
2606     // it could be a reportRegionTransition RPC retry.
2607     if (current == null || !current.isMergingNewOrOpenedOnServer(serverName)) {
2608       return hri.getShortNameToLog() + " is not merging on " + serverName;
2609     }
2610
2611     // Just return in case of retrying
2612     if (current.isOpened()) {
2613       return null;
2614     }
2615
2616     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2617     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2618     RegionState rs_a = regionStates.getRegionState(a);
2619     RegionState rs_b = regionStates.getRegionState(b);
2620     if (rs_a == null || !rs_a.isMergingOnServer(serverName)
2621         || rs_b == null || !rs_b.isMergingOnServer(serverName)) {
2622       return "Some daughter is not known to be merging on " + serverName
2623         + ", a=" + rs_a + ", b=" + rs_b;
2624     }
2625
2626     regionOffline(a, State.MERGED);
2627     regionOffline(b, State.MERGED);
2628     regionOnline(hri, serverName, 1);
2629
2630     // User could disable the table before master knows the new region.
2631     if (getTableStateManager().isTableState(hri.getTable(),
2632         TableState.State.DISABLED, TableState.State.DISABLING)) {
2633       invokeUnAssign(hri);
2634     } else {
2635       Callable<Object> mergeReplicasCallable = new Callable<Object>() {
2636         @Override
2637         public Object call() {
2638           doMergingOfReplicas(hri, a, b);
2639           return null;
2640         }
2641       };
2642       threadPoolExecutorService.submit(mergeReplicasCallable);
2643     }
2644     return null;
2645   }
2646
2647   private String onRegionMergeReverted(final RegionState current, final HRegionInfo hri,
2648       final ServerName serverName, final RegionStateTransition transition) {
2649     // The region must be in merging_new state, and the daughters must be
2650     // merging on this server.
2651     // If the region is in offline state, it could be an RPC retry.
2652     if (current == null || !current.isMergingNewOrOfflineOnServer(serverName)) {
2653       return hri.getShortNameToLog() + " is not merging on " + serverName;
2654     }
2655
2656     // Just return in case of retrying
2657     if (current.isOffline()) {
2658       return null;
2659     }
2660
2661     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2662     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2663     RegionState rs_a = regionStates.getRegionState(a);
2664     RegionState rs_b = regionStates.getRegionState(b);
2665     if (rs_a == null || !rs_a.isMergingOnServer(serverName)
2666         || rs_b == null || !rs_b.isMergingOnServer(serverName)) {
2667       return "Some daughter is not known to be merging on " + serverName
2668         + ", a=" + rs_a + ", b=" + rs_b;
2669     }
2670
2671     regionOnline(a, serverName);
2672     regionOnline(b, serverName);
2673     regionOffline(hri);
2674
2675     if (getTableStateManager().isTableState(hri.getTable(),
2676         TableState.State.DISABLED, TableState.State.DISABLING)) {
2677       invokeUnAssign(a);
2678       invokeUnAssign(b);
2679     }
2680     return null;
2681   }
2682
2683   private void doMergingOfReplicas(HRegionInfo mergedHri, final HRegionInfo hri_a,
2684       final HRegionInfo hri_b) {
2685     // Close replicas for the original unmerged regions. create/assign new replicas
2686     // for the merged parent.
2687     List<HRegionInfo> unmergedRegions = new ArrayList<HRegionInfo>();
2688     unmergedRegions.add(hri_a);
2689     unmergedRegions.add(hri_b);
2690     Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(unmergedRegions);
2691     Collection<List<HRegionInfo>> c = map.values();
2692     for (List<HRegionInfo> l : c) {
2693       for (HRegionInfo h : l) {
2694         if (!RegionReplicaUtil.isDefaultReplica(h)) {
2695           LOG.debug("Unassigning un-merged replica " + h);
2696           unassign(h);
2697         }
2698       }
2699     }
2700     int numReplicas = getNumReplicas(server, mergedHri.getTable());
2701     List<HRegionInfo> regions = new ArrayList<HRegionInfo>();
2702     for (int i = 1; i < numReplicas; i++) {
2703       regions.add(RegionReplicaUtil.getRegionInfoForReplica(mergedHri, i));
2704     }
2705     try {
2706       assign(regions);
2707     } catch (IOException ioe) {
2708       LOG.warn("Couldn't assign all replica(s) of region " + mergedHri + " because of " +
2709                 ioe.getMessage());
2710     } catch (InterruptedException ie) {
2711       LOG.warn("Couldn't assign all replica(s) of region " + mergedHri+ " because of " +
2712                 ie.getMessage());
2713     }
2714   }
2715
2716   private void doSplittingOfReplicas(final HRegionInfo parentHri, final HRegionInfo hri_a,
2717       final HRegionInfo hri_b) {
2718     // create new regions for the replica, and assign them to match with the
2719     // current replica assignments. If replica1 of parent is assigned to RS1,
2720     // the replica1s of daughters will be on the same machine
2721     int numReplicas = getNumReplicas(server, parentHri.getTable());
2722     // unassign the old replicas
2723     List<HRegionInfo> parentRegion = new ArrayList<HRegionInfo>();
2724     parentRegion.add(parentHri);
2725     Map<ServerName, List<HRegionInfo>> currentAssign =
2726         regionStates.getRegionAssignments(parentRegion);
2727     Collection<List<HRegionInfo>> c = currentAssign.values();
2728     for (List<HRegionInfo> l : c) {
2729       for (HRegionInfo h : l) {
2730         if (!RegionReplicaUtil.isDefaultReplica(h)) {
2731           LOG.debug("Unassigning parent's replica " + h);
2732           unassign(h);
2733         }
2734       }
2735     }
2736     // assign daughter replicas
2737     Map<HRegionInfo, ServerName> map = new HashMap<HRegionInfo, ServerName>();
2738     for (int i = 1; i < numReplicas; i++) {
2739       prepareDaughterReplicaForAssignment(hri_a, parentHri, i, map);
2740       prepareDaughterReplicaForAssignment(hri_b, parentHri, i, map);
2741     }
2742     try {
2743       assign(map);
2744     } catch (IOException e) {
2745       LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)");
2746     } catch (InterruptedException e) {
2747       LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)");
2748     }
2749   }
2750
2751   private void prepareDaughterReplicaForAssignment(HRegionInfo daughterHri, HRegionInfo parentHri,
2752       int replicaId, Map<HRegionInfo, ServerName> map) {
2753     HRegionInfo parentReplica = RegionReplicaUtil.getRegionInfoForReplica(parentHri, replicaId);
2754     HRegionInfo daughterReplica = RegionReplicaUtil.getRegionInfoForReplica(daughterHri,
2755         replicaId);
2756     LOG.debug("Created replica region for daughter " + daughterReplica);
2757     ServerName sn;
2758     if ((sn = regionStates.getRegionServerOfRegion(parentReplica)) != null) {
2759       map.put(daughterReplica, sn);
2760     } else {
2761       List<ServerName> servers = serverManager.getOnlineServersList();
2762       sn = servers.get((new Random(System.currentTimeMillis())).nextInt(servers.size()));
2763       map.put(daughterReplica, sn);
2764     }
2765   }
2766
2767   public Set<HRegionInfo> getReplicasToClose() {
2768     return replicasToClose;
2769   }
2770
2771   public Map<String, AtomicInteger> getFailedOpenTracker() {return failedOpenTracker;}
2772
2773   /**
2774    * A region is offline.  The new state should be the specified one,
2775    * if not null.  If the specified state is null, the new state is Offline.
2776    * The specified state can be Split/Merged/Offline/null only.
2777    */
2778   private void regionOffline(final HRegionInfo regionInfo, final State state) {
2779     regionStates.regionOffline(regionInfo, state);
2780     removeClosedRegion(regionInfo);
2781     // remove the region plan as well just in case.
2782     clearRegionPlan(regionInfo);
2783     balancer.regionOffline(regionInfo);
2784
2785     // Tell our listeners that a region was closed
2786     sendRegionClosedNotification(regionInfo);
2787     // also note that all the replicas of the primary should be closed
2788     if (state != null && state.equals(State.SPLIT)) {
2789       Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1);
2790       c.add(regionInfo);
2791       Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(c);
2792       Collection<List<HRegionInfo>> allReplicas = map.values();
2793       for (List<HRegionInfo> list : allReplicas) {
2794         replicasToClose.addAll(list);
2795       }
2796     }
2797     else if (state != null && state.equals(State.MERGED)) {
2798       Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1);
2799       c.add(regionInfo);
2800       Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(c);
2801       Collection<List<HRegionInfo>> allReplicas = map.values();
2802       for (List<HRegionInfo> list : allReplicas) {
2803         replicasToClose.addAll(list);
2804       }
2805     }
2806   }
2807
2808   private void sendRegionOpenedNotification(final HRegionInfo regionInfo,
2809       final ServerName serverName) {
2810     if (!this.listeners.isEmpty()) {
2811       for (AssignmentListener listener : this.listeners) {
2812         listener.regionOpened(regionInfo, serverName);
2813       }
2814     }
2815   }
2816
2817   private void sendRegionClosedNotification(final HRegionInfo regionInfo) {
2818     if (!this.listeners.isEmpty()) {
2819       for (AssignmentListener listener : this.listeners) {
2820         listener.regionClosed(regionInfo);
2821       }
2822     }
2823   }
2824
2825   /**
2826    * Try to update some region states. If the state machine prevents
2827    * such update, an error message is returned to explain the reason.
2828    *
2829    * It's expected that in each transition there should have just one
2830    * region for opening/closing, 3 regions for splitting/merging.
2831    * These regions should be on the server that requested the change.
2832    *
2833    * Region state machine. Only these transitions
2834    * are expected to be triggered by a region server.
2835    *
2836    * On the state transition:
2837    *  (1) Open/Close should be initiated by master
2838    *      (a) Master sets the region to pending_open/pending_close
2839    *        in memory and hbase:meta after sending the request
2840    *        to the region server
2841    *      (b) Region server reports back to the master
2842    *        after open/close is done (either success/failure)
2843    *      (c) If region server has problem to report the status
2844    *        to master, it must be because the master is down or some
2845    *        temporary network issue. Otherwise, the region server should
2846    *        abort since it must be a bug. If the master is not accessible,
2847    *        the region server should keep trying until the server is
2848    *        stopped or till the status is reported to the (new) master
2849    *      (d) If region server dies in the middle of opening/closing
2850    *        a region, SSH picks it up and finishes it
2851    *      (e) If master dies in the middle, the new master recovers
2852    *        the state during initialization from hbase:meta. Region server
2853    *        can report any transition that has not been reported to
2854    *        the previous active master yet
2855    *  (2) Split/merge is initiated by region servers
2856    *      (a) To split a region, a region server sends a request
2857    *        to master to try to set a region to splitting, together with
2858    *        two daughters (to be created) to splitting new. If approved
2859    *        by the master, the splitting can then move ahead
2860    *      (b) To merge two regions, a region server sends a request to
2861    *        master to try to set the new merged region (to be created) to
2862    *        merging_new, together with two regions (to be merged) to merging.
2863    *        If it is ok with the master, the merge can then move ahead
2864    *      (c) Once the splitting/merging is done, the region server
2865    *        reports the status back to the master either success/failure.
2866    *      (d) Other scenarios should be handled similarly as for
2867    *        region open/close
2868    */
2869   protected String onRegionTransition(final ServerName serverName,
2870       final RegionStateTransition transition) {
2871     TransitionCode code = transition.getTransitionCode();
2872     HRegionInfo hri = HRegionInfo.convert(transition.getRegionInfo(0));
2873     Lock lock = locker.acquireLock(hri.getEncodedName());
2874     try {
2875       RegionState current = regionStates.getRegionState(hri);
2876       if (LOG.isDebugEnabled()) {
2877         LOG.debug("Got transition " + code + " for "
2878           + (current != null ? current.toString() : hri.getShortNameToLog())
2879           + " from " + serverName);
2880       }
2881       String errorMsg = null;
2882       switch (code) {
2883       case OPENED:
2884         errorMsg = onRegionOpen(current, hri, serverName, transition);
2885         break;
2886       case FAILED_OPEN:
2887         errorMsg = onRegionFailedOpen(current, hri, serverName);
2888         break;
2889       case CLOSED:
2890         errorMsg = onRegionClosed(current, hri, serverName);
2891         break;
2892       case READY_TO_SPLIT:
2893         try {
2894           regionStateListener.onRegionSplit(hri);
2895           errorMsg = onRegionReadyToSplit(current, hri, serverName, transition);
2896         } catch (IOException exp) {
2897             if (exp instanceof QuotaExceededException) {
2898               server.getRegionNormalizer().planSkipped(hri, PlanType.SPLIT);
2899             }
2900             errorMsg = StringUtils.stringifyException(exp);
2901         }
2902         break;
2903       case SPLIT_PONR:
2904         errorMsg = onRegionSplitPONR(current, hri, serverName, transition);
2905         break;
2906       case SPLIT:
2907         errorMsg = onRegionSplit(current, hri, serverName, transition);
2908         break;
2909       case SPLIT_REVERTED:
2910         errorMsg = onRegionSplitReverted(current, hri, serverName, transition);
2911         if (org.apache.commons.lang.StringUtils.isEmpty(errorMsg)) {
2912           try {
2913             regionStateListener.onRegionSplitReverted(hri);
2914           } catch (IOException exp) {
2915             LOG.warn(StringUtils.stringifyException(exp));
2916           }
2917         }
2918         break;
2919       case READY_TO_MERGE:
2920         errorMsg = onRegionReadyToMerge(current, hri, serverName, transition);
2921         break;
2922       case MERGE_PONR:
2923         errorMsg = onRegionMergePONR(current, hri, serverName, transition);
2924         break;
2925       case MERGED:
2926         try {
2927           errorMsg = onRegionMerged(current, hri, serverName, transition);
2928           regionStateListener.onRegionMerged(hri);
2929         } catch (IOException exp) {
2930           errorMsg = StringUtils.stringifyException(exp);
2931         }
2932         break;
2933       case MERGE_REVERTED:
2934         errorMsg = onRegionMergeReverted(current, hri, serverName, transition);
2935         break;
2936
2937       default:
2938         errorMsg = "Unexpected transition code " + code;
2939       }
2940       if (errorMsg != null) {
2941         LOG.info("Could not transition region from " + current + " on "
2942           + code + " by " + serverName + ": " + errorMsg);
2943       }
2944       return errorMsg;
2945     } finally {
2946       lock.unlock();
2947     }
2948   }
2949
2950   private void processBogusAssignments(Map<ServerName, List<HRegionInfo>> bulkPlan) {
2951     if (bulkPlan.containsKey(LoadBalancer.BOGUS_SERVER_NAME)) {
2952       // Found no plan for some regions, put those regions in RIT
2953       for (HRegionInfo hri : bulkPlan.get(LoadBalancer.BOGUS_SERVER_NAME)) {
2954         regionStates.updateRegionState(hri, State.FAILED_OPEN);
2955       }
2956       bulkPlan.remove(LoadBalancer.BOGUS_SERVER_NAME);
2957     }
2958   }
2959
2960   /**
2961    * @return Instance of load balancer
2962    */
2963   public LoadBalancer getBalancer() {
2964     return this.balancer;
2965   }
2966
2967   public Map<ServerName, List<HRegionInfo>>
2968     getSnapShotOfAssignment(Collection<HRegionInfo> infos) {
2969     return getRegionStates().getRegionAssignments(infos);
2970   }
2971
2972   void setRegionStateListener(RegionStateListener listener) {
2973     this.regionStateListener = listener;
2974   }
2975
2976   private class DelayedAssignCallable implements Runnable {
2977     Callable callable;
2978     public DelayedAssignCallable(Callable callable) {
2979       this.callable = callable;
2980     }
2981
2982     @Override
2983     public void run() {
2984       threadPoolExecutorService.submit(callable);
2985     }
2986   }
2987 }