View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.master;
19  
20  import java.io.IOException;
21  import java.util.ArrayList;
22  import java.util.Collection;
23  import java.util.Collections;
24  import java.util.HashMap;
25  import java.util.HashSet;
26  import java.util.Iterator;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Set;
30  import java.util.TreeMap;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.hbase.HConstants;
36  import org.apache.hadoop.hbase.HRegionInfo;
37  import org.apache.hadoop.hbase.HTableDescriptor;
38  import org.apache.hadoop.hbase.MetaTableAccessor;
39  import org.apache.hadoop.hbase.RegionTransition;
40  import org.apache.hadoop.hbase.Server;
41  import org.apache.hadoop.hbase.ServerLoad;
42  import org.apache.hadoop.hbase.ServerName;
43  import org.apache.hadoop.hbase.TableName;
44  import org.apache.hadoop.hbase.TableStateManager;
45  import org.apache.hadoop.hbase.classification.InterfaceAudience;
46  import org.apache.hadoop.hbase.client.RegionReplicaUtil;
47  import org.apache.hadoop.hbase.master.RegionState.State;
48  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
49  import org.apache.hadoop.hbase.util.Bytes;
50  import org.apache.hadoop.hbase.util.ConfigUtil;
51  import org.apache.hadoop.hbase.util.FSUtils;
52  import org.apache.hadoop.hbase.util.Pair;
53  import org.apache.hadoop.hbase.zookeeper.ZKAssign;
54  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
55  import org.apache.zookeeper.KeeperException;
56  
57  import com.google.common.annotations.VisibleForTesting;
58  import com.google.common.base.Preconditions;
59  
60  /**
61   * Region state accountant. It holds the states of all regions in the memory.
62   * In normal scenario, it should match the meta table and the true region states.
63   *
64   * This map is used by AssignmentManager to track region states.
65   */
66  @InterfaceAudience.Private
67  public class RegionStates {
68    private static final Log LOG = LogFactory.getLog(RegionStates.class);
69  
70    /**
71     * Regions currently in transition.
72     */
73    final HashMap<String, RegionState> regionsInTransition =
74      new HashMap<String, RegionState>();
75  
76    /**
77     * Region encoded name to state map.
78     * All the regions should be in this map.
79     */
80    private final Map<String, RegionState> regionStates =
81      new HashMap<String, RegionState>();
82  
83    /**
84     * Holds mapping of table -> region state
85     */
86    private final Map<TableName, Map<String, RegionState>> regionStatesTableIndex =
87        new HashMap<TableName, Map<String, RegionState>>();
88  
89    /**
90     * Server to regions assignment map.
91     * Contains the set of regions currently assigned to a given server.
92     */
93    private final Map<ServerName, Set<HRegionInfo>> serverHoldings =
94      new HashMap<ServerName, Set<HRegionInfo>>();
95  
96    /**
97     * Maintains the mapping from the default region to the replica regions.
98     */
99    private final Map<HRegionInfo, Set<HRegionInfo>> defaultReplicaToOtherReplicas =
100     new HashMap<HRegionInfo, Set<HRegionInfo>>();
101 
102   /**
103    * Region to server assignment map.
104    * Contains the server a given region is currently assigned to.
105    */
106   private final TreeMap<HRegionInfo, ServerName> regionAssignments =
107     new TreeMap<HRegionInfo, ServerName>();
108 
109   /**
110    * Encoded region name to server assignment map for re-assignment
111    * purpose. Contains the server a given region is last known assigned
112    * to, which has not completed log splitting, so not assignable.
113    * If a region is currently assigned, this server info in this
114    * map should be the same as that in regionAssignments.
115    * However the info in regionAssignments is cleared when the region
116    * is offline while the info in lastAssignments is cleared when
117    * the region is closed or the server is dead and processed.
118    */
119   private final HashMap<String, ServerName> lastAssignments =
120     new HashMap<String, ServerName>();
121 
122   /**
123    * Encoded region name to server assignment map for the
124    * purpose to clean up serverHoldings when a region is online
125    * on a new server. When the region is offline from the previous
126    * server, we cleaned up regionAssignments so that it has the
127    * latest assignment map. But we didn't clean up serverHoldings
128    * to match the meta. We need this map to find out the old server
129    * whose serverHoldings needs cleanup, given a moved region.
130    */
131   private final HashMap<String, ServerName> oldAssignments =
132     new HashMap<String, ServerName>();
133 
134   /**
135    * Map a host port pair string to the latest start code
136    * of a region server which is known to be dead. It is dead
137    * to us, but server manager may not know it yet.
138    */
139   private final HashMap<String, Long> deadServers =
140     new HashMap<String, Long>();
141 
142   /**
143    * Map a dead servers to the time when log split is done.
144    * Since log splitting is not ordered, we have to remember
145    * all processed instances. The map is cleaned up based
146    * on a configured time. By default, we assume a dead
147    * server should be done with log splitting in two hours.
148    */
149   private final HashMap<ServerName, Long> processedServers =
150     new HashMap<ServerName, Long>();
151   private long lastProcessedServerCleanTime;
152 
153   private final TableStateManager tableStateManager;
154   private final RegionStateStore regionStateStore;
155   private final ServerManager serverManager;
156   private final Server server;
157   private final boolean useZK; // Is it ZK based assignment?
158 
159   // The maximum time to keep a log split info in region states map
160   static final String LOG_SPLIT_TIME = "hbase.master.maximum.logsplit.keeptime";
161   static final long DEFAULT_LOG_SPLIT_TIME = 7200000L; // 2 hours
162 
163   RegionStates(final Server master, final TableStateManager tableStateManager,
164       final ServerManager serverManager, final RegionStateStore regionStateStore) {
165     this.tableStateManager = tableStateManager;
166     this.regionStateStore = regionStateStore;
167     this.serverManager = serverManager;
168     this.server = master;
169     this.useZK = ConfigUtil.useZKForAssignment(server.getConfiguration());
170   }
171 
172   /**
173    * @return a copy of the region assignment map
174    */
175   public synchronized Map<HRegionInfo, ServerName> getRegionAssignments() {
176     return new TreeMap<HRegionInfo, ServerName>(regionAssignments);
177   }
178 
179   /**
180    * Return the replicas (including default) for the regions grouped by ServerName
181    * @param regions
182    * @return a pair containing the groupings as a map
183    */
184   synchronized Map<ServerName, List<HRegionInfo>> getRegionAssignments(
185     Collection<HRegionInfo> regions) {
186     Map<ServerName, List<HRegionInfo>> map = new HashMap<ServerName, List<HRegionInfo>>();
187     for (HRegionInfo region : regions) {
188       HRegionInfo defaultReplica = RegionReplicaUtil.getRegionInfoForDefaultReplica(region);
189       Set<HRegionInfo> allReplicas = defaultReplicaToOtherReplicas.get(defaultReplica);
190       if (allReplicas != null) {
191         for (HRegionInfo hri : allReplicas) {
192           ServerName server = regionAssignments.get(hri);
193           if (server != null) {
194             List<HRegionInfo> regionsOnServer = map.get(server);
195             if (regionsOnServer == null) {
196               regionsOnServer = new ArrayList<HRegionInfo>(1);
197               map.put(server, regionsOnServer);
198             }
199             regionsOnServer.add(hri);
200           }
201         }
202       }
203     }
204     return map;
205   }
206 
207   public synchronized ServerName getRegionServerOfRegion(HRegionInfo hri) {
208     return regionAssignments.get(hri);
209   }
210 
211   /**
212    * Get regions in transition and their states
213    */
214   @SuppressWarnings("unchecked")
215   public synchronized Map<String, RegionState> getRegionsInTransition() {
216     return (Map<String, RegionState>)regionsInTransition.clone();
217   }
218 
219   /**
220    * @return True if specified region in transition.
221    */
222   public synchronized boolean isRegionInTransition(final HRegionInfo hri) {
223     return regionsInTransition.containsKey(hri.getEncodedName());
224   }
225 
226   /**
227    * @return True if specified region in transition.
228    */
229   public synchronized boolean isRegionInTransition(final String encodedName) {
230     return regionsInTransition.containsKey(encodedName);
231   }
232 
233   /**
234    * @return True if any region in transition.
235    */
236   public synchronized boolean isRegionsInTransition() {
237     return !regionsInTransition.isEmpty();
238   }
239 
240   /**
241    * @return True if specified region assigned, and not in transition.
242    */
243   public synchronized boolean isRegionOnline(final HRegionInfo hri) {
244     return !isRegionInTransition(hri) && regionAssignments.containsKey(hri);
245   }
246 
247   /**
248    * @return True if specified region offline/closed, but not in transition.
249    * If the region is not in the map, it is offline to us too.
250    */
251   public synchronized boolean isRegionOffline(final HRegionInfo hri) {
252     return getRegionState(hri) == null || (!isRegionInTransition(hri)
253       && isRegionInState(hri, State.OFFLINE, State.CLOSED));
254   }
255 
256   /**
257    * @return True if specified region is in one of the specified states.
258    */
259   public boolean isRegionInState(
260       final HRegionInfo hri, final State... states) {
261     return isRegionInState(hri.getEncodedName(), states);
262   }
263 
264   /**
265    * @return True if specified region is in one of the specified states.
266    */
267   public boolean isRegionInState(
268       final String encodedName, final State... states) {
269     RegionState regionState = getRegionState(encodedName);
270     return isOneOfStates(regionState, states);
271   }
272 
273   /**
274    * Wait for the state map to be updated by assignment manager.
275    */
276   public synchronized void waitForUpdate(
277       final long timeout) throws InterruptedException {
278     this.wait(timeout);
279   }
280 
281   /**
282    * Get region transition state
283    */
284   public RegionState getRegionTransitionState(final HRegionInfo hri) {
285     return getRegionTransitionState(hri.getEncodedName());
286   }
287 
288   /**
289    * Get region transition state
290    */
291   public synchronized RegionState
292       getRegionTransitionState(final String encodedName) {
293     return regionsInTransition.get(encodedName);
294   }
295 
296   /**
297    * Add a list of regions to RegionStates. If a region is split
298    * and offline, its state will be SPLIT. Otherwise, its state will
299    * be OFFLINE. Region already in RegionStates will be skipped.
300    */
301   public void createRegionStates(
302       final List<HRegionInfo> hris) {
303     for (HRegionInfo hri: hris) {
304       createRegionState(hri);
305     }
306   }
307 
308   /**
309    * Add a region to RegionStates. If the region is split
310    * and offline, its state will be SPLIT. Otherwise, its state will
311    * be OFFLINE. If it is already in RegionStates, this call has
312    * no effect, and the original state is returned.
313    */
314   public RegionState createRegionState(final HRegionInfo hri) {
315     return createRegionState(hri, null, null, null);
316   }
317 
318   /**
319    * Add a region to RegionStates with the specified state.
320    * If the region is already in RegionStates, this call has
321    * no effect, and the original state is returned.
322    *
323    * @param hri the region info to create a state for
324    * @param newState the state to the region in set to
325    * @param serverName the server the region is transitioning on
326    * @param lastHost the last server that hosts the region
327    * @return the current state
328    */
329   public synchronized RegionState createRegionState(final HRegionInfo hri,
330       State newState, ServerName serverName, ServerName lastHost) {
331     if (newState == null || (newState == State.OPEN && serverName == null)) {
332       newState =  State.OFFLINE;
333     }
334     if (hri.isOffline() && hri.isSplit()) {
335       newState = State.SPLIT;
336       serverName = null;
337     }
338     String encodedName = hri.getEncodedName();
339     RegionState regionState = regionStates.get(encodedName);
340     if (regionState != null) {
341       LOG.warn("Tried to create a state for a region already in RegionStates, "
342         + "used existing: " + regionState + ", ignored new: " + newState);
343     } else {
344       regionState = new RegionState(hri, newState, serverName);
345       putRegionState(regionState);
346       if (newState == State.OPEN) {
347         if (!serverName.equals(lastHost)) {
348           LOG.warn("Open region's last host " + lastHost
349             + " should be the same as the current one " + serverName
350             + ", ignored the last and used the current one");
351           lastHost = serverName;
352         }
353         lastAssignments.put(encodedName, lastHost);
354         regionAssignments.put(hri, lastHost);
355       } else if (!regionState.isUnassignable()) {
356         regionsInTransition.put(encodedName, regionState);
357       }
358       if (lastHost != null && newState != State.SPLIT) {
359         addToReplicaMapping(hri);
360         addToServerHoldings(lastHost, hri);
361         if (newState != State.OPEN) {
362           oldAssignments.put(encodedName, lastHost);
363         }
364       }
365     }
366     return regionState;
367   }
368 
369   private RegionState putRegionState(RegionState regionState) {
370     HRegionInfo hri = regionState.getRegion();
371     String encodedName = hri.getEncodedName();
372     TableName table = hri.getTable();
373     RegionState oldState = regionStates.put(encodedName, regionState);
374     Map<String, RegionState> map = regionStatesTableIndex.get(table);
375     if (map == null) {
376       map = new HashMap<String, RegionState>();
377       regionStatesTableIndex.put(table, map);
378     }
379     map.put(encodedName, regionState);
380     return oldState;
381   }
382 
383   /**
384    * Set the region state to CLOSED
385    */
386   public RegionState setRegionStateTOCLOSED(
387       final byte[] regionName,
388       final ServerName serverName) {
389     HRegionInfo regionInfo = getRegionInfo(regionName);
390     return setRegionStateTOCLOSED(regionInfo, serverName);
391   }
392 
393   /**
394    * Set the region state to CLOSED
395    */
396   public RegionState setRegionStateTOCLOSED(
397       final HRegionInfo regionInfo,
398       final ServerName serverName) {
399     ServerName sn = serverName;
400     if (sn == null) {
401       RegionState regionState = getRegionState(regionInfo.getEncodedName());
402       if (regionState != null) {
403         sn = regionState.getServerName();
404       }
405       // TODO: if sn is null, should we dig into
406       // lastAssignments.get(regionInfo.getEncodedName() to get the server name?
407       // For now, I just keep the same logic that works in the past
408     }
409     // We have to make sure that the last region server is set to be the same as the
410     // current RS.  If we don't do that, we could run into situation that both AM and SSH
411     // think other would do the assignment work; at the end, neither does the work and
412     // region remains RIT.
413     // See HBASE-13330 and HBASE-17023
414     setLastRegionServerOfRegion(sn, regionInfo.getEncodedName());
415     return updateRegionState(regionInfo, State.CLOSED, sn);
416   }
417 
418   /**
419    * Update a region state. It will be put in transition if not already there.
420    */
421   public RegionState updateRegionState(
422       final HRegionInfo hri, final State state) {
423     RegionState regionState = getRegionState(hri.getEncodedName());
424     return updateRegionState(hri, state,
425       regionState == null ? null : regionState.getServerName());
426   }
427 
428   /**
429    * Update a region state. It will be put in transition if not already there.
430    *
431    * If we can't find the region info based on the region name in
432    * the transition, log a warning and return null.
433    */
434   public RegionState updateRegionState(
435       final RegionTransition transition, final State state) {
436     byte [] regionName = transition.getRegionName();
437     HRegionInfo regionInfo = getRegionInfo(regionName);
438     if (regionInfo == null) {
439       String prettyRegionName = HRegionInfo.prettyPrint(
440         HRegionInfo.encodeRegionName(regionName));
441       LOG.warn("Failed to find region " + prettyRegionName
442         + " in updating its state to " + state
443         + " based on region transition " + transition);
444       return null;
445     }
446     return updateRegionState(regionInfo, state,
447       transition.getServerName());
448   }
449 
450   /**
451    * Transition a region state to OPEN from OPENING/PENDING_OPEN
452    */
453   public synchronized RegionState transitionOpenFromPendingOpenOrOpeningOnServer(
454       final RegionTransition transition, final RegionState fromState, final ServerName sn) {
455     if(fromState.isPendingOpenOrOpeningOnServer(sn)){
456       return updateRegionState(transition, State.OPEN);
457     }
458     return null;
459   }
460 
461   /**
462    * Update a region state. It will be put in transition if not already there.
463    */
464   public RegionState updateRegionState(
465       final HRegionInfo hri, final State state, final ServerName serverName) {
466     return updateRegionState(hri, state, serverName, HConstants.NO_SEQNUM);
467   }
468 
469   public void regionOnline(final HRegionInfo hri, final ServerName serverName) {
470     regionOnline(hri, serverName, HConstants.NO_SEQNUM);
471   }
472 
473   /**
474    * A region is online, won't be in transition any more.
475    * We can't confirm it is really online on specified region server
476    * because it hasn't been put in region server's online region list yet.
477    */
478   public void regionOnline(final HRegionInfo hri, final ServerName serverName, long openSeqNum) {
479     String encodedName = hri.getEncodedName();
480     if (!serverManager.isServerOnline(serverName)) {
481       // This is possible if the region server dies before master gets a
482       // chance to handle ZK event in time. At this time, if the dead server
483       // is already processed by SSH, we should ignore this event.
484       // If not processed yet, ignore and let SSH deal with it.
485       LOG.warn("Ignored, " + encodedName + " was opened on a dead server: " + serverName);
486       return;
487     }
488     updateRegionState(hri, State.OPEN, serverName, openSeqNum);
489 
490     synchronized (this) {
491       regionsInTransition.remove(encodedName);
492       ServerName oldServerName = regionAssignments.put(hri, serverName);
493       if (!serverName.equals(oldServerName)) {
494         if (LOG.isDebugEnabled()) {
495           LOG.debug("Onlined " + hri.getShortNameToLog() + " on " + serverName);
496         }
497         addToServerHoldings(serverName, hri);
498         addToReplicaMapping(hri);
499         if (oldServerName == null) {
500           oldServerName = oldAssignments.remove(encodedName);
501         }
502         if (oldServerName != null
503             && !oldServerName.equals(serverName)
504             && serverHoldings.containsKey(oldServerName)) {
505           LOG.info("Offlined " + hri.getShortNameToLog() + " from " + oldServerName);
506           removeFromServerHoldings(oldServerName, hri);
507         }
508       }
509     }
510   }
511 
512   private void addToServerHoldings(ServerName serverName, HRegionInfo hri) {
513     Set<HRegionInfo> regions = serverHoldings.get(serverName);
514     if (regions == null) {
515       regions = new HashSet<HRegionInfo>();
516       serverHoldings.put(serverName, regions);
517     }
518     regions.add(hri);
519   }
520 
521   private void addToReplicaMapping(HRegionInfo hri) {
522     HRegionInfo defaultReplica = RegionReplicaUtil.getRegionInfoForDefaultReplica(hri);
523     Set<HRegionInfo> replicas =
524         defaultReplicaToOtherReplicas.get(defaultReplica);
525     if (replicas == null) {
526       replicas = new HashSet<HRegionInfo>();
527       defaultReplicaToOtherReplicas.put(defaultReplica, replicas);
528     }
529     replicas.add(hri);
530   }
531 
532   private void removeFromServerHoldings(ServerName serverName, HRegionInfo hri) {
533     Set<HRegionInfo> oldRegions = serverHoldings.get(serverName);
534     oldRegions.remove(hri);
535     if (oldRegions.isEmpty()) {
536       serverHoldings.remove(serverName);
537     }
538   }
539 
540   private void removeFromReplicaMapping(HRegionInfo hri) {
541     HRegionInfo defaultReplica = RegionReplicaUtil.getRegionInfoForDefaultReplica(hri);
542     Set<HRegionInfo> replicas = defaultReplicaToOtherReplicas.get(defaultReplica);
543     if (replicas != null) {
544       replicas.remove(hri);
545       if (replicas.isEmpty()) {
546         defaultReplicaToOtherReplicas.remove(defaultReplica);
547       }
548     }
549   }
550 
551   /**
552    * Used in some unit tests
553    */
554   @VisibleForTesting
555   synchronized boolean existsInServerHoldings(final ServerName serverName,
556       final HRegionInfo hri) {
557     Set<HRegionInfo> oldRegions = serverHoldings.get(serverName);
558     if (oldRegions != null) {
559       return oldRegions.contains(hri);
560     }
561     return false;
562   }
563 
564   /**
565    * A dead server's wals have been split so that all the regions
566    * used to be open on it can be safely assigned now. Mark them assignable.
567    */
568   public synchronized void logSplit(final ServerName serverName) {
569     for (Iterator<Map.Entry<String, ServerName>> it
570         = lastAssignments.entrySet().iterator(); it.hasNext();) {
571       Map.Entry<String, ServerName> e = it.next();
572       if (e.getValue().equals(serverName)) {
573         it.remove();
574       }
575     }
576     long now = System.currentTimeMillis();
577     if (LOG.isDebugEnabled()) {
578       LOG.debug("Adding to log splitting servers " + serverName);
579     }
580     processedServers.put(serverName, Long.valueOf(now));
581     Configuration conf = server.getConfiguration();
582     long obsoleteTime = conf.getLong(LOG_SPLIT_TIME, DEFAULT_LOG_SPLIT_TIME);
583     // Doesn't have to be very accurate about the clean up time
584     if (now > lastProcessedServerCleanTime + obsoleteTime) {
585       lastProcessedServerCleanTime = now;
586       long cutoff = now - obsoleteTime;
587       for (Iterator<Map.Entry<ServerName, Long>> it
588           = processedServers.entrySet().iterator(); it.hasNext();) {
589         Map.Entry<ServerName, Long> e = it.next();
590         if (e.getValue().longValue() < cutoff) {
591           if (LOG.isDebugEnabled()) {
592             LOG.debug("Removed from log splitting servers " + e.getKey());
593           }
594           it.remove();
595         }
596       }
597     }
598   }
599 
600   /**
601    * Log split is done for a given region, so it is assignable now.
602    */
603   public void logSplit(final HRegionInfo region) {
604     clearLastAssignment(region);
605   }
606 
607   public synchronized void clearLastAssignment(final HRegionInfo region) {
608     lastAssignments.remove(region.getEncodedName());
609   }
610 
611   /**
612    * A region is offline, won't be in transition any more.
613    */
614   public void regionOffline(final HRegionInfo hri) {
615     regionOffline(hri, null);
616   }
617 
618   /**
619    * A region is offline, won't be in transition any more. Its state
620    * should be the specified expected state, which can only be
621    * Split/Merged/Offline/null(=Offline)/SplittingNew/MergingNew.
622    */
623   public void regionOffline(
624       final HRegionInfo hri, final State expectedState) {
625     Preconditions.checkArgument(expectedState == null
626       || RegionState.isUnassignable(expectedState),
627         "Offlined region should not be " + expectedState);
628     if (isRegionInState(hri, State.SPLITTING_NEW, State.MERGING_NEW)) {
629       // Remove it from all region maps
630       deleteRegion(hri);
631       return;
632     }
633 
634     /*
635      * One tricky case, if region here is a replica region and its parent is at
636      * SPLIT state, its newState should be same as its parent, not OFFLINE.
637      */
638     State newState =
639         expectedState == null ? State.OFFLINE : expectedState;
640 
641     if ((expectedState == null) && !RegionReplicaUtil.isDefaultReplica(hri)) {
642       RegionState primateState = getRegionState(
643           RegionReplicaUtil.getRegionInfoForDefaultReplica(hri));
644       if ((primateState != null) && (primateState.getState() == State.SPLIT)) {
645         if (LOG.isDebugEnabled()) {
646           LOG.debug("Update region " + hri + "to SPLIT, from primary region " +
647               RegionReplicaUtil.getRegionInfoForDefaultReplica(hri));
648         }
649         newState = State.SPLIT;
650       }
651     }
652 
653     updateRegionState(hri, newState);
654     String encodedName = hri.getEncodedName();
655     synchronized (this) {
656       regionsInTransition.remove(encodedName);
657       ServerName oldServerName = regionAssignments.remove(hri);
658       if (oldServerName != null && serverHoldings.containsKey(oldServerName)) {
659         if (newState == State.MERGED || newState == State.SPLIT
660             || hri.isMetaRegion() || tableStateManager.isTableState(hri.getTable(),
661               ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
662           // Offline the region only if it's merged/split, or the table is disabled/disabling.
663           // Otherwise, offline it from this server only when it is online on a different server.
664           LOG.info("Offlined " + hri.getShortNameToLog() + " from " + oldServerName);
665           removeFromServerHoldings(oldServerName, hri);
666           removeFromReplicaMapping(hri);
667         } else {
668           // Need to remember it so that we can offline it from this
669           // server when it is online on a different server.
670           oldAssignments.put(encodedName, oldServerName);
671         }
672       }
673     }
674   }
675 
676   /**
677    * A server is offline, all regions on it are dead.
678    */
679   public List<HRegionInfo> serverOffline(final ZooKeeperWatcher watcher, final ServerName sn) {
680     // Offline all regions on this server not already in transition.
681     List<HRegionInfo> rits = new ArrayList<HRegionInfo>();
682     Set<HRegionInfo> regionsToClean = new HashSet<HRegionInfo>();
683     // Offline regions outside the loop and synchronized block to avoid
684     // ConcurrentModificationException and deadlock in case of meta anassigned,
685     // but RegionState a blocked.
686     Set<HRegionInfo> regionsToOffline = new HashSet<HRegionInfo>();
687     synchronized (this) {
688       Set<HRegionInfo> assignedRegions = serverHoldings.get(sn);
689       if (assignedRegions == null) {
690         assignedRegions = new HashSet<HRegionInfo>();
691       }
692 
693       for (HRegionInfo region : assignedRegions) {
694         // Offline open regions, no need to offline if SPLIT/MERGED/OFFLINE
695         if (isRegionOnline(region)) {
696           regionsToOffline.add(region);
697         } else if (isRegionInState(region, State.SPLITTING, State.MERGING)) {
698           LOG.debug("Offline splitting/merging region " + getRegionState(region));
699           try {
700             // Delete the ZNode if exists
701             ZKAssign.deleteNodeFailSilent(watcher, region);
702             regionsToOffline.add(region);
703           } catch (KeeperException ke) {
704             server.abort("Unexpected ZK exception deleting node " + region, ke);
705           }
706         }
707       }
708 
709       for (RegionState state : regionsInTransition.values()) {
710         HRegionInfo hri = state.getRegion();
711         if (assignedRegions.contains(hri)) {
712           // Region is open on this region server, but in transition.
713           // This region must be moving away from this server, or splitting/merging.
714           // SSH will handle it, either skip assigning, or re-assign.
715           LOG.info("Transitioning " + state + " will be handled by ServerCrashProcedure for " + sn);
716         } else if (sn.equals(state.getServerName())) {
717           // Region is in transition on this region server, and this
718           // region is not open on this server. So the region must be
719           // moving to this server from another one (i.e. opening or
720           // pending open on this server, was open on another one.
721           // Offline state is also kind of pending open if the region is in
722           // transition. The region could be in failed_close state too if we have
723           // tried several times to open it while this region server is not reachable)
724           if (state.isPendingOpenOrOpening() || state.isFailedClose() || state.isOffline()) {
725             LOG.info("Found region in " + state +
726               " to be reassigned by ServerCrashProcedure for " + sn);
727             rits.add(hri);
728           } else if(state.isSplittingNew() || state.isMergingNew()) {
729             LOG.info("Offline/Cleanup region if no meta entry exists, hri: " + hri +
730                 " state: " + state);
731             regionsToClean.add(state.getRegion());
732           } else {
733             LOG.warn("THIS SHOULD NOT HAPPEN: unexpected " + state);
734           }
735         }
736       }
737       this.notifyAll();
738     }
739 
740     for (HRegionInfo hri : regionsToOffline) {
741       regionOffline(hri);
742     }
743 
744     cleanFailedSplitMergeRegions(regionsToClean);
745     return rits;
746   }
747 
748   /**
749    * This method does an RPC to hbase:meta. Do not call this method with a lock/synchronize held.
750    * In ZK mode we rollback and hence cleanup daughters/merged region. We also cleanup if
751    * meta doesn't have these regions.
752    *
753    * @param hris The hris to check if empty in hbase:meta and if so, clean them up.
754    */
755   private void cleanFailedSplitMergeRegions(Set<HRegionInfo> hris) {
756     if (hris.isEmpty()) {
757       return;
758     }
759 
760     for (HRegionInfo hri : hris) {
761       // This is RPC to meta table. It is done while we have a synchronize on
762       // regionstates. No progress will be made if meta is not available at this time.
763       // This is a cleanup task. Not critical.
764       try {
765         Pair<HRegionInfo, ServerName> regionPair =
766             MetaTableAccessor.getRegion(server.getConnection(), hri.getRegionName());
767         if (regionPair == null || useZK) {
768           regionOffline(hri);
769 
770           // If we use ZK, then we can cleanup entries from meta, since we roll back.
771           if (regionPair != null) {
772             MetaTableAccessor.deleteRegion(this.server.getConnection(), hri);
773           }
774           LOG.debug("Cleaning up HDFS since no meta entry exists, hri: " + hri);
775           FSUtils.deleteRegionDir(server.getConfiguration(), hri);
776         }
777       } catch (IOException e) {
778         LOG.warn("Got exception while cleaning up region " + hri, e);
779       }
780     }
781   }
782 
783   /**
784    * Gets the online regions of the specified table.
785    * This method looks at the in-memory state.  It does not go to <code>hbase:meta</code>.
786    * Only returns <em>online</em> regions.  If a region on this table has been
787    * closed during a disable, etc., it will be included in the returned list.
788    * So, the returned list may not necessarily be ALL regions in this table, its
789    * all the ONLINE regions in the table.
790    * @param tableName
791    * @return Online regions from <code>tableName</code>
792    */
793   public synchronized List<HRegionInfo> getRegionsOfTable(TableName tableName) {
794     List<HRegionInfo> tableRegions = new ArrayList<HRegionInfo>();
795     // boundary needs to have table's name but regionID 0 so that it is sorted
796     // before all table's regions.
797     HRegionInfo boundary = new HRegionInfo(tableName, null, null, false, 0L);
798     for (HRegionInfo hri: regionAssignments.tailMap(boundary).keySet()) {
799       if(!hri.getTable().equals(tableName)) break;
800       tableRegions.add(hri);
801     }
802     return tableRegions;
803   }
804 
805   /**
806    * Gets current state of all regions of the table.
807    * This method looks at the in-memory state.  It does not go to <code>hbase:meta</code>.
808    * Method guaranteed to return keys for all states
809    * in {@link org.apache.hadoop.hbase.master.RegionState.State}
810    *
811    * @param tableName
812    * @return Online regions from <code>tableName</code>
813    */
814   public synchronized Map<RegionState.State, List<HRegionInfo>>
815   getRegionByStateOfTable(TableName tableName) {
816     Map<RegionState.State, List<HRegionInfo>> tableRegions =
817         new HashMap<State, List<HRegionInfo>>();
818     for (State state : State.values()) {
819       tableRegions.put(state, new ArrayList<HRegionInfo>());
820     }
821     Map<String, RegionState> indexMap = regionStatesTableIndex.get(tableName);
822     if (indexMap == null)
823       return tableRegions;
824     for (RegionState regionState : indexMap.values()) {
825       tableRegions.get(regionState.getState()).add(regionState.getRegion());
826     }
827     return tableRegions;
828   }
829 
830   /**
831    * Wait on region to clear regions-in-transition.
832    * <p>
833    * If the region isn't in transition, returns immediately.  Otherwise, method
834    * blocks until the region is out of transition.
835    */
836   public synchronized void waitOnRegionToClearRegionsInTransition(
837       final HRegionInfo hri) throws InterruptedException {
838     if (!isRegionInTransition(hri)) return;
839 
840     while(!server.isStopped() && isRegionInTransition(hri)) {
841       RegionState rs = getRegionState(hri);
842       LOG.info("Waiting on " + rs + " to clear regions-in-transition");
843       waitForUpdate(100);
844     }
845 
846     if (server.isStopped()) {
847       LOG.info("Giving up wait on region in " +
848         "transition because stoppable.isStopped is set");
849     }
850   }
851 
852   /**
853    * A table is deleted. Remove its regions from all internal maps.
854    * We loop through all regions assuming we don't delete tables too much.
855    */
856   public void tableDeleted(final TableName tableName) {
857     Set<HRegionInfo> regionsToDelete = new HashSet<HRegionInfo>();
858     synchronized (this) {
859       for (RegionState state: regionStates.values()) {
860         HRegionInfo region = state.getRegion();
861         if (region.getTable().equals(tableName)) {
862           regionsToDelete.add(region);
863         }
864       }
865     }
866     for (HRegionInfo region: regionsToDelete) {
867       deleteRegion(region);
868     }
869   }
870 
871   /**
872    * Get a copy of all regions assigned to a server
873    */
874   public synchronized Set<HRegionInfo> getServerRegions(ServerName serverName) {
875     Set<HRegionInfo> regions = serverHoldings.get(serverName);
876     if (regions == null) return null;
877     return new HashSet<HRegionInfo>(regions);
878   }
879 
880   /**
881    * Remove a region from all state maps.
882    */
883   @VisibleForTesting
884   public synchronized void deleteRegion(final HRegionInfo hri) {
885     String encodedName = hri.getEncodedName();
886     regionsInTransition.remove(encodedName);
887     regionStates.remove(encodedName);
888     TableName table = hri.getTable();
889     Map<String, RegionState> indexMap = regionStatesTableIndex.get(table);
890     indexMap.remove(encodedName);
891     if (indexMap.size() == 0)
892       regionStatesTableIndex.remove(table);
893     lastAssignments.remove(encodedName);
894     ServerName sn = regionAssignments.remove(hri);
895     if (sn != null) {
896       Set<HRegionInfo> regions = serverHoldings.get(sn);
897       regions.remove(hri);
898     }
899   }
900 
901   @VisibleForTesting
902   public boolean isRegionInRegionStates(final HRegionInfo hri) {
903     return (getRegionState(hri) != null || isRegionOnline(hri)) || isRegionInTransition(hri)
904         || isRegionInState(hri, State.OFFLINE, State.CLOSED);
905      }
906 
907   /**
908    * Checking if a region was assigned to a server which is not online now.
909    * If so, we should hold re-assign this region till SSH has split its wals.
910    * Once logs are split, the last assignment of this region will be reset,
911    * which means a null last assignment server is ok for re-assigning.
912    *
913    * A region server could be dead but we don't know it yet. We may
914    * think it's online falsely. Therefore if a server is online, we still
915    * need to confirm it reachable and having the expected start code.
916    */
917   synchronized boolean wasRegionOnDeadServer(final String encodedName) {
918     ServerName server = lastAssignments.get(encodedName);
919     return isServerDeadAndNotProcessed(server);
920   }
921 
922   synchronized boolean isServerDeadAndNotProcessed(ServerName server) {
923     if (server == null) return false;
924     if (serverManager.isServerOnline(server)) {
925       String hostAndPort = server.getHostAndPort();
926       long startCode = server.getStartcode();
927       Long deadCode = deadServers.get(hostAndPort);
928       if (deadCode == null || startCode > deadCode.longValue()) {
929         if (serverManager.isServerReachable(server)) {
930           return false;
931         }
932         // The size of deadServers won't grow unbounded.
933         deadServers.put(hostAndPort, Long.valueOf(startCode));
934       }
935       // Watch out! If the server is not dead, the region could
936       // remain unassigned. That's why ServerManager#isServerReachable
937       // should use some retry.
938       //
939       // We cache this info since it is very unlikely for that
940       // instance to come back up later on. We don't want to expire
941       // the server since we prefer to let it die naturally.
942       LOG.warn("Couldn't reach online server " + server);
943     }
944     // Now, we know it's dead. Check if it's processed
945     return !processedServers.containsKey(server);
946   }
947 
948  /**
949    * Get the last region server a region was on for purpose of re-assignment,
950    * i.e. should the re-assignment be held back till log split is done?
951    */
952   synchronized ServerName getLastRegionServerOfRegion(final String encodedName) {
953     return lastAssignments.get(encodedName);
954   }
955 
956   synchronized void setLastRegionServerOfRegions(
957       final ServerName serverName, final List<HRegionInfo> regionInfos) {
958     for (HRegionInfo hri: regionInfos) {
959       setLastRegionServerOfRegion(serverName, hri.getEncodedName());
960     }
961   }
962 
963   synchronized void setLastRegionServerOfRegion(
964       final ServerName serverName, final String encodedName) {
965     lastAssignments.put(encodedName, serverName);
966   }
967 
968   void splitRegion(HRegionInfo p,
969       HRegionInfo a, HRegionInfo b, ServerName sn) throws IOException {
970 
971     regionStateStore.splitRegion(p, a, b, sn, getRegionReplication(p));
972     synchronized (this) {
973       // After PONR, split is considered to be done.
974       // Update server holdings to be aligned with the meta.
975       Set<HRegionInfo> regions = serverHoldings.get(sn);
976       if (regions == null) {
977         throw new IllegalStateException(sn + " should host some regions");
978       }
979       regions.remove(p);
980       regions.add(a);
981       regions.add(b);
982     }
983   }
984 
985   void mergeRegions(HRegionInfo p,
986       HRegionInfo a, HRegionInfo b, ServerName sn) throws IOException {
987     regionStateStore.mergeRegions(p, a, b, sn, getRegionReplication(a));
988     synchronized (this) {
989       // After PONR, merge is considered to be done.
990       // Update server holdings to be aligned with the meta.
991       Set<HRegionInfo> regions = serverHoldings.get(sn);
992       if (regions == null) {
993         throw new IllegalStateException(sn + " should host some regions");
994       }
995       regions.remove(a);
996       regions.remove(b);
997       regions.add(p);
998     }
999   }
1000 
1001   private int getRegionReplication(HRegionInfo r) throws IOException {
1002     if (tableStateManager != null) {
1003       HTableDescriptor htd = ((MasterServices)server).getTableDescriptors().get(r.getTable());
1004       if (htd != null) {
1005         return htd.getRegionReplication();
1006       }
1007     }
1008     return 1;
1009   }
1010 
1011   /**
1012    * At cluster clean re/start, mark all user regions closed except those of tables
1013    * that are excluded, such as disabled/disabling/enabling tables. All user regions
1014    * and their previous locations are returned.
1015    */
1016   synchronized Map<HRegionInfo, ServerName> closeAllUserRegions(Set<TableName> excludedTables) {
1017     boolean noExcludeTables = excludedTables == null || excludedTables.isEmpty();
1018     Set<HRegionInfo> toBeClosed = new HashSet<HRegionInfo>(regionStates.size());
1019     for(RegionState state: regionStates.values()) {
1020       HRegionInfo hri = state.getRegion();
1021       if (state.isSplit() || hri.isSplit()) {
1022         continue;
1023       }
1024       TableName tableName = hri.getTable();
1025       if (!TableName.META_TABLE_NAME.equals(tableName)
1026           && (noExcludeTables || !excludedTables.contains(tableName))) {
1027         toBeClosed.add(hri);
1028       }
1029     }
1030     Map<HRegionInfo, ServerName> allUserRegions =
1031       new HashMap<HRegionInfo, ServerName>(toBeClosed.size());
1032     for (HRegionInfo hri: toBeClosed) {
1033       RegionState regionState = updateRegionState(hri, State.CLOSED);
1034       allUserRegions.put(hri, regionState.getServerName());
1035     }
1036     return allUserRegions;
1037   }
1038 
1039   /**
1040    * Compute the average load across all region servers.
1041    * Currently, this uses a very naive computation - just uses the number of
1042    * regions being served, ignoring stats about number of requests.
1043    * @return the average load
1044    */
1045   protected synchronized double getAverageLoad() {
1046     int numServers = 0, totalLoad = 0;
1047     for (Map.Entry<ServerName, Set<HRegionInfo>> e: serverHoldings.entrySet()) {
1048       Set<HRegionInfo> regions = e.getValue();
1049       ServerName serverName = e.getKey();
1050       int regionCount = regions.size();
1051       if (serverManager.isServerOnline(serverName)) {
1052         totalLoad += regionCount;
1053         numServers++;
1054       }
1055     }
1056     if (numServers > 1) {
1057       // The master region server holds only a couple regions.
1058       // Don't consider this server in calculating the average load
1059       // if there are other region servers to avoid possible confusion.
1060       Set<HRegionInfo> hris = serverHoldings.get(server.getServerName());
1061       if (hris != null) {
1062         totalLoad -= hris.size();
1063         numServers--;
1064       }
1065     }
1066     return numServers == 0 ? 0.0 :
1067       (double)totalLoad / (double)numServers;
1068   }
1069 
1070   /**
1071    * This is an EXPENSIVE clone.  Cloning though is the safest thing to do.
1072    * Can't let out original since it can change and at least the load balancer
1073    * wants to iterate this exported list.  We need to synchronize on regions
1074    * since all access to this.servers is under a lock on this.regions.
1075    *
1076    * @return A clone of current assignments by table.
1077    */
1078   protected Map<TableName, Map<ServerName, List<HRegionInfo>>>
1079       getAssignmentsByTable() {
1080     Map<TableName, Map<ServerName, List<HRegionInfo>>> result =
1081       new HashMap<TableName, Map<ServerName,List<HRegionInfo>>>();
1082     synchronized (this) {
1083       if (!server.getConfiguration().getBoolean("hbase.master.loadbalance.bytable", false)) {
1084         Map<ServerName, List<HRegionInfo>> svrToRegions =
1085           new HashMap<ServerName, List<HRegionInfo>>(serverHoldings.size());
1086         for (Map.Entry<ServerName, Set<HRegionInfo>> e: serverHoldings.entrySet()) {
1087           svrToRegions.put(e.getKey(), new ArrayList<HRegionInfo>(e.getValue()));
1088         }
1089         result.put(TableName.valueOf("ensemble"), svrToRegions);
1090       } else {
1091         for (Map.Entry<ServerName, Set<HRegionInfo>> e: serverHoldings.entrySet()) {
1092           for (HRegionInfo hri: e.getValue()) {
1093             if (hri.isMetaRegion()) continue;
1094             TableName tablename = hri.getTable();
1095             Map<ServerName, List<HRegionInfo>> svrToRegions = result.get(tablename);
1096             if (svrToRegions == null) {
1097               svrToRegions = new HashMap<ServerName, List<HRegionInfo>>(serverHoldings.size());
1098               result.put(tablename, svrToRegions);
1099             }
1100             List<HRegionInfo> regions = svrToRegions.get(e.getKey());
1101             if (regions == null) {
1102               regions = new ArrayList<HRegionInfo>();
1103               svrToRegions.put(e.getKey(), regions);
1104             }
1105             regions.add(hri);
1106           }
1107         }
1108       }
1109     }
1110 
1111     Map<ServerName, ServerLoad>
1112       onlineSvrs = serverManager.getOnlineServers();
1113     // Take care of servers w/o assignments, and remove servers in draining mode
1114     List<ServerName> drainingServers = this.serverManager.getDrainingServersList();
1115     for (Map<ServerName, List<HRegionInfo>> map: result.values()) {
1116       for (ServerName svr: onlineSvrs.keySet()) {
1117         if (!map.containsKey(svr)) {
1118           map.put(svr, new ArrayList<HRegionInfo>());
1119         }
1120       }
1121       map.keySet().removeAll(drainingServers);
1122     }
1123     return result;
1124   }
1125 
1126   protected RegionState getRegionState(final HRegionInfo hri) {
1127     return getRegionState(hri.getEncodedName());
1128   }
1129 
1130   /**
1131    * Returns a clone of region assignments per server
1132    * @return a Map of ServerName to a List of HRegionInfo's
1133    */
1134   protected synchronized Map<ServerName, List<HRegionInfo>> getRegionAssignmentsByServer() {
1135     Map<ServerName, List<HRegionInfo>> regionsByServer =
1136         new HashMap<ServerName, List<HRegionInfo>>(serverHoldings.size());
1137     for (Map.Entry<ServerName, Set<HRegionInfo>> e: serverHoldings.entrySet()) {
1138       regionsByServer.put(e.getKey(), new ArrayList<HRegionInfo>(e.getValue()));
1139     }
1140     return regionsByServer;
1141   }
1142 
1143   protected synchronized RegionState getRegionState(final String encodedName) {
1144     return regionStates.get(encodedName);
1145   }
1146 
1147   /**
1148    * Get the HRegionInfo from cache, if not there, from the hbase:meta table
1149    * @param  regionName
1150    * @return HRegionInfo for the region
1151    */
1152   @SuppressWarnings("deprecation")
1153   protected HRegionInfo getRegionInfo(final byte [] regionName) {
1154     String encodedName = HRegionInfo.encodeRegionName(regionName);
1155     RegionState regionState = getRegionState(encodedName);
1156     if (regionState != null) {
1157       return regionState.getRegion();
1158     }
1159 
1160     try {
1161       Pair<HRegionInfo, ServerName> p =
1162         MetaTableAccessor.getRegion(server.getConnection(), regionName);
1163       HRegionInfo hri = p == null ? null : p.getFirst();
1164       if (hri != null) {
1165         createRegionState(hri);
1166       }
1167       return hri;
1168     } catch (IOException e) {
1169       server.abort("Aborting because error occoured while reading "
1170         + Bytes.toStringBinary(regionName) + " from hbase:meta", e);
1171       return null;
1172     }
1173   }
1174 
1175   static boolean isOneOfStates(RegionState regionState, State... states) {
1176     State s = regionState != null ? regionState.getState() : null;
1177     for (State state: states) {
1178       if (s == state) return true;
1179     }
1180     return false;
1181   }
1182 
1183   /**
1184    * Update a region state. It will be put in transition if not already there.
1185    */
1186   private RegionState updateRegionState(final HRegionInfo hri,
1187       final State state, final ServerName serverName, long openSeqNum) {
1188     if (state == State.FAILED_CLOSE || state == State.FAILED_OPEN) {
1189       LOG.warn("Failed to open/close " + hri.getShortNameToLog()
1190         + " on " + serverName + ", set to " + state);
1191     }
1192 
1193     String encodedName = hri.getEncodedName();
1194     RegionState regionState = new RegionState(
1195       hri, state, System.currentTimeMillis(), serverName);
1196     RegionState oldState = getRegionState(encodedName);
1197     if (!regionState.equals(oldState)) {
1198       LOG.info("Transition " + oldState + " to " + regionState);
1199       // Persist region state before updating in-memory info, if needed
1200       regionStateStore.updateRegionState(openSeqNum, regionState, oldState);
1201     }
1202 
1203     synchronized (this) {
1204       regionsInTransition.put(encodedName, regionState);
1205       putRegionState(regionState);
1206 
1207       // For these states, region should be properly closed.
1208       // There should be no log splitting issue.
1209       if ((state == State.CLOSED || state == State.MERGED
1210           || state == State.SPLIT) && lastAssignments.containsKey(encodedName)) {
1211         ServerName last = lastAssignments.get(encodedName);
1212         if (last.equals(serverName)) {
1213           lastAssignments.remove(encodedName);
1214         } else {
1215           LOG.warn(encodedName + " moved to " + state + " on "
1216             + serverName + ", expected " + last);
1217         }
1218       }
1219 
1220       // Once a region is opened, record its last assignment right away.
1221       if (serverName != null && state == State.OPEN) {
1222         ServerName last = lastAssignments.get(encodedName);
1223         if (!serverName.equals(last)) {
1224           lastAssignments.put(encodedName, serverName);
1225           if (last != null && isServerDeadAndNotProcessed(last)) {
1226             LOG.warn(encodedName + " moved to " + serverName
1227               + ", while it's previous host " + last
1228               + " is dead but not processed yet");
1229           }
1230         }
1231       }
1232 
1233       // notify the change
1234       this.notifyAll();
1235     }
1236     return regionState;
1237   }
1238 }