View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.master;
20  
21  import com.google.common.annotations.VisibleForTesting;
22  
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.Collection;
26  import java.util.Collections;
27  import java.util.HashMap;
28  import java.util.HashSet;
29  import java.util.Iterator;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.NavigableMap;
33  import java.util.Random;
34  import java.util.Set;
35  import java.util.TreeMap;
36  import java.util.concurrent.Callable;
37  import java.util.concurrent.ConcurrentHashMap;
38  import java.util.concurrent.CopyOnWriteArrayList;
39  import java.util.concurrent.TimeUnit;
40  import java.util.concurrent.atomic.AtomicBoolean;
41  import java.util.concurrent.atomic.AtomicInteger;
42  import java.util.concurrent.locks.Lock;
43  import java.util.concurrent.locks.ReentrantLock;
44  
45  import org.apache.commons.logging.Log;
46  import org.apache.commons.logging.LogFactory;
47  import org.apache.hadoop.conf.Configuration;
48  import org.apache.hadoop.fs.FileStatus;
49  import org.apache.hadoop.fs.FileSystem;
50  import org.apache.hadoop.fs.Path;
51  import org.apache.hadoop.hbase.CoordinatedStateException;
52  import org.apache.hadoop.hbase.HBaseIOException;
53  import org.apache.hadoop.hbase.HConstants;
54  import org.apache.hadoop.hbase.HRegionInfo;
55  import org.apache.hadoop.hbase.HRegionLocation;
56  import org.apache.hadoop.hbase.HTableDescriptor;
57  import org.apache.hadoop.hbase.MetaTableAccessor;
58  import org.apache.hadoop.hbase.NotServingRegionException;
59  import org.apache.hadoop.hbase.RegionLocations;
60  import org.apache.hadoop.hbase.RegionStateListener;
61  import org.apache.hadoop.hbase.ServerName;
62  import org.apache.hadoop.hbase.TableName;
63  import org.apache.hadoop.hbase.TableNotFoundException;
64  import org.apache.hadoop.hbase.classification.InterfaceAudience;
65  import org.apache.hadoop.hbase.client.MasterSwitchType;
66  import org.apache.hadoop.hbase.client.RegionReplicaUtil;
67  import org.apache.hadoop.hbase.client.Result;
68  import org.apache.hadoop.hbase.client.TableState;
69  import org.apache.hadoop.hbase.executor.EventHandler;
70  import org.apache.hadoop.hbase.executor.EventType;
71  import org.apache.hadoop.hbase.executor.ExecutorService;
72  import org.apache.hadoop.hbase.ipc.FailedServerException;
73  import org.apache.hadoop.hbase.ipc.RpcClient;
74  import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
75  import org.apache.hadoop.hbase.master.RegionState.State;
76  import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper;
77  import org.apache.hadoop.hbase.master.balancer.FavoredNodeLoadBalancer;
78  import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
79  import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
80  import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
81  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
82  import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
83  import org.apache.hadoop.hbase.quotas.QuotaExceededException;
84  import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
85  import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
86  import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
87  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
88  import org.apache.hadoop.hbase.util.FSUtils;
89  import org.apache.hadoop.hbase.util.KeyLocker;
90  import org.apache.hadoop.hbase.util.Pair;
91  import org.apache.hadoop.hbase.util.PairOfSameType;
92  import org.apache.hadoop.hbase.util.Threads;
93  import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
94  import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
95  import org.apache.hadoop.ipc.RemoteException;
96  import org.apache.hadoop.util.StringUtils;
97  import org.apache.zookeeper.KeeperException;
98  
99  /**
100  * Manages and performs region assignment.
101  * Related communications with regionserver are all done over RPC.
102  */
103 @InterfaceAudience.Private
104 public class AssignmentManager {
105   private static final Log LOG = LogFactory.getLog(AssignmentManager.class);
106 
107   protected final MasterServices server;
108 
109   private ServerManager serverManager;
110 
111   private boolean shouldAssignRegionsWithFavoredNodes;
112 
113   private LoadBalancer balancer;
114 
115   private final MetricsAssignmentManager metricsAssignmentManager;
116 
117   private final TableLockManager tableLockManager;
118 
119   private AtomicInteger numRegionsOpened = new AtomicInteger(0);
120 
121   final private KeyLocker<String> locker = new KeyLocker<String>();
122 
123   Set<HRegionInfo> replicasToClose = Collections.synchronizedSet(new HashSet<HRegionInfo>());
124 
125   /**
126    * Map of regions to reopen after the schema of a table is changed. Key -
127    * encoded region name, value - HRegionInfo
128    */
129   private final Map <String, HRegionInfo> regionsToReopen;
130 
131   /*
132    * Maximum times we recurse an assignment/unassignment.
133    * See below in {@link #assign()} and {@link #unassign()}.
134    */
135   private final int maximumAttempts;
136 
137   /**
138    * The sleep time for which the assignment will wait before retrying in case of
139    * hbase:meta assignment failure due to lack of availability of region plan or bad region plan
140    */
141   private final long sleepTimeBeforeRetryingMetaAssignment;
142 
143   /** Plans for region movement. Key is the encoded version of a region name*/
144   // TODO: When do plans get cleaned out?  Ever? In server open and in server
145   // shutdown processing -- St.Ack
146   // All access to this Map must be synchronized.
147   final NavigableMap<String, RegionPlan> regionPlans =
148     new TreeMap<String, RegionPlan>();
149 
150   private final TableStateManager tableStateManager;
151 
152   private final ExecutorService executorService;
153 
154   // Thread pool executor service. TODO, consolidate with executorService?
155   private java.util.concurrent.ExecutorService threadPoolExecutorService;
156 
157   private final RegionStates regionStates;
158 
159   // The threshold to use bulk assigning. Using bulk assignment
160   // only if assigning at least this many regions to at least this
161   // many servers. If assigning fewer regions to fewer servers,
162   // bulk assigning may be not as efficient.
163   private final int bulkAssignThresholdRegions;
164   private final int bulkAssignThresholdServers;
165   private final int bulkPerRegionOpenTimeGuesstimate;
166 
167   // Should bulk assignment wait till all regions are assigned,
168   // or it is timed out?  This is useful to measure bulk assignment
169   // performance, but not needed in most use cases.
170   private final boolean bulkAssignWaitTillAllAssigned;
171 
172   /**
173    * Indicator that AssignmentManager has recovered the region states so
174    * that ServerShutdownHandler can be fully enabled and re-assign regions
175    * of dead servers. So that when re-assignment happens, AssignmentManager
176    * has proper region states.
177    *
178    * Protected to ease testing.
179    */
180   protected final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
181 
182   /**
183    * A map to track the count a region fails to open in a row.
184    * So that we don't try to open a region forever if the failure is
185    * unrecoverable.  We don't put this information in region states
186    * because we don't expect this to happen frequently; we don't
187    * want to copy this information over during each state transition either.
188    */
189   private final ConcurrentHashMap<String, AtomicInteger>
190     failedOpenTracker = new ConcurrentHashMap<String, AtomicInteger>();
191 
192   // In case not using ZK for region assignment, region states
193   // are persisted in meta with a state store
194   private final RegionStateStore regionStateStore;
195 
196   /**
197    * For testing only!  Set to true to skip handling of split.
198    */
199   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
200   public static boolean TEST_SKIP_SPLIT_HANDLING = false;
201 
202   /** Listeners that are called on assignment events. */
203   private List<AssignmentListener> listeners = new CopyOnWriteArrayList<AssignmentListener>();
204 
205   private RegionStateListener regionStateListener;
206 
207   /**
208    * Constructs a new assignment manager.
209    *
210    * @param server instance of HMaster this AM running inside
211    * @param serverManager serverManager for associated HMaster
212    * @param balancer implementation of {@link LoadBalancer}
213    * @param service Executor service
214    * @param metricsMaster metrics manager
215    * @param tableLockManager TableLock manager
216    * @throws IOException
217    */
218   public AssignmentManager(MasterServices server, ServerManager serverManager,
219       final LoadBalancer balancer,
220       final ExecutorService service, MetricsMaster metricsMaster,
221       final TableLockManager tableLockManager,
222       final TableStateManager tableStateManager)
223           throws IOException {
224     this.server = server;
225     this.serverManager = serverManager;
226     this.executorService = service;
227     this.regionStateStore = new RegionStateStore(server);
228     this.regionsToReopen = Collections.synchronizedMap
229                            (new HashMap<String, HRegionInfo> ());
230     Configuration conf = server.getConfiguration();
231     // Only read favored nodes if using the favored nodes load balancer.
232     this.shouldAssignRegionsWithFavoredNodes = conf.getClass(
233            HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class).equals(
234            FavoredNodeLoadBalancer.class);
235 
236     this.tableStateManager = tableStateManager;
237 
238     // This is the max attempts, not retries, so it should be at least 1.
239     this.maximumAttempts = Math.max(1,
240       this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10));
241     this.sleepTimeBeforeRetryingMetaAssignment = this.server.getConfiguration().getLong(
242         "hbase.meta.assignment.retry.sleeptime", 1000l);
243     this.balancer = balancer;
244     int maxThreads = conf.getInt("hbase.assignment.threads.max", 30);
245     this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool(
246       maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("AM."));
247     this.regionStates = new RegionStates(
248       server, tableStateManager, serverManager, regionStateStore);
249 
250     this.bulkAssignWaitTillAllAssigned =
251       conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false);
252     this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7);
253     this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3);
254     this.bulkPerRegionOpenTimeGuesstimate =
255       conf.getInt("hbase.bulk.assignment.perregion.open.time", 10000);
256 
257     this.metricsAssignmentManager = new MetricsAssignmentManager();
258     this.tableLockManager = tableLockManager;
259   }
260 
261   MetricsAssignmentManager getAssignmentManagerMetrics() {
262     return this.metricsAssignmentManager;
263   }
264
265   /**
266    * Add the listener to the notification list.
267    * @param listener The AssignmentListener to register
268    */
269   public void registerListener(final AssignmentListener listener) {
270     this.listeners.add(listener);
271   }
272
273   /**
274    * Remove the listener from the notification list.
275    * @param listener The AssignmentListener to unregister
276    */
277   public boolean unregisterListener(final AssignmentListener listener) {
278     return this.listeners.remove(listener);
279   }
280
281   /**
282    * @return Instance of ZKTableStateManager.
283    */
284   public TableStateManager getTableStateManager() {
285     // These are 'expensive' to make involving trip to zk ensemble so allow
286     // sharing.
287     return this.tableStateManager;
288   }
289
290   /**
291    * This SHOULD not be public. It is public now
292    * because of some unit tests.
293    *
294    * TODO: make it package private and keep RegionStates in the master package
295    */
296   public RegionStates getRegionStates() {
297     return regionStates;
298   }
299
300   /**
301    * Used in some tests to mock up region state in meta
302    */
303   @VisibleForTesting
304   RegionStateStore getRegionStateStore() {
305     return regionStateStore;
306   }
307 
308   public RegionPlan getRegionReopenPlan(HRegionInfo hri) {
309     return new RegionPlan(hri, null, regionStates.getRegionServerOfRegion(hri));
310   }
311
312   /**
313    * Add a regionPlan for the specified region.
314    * @param encodedName
315    * @param plan
316    */
317   public void addPlan(String encodedName, RegionPlan plan) {
318     synchronized (regionPlans) {
319       regionPlans.put(encodedName, plan);
320     }
321   }
322
323   /**
324    * Add a map of region plans.
325    */
326   public void addPlans(Map<String, RegionPlan> plans) {
327     synchronized (regionPlans) {
328       regionPlans.putAll(plans);
329     }
330   }
331
332   /**
333    * Set the list of regions that will be reopened
334    * because of an update in table schema
335    *
336    * @param regions
337    *          list of regions that should be tracked for reopen
338    */
339   public void setRegionsToReopen(List <HRegionInfo> regions) {
340     for(HRegionInfo hri : regions) {
341       regionsToReopen.put(hri.getEncodedName(), hri);
342     }
343   }
344
345   /**
346    * Used by the client to identify if all regions have the schema updates
347    *
348    * @param tableName
349    * @return Pair indicating the status of the alter command
350    * @throws IOException
351    */
352   public Pair<Integer, Integer> getReopenStatus(TableName tableName)
353       throws IOException {
354     List<HRegionInfo> hris;
355     if (TableName.META_TABLE_NAME.equals(tableName)) {
356       hris = new MetaTableLocator().getMetaRegions(server.getZooKeeper());
357     } else {
358       hris = MetaTableAccessor.getTableRegions(server.getConnection(), tableName, true);
359     }
360
361     Integer pending = 0;
362     for (HRegionInfo hri : hris) {
363       String name = hri.getEncodedName();
364       // no lock concurrent access ok: sequential consistency respected.
365       if (regionsToReopen.containsKey(name)
366           || regionStates.isRegionInTransition(name)) {
367         pending++;
368       }
369     }
370     return new Pair<Integer, Integer>(pending, hris.size());
371   }
372
373   /**
374    * Used by ServerShutdownHandler to make sure AssignmentManager has completed
375    * the failover cleanup before re-assigning regions of dead servers. So that
376    * when re-assignment happens, AssignmentManager has proper region states.
377    */
378   public boolean isFailoverCleanupDone() {
379     return failoverCleanupDone.get();
380   }
381
382   /**
383    * To avoid racing with AM, external entities may need to lock a region,
384    * for example, when SSH checks what regions to skip re-assigning.
385    */
386   public Lock acquireRegionLock(final String encodedName) {
387     return locker.acquireLock(encodedName);
388   }
389
390   /**
391    * Now, failover cleanup is completed. Notify server manager to
392    * process queued up dead servers processing, if any.
393    */
394   void failoverCleanupDone() {
395     failoverCleanupDone.set(true);
396     serverManager.processQueuedDeadServers();
397   }
398
399   /**
400    * Called on startup.
401    * Figures whether a fresh cluster start of we are joining extant running cluster.
402    * @throws IOException
403    * @throws KeeperException
404    * @throws InterruptedException
405    * @throws CoordinatedStateException
406    */
407   void joinCluster()
408   throws IOException, KeeperException, InterruptedException, CoordinatedStateException {
409     long startTime = System.currentTimeMillis();
410     // Concurrency note: In the below the accesses on regionsInTransition are
411     // outside of a synchronization block where usually all accesses to RIT are
412     // synchronized.  The presumption is that in this case it is safe since this
413     // method is being played by a single thread on startup.
414
415     // TODO: Regions that have a null location and are not in regionsInTransitions
416     // need to be handled.
417
418     // Scan hbase:meta to build list of existing regions, servers, and assignment
419     // Returns servers who have not checked in (assumed dead) that some regions
420     // were assigned to (according to the meta)
421     Set<ServerName> deadServers = rebuildUserRegions();
422 
423     // This method will assign all user regions if a clean server startup or
424     // it will reconstruct master state and cleanup any leftovers from previous master process.
425     boolean failover = processDeadServersAndRegionsInTransition(deadServers);
426
427     recoverTableInDisablingState();
428     recoverTableInEnablingState();
429     LOG.info("Joined the cluster in " + (System.currentTimeMillis()
430       - startTime) + "ms, failover=" + failover);
431   }
432
433   /**
434    * Process all regions that are in transition in zookeeper and also
435    * processes the list of dead servers.
436    * Used by master joining an cluster.  If we figure this is a clean cluster
437    * startup, will assign all user regions.
438    * @param deadServers Set of servers that are offline probably legitimately that were carrying
439    * regions according to a scan of hbase:meta. Can be null.
440    * @throws IOException
441    * @throws InterruptedException
442    */
443   boolean processDeadServersAndRegionsInTransition(final Set<ServerName> deadServers)
444   throws KeeperException, IOException, InterruptedException, CoordinatedStateException {
445     // TODO Needed? List<String> nodes = ZKUtil.listChildrenNoWatch(watcher, watcher.assignmentZNode);
446     boolean failover = !serverManager.getDeadServers().isEmpty();
447     if (failover) {
448       // This may not be a failover actually, especially if meta is on this master.
449       if (LOG.isDebugEnabled()) {
450         LOG.debug("Found dead servers out on cluster " + serverManager.getDeadServers());
451       }
452       // Check if there are any regions on these servers
453       failover = false;
454       for (ServerName serverName : serverManager.getDeadServers().copyServerNames()) {
455         if (regionStates.getRegionAssignments().values().contains(serverName)) {
456           LOG.debug("Found regions on dead server: " + serverName);
457           failover = true;
458           break;
459         }
460       }
461     }
462     Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
463     if (!failover) {
464       // If any one region except meta is assigned, it's a failover.
465       for (Map.Entry<HRegionInfo, ServerName> en:
466           regionStates.getRegionAssignments().entrySet()) {
467         HRegionInfo hri = en.getKey();
468         if (!hri.isMetaTable()
469             && onlineServers.contains(en.getValue())) {
470           LOG.debug("Found region " + hri + " out on cluster");
471           failover = true;
472           break;
473         }
474       }
475     }
476     if (!failover) {
477       // If any region except meta is in transition on a live server, it's a failover.
478       Set<RegionState> regionsInTransition = regionStates.getRegionsInTransition();
479       if (!regionsInTransition.isEmpty()) {
480         for (RegionState regionState: regionsInTransition) {
481           ServerName serverName = regionState.getServerName();
482           if (!regionState.getRegion().isMetaRegion()
483               && serverName != null && onlineServers.contains(serverName)) {
484             LOG.debug("Found " + regionState + " for region " +
485               regionState.getRegion().getRegionNameAsString() + " for server " +
486                 serverName + "in RITs");
487             failover = true;
488             break;
489           }
490         }
491       }
492     }
493     if (!failover) {
494       // If we get here, we have a full cluster restart. It is a failover only
495       // if there are some WALs are not split yet. For meta WALs, they should have
496       // been split already, if any. We can walk through those queued dead servers,
497       // if they don't have any WALs, this restart should be considered as a clean one
498       Set<ServerName> queuedDeadServers = serverManager.getRequeuedDeadServers().keySet();
499       if (!queuedDeadServers.isEmpty()) {
500         Configuration conf = server.getConfiguration();
501         Path rootdir = FSUtils.getRootDir(conf);
502         FileSystem fs = rootdir.getFileSystem(conf);
503         for (ServerName serverName: queuedDeadServers) {
504           // In the case of a clean exit, the shutdown handler would have presplit any WALs and
505           // removed empty directories.
506           Path logDir = new Path(rootdir,
507             AbstractFSWALProvider.getWALDirectoryName(serverName.toString()));
508           Path splitDir = logDir.suffix(AbstractFSWALProvider.SPLITTING_EXT);
509           if (checkWals(fs, logDir) || checkWals(fs, splitDir)) {
510             LOG.debug("Found queued dead server " + serverName);
511             failover = true;
512             break;
513           }
514         }
515         if (!failover) {
516           // We figured that it's not a failover, so no need to
517           // work on these re-queued dead servers any more.
518           LOG.info("AM figured that it's not a failover and cleaned up "
519             + queuedDeadServers.size() + " queued dead servers");
520           serverManager.removeRequeuedDeadServers();
521         }
522       }
523     }
524
525     Set<TableName> disabledOrDisablingOrEnabling = null;
526     Map<HRegionInfo, ServerName> allRegions = null;
527
528     if (!failover) {
529       disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
530         TableState.State.DISABLED, TableState.State.DISABLING,
531         TableState.State.ENABLING);
532
533       // Clean re/start, mark all user regions closed before reassignment
534       allRegions = regionStates.closeAllUserRegions(
535         disabledOrDisablingOrEnabling);
536     }
537
538     // Now region states are restored
539     regionStateStore.start();
540
541     if (failover) {
542       if (deadServers != null && !deadServers.isEmpty()) {
543         for (ServerName serverName: deadServers) {
544           if (!serverManager.isServerDead(serverName)) {
545             serverManager.expireServer(serverName); // Let SSH do region re-assign
546           }
547         }
548       }
549       processRegionsInTransition(regionStates.getRegionsInTransition());
550     }
551
552     // Now we can safely claim failover cleanup completed and enable
553     // ServerShutdownHandler for further processing. The nodes (below)
554     // in transition, if any, are for regions not related to those
555     // dead servers at all, and can be done in parallel to SSH.
556     failoverCleanupDone();
557     if (!failover) {
558       // Fresh cluster startup.
559       LOG.info("Clean cluster startup. Don't reassign user regions");
560       assignAllUserRegions(allRegions);
561     } else {
562       LOG.info("Failover! Reassign user regions");
563     }
564     // unassign replicas of the split parents and the merged regions
565     // the daughter replicas are opened in assignAllUserRegions if it was
566     // not already opened.
567     for (HRegionInfo h : replicasToClose) {
568       unassign(h);
569     }
570     replicasToClose.clear();
571     return failover;
572   }
573
574   private boolean checkWals(FileSystem fs, Path dir) throws IOException {
575     if (!fs.exists(dir)) {
576       LOG.debug(dir + " doesn't exist");
577       return false;
578     }
579     if (!fs.getFileStatus(dir).isDirectory()) {
580       LOG.warn(dir + " is not a directory");
581       return false;
582     }
583     FileStatus[] files = FSUtils.listStatus(fs, dir);
584     if (files == null || files.length == 0) {
585       LOG.debug(dir + " has no files");
586       return false;
587     }
588     for (int i = 0; i < files.length; i++) {
589       if (files[i].isFile() && files[i].getLen() > 0) {
590         LOG.debug(dir + " has a non-empty file: " + files[i].getPath());
591         return true;
592       } else if (files[i].isDirectory() && checkWals(fs, dir)) {
593         LOG.debug(dir + " is a directory and has a non-empty file: " + files[i].getPath());
594         return true;
595       }
596     }
597     LOG.debug("Found 0 non-empty wal files for :" + dir);
598     return false;
599   }
600
601   /**
602    * When a region is closed, it should be removed from the regionsToReopen
603    * @param hri HRegionInfo of the region which was closed
604    */
605   public void removeClosedRegion(HRegionInfo hri) {
606     if (regionsToReopen.remove(hri.getEncodedName()) != null) {
607       LOG.debug("Removed region from reopening regions because it was closed");
608     }
609   }
610
611   // TODO: processFavoredNodes might throw an exception, for e.g., if the
612   // meta could not be contacted/updated. We need to see how seriously to treat
613   // this problem as. Should we fail the current assignment. We should be able
614   // to recover from this problem eventually (if the meta couldn't be updated
615   // things should work normally and eventually get fixed up).
616   void processFavoredNodes(List<HRegionInfo> regions) throws IOException {
617     if (!shouldAssignRegionsWithFavoredNodes) return;
618     // The AM gets the favored nodes info for each region and updates the meta
619     // table with that info
620     Map<HRegionInfo, List<ServerName>> regionToFavoredNodes =
621         new HashMap<HRegionInfo, List<ServerName>>();
622     for (HRegionInfo region : regions) {
623       regionToFavoredNodes.put(region,
624           ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region));
625     }
626     FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(regionToFavoredNodes,
627       this.server.getConnection());
628   }
629
630   /**
631    * Marks the region as online.  Removes it from regions in transition and
632    * updates the in-memory assignment information.
633    * <p>
634    * Used when a region has been successfully opened on a region server.
635    * @param regionInfo
636    * @param sn
637    */
638   void regionOnline(HRegionInfo regionInfo, ServerName sn) {
639     regionOnline(regionInfo, sn, HConstants.NO_SEQNUM);
640   }
641 
642   void regionOnline(HRegionInfo regionInfo, ServerName sn, long openSeqNum) {
643     numRegionsOpened.incrementAndGet();
644     regionStates.regionOnline(regionInfo, sn, openSeqNum);
645 
646     // Remove plan if one.
647     clearRegionPlan(regionInfo);
648     balancer.regionOnline(regionInfo, sn);
649 
650     // Tell our listeners that a region was opened
651     sendRegionOpenedNotification(regionInfo, sn);
652   }
653
654   /**
655    * Marks the region as offline.  Removes it from regions in transition and
656    * removes in-memory assignment information.
657    * <p>
658    * Used when a region has been closed and should remain closed.
659    * @param regionInfo
660    */
661   public void regionOffline(final HRegionInfo regionInfo) {
662     regionOffline(regionInfo, null);
663   }
664
665   public void offlineDisabledRegion(HRegionInfo regionInfo) {
666     replicasToClose.remove(regionInfo);
667     regionOffline(regionInfo);
668   }
669
670   // Assignment methods
671
672   /**
673    * Assigns the specified region.
674    * <p>
675    * If a RegionPlan is available with a valid destination then it will be used
676    * to determine what server region is assigned to.  If no RegionPlan is
677    * available, region will be assigned to a random available server.
678    * <p>
679    * Updates the RegionState and sends the OPEN RPC.
680    * <p>
681    * This will only succeed if the region is in transition and in a CLOSED or
682    * OFFLINE state or not in transition, and of course, the
683    * chosen server is up and running (It may have just crashed!).
684    *
685    * @param region server to be assigned
686    */
687   public void assign(HRegionInfo region) {
688     assign(region, false);
689   }
690
691   /**
692    * Use care with forceNewPlan. It could cause double assignment.
693    */
694   public void assign(HRegionInfo region, boolean forceNewPlan) {
695     if (isDisabledorDisablingRegionInRIT(region)) {
696       return;
697     }
698     String encodedName = region.getEncodedName();
699     Lock lock = locker.acquireLock(encodedName);
700     try {
701       RegionState state = forceRegionStateToOffline(region, forceNewPlan);
702       if (state != null) {
703         if (regionStates.wasRegionOnDeadServer(encodedName)) {
704           LOG.info("Skip assigning " + region.getRegionNameAsString()
705             + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
706             + " is dead but not processed yet");
707           return;
708         }
709         assign(state, forceNewPlan);
710       }
711     } finally {
712       lock.unlock();
713     }
714   }
715
716   /**
717    * Bulk assign regions to <code>destination</code>.
718    * @param destination
719    * @param regions Regions to assign.
720    * @return true if successful
721    */
722   boolean assign(final ServerName destination, final List<HRegionInfo> regions)
723     throws InterruptedException {
724     long startTime = EnvironmentEdgeManager.currentTime();
725     try {
726       int regionCount = regions.size();
727       if (regionCount == 0) {
728         return true;
729       }
730       LOG.info("Assigning " + regionCount + " region(s) to " + destination.toString());
731       Set<String> encodedNames = new HashSet<String>(regionCount);
732       for (HRegionInfo region : regions) {
733         encodedNames.add(region.getEncodedName());
734       }
735
736       List<HRegionInfo> failedToOpenRegions = new ArrayList<HRegionInfo>();
737       Map<String, Lock> locks = locker.acquireLocks(encodedNames);
738       try {
739         Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>(regionCount);
740         List<RegionState> states = new ArrayList<RegionState>(regionCount);
741         for (HRegionInfo region : regions) {
742           String encodedName = region.getEncodedName();
743           if (!isDisabledorDisablingRegionInRIT(region)) {
744             RegionState state = forceRegionStateToOffline(region, false);
745             boolean onDeadServer = false;
746             if (state != null) {
747               if (regionStates.wasRegionOnDeadServer(encodedName)) {
748                 LOG.info("Skip assigning " + region.getRegionNameAsString()
749                   + ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
750                   + " is dead but not processed yet");
751                 onDeadServer = true;
752               } else {
753                 RegionPlan plan = new RegionPlan(region, state.getServerName(), destination);
754                 plans.put(encodedName, plan);
755                 states.add(state);
756                 continue;
757               }
758             }
759             // Reassign if the region wasn't on a dead server
760             if (!onDeadServer) {
761               LOG.info("failed to force region state to offline, "
762                 + "will reassign later: " + region);
763               failedToOpenRegions.add(region); // assign individually later
764             }
765           }
766           // Release the lock, this region is excluded from bulk assign because
767           // we can't update its state, or set its znode to offline.
768           Lock lock = locks.remove(encodedName);
769           lock.unlock();
770         }
771 
772         if (server.isStopped()) {
773           return false;
774         }
775 
776         // Add region plans, so we can updateTimers when one region is opened so
777         // that unnecessary timeout on RIT is reduced.
778         this.addPlans(plans);
779
780         List<Pair<HRegionInfo, List<ServerName>>> regionOpenInfos =
781           new ArrayList<Pair<HRegionInfo, List<ServerName>>>(states.size());
782         for (RegionState state: states) {
783           HRegionInfo region = state.getRegion();
784           regionStates.updateRegionState(
785             region, State.PENDING_OPEN, destination);
786           List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
787           if (this.shouldAssignRegionsWithFavoredNodes) {
788             favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
789           }
790           regionOpenInfos.add(new Pair<HRegionInfo, List<ServerName>>(
791             region, favoredNodes));
792         }
793
794         // Move on to open regions.
795         try {
796           // Send OPEN RPC. If it fails on a IOE or RemoteException,
797           // regions will be assigned individually.
798           Configuration conf = server.getConfiguration();
799           long maxWaitTime = System.currentTimeMillis() +
800             conf.getLong("hbase.regionserver.rpc.startup.waittime", 60000);
801           for (int i = 1; i <= maximumAttempts && !server.isStopped(); i++) {
802             try {
803               List<RegionOpeningState> regionOpeningStateList = serverManager
804                 .sendRegionOpen(destination, regionOpenInfos);
805               for (int k = 0, n = regionOpeningStateList.size(); k < n; k++) {
806                 RegionOpeningState openingState = regionOpeningStateList.get(k);
807                 if (openingState != RegionOpeningState.OPENED) {
808                   HRegionInfo region = regionOpenInfos.get(k).getFirst();
809                   LOG.info("Got opening state " + openingState
810                     + ", will reassign later: " + region);
811                   // Failed opening this region, reassign it later
812                   forceRegionStateToOffline(region, true);
813                   failedToOpenRegions.add(region);
814                 }
815               }
816               break;
817             } catch (IOException e) {
818               if (e instanceof RemoteException) {
819                 e = ((RemoteException)e).unwrapRemoteException();
820               }
821               if (e instanceof RegionServerStoppedException) {
822                 LOG.warn("The region server was shut down, ", e);
823                 // No need to retry, the region server is a goner.
824                 return false;
825               } else if (e instanceof ServerNotRunningYetException) {
826                 long now = System.currentTimeMillis();
827                 if (now < maxWaitTime) {
828                   if (LOG.isDebugEnabled()) {
829                     LOG.debug("Server is not yet up; waiting up to " +
830                       (maxWaitTime - now) + "ms", e);
831                   }
832                   Thread.sleep(100);
833                   i--; // reset the try count
834                   continue;
835                 }
836               } else if (e instanceof java.net.SocketTimeoutException
837                   && this.serverManager.isServerOnline(destination)) {
838                 // In case socket is timed out and the region server is still online,
839                 // the openRegion RPC could have been accepted by the server and
840                 // just the response didn't go through.  So we will retry to
841                 // open the region on the same server.
842                 if (LOG.isDebugEnabled()) {
843                   LOG.debug("Bulk assigner openRegion() to " + destination
844                     + " has timed out, but the regions might"
845                     + " already be opened on it.", e);
846                 }
847                 // wait and reset the re-try count, server might be just busy.
848                 Thread.sleep(100);
849                 i--;
850                 continue;
851               } else if (e instanceof FailedServerException && i < maximumAttempts) {
852                 // In case the server is in the failed server list, no point to
853                 // retry too soon. Retry after the failed_server_expiry time
854                 long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
855                   RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
856                 if (LOG.isDebugEnabled()) {
857                   LOG.debug(destination + " is on failed server list; waiting "
858                     + sleepTime + "ms", e);
859                 }
860                 Thread.sleep(sleepTime);
861                 continue;
862               }
863               throw e;
864             }
865           }
866         } catch (IOException e) {
867           // Can be a socket timeout, EOF, NoRouteToHost, etc
868           LOG.info("Unable to communicate with " + destination
869             + " in order to assign regions, ", e);
870           for (RegionState state: states) {
871             HRegionInfo region = state.getRegion();
872             forceRegionStateToOffline(region, true);
873           }
874           return false;
875         }
876       } finally {
877         for (Lock lock : locks.values()) {
878           lock.unlock();
879         }
880       }
881
882       if (!failedToOpenRegions.isEmpty()) {
883         for (HRegionInfo region : failedToOpenRegions) {
884           if (!regionStates.isRegionOnline(region)) {
885             invokeAssign(region);
886           }
887         }
888       }
889
890       // wait for assignment completion
891       ArrayList<HRegionInfo> userRegionSet = new ArrayList<HRegionInfo>(regions.size());
892       for (HRegionInfo region: regions) {
893         if (!region.getTable().isSystemTable()) {
894           userRegionSet.add(region);
895         }
896       }
897       if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
898             System.currentTimeMillis())) {
899         LOG.debug("some user regions are still in transition: " + userRegionSet);
900       }
901       LOG.debug("Bulk assigning done for " + destination);
902       return true;
903     } finally {
904       metricsAssignmentManager.updateBulkAssignTime(EnvironmentEdgeManager.currentTime() - startTime);
905     }
906   }
907
908   /**
909    * Send CLOSE RPC if the server is online, otherwise, offline the region.
910    *
911    * The RPC will be sent only to the region sever found in the region state
912    * if it is passed in, otherwise, to the src server specified. If region
913    * state is not specified, we don't update region state at all, instead
914    * we just send the RPC call. This is useful for some cleanup without
915    * messing around the region states (see handleRegion, on region opened
916    * on an unexpected server scenario, for an example)
917    */
918   private void unassign(final HRegionInfo region,
919       final ServerName server, final ServerName dest) {
920     for (int i = 1; i <= this.maximumAttempts; i++) {
921       if (this.server.isStopped() || this.server.isAborted()) {
922         LOG.debug("Server stopped/aborted; skipping unassign of " + region);
923         return;
924       }
925       if (!serverManager.isServerOnline(server)) {
926         LOG.debug("Offline " + region.getRegionNameAsString()
927           + ", no need to unassign since it's on a dead server: " + server);
928         regionStates.updateRegionState(region, State.OFFLINE);
929         return;
930       }
931       try {
932         // Send CLOSE RPC
933         if (serverManager.sendRegionClose(server, region, dest)) {
934           LOG.debug("Sent CLOSE to " + server + " for region " +
935             region.getRegionNameAsString());
936           return;
937         }
938         // This never happens. Currently regionserver close always return true.
939         // Todo; this can now happen (0.96) if there is an exception in a coprocessor
940         LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
941           region.getRegionNameAsString());
942       } catch (Throwable t) {
943         long sleepTime = 0;
944         Configuration conf = this.server.getConfiguration();
945         if (t instanceof RemoteException) {
946           t = ((RemoteException)t).unwrapRemoteException();
947         }
948         if (t instanceof RegionServerAbortedException
949             || t instanceof RegionServerStoppedException
950             || t instanceof ServerNotRunningYetException) {
951           // RS is aborting, we cannot offline the region since the region may need to do WAL
952           // recovery. Until we see  the RS expiration, we should retry.
953           sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
954             RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
955
956         } else if (t instanceof NotServingRegionException) {
957           LOG.debug("Offline " + region.getRegionNameAsString()
958             + ", it's not any more on " + server, t);
959           regionStates.updateRegionState(region, State.OFFLINE);
960           return;
961         } else if (t instanceof FailedServerException && i < maximumAttempts) {
962           // In case the server is in the failed server list, no point to
963           // retry too soon. Retry after the failed_server_expiry time
964           sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
965           RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
966           if (LOG.isDebugEnabled()) {
967             LOG.debug(server + " is on failed server list; waiting " + sleepTime + "ms", t);
968           }
969        }
970        try {
971          if (sleepTime > 0) {
972            Thread.sleep(sleepTime);
973          }
974        } catch (InterruptedException ie) {
975          LOG.warn("Interrupted unassign " + region.getRegionNameAsString(), ie);
976          Thread.currentThread().interrupt();
977          regionStates.updateRegionState(region, State.FAILED_CLOSE);
978          return;
979        }
980        LOG.info("Server " + server + " returned " + t + " for "
981          + region.getRegionNameAsString() + ", try=" + i
982          + " of " + this.maximumAttempts, t);
983       }
984     }
985     // Run out of attempts
986     regionStates.updateRegionState(region, State.FAILED_CLOSE);
987   }
988
989   /**
990    * Set region to OFFLINE unless it is opening and forceNewPlan is false.
991    */
992   private RegionState forceRegionStateToOffline(
993       final HRegionInfo region, final boolean forceNewPlan) {
994     RegionState state = regionStates.getRegionState(region);
995     if (state == null) {
996       LOG.warn("Assigning but not in region states: " + region);
997       state = regionStates.createRegionState(region);
998     }
999 
1000     if (forceNewPlan && LOG.isDebugEnabled()) {
1001       LOG.debug("Force region state offline " + state);
1002     }
1003
1004     switch (state.getState()) {
1005     case OPEN:
1006     case OPENING:
1007     case PENDING_OPEN:
1008     case CLOSING:
1009     case PENDING_CLOSE:
1010       if (!forceNewPlan) {
1011         LOG.debug("Skip assigning " +
1012           region + ", it is already " + state);
1013         return null;
1014       }
1015     case FAILED_CLOSE:
1016     case FAILED_OPEN:
1017       regionStates.updateRegionState(region, State.PENDING_CLOSE);
1018       unassign(region, state.getServerName(), null);
1019       state = regionStates.getRegionState(region);
1020       if (!state.isOffline() && !state.isClosed()) {
1021         // If the region isn't offline, we can't re-assign
1022         // it now. It will be assigned automatically after
1023         // the regionserver reports it's closed.
1024         return null;
1025       }
1026     case OFFLINE:
1027     case CLOSED:
1028       break;
1029     default:
1030       LOG.error("Trying to assign region " + region
1031         + ", which is " + state);
1032       return null;
1033     }
1034     return state;
1035   }
1036
1037   /**
1038    * Caller must hold lock on the passed <code>state</code> object.
1039    * @param state
1040    * @param forceNewPlan
1041    */
1042   private void assign(RegionState state, boolean forceNewPlan) {
1043     long startTime = EnvironmentEdgeManager.currentTime();
1044     try {
1045       Configuration conf = server.getConfiguration();
1046       RegionPlan plan = null;
1047       long maxWaitTime = -1;
1048       HRegionInfo region = state.getRegion();
1049       Throwable previousException = null;
1050       for (int i = 1; i <= maximumAttempts; i++) {
1051         if (server.isStopped() || server.isAborted()) {
1052           LOG.info("Skip assigning " + region.getRegionNameAsString()
1053             + ", the server is stopped/aborted");
1054           return;
1055         }
1056
1057         if (plan == null) { // Get a server for the region at first
1058           try {
1059             plan = getRegionPlan(region, forceNewPlan);
1060           } catch (HBaseIOException e) {
1061             LOG.warn("Failed to get region plan", e);
1062           }
1063         }
1064
1065         if (plan == null) {
1066           LOG.warn("Unable to determine a plan to assign " + region);
1067
1068           // For meta region, we have to keep retrying until succeeding
1069           if (region.isMetaRegion()) {
1070             if (i == maximumAttempts) {
1071               i = 0; // re-set attempt count to 0 for at least 1 retry
1072
1073               LOG.warn("Unable to determine a plan to assign a hbase:meta region " + region +
1074                 " after maximumAttempts (" + this.maximumAttempts +
1075                 "). Reset attempts count and continue retrying.");
1076             }
1077             waitForRetryingMetaAssignment();
1078             continue;
1079           }
1080
1081           regionStates.updateRegionState(region, State.FAILED_OPEN);
1082           return;
1083         }
1084         LOG.info("Assigning " + region.getRegionNameAsString() +
1085             " to " + plan.getDestination());
1086         // Transition RegionState to PENDING_OPEN
1087        regionStates.updateRegionState(region,
1088           State.PENDING_OPEN, plan.getDestination());
1089
1090         boolean needNewPlan = false;
1091         final String assignMsg = "Failed assignment of " + region.getRegionNameAsString() +
1092             " to " + plan.getDestination();
1093         try {
1094           List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
1095           if (this.shouldAssignRegionsWithFavoredNodes) {
1096             favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
1097           }
1098           serverManager.sendRegionOpen(plan.getDestination(), region, favoredNodes);
1099           return; // we're done
1100         } catch (Throwable t) {
1101           if (t instanceof RemoteException) {
1102             t = ((RemoteException) t).unwrapRemoteException();
1103           }
1104           previousException = t;
1105
1106           // Should we wait a little before retrying? If the server is starting it's yes.
1107           boolean hold = (t instanceof ServerNotRunningYetException);
1108
1109           // In case socket is timed out and the region server is still online,
1110           // the openRegion RPC could have been accepted by the server and
1111           // just the response didn't go through.  So we will retry to
1112           // open the region on the same server.
1113           boolean retry = !hold && (t instanceof java.net.SocketTimeoutException
1114               && this.serverManager.isServerOnline(plan.getDestination()));
1115 
1116           if (hold) {
1117             LOG.warn(assignMsg + ", waiting a little before trying on the same region server " +
1118               "try=" + i + " of " + this.maximumAttempts, t);
1119
1120             if (maxWaitTime < 0) {
1121               maxWaitTime = EnvironmentEdgeManager.currentTime()
1122                 + this.server.getConfiguration().getLong(
1123                   "hbase.regionserver.rpc.startup.waittime", 60000);
1124             }
1125             try {
1126               long now = EnvironmentEdgeManager.currentTime();
1127               if (now < maxWaitTime) {
1128                 if (LOG.isDebugEnabled()) {
1129                   LOG.debug("Server is not yet up; waiting up to "
1130                     + (maxWaitTime - now) + "ms", t);
1131                 }
1132                 Thread.sleep(100);
1133                 i--; // reset the try count
1134               } else {
1135                 LOG.debug("Server is not up for a while; try a new one", t);
1136                 needNewPlan = true;
1137               }
1138             } catch (InterruptedException ie) {
1139               LOG.warn("Failed to assign "
1140                   + region.getRegionNameAsString() + " since interrupted", ie);
1141               regionStates.updateRegionState(region, State.FAILED_OPEN);
1142               Thread.currentThread().interrupt();
1143               return;
1144             }
1145           } else if (retry) {
1146             i--; // we want to retry as many times as needed as long as the RS is not dead.
1147             if (LOG.isDebugEnabled()) {
1148               LOG.debug(assignMsg + ", trying to assign to the same region server due ", t);
1149             }
1150           } else {
1151             needNewPlan = true;
1152             LOG.warn(assignMsg + ", trying to assign elsewhere instead;" +
1153                 " try=" + i + " of " + this.maximumAttempts, t);
1154           }
1155         }
1156
1157         if (i == this.maximumAttempts) {
1158           // For meta region, we have to keep retrying until succeeding
1159           if (region.isMetaRegion()) {
1160             i = 0; // re-set attempt count to 0 for at least 1 retry
1161             LOG.warn(assignMsg +
1162                 ", trying to assign a hbase:meta region reached to maximumAttempts (" +
1163                 this.maximumAttempts + ").  Reset attempt counts and continue retrying.");
1164             waitForRetryingMetaAssignment();
1165           }
1166           else {
1167             // Don't reset the region state or get a new plan any more.
1168             // This is the last try.
1169             continue;
1170           }
1171         }
1172
1173         // If region opened on destination of present plan, reassigning to new
1174         // RS may cause double assignments. In case of RegionAlreadyInTransitionException
1175         // reassigning to same RS.
1176         if (needNewPlan) {
1177           // Force a new plan and reassign. Will return null if no servers.
1178           // The new plan could be the same as the existing plan since we don't
1179           // exclude the server of the original plan, which should not be
1180           // excluded since it could be the only server up now.
1181           RegionPlan newPlan = null;
1182           try {
1183             newPlan = getRegionPlan(region, true);
1184           } catch (HBaseIOException e) {
1185             LOG.warn("Failed to get region plan", e);
1186           }
1187           if (newPlan == null) {
1188             regionStates.updateRegionState(region, State.FAILED_OPEN);
1189             LOG.warn("Unable to find a viable location to assign region " +
1190                 region.getRegionNameAsString());
1191             return;
1192           }
1193
1194           if (plan != newPlan && !plan.getDestination().equals(newPlan.getDestination())) {
1195             // Clean out plan we failed execute and one that doesn't look like it'll
1196             // succeed anyways; we need a new plan!
1197             // Transition back to OFFLINE
1198             regionStates.updateRegionState(region, State.OFFLINE);
1199             plan = newPlan;
1200           } else if(plan.getDestination().equals(newPlan.getDestination()) &&
1201               previousException instanceof FailedServerException) {
1202             try {
1203               LOG.info("Trying to re-assign " + region.getRegionNameAsString() +
1204                 " to the same failed server.");
1205               Thread.sleep(1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1206                 RpcClient.FAILED_SERVER_EXPIRY_DEFAULT));
1207             } catch (InterruptedException ie) {
1208               LOG.warn("Failed to assign "
1209                   + region.getRegionNameAsString() + " since interrupted", ie);
1210               regionStates.updateRegionState(region, State.FAILED_OPEN);
1211               Thread.currentThread().interrupt();
1212               return;
1213             }
1214           }
1215         }
1216       }
1217       // Run out of attempts
1218       regionStates.updateRegionState(region, State.FAILED_OPEN);
1219     } finally {
1220       metricsAssignmentManager.updateAssignmentTime(EnvironmentEdgeManager.currentTime() - startTime);
1221     }
1222   }
1223
1224   private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) {
1225     if (this.tableStateManager.isTableState(region.getTable(),
1226             TableState.State.DISABLED,
1227             TableState.State.DISABLING) || replicasToClose.contains(region)) {
1228       LOG.info("Table " + region.getTable() + " is disabled or disabling;"
1229         + " skipping assign of " + region.getRegionNameAsString());
1230       offlineDisabledRegion(region);
1231       return true;
1232     }
1233     return false;
1234   }
1235
1236   /**
1237    * @param region the region to assign
1238    * @param forceNewPlan If true, then if an existing plan exists, a new plan
1239    * will be generated.
1240    * @return Plan for passed <code>region</code> (If none currently, it creates one or
1241    * if no servers to assign, it returns null).
1242    */
1243   private RegionPlan getRegionPlan(final HRegionInfo region,
1244       final boolean forceNewPlan) throws HBaseIOException {
1245     // Pickup existing plan or make a new one
1246     final String encodedName = region.getEncodedName();
1247     final List<ServerName> destServers =
1248       serverManager.createDestinationServersList();
1249
1250     if (destServers.isEmpty()){
1251       LOG.warn("Can't move " + encodedName +
1252         ", there is no destination server available.");
1253       return null;
1254     }
1255 
1256     RegionPlan randomPlan = null;
1257     boolean newPlan = false;
1258     RegionPlan existingPlan;
1259
1260     synchronized (this.regionPlans) {
1261       existingPlan = this.regionPlans.get(encodedName);
1262
1263       if (existingPlan != null && existingPlan.getDestination() != null) {
1264         LOG.debug("Found an existing plan for " + region.getRegionNameAsString()
1265           + " destination server is " + existingPlan.getDestination() +
1266             " accepted as a dest server = " + destServers.contains(existingPlan.getDestination()));
1267       }
1268
1269       if (forceNewPlan
1270           || existingPlan == null
1271           || existingPlan.getDestination() == null
1272           || !destServers.contains(existingPlan.getDestination())) {
1273         newPlan = true;
1274         try {
1275           randomPlan = new RegionPlan(region, null,
1276               balancer.randomAssignment(region, destServers));
1277         } catch (IOException ex) {
1278           LOG.warn("Failed to create new plan.",ex);
1279           return null;
1280         }
1281         if (!region.isMetaTable() && shouldAssignRegionsWithFavoredNodes) {
1282           List<HRegionInfo> regions = new ArrayList<HRegionInfo>(1);
1283           regions.add(region);
1284           try {
1285             processFavoredNodes(regions);
1286           } catch (IOException ie) {
1287             LOG.warn("Ignoring exception in processFavoredNodes " + ie);
1288           }
1289         }
1290         this.regionPlans.put(encodedName, randomPlan);
1291       }
1292     }
1293
1294     if (newPlan) {
1295       if (randomPlan.getDestination() == null) {
1296         LOG.warn("Can't find a destination for " + encodedName);
1297         return null;
1298       }
1299       if (LOG.isDebugEnabled()) {
1300         LOG.debug("No previous transition plan found (or ignoring " +
1301           "an existing plan) for " + region.getRegionNameAsString() +
1302           "; generated random plan=" + randomPlan + "; " + destServers.size() +
1303           " (online=" + serverManager.getOnlineServers().size() +
1304           ") available servers, forceNewPlan=" + forceNewPlan);
1305       }
1306       return randomPlan;
1307     }
1308     if (LOG.isDebugEnabled()) {
1309       LOG.debug("Using pre-existing plan for " +
1310         region.getRegionNameAsString() + "; plan=" + existingPlan);
1311     }
1312     return existingPlan;
1313   }
1314
1315   /**
1316    * Wait for some time before retrying meta table region assignment
1317    */
1318   private void waitForRetryingMetaAssignment() {
1319     try {
1320       Thread.sleep(this.sleepTimeBeforeRetryingMetaAssignment);
1321     } catch (InterruptedException e) {
1322       LOG.error("Got exception while waiting for hbase:meta assignment");
1323       Thread.currentThread().interrupt();
1324     }
1325   }
1326
1327   /**
1328    * Unassigns the specified region.
1329    * <p>
1330    * Updates the RegionState and sends the CLOSE RPC unless region is being
1331    * split by regionserver; then the unassign fails (silently) because we
1332    * presume the region being unassigned no longer exists (its been split out
1333    * of existence). TODO: What to do if split fails and is rolled back and
1334    * parent is revivified?
1335    * <p>
1336    * If a RegionPlan is already set, it will remain.
1337    *
1338    * @param region server to be unassigned
1339    */
1340   public void unassign(HRegionInfo region) {
1341     unassign(region, null);
1342   }
1343
1344
1345   /**
1346    * Unassigns the specified region.
1347    * <p>
1348    * Updates the RegionState and sends the CLOSE RPC unless region is being
1349    * split by regionserver; then the unassign fails (silently) because we
1350    * presume the region being unassigned no longer exists (its been split out
1351    * of existence). TODO: What to do if split fails and is rolled back and
1352    * parent is revivified?
1353    * <p>
1354    * If a RegionPlan is already set, it will remain.
1355    *
1356    * @param region server to be unassigned
1357    * @param dest the destination server of the region
1358    */
1359   public void unassign(HRegionInfo region, ServerName dest) {
1360     // TODO: Method needs refactoring.  Ugly buried returns throughout.  Beware!
1361     LOG.debug("Starting unassign of " + region.getRegionNameAsString()
1362       + " (offlining), current state: " + regionStates.getRegionState(region));
1363
1364     String encodedName = region.getEncodedName();
1365     // Grab the state of this region and synchronize on it
1366     // We need a lock here as we're going to do a put later and we don't want multiple states
1367     //  creation
1368     ReentrantLock lock = locker.acquireLock(encodedName);
1369     RegionState state = regionStates.getRegionTransitionState(encodedName);
1370     try {
1371       if (state == null || state.isFailedClose()) {
1372         if (state == null) {
1373           // Region is not in transition.
1374           // We can unassign it only if it's not SPLIT/MERGED.
1375           state = regionStates.getRegionState(encodedName);
1376           if (state != null && state.isUnassignable()) {
1377             LOG.info("Attempting to unassign " + state + ", ignored");
1378             // Offline region will be reassigned below
1379             return;
1380           }
1381           if (state == null || state.getServerName() == null) {
1382             // We don't know where the region is, offline it.
1383             // No need to send CLOSE RPC
1384             LOG.warn("Attempting to unassign a region not in RegionStates "
1385               + region.getRegionNameAsString() + ", offlined");
1386             regionOffline(region);
1387             return;
1388           }
1389         }
1390         state = regionStates.updateRegionState(
1391           region, State.PENDING_CLOSE);
1392       } else if (state.isFailedOpen()) {
1393         // The region is not open yet
1394         regionOffline(region);
1395         return;
1396       } else {
1397         LOG.debug("Attempting to unassign " +
1398           region.getRegionNameAsString() + " but it is " +
1399           "already in transition (" + state.getState());
1400         return;
1401       }
1402 
1403       unassign(region, state.getServerName(), dest);
1404     } finally {
1405       lock.unlock();
1406
1407       // Region is expected to be reassigned afterwards
1408       if (!replicasToClose.contains(region)
1409           && regionStates.isRegionInState(region, State.OFFLINE)) {
1410         assign(region);
1411       }
1412     }
1413   }
1414
1415   /**
1416    * Used by unit tests. Return the number of regions opened so far in the life
1417    * of the master. Increases by one every time the master opens a region
1418    * @return the counter value of the number of regions opened so far
1419    */
1420   public int getNumRegionsOpened() {
1421     return numRegionsOpened.get();
1422   }
1423
1424   /**
1425    * Waits until the specified region has completed assignment.
1426    * <p>
1427    * If the region is already assigned, returns immediately.  Otherwise, method
1428    * blocks until the region is assigned.
1429    * @param regionInfo region to wait on assignment for
1430    * @return true if the region is assigned false otherwise.
1431    * @throws InterruptedException
1432    */
1433   public boolean waitForAssignment(HRegionInfo regionInfo)
1434       throws InterruptedException {
1435     ArrayList<HRegionInfo> regionSet = new ArrayList<HRegionInfo>(1);
1436     regionSet.add(regionInfo);
1437     return waitForAssignment(regionSet, true, Long.MAX_VALUE);
1438   }
1439
1440   /**
1441    * Waits until the specified region has completed assignment, or the deadline is reached.
1442    */
1443   protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
1444       final boolean waitTillAllAssigned, final int reassigningRegions,
1445       final long minEndTime) throws InterruptedException {
1446     long deadline = minEndTime + bulkPerRegionOpenTimeGuesstimate * (reassigningRegions + 1);
1447     if (deadline < 0) { // Overflow
1448       deadline = Long.MAX_VALUE; // wait forever
1449     }
1450     return waitForAssignment(regionSet, waitTillAllAssigned, deadline);
1451   }
1452
1453   /**
1454    * Waits until the specified region has completed assignment, or the deadline is reached.
1455    * @param regionSet set of region to wait on. the set is modified and the assigned regions removed
1456    * @param waitTillAllAssigned true if we should wait all the regions to be assigned
1457    * @param deadline the timestamp after which the wait is aborted
1458    * @return true if all the regions are assigned false otherwise.
1459    * @throws InterruptedException
1460    */
1461   protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
1462       final boolean waitTillAllAssigned, final long deadline) throws InterruptedException {
1463     // We're not synchronizing on regionsInTransition now because we don't use any iterator.
1464     while (!regionSet.isEmpty() && !server.isStopped() && deadline > System.currentTimeMillis()) {
1465       int failedOpenCount = 0;
1466       Iterator<HRegionInfo> regionInfoIterator = regionSet.iterator();
1467       while (regionInfoIterator.hasNext()) {
1468         HRegionInfo hri = regionInfoIterator.next();
1469         if (regionStates.isRegionOnline(hri) || regionStates.isRegionInState(hri,
1470             State.SPLITTING, State.SPLIT, State.MERGING, State.MERGED)) {
1471           regionInfoIterator.remove();
1472         } else if (regionStates.isRegionInState(hri, State.FAILED_OPEN)) {
1473           failedOpenCount++;
1474         }
1475       }
1476       if (!waitTillAllAssigned) {
1477         // No need to wait, let assignment going on asynchronously
1478         break;
1479       }
1480       if (!regionSet.isEmpty()) {
1481         if (failedOpenCount == regionSet.size()) {
1482           // all the regions we are waiting had an error on open.
1483           break;
1484         }
1485         regionStates.waitForUpdate(100);
1486       }
1487     }
1488     return regionSet.isEmpty();
1489   }
1490
1491   /**
1492    * Assigns the hbase:meta region or a replica.
1493    * <p>
1494    * Assumes that hbase:meta is currently closed and is not being actively served by
1495    * any RegionServer.
1496    * @param hri TODO
1497    */
1498   public void assignMeta(HRegionInfo hri) throws KeeperException {
1499     regionStates.updateRegionState(hri, State.OFFLINE);
1500     assign(hri);
1501   }
1502
1503   /**
1504    * Assigns specified regions retaining assignments, if any.
1505    * <p>
1506    * This is a synchronous call and will return once every region has been
1507    * assigned.  If anything fails, an exception is thrown
1508    * @throws InterruptedException
1509    * @throws IOException
1510    */
1511   public void assign(Map<HRegionInfo, ServerName> regions)
1512         throws IOException, InterruptedException {
1513     if (regions == null || regions.isEmpty()) {
1514       return;
1515     }
1516     List<ServerName> servers = serverManager.createDestinationServersList();
1517     if (servers == null || servers.isEmpty()) {
1518       throw new IOException("Found no destination server to assign region(s)");
1519     }
1520
1521     // Reuse existing assignment info
1522     Map<ServerName, List<HRegionInfo>> bulkPlan =
1523       balancer.retainAssignment(regions, servers);
1524     if (bulkPlan == null) {
1525       throw new IOException("Unable to determine a plan to assign region(s)");
1526     }
1527
1528     processBogusAssignments(bulkPlan);
1529
1530     assign(regions.size(), servers.size(),
1531       "retainAssignment=true", bulkPlan);
1532   }
1533
1534   /**
1535    * Assigns specified regions round robin, if any.
1536    * <p>
1537    * This is a synchronous call and will return once every region has been
1538    * assigned.  If anything fails, an exception is thrown
1539    * @throws InterruptedException
1540    * @throws IOException
1541    */
1542   public void assign(List<HRegionInfo> regions)
1543         throws IOException, InterruptedException {
1544     if (regions == null || regions.isEmpty()) {
1545       return;
1546     }
1547
1548     List<ServerName> servers = serverManager.createDestinationServersList();
1549     if (servers == null || servers.isEmpty()) {
1550       throw new IOException("Found no destination server to assign region(s)");
1551     }
1552
1553     // Generate a round-robin bulk assignment plan
1554     Map<ServerName, List<HRegionInfo>> bulkPlan = balancer.roundRobinAssignment(regions, servers);
1555     if (bulkPlan == null) {
1556       throw new IOException("Unable to determine a plan to assign region(s)");
1557     }
1558
1559     processBogusAssignments(bulkPlan);
1560
1561     processFavoredNodes(regions);
1562     assign(regions.size(), servers.size(), "round-robin=true", bulkPlan);
1563   }
1564
1565   private void assign(int regions, int totalServers,
1566       String message, Map<ServerName, List<HRegionInfo>> bulkPlan)
1567           throws InterruptedException, IOException {
1568
1569     int servers = bulkPlan.size();
1570     if (servers == 1 || (regions < bulkAssignThresholdRegions
1571         && servers < bulkAssignThresholdServers)) {
1572
1573       // Not use bulk assignment.  This could be more efficient in small
1574       // cluster, especially mini cluster for testing, so that tests won't time out
1575       if (LOG.isTraceEnabled()) {
1576         LOG.trace("Not using bulk assignment since we are assigning only " + regions +
1577           " region(s) to " + servers + " server(s)");
1578       }
1579
1580       // invoke assignment (async)
1581       ArrayList<HRegionInfo> userRegionSet = new ArrayList<HRegionInfo>(regions);
1582       for (Map.Entry<ServerName, List<HRegionInfo>> plan: bulkPlan.entrySet()) {
1583         if (!assign(plan.getKey(), plan.getValue()) && !server.isStopped()) {
1584           for (HRegionInfo region: plan.getValue()) {
1585             if (!regionStates.isRegionOnline(region)) {
1586               invokeAssign(region);
1587               if (!region.getTable().isSystemTable()) {
1588                 userRegionSet.add(region);
1589               }
1590             }
1591           }
1592         }
1593       }
1594
1595       // wait for assignment completion
1596       if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
1597             System.currentTimeMillis())) {
1598         LOG.debug("some user regions are still in transition: " + userRegionSet);
1599       }
1600     } else {
1601       LOG.info("Bulk assigning " + regions + " region(s) across "
1602         + totalServers + " server(s), " + message);
1603
1604       // Use fixed count thread pool assigning.
1605       BulkAssigner ba = new GeneralBulkAssigner(
1606         this.server, bulkPlan, this, bulkAssignWaitTillAllAssigned);
1607       ba.bulkAssign();
1608       LOG.info("Bulk assigning done");
1609     }
1610   }
1611
1612   /**
1613    * Assigns all user regions, if any exist.  Used during cluster startup.
1614    * <p>
1615    * This is a synchronous call and will return once every region has been
1616    * assigned.  If anything fails, an exception is thrown and the cluster
1617    * should be shutdown.
1618    * @throws InterruptedException
1619    * @throws IOException
1620    */
1621   private void assignAllUserRegions(Map<HRegionInfo, ServerName> allRegions)
1622       throws IOException, InterruptedException {
1623     if (allRegions == null || allRegions.isEmpty()) return;
1624
1625     // Determine what type of assignment to do on startup
1626     boolean retainAssignment = server.getConfiguration().
1627       getBoolean("hbase.master.startup.retainassign", true);
1628
1629     Set<HRegionInfo> regionsFromMetaScan = allRegions.keySet();
1630     if (retainAssignment) {
1631       assign(allRegions);
1632     } else {
1633       List<HRegionInfo> regions = new ArrayList<HRegionInfo>(regionsFromMetaScan);
1634       assign(regions);
1635     }
1636
1637     for (HRegionInfo hri : regionsFromMetaScan) {
1638       TableName tableName = hri.getTable();
1639       if (!tableStateManager.isTableState(tableName,
1640               TableState.State.ENABLED)) {
1641         setEnabledTable(tableName);
1642       }
1643     }
1644     // assign all the replicas that were not recorded in the meta
1645     assign(replicaRegionsNotRecordedInMeta(regionsFromMetaScan, (MasterServices)server));
1646   }
1647
1648   /**
1649    * Get a list of replica regions that are:
1650    * not recorded in meta yet. We might not have recorded the locations
1651    * for the replicas since the replicas may not have been online yet, master restarted
1652    * in the middle of assigning, ZK erased, etc.
1653    * @param regionsRecordedInMeta the list of regions we know are recorded in meta
1654    * either as a default, or, as the location of a replica
1655    * @param master
1656    * @return list of replica regions
1657    * @throws IOException
1658    */
1659   public static List<HRegionInfo> replicaRegionsNotRecordedInMeta(
1660       Set<HRegionInfo> regionsRecordedInMeta, MasterServices master)throws IOException {
1661     List<HRegionInfo> regionsNotRecordedInMeta = new ArrayList<HRegionInfo>();
1662     for (HRegionInfo hri : regionsRecordedInMeta) {
1663       TableName table = hri.getTable();
1664       HTableDescriptor htd = master.getTableDescriptors().get(table);
1665       // look at the HTD for the replica count. That's the source of truth
1666       int desiredRegionReplication = htd.getRegionReplication();
1667       for (int i = 0; i < desiredRegionReplication; i++) {
1668         HRegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hri, i);
1669         if (regionsRecordedInMeta.contains(replica)) continue;
1670         regionsNotRecordedInMeta.add(replica);
1671       }
1672     }
1673     return regionsNotRecordedInMeta;
1674   }
1675
1676   /**
1677    * Rebuild the list of user regions and assignment information.
1678    * Updates regionstates with findings as we go through list of regions.
1679    * @return set of servers not online that hosted some regions according to a scan of hbase:meta
1680    * @throws IOException
1681    */
1682   Set<ServerName> rebuildUserRegions() throws
1683           IOException, KeeperException {
1684     Set<TableName> disabledOrEnablingTables = tableStateManager.getTablesInStates(
1685             TableState.State.DISABLED, TableState.State.ENABLING);
1686
1687     Set<TableName> disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
1688             TableState.State.DISABLED,
1689             TableState.State.DISABLING,
1690             TableState.State.ENABLING);
1691
1692     // Region assignment from META
1693     List<Result> results = MetaTableAccessor.fullScanRegions(server.getConnection());
1694     // Get any new but slow to checkin region server that joined the cluster
1695     Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
1696     // Set of offline servers to be returned
1697     Set<ServerName> offlineServers = new HashSet<ServerName>();
1698     // Iterate regions in META
1699     for (Result result : results) {
1700       if (result == null && LOG.isDebugEnabled()){
1701         LOG.debug("null result from meta - ignoring but this is strange.");
1702         continue;
1703       }
1704       // keep a track of replicas to close. These were the replicas of the originally
1705       // unmerged regions. The master might have closed them before but it mightn't
1706       // maybe because it crashed.
1707       PairOfSameType<HRegionInfo> p = MetaTableAccessor.getMergeRegions(result);
1708       if (p.getFirst() != null && p.getSecond() != null) {
1709         int numReplicas = ((MasterServices)server).getTableDescriptors().get(p.getFirst().
1710             getTable()).getRegionReplication();
1711         for (HRegionInfo merge : p) {
1712           for (int i = 1; i < numReplicas; i++) {
1713             replicasToClose.add(RegionReplicaUtil.getRegionInfoForReplica(merge, i));
1714           }
1715         }
1716       }
1717       RegionLocations rl =  MetaTableAccessor.getRegionLocations(result);
1718       if (rl == null) {
1719         continue;
1720       }
1721       HRegionLocation[] locations = rl.getRegionLocations();
1722       if (locations == null) {
1723         continue;
1724       }
1725       for (HRegionLocation hrl : locations) {
1726         if (hrl == null) continue;
1727         HRegionInfo regionInfo = hrl.getRegionInfo();
1728         if (regionInfo == null) continue;
1729         int replicaId = regionInfo.getReplicaId();
1730         State state = RegionStateStore.getRegionState(result, replicaId);
1731         // keep a track of replicas to close. These were the replicas of the split parents
1732         // from the previous life of the master. The master should have closed them before
1733         // but it couldn't maybe because it crashed
1734         if (replicaId == 0 && state.equals(State.SPLIT)) {
1735           for (HRegionLocation h : locations) {
1736             replicasToClose.add(h.getRegionInfo());
1737           }
1738         }
1739         ServerName lastHost = hrl.getServerName();
1740         ServerName regionLocation = RegionStateStore.getRegionServer(result, replicaId);
1741         regionStates.createRegionState(regionInfo, state, regionLocation, lastHost);
1742         if (!regionStates.isRegionInState(regionInfo, State.OPEN)) {
1743           // Region is not open (either offline or in transition), skip
1744           continue;
1745         }
1746         TableName tableName = regionInfo.getTable();
1747         if (!onlineServers.contains(regionLocation)) {
1748           // Region is located on a server that isn't online
1749           offlineServers.add(regionLocation);
1750         } else if (!disabledOrEnablingTables.contains(tableName)) {
1751           // Region is being served and on an active server
1752           // add only if region not in disabled or enabling table
1753           regionStates.regionOnline(regionInfo, regionLocation);
1754           balancer.regionOnline(regionInfo, regionLocation);
1755         }
1756         // need to enable the table if not disabled or disabling or enabling
1757         // this will be used in rolling restarts
1758         if (!disabledOrDisablingOrEnabling.contains(tableName)
1759           && !getTableStateManager().isTableState(tableName,
1760                 TableState.State.ENABLED)) {
1761           setEnabledTable(tableName);
1762         }
1763       }
1764     }
1765     return offlineServers;
1766   }
1767
1768   /**
1769    * Recover the tables that were not fully moved to DISABLED state. These
1770    * tables are in DISABLING state when the master restarted/switched.
1771    *
1772    * @throws KeeperException
1773    * @throws TableNotFoundException
1774    * @throws IOException
1775    */
1776   private void recoverTableInDisablingState()
1777           throws KeeperException, IOException {
1778     Set<TableName> disablingTables =
1779             tableStateManager.getTablesInStates(TableState.State.DISABLING);
1780     if (disablingTables.size() != 0) {
1781       for (TableName tableName : disablingTables) {
1782         // Recover by calling DisableTableHandler
1783         LOG.info("The table " + tableName
1784             + " is in DISABLING state.  Hence recovering by moving the table"
1785             + " to DISABLED state.");
1786         new DisableTableHandler(this.server, tableName,
1787             this, tableLockManager, true).prepare().process();
1788       }
1789     }
1790   }
1791
1792   /**
1793    * Recover the tables that are not fully moved to ENABLED state. These tables
1794    * are in ENABLING state when the master restarted/switched
1795    *
1796    * @throws KeeperException
1797    * @throws org.apache.hadoop.hbase.TableNotFoundException
1798    * @throws IOException
1799    */
1800   private void recoverTableInEnablingState()
1801           throws KeeperException, IOException {
1802     Set<TableName> enablingTables = tableStateManager.
1803             getTablesInStates(TableState.State.ENABLING);
1804     if (enablingTables.size() != 0) {
1805       for (TableName tableName : enablingTables) {
1806         // Recover by calling EnableTableHandler
1807         LOG.info("The table " + tableName
1808             + " is in ENABLING state.  Hence recovering by moving the table"
1809             + " to ENABLED state.");
1810         // enableTable in sync way during master startup,
1811         // no need to invoke coprocessor
1812         EnableTableHandler eth = new EnableTableHandler(this.server, tableName,
1813           this, tableLockManager, true);
1814         try {
1815           eth.prepare();
1816         } catch (TableNotFoundException e) {
1817           LOG.warn("Table " + tableName + " not found in hbase:meta to recover.");
1818           continue;
1819         }
1820         eth.process();
1821       }
1822     }
1823   }
1824
1825   /**
1826    * Processes list of regions in transition at startup
1827    */
1828   void processRegionsInTransition(Collection<RegionState> regionsInTransition) {
1829     // We need to send RPC call again for PENDING_OPEN/PENDING_CLOSE regions
1830     // in case the RPC call is not sent out yet before the master was shut down
1831     // since we update the state before we send the RPC call. We can't update
1832     // the state after the RPC call. Otherwise, we don't know what's happened
1833     // to the region if the master dies right after the RPC call is out.
1834     for (RegionState regionState: regionsInTransition) {
1835       LOG.info("Processing " + regionState);
1836       ServerName serverName = regionState.getServerName();
1837       // Server could be null in case of FAILED_OPEN when master cannot find a region plan. In that
1838       // case, try assigning it here.
1839       if (serverName != null && !serverManager.getOnlineServers().containsKey(serverName)) {
1840         LOG.info("Server " + serverName + " isn't online. SSH will handle this");
1841         continue; // SSH will handle it
1842       }
1843       HRegionInfo regionInfo = regionState.getRegion();
1844       RegionState.State state = regionState.getState();
1845       switch (state) {
1846       case CLOSED:
1847         invokeAssign(regionState.getRegion());
1848         break;
1849       case PENDING_OPEN:
1850         retrySendRegionOpen(regionState);
1851         break;
1852       case PENDING_CLOSE:
1853         retrySendRegionClose(regionState);
1854         break;
1855       case FAILED_CLOSE:
1856       case FAILED_OPEN:
1857         invokeUnAssign(regionInfo);
1858         break;
1859       default:
1860           // No process for other states
1861           break;
1862       }
1863     }
1864   }
1865
1866   /**
1867    * At master failover, for pending_open region, make sure
1868    * sendRegionOpen RPC call is sent to the target regionserver
1869    */
1870   private void retrySendRegionOpen(final RegionState regionState) {
1871     this.executorService.submit(
1872       new EventHandler(server, EventType.M_MASTER_RECOVERY) {
1873         @Override
1874         public void process() throws IOException {
1875           HRegionInfo hri = regionState.getRegion();
1876           ServerName serverName = regionState.getServerName();
1877           ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
1878           try {
1879             for (int i = 1; i <= maximumAttempts; i++) {
1880               if (!serverManager.isServerOnline(serverName)
1881                   || server.isStopped() || server.isAborted()) {
1882                 return; // No need any more
1883               }
1884               try {
1885                 if (!regionState.equals(regionStates.getRegionState(hri))) {
1886                   return; // Region is not in the expected state any more
1887                 }
1888                 List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
1889                 if (shouldAssignRegionsWithFavoredNodes) {
1890                   favoredNodes = ((FavoredNodeLoadBalancer)balancer).getFavoredNodes(hri);
1891                 }
1892                 serverManager.sendRegionOpen(serverName, hri, favoredNodes);
1893                 return; // we're done
1894               } catch (Throwable t) {
1895                 if (t instanceof RemoteException) {
1896                   t = ((RemoteException) t).unwrapRemoteException();
1897                 }
1898                 if (t instanceof FailedServerException && i < maximumAttempts) {
1899                   // In case the server is in the failed server list, no point to
1900                   // retry too soon. Retry after the failed_server_expiry time
1901                   try {
1902                     Configuration conf = this.server.getConfiguration();
1903                     long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1904                       RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
1905                     if (LOG.isDebugEnabled()) {
1906                       LOG.debug(serverName + " is on failed server list; waiting "
1907                         + sleepTime + "ms", t);
1908                     }
1909                     Thread.sleep(sleepTime);
1910                     continue;
1911                   } catch (InterruptedException ie) {
1912                     LOG.warn("Failed to assign "
1913                       + hri.getRegionNameAsString() + " since interrupted", ie);
1914                     regionStates.updateRegionState(hri, State.FAILED_OPEN);
1915                     Thread.currentThread().interrupt();
1916                     return;
1917                   }
1918                 }
1919                 if (serverManager.isServerOnline(serverName)
1920                     && t instanceof java.net.SocketTimeoutException) {
1921                   i--; // reset the try count
1922                 } else {
1923                   LOG.info("Got exception in retrying sendRegionOpen for "
1924                     + regionState + "; try=" + i + " of " + maximumAttempts, t);
1925                 }
1926                 Threads.sleep(100);
1927               }
1928             }
1929             // Run out of attempts
1930             regionStates.updateRegionState(hri, State.FAILED_OPEN);
1931           } finally {
1932             lock.unlock();
1933           }
1934         }
1935       });
1936   }
1937
1938   /**
1939    * At master failover, for pending_close region, make sure
1940    * sendRegionClose RPC call is sent to the target regionserver
1941    */
1942   private void retrySendRegionClose(final RegionState regionState) {
1943     this.executorService.submit(
1944       new EventHandler(server, EventType.M_MASTER_RECOVERY) {
1945         @Override
1946         public void process() throws IOException {
1947           HRegionInfo hri = regionState.getRegion();
1948           ServerName serverName = regionState.getServerName();
1949           ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
1950           try {
1951             for (int i = 1; i <= maximumAttempts; i++) {
1952               if (!serverManager.isServerOnline(serverName)
1953                   || server.isStopped() || server.isAborted()) {
1954                 return; // No need any more
1955               }
1956               try {
1957                 if (!regionState.equals(regionStates.getRegionState(hri))) {
1958                   return; // Region is not in the expected state any more
1959                 }
1960                 serverManager.sendRegionClose(serverName, hri, null);
1961                 return; // Done.
1962               } catch (Throwable t) {
1963                 if (t instanceof RemoteException) {
1964                   t = ((RemoteException) t).unwrapRemoteException();
1965                 }
1966                 if (t instanceof FailedServerException && i < maximumAttempts) {
1967                   // In case the server is in the failed server list, no point to
1968                   // retry too soon. Retry after the failed_server_expiry time
1969                   try {
1970                     Configuration conf = this.server.getConfiguration();
1971                     long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
1972                       RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
1973                     if (LOG.isDebugEnabled()) {
1974                       LOG.debug(serverName + " is on failed server list; waiting "
1975                         + sleepTime + "ms", t);
1976                     }
1977                     Thread.sleep(sleepTime);
1978                     continue;
1979                   } catch (InterruptedException ie) {
1980                     LOG.warn("Failed to unassign "
1981                       + hri.getRegionNameAsString() + " since interrupted", ie);
1982                     regionStates.updateRegionState(hri, RegionState.State.FAILED_CLOSE);
1983                     Thread.currentThread().interrupt();
1984                     return;
1985                   }
1986                 }
1987                 if (serverManager.isServerOnline(serverName)
1988                     && t instanceof java.net.SocketTimeoutException) {
1989                   i--; // reset the try count
1990                 } else {
1991                   LOG.info("Got exception in retrying sendRegionClose for "
1992                     + regionState + "; try=" + i + " of " + maximumAttempts, t);
1993                 }
1994                 Threads.sleep(100);
1995               }
1996             }
1997             // Run out of attempts
1998             regionStates.updateRegionState(hri, State.FAILED_CLOSE);
1999           } finally {
2000             lock.unlock();
2001           }
2002         }
2003       });
2004   }
2005
2006   /**
2007    * Set Regions in transitions metrics.
2008    * This takes an iterator on the RegionInTransition map (CLSM), and is not synchronized.
2009    * This iterator is not fail fast, which may lead to stale read; but that's better than
2010    * creating a copy of the map for metrics computation, as this method will be invoked
2011    * on a frequent interval.
2012    */
2013   public void updateRegionsInTransitionMetrics() {
2014     long currentTime = System.currentTimeMillis();
2015     int totalRITs = 0;
2016     int totalRITsOverThreshold = 0;
2017     long oldestRITTime = 0;
2018     int ritThreshold = this.server.getConfiguration().
2019       getInt(HConstants.METRICS_RIT_STUCK_WARNING_THRESHOLD, 60000);
2020     for (RegionState state: regionStates.getRegionsInTransition()) {
2021       totalRITs++;
2022       long ritTime = currentTime - state.getStamp();
2023       if (ritTime > ritThreshold) { // more than the threshold
2024         totalRITsOverThreshold++;
2025       }
2026       if (oldestRITTime < ritTime) {
2027         oldestRITTime = ritTime;
2028       }
2029     }
2030     if (this.metricsAssignmentManager != null) {
2031       this.metricsAssignmentManager.updateRITOldestAge(oldestRITTime);
2032       this.metricsAssignmentManager.updateRITCount(totalRITs);
2033       this.metricsAssignmentManager.updateRITCountOverThreshold(totalRITsOverThreshold);
2034     }
2035   }
2036
2037   /**
2038    * @param region Region whose plan we are to clear.
2039    */
2040   private void clearRegionPlan(final HRegionInfo region) {
2041     synchronized (this.regionPlans) {
2042       this.regionPlans.remove(region.getEncodedName());
2043     }
2044   }
2045
2046   /**
2047    * Wait on region to clear regions-in-transition.
2048    * @param hri Region to wait on.
2049    * @throws IOException
2050    */
2051   public void waitOnRegionToClearRegionsInTransition(final HRegionInfo hri)
2052       throws IOException, InterruptedException {
2053     waitOnRegionToClearRegionsInTransition(hri, -1L);
2054   }
2055
2056   /**
2057    * Wait on region to clear regions-in-transition or time out
2058    * @param hri
2059    * @param timeOut Milliseconds to wait for current region to be out of transition state.
2060    * @return True when a region clears regions-in-transition before timeout otherwise false
2061    * @throws InterruptedException
2062    */
2063   public boolean waitOnRegionToClearRegionsInTransition(final HRegionInfo hri, long timeOut)
2064       throws InterruptedException {
2065     if (!regionStates.isRegionInTransition(hri)) {
2066       return true;
2067     }
2068     long end = (timeOut <= 0) ? Long.MAX_VALUE : EnvironmentEdgeManager.currentTime()
2069         + timeOut;
2070     // There is already a timeout monitor on regions in transition so I
2071     // should not have to have one here too?
2072     LOG.info("Waiting for " + hri.getEncodedName() +
2073         " to leave regions-in-transition, timeOut=" + timeOut + " ms.");
2074     while (!this.server.isStopped() && regionStates.isRegionInTransition(hri)) {
2075       regionStates.waitForUpdate(100);
2076       if (EnvironmentEdgeManager.currentTime() > end) {
2077         LOG.info("Timed out on waiting for " + hri.getEncodedName() + " to be assigned.");
2078         return false;
2079       }
2080     }
2081     if (this.server.isStopped()) {
2082       LOG.info("Giving up wait on regions in transition because stoppable.isStopped is set");
2083       return false;
2084     }
2085     return true;
2086   }
2087
2088   void invokeAssign(HRegionInfo regionInfo) {
2089     threadPoolExecutorService.submit(new AssignCallable(this, regionInfo));
2090   }
2091
2092   void invokeUnAssign(HRegionInfo regionInfo) {
2093     threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
2094   }
2095
2096   public boolean isCarryingMeta(ServerName serverName) {
2097     return isCarryingRegion(serverName, HRegionInfo.FIRST_META_REGIONINFO);
2098   }
2099
2100   public boolean isCarryingMetaReplica(ServerName serverName, int replicaId) {
2101     return isCarryingRegion(serverName,
2102         RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO, replicaId));
2103   }
2104
2105   public boolean isCarryingMetaReplica(ServerName serverName, HRegionInfo metaHri) {
2106     return isCarryingRegion(serverName, metaHri);
2107   }
2108
2109   /**
2110    * Check if the shutdown server carries the specific region.
2111    * @return whether the serverName currently hosts the region
2112    */
2113   private boolean isCarryingRegion(ServerName serverName, HRegionInfo hri) {
2114     RegionState regionState = regionStates.getRegionTransitionState(hri);
2115     ServerName transitionAddr = regionState != null? regionState.getServerName(): null;
2116     if (transitionAddr != null) {
2117       boolean matchTransitionAddr = transitionAddr.equals(serverName);
2118       LOG.debug("Checking region=" + hri.getRegionNameAsString()
2119         + ", transitioning on server=" + matchTransitionAddr
2120         + " server being checked: " + serverName
2121         + ", matches=" + matchTransitionAddr);
2122       return matchTransitionAddr;
2123     }
2124
2125     ServerName assignedAddr = regionStates.getRegionServerOfRegion(hri);
2126     boolean matchAssignedAddr = serverName.equals(assignedAddr);
2127     LOG.debug("based on AM, current region=" + hri.getRegionNameAsString()
2128       + " is on server=" + assignedAddr + ", server being checked: "
2129       + serverName);
2130     return matchAssignedAddr;
2131   }
2132
2133   /**
2134    * Clean out crashed server removing any assignments.
2135    * @param sn Server that went down.
2136    * @return list of regions in transition on this server
2137    */
2138   public List<HRegionInfo> cleanOutCrashedServerReferences(final ServerName sn) {
2139     // Clean out any existing assignment plans for this server
2140     synchronized (this.regionPlans) {
2141       for (Iterator <Map.Entry<String, RegionPlan>> i = this.regionPlans.entrySet().iterator();
2142           i.hasNext();) {
2143         Map.Entry<String, RegionPlan> e = i.next();
2144         ServerName otherSn = e.getValue().getDestination();
2145         // The name will be null if the region is planned for a random assign.
2146         if (otherSn != null && otherSn.equals(sn)) {
2147           // Use iterator's remove else we'll get CME
2148           i.remove();
2149         }
2150       }
2151     }
2152     List<HRegionInfo> rits = regionStates.serverOffline(sn);
2153     for (Iterator<HRegionInfo> it = rits.iterator(); it.hasNext(); ) {
2154       HRegionInfo hri = it.next();
2155       String encodedName = hri.getEncodedName();
2156
2157       // We need a lock on the region as we could update it
2158       Lock lock = locker.acquireLock(encodedName);
2159       try {
2160         RegionState regionState = regionStates.getRegionTransitionState(encodedName);
2161         if (regionState == null
2162             || (regionState.getServerName() != null && !regionState.isOnServer(sn))
2163             || !RegionStates.isOneOfStates(regionState, State.PENDING_OPEN,
2164                 State.OPENING, State.FAILED_OPEN, State.FAILED_CLOSE, State.OFFLINE)) {
2165           LOG.info("Skip " + regionState + " since it is not opening/failed_close"
2166             + " on the dead server any more: " + sn);
2167           it.remove();
2168         } else {
2169           if (tableStateManager.isTableState(hri.getTable(),
2170                   TableState.State.DISABLED, TableState.State.DISABLING)) {
2171             regionStates.regionOffline(hri);
2172             it.remove();
2173             continue;
2174           }
2175           // Mark the region offline and assign it again by SSH
2176           regionStates.updateRegionState(hri, State.OFFLINE);
2177         }
2178       } finally {
2179         lock.unlock();
2180       }
2181     }
2182     return rits;
2183   }
2184
2185   /**
2186    * @param plan Plan to execute.
2187    */
2188   public void balance(final RegionPlan plan) {
2189
2190     HRegionInfo hri = plan.getRegionInfo();
2191     TableName tableName = hri.getTable();
2192     if (tableStateManager.isTableState(tableName,
2193             TableState.State.DISABLED, TableState.State.DISABLING)) {
2194       LOG.info("Ignored moving region of disabling/disabled table "
2195         + tableName);
2196       return;
2197     }
2198
2199     // Move the region only if it's assigned
2200     String encodedName = hri.getEncodedName();
2201     ReentrantLock lock = locker.acquireLock(encodedName);
2202     try {
2203       if (!regionStates.isRegionOnline(hri)) {
2204         RegionState state = regionStates.getRegionState(encodedName);
2205         LOG.info("Ignored moving region not assigned: " + hri + ", "
2206           + (state == null ? "not in region states" : state));
2207         return;
2208       }
2209       synchronized (this.regionPlans) {
2210         this.regionPlans.put(plan.getRegionName(), plan);
2211       }
2212       unassign(hri, plan.getDestination());
2213     } finally {
2214       lock.unlock();
2215     }
2216   }
2217
2218   public void stop() {
2219     // Shutdown the threadpool executor service
2220     threadPoolExecutorService.shutdownNow();
2221     regionStateStore.stop();
2222   }
2223
2224   protected void setEnabledTable(TableName tableName) {
2225     try {
2226       this.tableStateManager.setTableState(tableName,
2227               TableState.State.ENABLED);
2228     } catch (IOException e) {
2229       // here we can abort as it is the start up flow
2230       String errorMsg = "Unable to ensure that the table " + tableName
2231           + " will be" + " enabled because of a ZooKeeper issue";
2232       LOG.error(errorMsg);
2233       this.server.abort(errorMsg, e);
2234     }
2235   }
2236
2237   @edu.umd.cs.findbugs.annotations.SuppressWarnings(
2238       value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
2239       justification="Worth fixing but not the end of the world.")
2240   private String onRegionFailedOpen(final RegionState current,
2241       final HRegionInfo hri, final ServerName serverName) {
2242     // The region must be opening on this server.
2243     // If current state is failed_open on the same server,
2244     // it could be a reportRegionTransition RPC retry.
2245     if (current == null || !current.isOpeningOrFailedOpenOnServer(serverName)) {
2246       return hri.getShortNameToLog() + " is not opening on " + serverName;
2247     }
2248
2249     // Just return in case of retrying
2250     if (current.isFailedOpen()) {
2251       return null;
2252     }
2253
2254     String encodedName = hri.getEncodedName();
2255     // FindBugs: AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION Worth fixing!!!
2256     AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
2257     if (failedOpenCount == null) {
2258       failedOpenCount = new AtomicInteger();
2259       // No need to use putIfAbsent, or extra synchronization since
2260       // this whole handleRegion block is locked on the encoded region
2261       // name, and failedOpenTracker is updated only in this block
2262       failedOpenTracker.put(encodedName, failedOpenCount);
2263     }
2264     if (failedOpenCount.incrementAndGet() >= maximumAttempts && !hri.isMetaRegion()) {
2265       regionStates.updateRegionState(hri, State.FAILED_OPEN);
2266       // remove the tracking info to save memory, also reset
2267       // the count for next open initiative
2268       failedOpenTracker.remove(encodedName);
2269     } else {
2270       if (hri.isMetaRegion() && failedOpenCount.get() >= maximumAttempts) {
2271         // Log a warning message if a meta region failedOpenCount exceeds maximumAttempts
2272         // so that we are aware of potential problem if it persists for a long time.
2273         LOG.warn("Failed to open the hbase:meta region " +
2274             hri.getRegionNameAsString() + " after" +
2275             failedOpenCount.get() + " retries. Continue retrying.");
2276       }
2277
2278       // Handle this the same as if it were opened and then closed.
2279       RegionState regionState = regionStates.updateRegionState(hri, State.CLOSED);
2280       if (regionState != null) {
2281         // When there are more than one region server a new RS is selected as the
2282         // destination and the same is updated in the region plan. (HBASE-5546)
2283         if (getTableStateManager().isTableState(hri.getTable(),
2284                 TableState.State.DISABLED, TableState.State.DISABLING) ||
2285                 replicasToClose.contains(hri)) {
2286           offlineDisabledRegion(hri);
2287           return null;
2288         }
2289         regionStates.updateRegionState(hri, RegionState.State.CLOSED);
2290         // This below has to do w/ online enable/disable of a table
2291         removeClosedRegion(hri);
2292         try {
2293           getRegionPlan(hri, true);
2294         } catch (HBaseIOException e) {
2295           LOG.warn("Failed to get region plan", e);
2296         }
2297         invokeAssign(hri);
2298       }
2299     }
2300     // Null means no error
2301     return null;
2302   }
2303
2304   private String onRegionOpen(final RegionState current, final HRegionInfo hri,
2305       final ServerName serverName, final RegionStateTransition transition) {
2306     // The region must be opening on this server.
2307     // If current state is already opened on the same server,
2308     // it could be a reportRegionTransition RPC retry.
2309     if (current == null || !current.isOpeningOrOpenedOnServer(serverName)) {
2310       return hri.getShortNameToLog() + " is not opening on " + serverName;
2311     }
2312
2313     // Just return in case of retrying
2314     if (current.isOpened()) {
2315       return null;
2316     }
2317
2318     long openSeqNum = transition.hasOpenSeqNum()
2319       ? transition.getOpenSeqNum() : HConstants.NO_SEQNUM;
2320     if (openSeqNum < 0) {
2321       return "Newly opened region has invalid open seq num " + openSeqNum;
2322     }
2323     regionOnline(hri, serverName, openSeqNum);
2324
2325     // reset the count, if any
2326     failedOpenTracker.remove(hri.getEncodedName());
2327     if (getTableStateManager().isTableState(hri.getTable(),
2328             TableState.State.DISABLED, TableState.State.DISABLING)) {
2329       invokeUnAssign(hri);
2330     }
2331     return null;
2332   }
2333
2334   private String onRegionClosed(final RegionState current,
2335       final HRegionInfo hri, final ServerName serverName) {
2336     // Region will be usually assigned right after closed. When a RPC retry comes
2337     // in, the region may already have moved away from closed state. However, on the
2338     // region server side, we don't care much about the response for this transition.
2339     // We only make sure master has got and processed this report, either
2340     // successfully or not. So this is fine, not a problem at all.
2341     if (current == null || !current.isClosingOrClosedOnServer(serverName)) {
2342       return hri.getShortNameToLog() + " is not closing on " + serverName;
2343     }
2344
2345     // Just return in case of retrying
2346     if (current.isClosed()) {
2347       return null;
2348     }
2349
2350     if (getTableStateManager().isTableState(hri.getTable(), TableState.State.DISABLED,
2351         TableState.State.DISABLING) || replicasToClose.contains(hri)) {
2352       offlineDisabledRegion(hri);
2353       return null;
2354     }
2355
2356     regionStates.updateRegionState(hri, RegionState.State.CLOSED);
2357     sendRegionClosedNotification(hri);
2358     // This below has to do w/ online enable/disable of a table
2359     removeClosedRegion(hri);
2360     invokeAssign(hri);
2361     return null;
2362   }
2363
2364   private String onRegionReadyToSplit(final RegionState current, final HRegionInfo hri,
2365       final ServerName serverName, final RegionStateTransition transition) {
2366     // The region must be opened on this server.
2367     // If current state is already splitting on the same server,
2368     // it could be a reportRegionTransition RPC retry.
2369     if (current == null || !current.isSplittingOrOpenedOnServer(serverName)) {
2370       return hri.getShortNameToLog() + " is not opening on " + serverName;
2371     }
2372
2373     if (!((HMaster)server).getSplitOrMergeTracker().isSplitOrMergeEnabled(
2374             MasterSwitchType.SPLIT)) {
2375       return "split switch is off!";
2376     }
2377
2378     // Just return in case of retrying
2379     if (current.isSplitting()) {
2380       return null;
2381     }
2382
2383     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2384     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2385     RegionState rs_a = regionStates.getRegionState(a);
2386     RegionState rs_b = regionStates.getRegionState(b);
2387     if (rs_a != null || rs_b != null) {
2388       return "Some daughter is already existing. "
2389         + "a=" + rs_a + ", b=" + rs_b;
2390     }
2391
2392     // Server holding is not updated at this stage.
2393     // It is done after PONR.
2394     regionStates.updateRegionState(hri, State.SPLITTING);
2395     regionStates.createRegionState(
2396       a, State.SPLITTING_NEW, serverName, null);
2397     regionStates.createRegionState(
2398       b, State.SPLITTING_NEW, serverName, null);
2399     return null;
2400   }
2401
2402   private String onRegionSplitPONR(final RegionState current, final HRegionInfo hri,
2403       final ServerName serverName, final RegionStateTransition transition) {
2404     // The region must be splitting on this server, and the daughters must be in
2405     // splitting_new state. To check RPC retry, we use server holding info.
2406     if (current == null || !current.isSplittingOnServer(serverName)) {
2407       return hri.getShortNameToLog() + " is not splitting on " + serverName;
2408     }
2409
2410     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2411     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2412     RegionState rs_a = regionStates.getRegionState(a);
2413     RegionState rs_b = regionStates.getRegionState(b);
2414
2415     // Master could have restarted and lost the new region
2416     // states, if so, they must be lost together
2417     if (rs_a == null && rs_b == null) {
2418       rs_a = regionStates.createRegionState(
2419         a, State.SPLITTING_NEW, serverName, null);
2420       rs_b = regionStates.createRegionState(
2421         b, State.SPLITTING_NEW, serverName, null);
2422     }
2423
2424     if (rs_a == null || !rs_a.isSplittingNewOnServer(serverName)
2425         || rs_b == null || !rs_b.isSplittingNewOnServer(serverName)) {
2426       return "Some daughter is not known to be splitting on " + serverName
2427         + ", a=" + rs_a + ", b=" + rs_b;
2428     }
2429
2430     // Just return in case of retrying
2431     if (!regionStates.isRegionOnServer(hri, serverName)) {
2432       return null;
2433     }
2434
2435     try {
2436       regionStates.splitRegion(hri, a, b, serverName);
2437     } catch (IOException ioe) {
2438       LOG.info("Failed to record split region " + hri.getShortNameToLog());
2439       return "Failed to record the splitting in meta";
2440     }
2441     return null;
2442   }
2443
2444   private String onRegionSplit(final RegionState current, final HRegionInfo hri,
2445       final ServerName serverName, final RegionStateTransition transition) {
2446     // The region must be splitting on this server, and the daughters must be in
2447     // splitting_new state.
2448     // If current state is already split on the same server,
2449     // it could be a reportRegionTransition RPC retry.
2450     if (current == null || !current.isSplittingOrSplitOnServer(serverName)) {
2451       return hri.getShortNameToLog() + " is not splitting on " + serverName;
2452     }
2453
2454     // Just return in case of retrying
2455     if (current.isSplit()) {
2456       return null;
2457     }
2458
2459     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2460     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2461     RegionState rs_a = regionStates.getRegionState(a);
2462     RegionState rs_b = regionStates.getRegionState(b);
2463     if (rs_a == null || !rs_a.isSplittingNewOnServer(serverName)
2464         || rs_b == null || !rs_b.isSplittingNewOnServer(serverName)) {
2465       return "Some daughter is not known to be splitting on " + serverName
2466         + ", a=" + rs_a + ", b=" + rs_b;
2467     }
2468
2469     if (TEST_SKIP_SPLIT_HANDLING) {
2470       return "Skipping split message, TEST_SKIP_SPLIT_HANDLING is set";
2471     }
2472     regionOffline(hri, State.SPLIT);
2473     regionOnline(a, serverName, 1);
2474     regionOnline(b, serverName, 1);
2475
2476     // User could disable the table before master knows the new region.
2477     if (getTableStateManager().isTableState(hri.getTable(),
2478         TableState.State.DISABLED, TableState.State.DISABLING)) {
2479       invokeUnAssign(a);
2480       invokeUnAssign(b);
2481     } else {
2482       Callable<Object> splitReplicasCallable = new Callable<Object>() {
2483         @Override
2484         public Object call() {
2485           doSplittingOfReplicas(hri, a, b);
2486           return null;
2487         }
2488       };
2489       threadPoolExecutorService.submit(splitReplicasCallable);
2490     }
2491     return null;
2492   }
2493
2494   private String onRegionSplitReverted(final RegionState current, final HRegionInfo hri,
2495       final ServerName serverName, final RegionStateTransition transition) {
2496     // The region must be splitting on this server, and the daughters must be in
2497     // splitting_new state.
2498     // If the region is in open state, it could be an RPC retry.
2499     if (current == null || !current.isSplittingOrOpenedOnServer(serverName)) {
2500       return hri.getShortNameToLog() + " is not splitting on " + serverName;
2501     }
2502
2503     // Just return in case of retrying
2504     if (current.isOpened()) {
2505       return null;
2506     }
2507
2508     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2509     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2510     RegionState rs_a = regionStates.getRegionState(a);
2511     RegionState rs_b = regionStates.getRegionState(b);
2512     if (rs_a == null || !rs_a.isSplittingNewOnServer(serverName)
2513         || rs_b == null || !rs_b.isSplittingNewOnServer(serverName)) {
2514       return "Some daughter is not known to be splitting on " + serverName
2515         + ", a=" + rs_a + ", b=" + rs_b;
2516     }
2517
2518     regionOnline(hri, serverName);
2519     regionOffline(a);
2520     regionOffline(b);
2521     if (getTableStateManager().isTableState(hri.getTable(),
2522         TableState.State.DISABLED, TableState.State.DISABLING)) {
2523       invokeUnAssign(hri);
2524     }
2525     return null;
2526   }
2527
2528   private String onRegionReadyToMerge(final RegionState current, final HRegionInfo hri,
2529       final ServerName serverName, final RegionStateTransition transition) {
2530     // The region must be new, and the daughters must be open on this server.
2531     // If the region is in merge_new state, it could be an RPC retry.
2532     if (current != null && !current.isMergingNewOnServer(serverName)) {
2533       return "Merging daughter region already exists, p=" + current;
2534     }
2535
2536     if (!((HMaster)server).getSplitOrMergeTracker().isSplitOrMergeEnabled(
2537             MasterSwitchType.MERGE)) {
2538       return "merge switch is off!";
2539     }
2540     // Just return in case of retrying
2541     if (current != null) {
2542       return null;
2543     }
2544
2545     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2546     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2547     Set<String> encodedNames = new HashSet<String>(2);
2548     encodedNames.add(a.getEncodedName());
2549     encodedNames.add(b.getEncodedName());
2550     Map<String, Lock> locks = locker.acquireLocks(encodedNames);
2551     try {
2552       RegionState rs_a = regionStates.getRegionState(a);
2553       RegionState rs_b = regionStates.getRegionState(b);
2554       if (rs_a == null || !rs_a.isOpenedOnServer(serverName)
2555           || rs_b == null || !rs_b.isOpenedOnServer(serverName)) {
2556         return "Some daughter is not in a state to merge on " + serverName
2557           + ", a=" + rs_a + ", b=" + rs_b;
2558       }
2559
2560       regionStates.updateRegionState(a, State.MERGING);
2561       regionStates.updateRegionState(b, State.MERGING);
2562       regionStates.createRegionState(
2563         hri, State.MERGING_NEW, serverName, null);
2564       return null;
2565     } finally {
2566       for (Lock lock: locks.values()) {
2567         lock.unlock();
2568       }
2569     }
2570   }
2571
2572   private String onRegionMergePONR(final RegionState current, final HRegionInfo hri,
2573       final ServerName serverName, final RegionStateTransition transition) {
2574     // The region must be in merging_new state, and the daughters must be
2575     // merging. To check RPC retry, we use server holding info.
2576     if (current != null && !current.isMergingNewOnServer(serverName)) {
2577       return hri.getShortNameToLog() + " is not merging on " + serverName;
2578     }
2579
2580     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2581     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2582     RegionState rs_a = regionStates.getRegionState(a);
2583     RegionState rs_b = regionStates.getRegionState(b);
2584     if (rs_a == null || !rs_a.isMergingOnServer(serverName)
2585         || rs_b == null || !rs_b.isMergingOnServer(serverName)) {
2586       return "Some daughter is not known to be merging on " + serverName
2587         + ", a=" + rs_a + ", b=" + rs_b;
2588     }
2589
2590     // Master could have restarted and lost the new region state
2591     if (current == null) {
2592       regionStates.createRegionState(
2593         hri, State.MERGING_NEW, serverName, null);
2594     }
2595
2596     // Just return in case of retrying
2597     if (regionStates.isRegionOnServer(hri, serverName)) {
2598       return null;
2599     }
2600
2601     try {
2602       regionStates.mergeRegions(hri, a, b, serverName);
2603     } catch (IOException ioe) {
2604       LOG.info("Failed to record merged region " + hri.getShortNameToLog());
2605       return "Failed to record the merging in meta";
2606     }
2607     return null;
2608   }
2609
2610   private String onRegionMerged(final RegionState current, final HRegionInfo hri,
2611       final ServerName serverName, final RegionStateTransition transition) {
2612     // The region must be in merging_new state, and the daughters must be
2613     // merging on this server.
2614     // If current state is already opened on the same server,
2615     // it could be a reportRegionTransition RPC retry.
2616     if (current == null || !current.isMergingNewOrOpenedOnServer(serverName)) {
2617       return hri.getShortNameToLog() + " is not merging on " + serverName;
2618     }
2619
2620     // Just return in case of retrying
2621     if (current.isOpened()) {
2622       return null;
2623     }
2624
2625     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2626     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2627     RegionState rs_a = regionStates.getRegionState(a);
2628     RegionState rs_b = regionStates.getRegionState(b);
2629     if (rs_a == null || !rs_a.isMergingOnServer(serverName)
2630         || rs_b == null || !rs_b.isMergingOnServer(serverName)) {
2631       return "Some daughter is not known to be merging on " + serverName
2632         + ", a=" + rs_a + ", b=" + rs_b;
2633     }
2634
2635     regionOffline(a, State.MERGED);
2636     regionOffline(b, State.MERGED);
2637     regionOnline(hri, serverName, 1);
2638
2639     // User could disable the table before master knows the new region.
2640     if (getTableStateManager().isTableState(hri.getTable(),
2641         TableState.State.DISABLED, TableState.State.DISABLING)) {
2642       invokeUnAssign(hri);
2643     } else {
2644       Callable<Object> mergeReplicasCallable = new Callable<Object>() {
2645         @Override
2646         public Object call() {
2647           doMergingOfReplicas(hri, a, b);
2648           return null;
2649         }
2650       };
2651       threadPoolExecutorService.submit(mergeReplicasCallable);
2652     }
2653     return null;
2654   }
2655
2656   private String onRegionMergeReverted(final RegionState current, final HRegionInfo hri,
2657       final ServerName serverName, final RegionStateTransition transition) {
2658     // The region must be in merging_new state, and the daughters must be
2659     // merging on this server.
2660     // If the region is in offline state, it could be an RPC retry.
2661     if (current == null || !current.isMergingNewOrOfflineOnServer(serverName)) {
2662       return hri.getShortNameToLog() + " is not merging on " + serverName;
2663     }
2664
2665     // Just return in case of retrying
2666     if (current.isOffline()) {
2667       return null;
2668     }
2669
2670     final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
2671     final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
2672     RegionState rs_a = regionStates.getRegionState(a);
2673     RegionState rs_b = regionStates.getRegionState(b);
2674     if (rs_a == null || !rs_a.isMergingOnServer(serverName)
2675         || rs_b == null || !rs_b.isMergingOnServer(serverName)) {
2676       return "Some daughter is not known to be merging on " + serverName
2677         + ", a=" + rs_a + ", b=" + rs_b;
2678     }
2679
2680     regionOnline(a, serverName);
2681     regionOnline(b, serverName);
2682     regionOffline(hri);
2683
2684     if (getTableStateManager().isTableState(hri.getTable(),
2685         TableState.State.DISABLED, TableState.State.DISABLING)) {
2686       invokeUnAssign(a);
2687       invokeUnAssign(b);
2688     }
2689     return null;
2690   }
2691
2692   private void doMergingOfReplicas(HRegionInfo mergedHri, final HRegionInfo hri_a,
2693       final HRegionInfo hri_b) {
2694     // Close replicas for the original unmerged regions. create/assign new replicas
2695     // for the merged parent.
2696     List<HRegionInfo> unmergedRegions = new ArrayList<HRegionInfo>();
2697     unmergedRegions.add(hri_a);
2698     unmergedRegions.add(hri_b);
2699     Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(unmergedRegions);
2700     Collection<List<HRegionInfo>> c = map.values();
2701     for (List<HRegionInfo> l : c) {
2702       for (HRegionInfo h : l) {
2703         if (!RegionReplicaUtil.isDefaultReplica(h)) {
2704           LOG.debug("Unassigning un-merged replica " + h);
2705           unassign(h);
2706         }
2707       }
2708     }
2709     int numReplicas = 1;
2710     try {
2711       numReplicas = ((MasterServices)server).getTableDescriptors().get(mergedHri.getTable()).
2712           getRegionReplication();
2713     } catch (IOException e) {
2714       LOG.warn("Couldn't get the replication attribute of the table " + mergedHri.getTable() +
2715           " due to " + e.getMessage() + ". The assignment of replicas for the merged region " +
2716           "will not be done");
2717     }
2718     List<HRegionInfo> regions = new ArrayList<HRegionInfo>();
2719     for (int i = 1; i < numReplicas; i++) {
2720       regions.add(RegionReplicaUtil.getRegionInfoForReplica(mergedHri, i));
2721     }
2722     try {
2723       assign(regions);
2724     } catch (IOException ioe) {
2725       LOG.warn("Couldn't assign all replica(s) of region " + mergedHri + " because of " +
2726                 ioe.getMessage());
2727     } catch (InterruptedException ie) {
2728       LOG.warn("Couldn't assign all replica(s) of region " + mergedHri+ " because of " +
2729                 ie.getMessage());
2730     }
2731   }
2732
2733   private void doSplittingOfReplicas(final HRegionInfo parentHri, final HRegionInfo hri_a,
2734       final HRegionInfo hri_b) {
2735     // create new regions for the replica, and assign them to match with the
2736     // current replica assignments. If replica1 of parent is assigned to RS1,
2737     // the replica1s of daughters will be on the same machine
2738     int numReplicas = 1;
2739     try {
2740       numReplicas = ((MasterServices)server).getTableDescriptors().get(parentHri.getTable()).
2741           getRegionReplication();
2742     } catch (IOException e) {
2743       LOG.warn("Couldn't get the replication attribute of the table " + parentHri.getTable() +
2744           " due to " + e.getMessage() + ". The assignment of daughter replicas " +
2745           "replicas will not be done");
2746     }
2747     // unassign the old replicas
2748     List<HRegionInfo> parentRegion = new ArrayList<HRegionInfo>();
2749     parentRegion.add(parentHri);
2750     Map<ServerName, List<HRegionInfo>> currentAssign =
2751         regionStates.getRegionAssignments(parentRegion);
2752     Collection<List<HRegionInfo>> c = currentAssign.values();
2753     for (List<HRegionInfo> l : c) {
2754       for (HRegionInfo h : l) {
2755         if (!RegionReplicaUtil.isDefaultReplica(h)) {
2756           LOG.debug("Unassigning parent's replica " + h);
2757           unassign(h);
2758         }
2759       }
2760     }
2761     // assign daughter replicas
2762     Map<HRegionInfo, ServerName> map = new HashMap<HRegionInfo, ServerName>();
2763     for (int i = 1; i < numReplicas; i++) {
2764       prepareDaughterReplicaForAssignment(hri_a, parentHri, i, map);
2765       prepareDaughterReplicaForAssignment(hri_b, parentHri, i, map);
2766     }
2767     try {
2768       assign(map);
2769     } catch (IOException e) {
2770       LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)");
2771     } catch (InterruptedException e) {
2772       LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)");
2773     }
2774   }
2775
2776   private void prepareDaughterReplicaForAssignment(HRegionInfo daughterHri, HRegionInfo parentHri,
2777       int replicaId, Map<HRegionInfo, ServerName> map) {
2778     HRegionInfo parentReplica = RegionReplicaUtil.getRegionInfoForReplica(parentHri, replicaId);
2779     HRegionInfo daughterReplica = RegionReplicaUtil.getRegionInfoForReplica(daughterHri,
2780         replicaId);
2781     LOG.debug("Created replica region for daughter " + daughterReplica);
2782     ServerName sn;
2783     if ((sn = regionStates.getRegionServerOfRegion(parentReplica)) != null) {
2784       map.put(daughterReplica, sn);
2785     } else {
2786       List<ServerName> servers = serverManager.getOnlineServersList();
2787       sn = servers.get((new Random(System.currentTimeMillis())).nextInt(servers.size()));
2788       map.put(daughterReplica, sn);
2789     }
2790   }
2791
2792   public Set<HRegionInfo> getReplicasToClose() {
2793     return replicasToClose;
2794   }
2795
2796   /**
2797    * A region is offline.  The new state should be the specified one,
2798    * if not null.  If the specified state is null, the new state is Offline.
2799    * The specified state can be Split/Merged/Offline/null only.
2800    */
2801   private void regionOffline(final HRegionInfo regionInfo, final State state) {
2802     regionStates.regionOffline(regionInfo, state);
2803     removeClosedRegion(regionInfo);
2804     // remove the region plan as well just in case.
2805     clearRegionPlan(regionInfo);
2806     balancer.regionOffline(regionInfo);
2807
2808     // Tell our listeners that a region was closed
2809     sendRegionClosedNotification(regionInfo);
2810     // also note that all the replicas of the primary should be closed
2811     if (state != null && state.equals(State.SPLIT)) {
2812       Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1);
2813       c.add(regionInfo);
2814       Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(c);
2815       Collection<List<HRegionInfo>> allReplicas = map.values();
2816       for (List<HRegionInfo> list : allReplicas) {
2817         replicasToClose.addAll(list);
2818       }
2819     }
2820     else if (state != null && state.equals(State.MERGED)) {
2821       Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1);
2822       c.add(regionInfo);
2823       Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(c);
2824       Collection<List<HRegionInfo>> allReplicas = map.values();
2825       for (List<HRegionInfo> list : allReplicas) {
2826         replicasToClose.addAll(list);
2827       }
2828     }
2829   }
2830
2831   private void sendRegionOpenedNotification(final HRegionInfo regionInfo,
2832       final ServerName serverName) {
2833     if (!this.listeners.isEmpty()) {
2834       for (AssignmentListener listener : this.listeners) {
2835         listener.regionOpened(regionInfo, serverName);
2836       }
2837     }
2838   }
2839
2840   private void sendRegionClosedNotification(final HRegionInfo regionInfo) {
2841     if (!this.listeners.isEmpty()) {
2842       for (AssignmentListener listener : this.listeners) {
2843         listener.regionClosed(regionInfo);
2844       }
2845     }
2846   }
2847
2848   /**
2849    * Try to update some region states. If the state machine prevents
2850    * such update, an error message is returned to explain the reason.
2851    *
2852    * It's expected that in each transition there should have just one
2853    * region for opening/closing, 3 regions for splitting/merging.
2854    * These regions should be on the server that requested the change.
2855    *
2856    * Region state machine. Only these transitions
2857    * are expected to be triggered by a region server.
2858    *
2859    * On the state transition:
2860    *  (1) Open/Close should be initiated by master
2861    *      (a) Master sets the region to pending_open/pending_close
2862    *        in memory and hbase:meta after sending the request
2863    *        to the region server
2864    *      (b) Region server reports back to the master
2865    *        after open/close is done (either success/failure)
2866    *      (c) If region server has problem to report the status
2867    *        to master, it must be because the master is down or some
2868    *        temporary network issue. Otherwise, the region server should
2869    *        abort since it must be a bug. If the master is not accessible,
2870    *        the region server should keep trying until the server is
2871    *        stopped or till the status is reported to the (new) master
2872    *      (d) If region server dies in the middle of opening/closing
2873    *        a region, SSH picks it up and finishes it
2874    *      (e) If master dies in the middle, the new master recovers
2875    *        the state during initialization from hbase:meta. Region server
2876    *        can report any transition that has not been reported to
2877    *        the previous active master yet
2878    *  (2) Split/merge is initiated by region servers
2879    *      (a) To split a region, a region server sends a request
2880    *        to master to try to set a region to splitting, together with
2881    *        two daughters (to be created) to splitting new. If approved
2882    *        by the master, the splitting can then move ahead
2883    *      (b) To merge two regions, a region server sends a request to
2884    *        master to try to set the new merged region (to be created) to
2885    *        merging_new, together with two regions (to be merged) to merging.
2886    *        If it is ok with the master, the merge can then move ahead
2887    *      (c) Once the splitting/merging is done, the region server
2888    *        reports the status back to the master either success/failure.
2889    *      (d) Other scenarios should be handled similarly as for
2890    *        region open/close
2891    */
2892   protected String onRegionTransition(final ServerName serverName,
2893       final RegionStateTransition transition) {
2894     TransitionCode code = transition.getTransitionCode();
2895     HRegionInfo hri = HRegionInfo.convert(transition.getRegionInfo(0));
2896     Lock lock = locker.acquireLock(hri.getEncodedName());
2897     try {
2898       RegionState current = regionStates.getRegionState(hri);
2899       if (LOG.isDebugEnabled()) {
2900         LOG.debug("Got transition " + code + " for "
2901           + (current != null ? current.toString() : hri.getShortNameToLog())
2902           + " from " + serverName);
2903       }
2904       String errorMsg = null;
2905       switch (code) {
2906       case OPENED:
2907         errorMsg = onRegionOpen(current, hri, serverName, transition);
2908         break;
2909       case FAILED_OPEN:
2910         errorMsg = onRegionFailedOpen(current, hri, serverName);
2911         break;
2912       case CLOSED:
2913         errorMsg = onRegionClosed(current, hri, serverName);
2914         break;
2915       case READY_TO_SPLIT:
2916         try {
2917           regionStateListener.onRegionSplit(hri);
2918           errorMsg = onRegionReadyToSplit(current, hri, serverName, transition);
2919         } catch (IOException exp) {
2920             if (exp instanceof QuotaExceededException) {
2921               server.getRegionNormalizer().planSkipped(hri, PlanType.SPLIT);
2922             }
2923             errorMsg = StringUtils.stringifyException(exp);
2924         }
2925         break;
2926       case SPLIT_PONR:
2927         errorMsg = onRegionSplitPONR(current, hri, serverName, transition);
2928         break;
2929       case SPLIT:
2930         errorMsg = onRegionSplit(current, hri, serverName, transition);
2931         break;
2932       case SPLIT_REVERTED:
2933         errorMsg = onRegionSplitReverted(current, hri, serverName, transition);
2934         if (org.apache.commons.lang.StringUtils.isEmpty(errorMsg)) {
2935           try {
2936             regionStateListener.onRegionSplitReverted(hri);
2937           } catch (IOException exp) {
2938             LOG.warn(StringUtils.stringifyException(exp));
2939           }
2940         }
2941         break;
2942       case READY_TO_MERGE:
2943         errorMsg = onRegionReadyToMerge(current, hri, serverName, transition);
2944         break;
2945       case MERGE_PONR:
2946         errorMsg = onRegionMergePONR(current, hri, serverName, transition);
2947         break;
2948       case MERGED:
2949         try {
2950           errorMsg = onRegionMerged(current, hri, serverName, transition);
2951           regionStateListener.onRegionMerged(hri);
2952         } catch (IOException exp) {
2953           errorMsg = StringUtils.stringifyException(exp);
2954         }
2955         break;
2956       case MERGE_REVERTED:
2957         errorMsg = onRegionMergeReverted(current, hri, serverName, transition);
2958         break;
2959
2960       default:
2961         errorMsg = "Unexpected transition code " + code;
2962       }
2963       if (errorMsg != null) {
2964         LOG.info("Could not transition region from " + current + " on "
2965           + code + " by " + serverName + ": " + errorMsg);
2966       }
2967       return errorMsg;
2968     } finally {
2969       lock.unlock();
2970     }
2971   }
2972
2973   private void processBogusAssignments(Map<ServerName, List<HRegionInfo>> bulkPlan) {
2974     if (bulkPlan.containsKey(LoadBalancer.BOGUS_SERVER_NAME)) {
2975       // Found no plan for some regions, put those regions in RIT
2976       for (HRegionInfo hri : bulkPlan.get(LoadBalancer.BOGUS_SERVER_NAME)) {
2977         regionStates.updateRegionState(hri, State.FAILED_OPEN);
2978       }
2979       bulkPlan.remove(LoadBalancer.BOGUS_SERVER_NAME);
2980     }
2981   }
2982
2983   /**
2984    * @return Instance of load balancer
2985    */
2986   public LoadBalancer getBalancer() {
2987     return this.balancer;
2988   }
2989
2990   public Map<ServerName, List<HRegionInfo>>
2991     getSnapShotOfAssignment(Collection<HRegionInfo> infos) {
2992     return getRegionStates().getRegionAssignments(infos);
2993   }
2994
2995   void setRegionStateListener(RegionStateListener listener) {
2996     this.regionStateListener = listener;
2997   }
2998 }