001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import java.util.ArrayList;
021import java.util.Collections;
022import java.util.Date;
023import java.util.HashMap;
024import java.util.HashSet;
025import java.util.Iterator;
026import java.util.List;
027import java.util.Map;
028import java.util.Set;
029import org.apache.hadoop.hbase.ServerName;
030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
031import org.apache.hadoop.hbase.util.Pair;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
037
038
039/**
040 * Class to hold dead servers list and utility querying dead server list.
041 * Servers are added when they expire or when we find them in filesystem on startup.
042 * When a server crash procedure is queued, it will populate the processing list and
043 * then remove the server from processing list when done. Servers are removed from
044 * dead server list when a new instance is started over the old on same hostname and
045 * port or when new Master comes online tidying up after all initialization. Processing
046 * list and deadserver list are not tied together (you don't have to be in deadservers
047 * list to be processing and vice versa).
048 */
049@InterfaceAudience.Private
050public class DeadServer {
051  private static final Logger LOG = LoggerFactory.getLogger(DeadServer.class);
052
053  /**
054   * Set of known dead servers.  On znode expiration, servers are added here.
055   * This is needed in case of a network partitioning where the server's lease
056   * expires, but the server is still running. After the network is healed,
057   * and it's server logs are recovered, it will be told to call server startup
058   * because by then, its regions have probably been reassigned.
059   */
060  private final Map<ServerName, Long> deadServers = new HashMap<>();
061
062  /**
063   * Set of dead servers currently being processed by a SCP.
064   * Added to this list at the start of SCP and removed after it is done
065   * processing the crash.
066   */
067  private final Set<ServerName> processingServers = new HashSet<>();
068
069  /**
070   * @param serverName server name.
071   * @return true if this server is on the dead servers list false otherwise
072   */
073  public synchronized boolean isDeadServer(final ServerName serverName) {
074    return deadServers.containsKey(serverName);
075  }
076
077  /**
078   * Checks if there are currently any dead servers being processed by the
079   * master.  Returns true if at least one region server is currently being
080   * processed as dead.
081   *
082   * @return true if any RS are being processed as dead
083   */
084  synchronized boolean areDeadServersInProgress() {
085    return !processingServers.isEmpty();
086  }
087
088  public synchronized Set<ServerName> copyServerNames() {
089    Set<ServerName> clone = new HashSet<>(deadServers.size());
090    clone.addAll(deadServers.keySet());
091    return clone;
092  }
093
094  /**
095   * Adds the server to the dead server list if it's not there already.
096   */
097  synchronized void putIfAbsent(ServerName sn) {
098    this.deadServers.putIfAbsent(sn, EnvironmentEdgeManager.currentTime());
099    processing(sn);
100  }
101
102  /**
103   * Add <code>sn<</code> to set of processing deadservers.
104   * @see #finish(ServerName)
105   */
106  public synchronized void processing(ServerName sn) {
107    if (processingServers.add(sn)) {
108      // Only log on add.
109      LOG.debug("Processing {}; numProcessing={}", sn, processingServers.size());
110    }
111  }
112
113  /**
114   * Complete processing for this dead server.
115   * @param sn ServerName for the dead server.
116   * @see #processing(ServerName)
117   */
118  public synchronized void finish(ServerName sn) {
119    if (processingServers.remove(sn)) {
120      LOG.debug("Removed {} from processing; numProcessing={}", sn, processingServers.size());
121    }
122  }
123
124  public synchronized int size() {
125    return deadServers.size();
126  }
127
128  synchronized boolean isEmpty() {
129    return deadServers.isEmpty();
130  }
131
132  /**
133   * Handles restart of a server. The new server instance has a different start code.
134   * The new start code should be greater than the old one. We don't check that here.
135   * Removes the old server from deadserver list.
136   *
137   * @param newServerName Servername as either <code>host:port</code> or
138   *                      <code>host,port,startcode</code>.
139   * @return true if this server was dead before and coming back alive again
140   */
141  synchronized boolean cleanPreviousInstance(final ServerName newServerName) {
142    Iterator<ServerName> it = deadServers.keySet().iterator();
143    while (it.hasNext()) {
144      if (cleanOldServerName(newServerName, it)) {
145        return true;
146      }
147    }
148    return false;
149  }
150
151  synchronized void cleanAllPreviousInstances(final ServerName newServerName) {
152    Iterator<ServerName> it = deadServers.keySet().iterator();
153    while (it.hasNext()) {
154      cleanOldServerName(newServerName, it);
155    }
156  }
157
158  /**
159   * @param newServerName Server to match port and hostname against.
160   * @param deadServerIterator Iterator primed so can call 'next' on it.
161   * @return True if <code>newServerName</code> and current primed
162   *   iterator ServerName have same host and port and we removed old server
163   *   from iterator and from processing list.
164   */
165  private boolean cleanOldServerName(ServerName newServerName,
166      Iterator<ServerName> deadServerIterator) {
167    ServerName sn = deadServerIterator.next();
168    if (ServerName.isSameAddress(sn, newServerName)) {
169      // Remove from dead servers list. Don't remove from the processing list --
170      // let the SCP do it when it is done.
171      deadServerIterator.remove();
172      return true;
173    }
174    return false;
175  }
176
177  @Override
178  public synchronized String toString() {
179    // Display unified set of servers from both maps
180    Set<ServerName> servers = new HashSet<>();
181    servers.addAll(deadServers.keySet());
182    servers.addAll(processingServers);
183    StringBuilder sb = new StringBuilder();
184    for (ServerName sn : servers) {
185      if (sb.length() > 0) {
186        sb.append(", ");
187      }
188      sb.append(sn.toString());
189      // Star entries that are being processed
190      if (processingServers.contains(sn)) {
191        sb.append("*");
192      }
193    }
194    return sb.toString();
195  }
196
197  /**
198   * Extract all the servers dead since a given time, and sort them.
199   * @param ts the time, 0 for all
200   * @return a sorted array list, by death time, lowest values first.
201   */
202  synchronized List<Pair<ServerName, Long>> copyDeadServersSince(long ts) {
203    List<Pair<ServerName, Long>> res =  new ArrayList<>(size());
204
205    for (Map.Entry<ServerName, Long> entry:deadServers.entrySet()){
206      if (entry.getValue() >= ts){
207        res.add(new Pair<>(entry.getKey(), entry.getValue()));
208      }
209    }
210
211    Collections.sort(res, (o1, o2) -> o1.getSecond().compareTo(o2.getSecond()));
212    return res;
213  }
214  
215  /**
216   * Get the time when a server died
217   * @param deadServerName the dead server name
218   * @return the date when the server died 
219   */
220  public synchronized Date getTimeOfDeath(final ServerName deadServerName){
221    Long time = deadServers.get(deadServerName);
222    return time == null ? null : new Date(time);
223  }
224
225  /**
226   * Called from rpc by operator cleaning up deadserver list.
227   * @param deadServerName the dead server name
228   * @return true if this server was removed
229   */
230  public synchronized boolean removeDeadServer(final ServerName deadServerName) {
231    Preconditions.checkState(!processingServers.contains(deadServerName),
232      "Asked to remove server still in processingServers set " + deadServerName +
233          " (numProcessing=" + processingServers.size() + ")");
234    return this.deadServers.remove(deadServerName) != null;
235  }
236}