001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.master;
020
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.Comparator;
024import java.util.Date;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.Iterator;
028import java.util.List;
029import java.util.Map;
030import java.util.Set;
031import org.apache.hadoop.hbase.ServerName;
032import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
033import org.apache.hadoop.hbase.util.Pair;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
039
040
041/**
042 * Class to hold dead servers list and utility querying dead server list.
043 * On znode expiration, servers are added here.
044 */
045@InterfaceAudience.Private
046public class DeadServer {
047  private static final Logger LOG = LoggerFactory.getLogger(DeadServer.class);
048
049  /**
050   * Set of known dead servers.  On znode expiration, servers are added here.
051   * This is needed in case of a network partitioning where the server's lease
052   * expires, but the server is still running. After the network is healed,
053   * and it's server logs are recovered, it will be told to call server startup
054   * because by then, its regions have probably been reassigned.
055   */
056  private final Map<ServerName, Long> deadServers = new HashMap<>();
057
058  /**
059   * Set of dead servers currently being processed
060   */
061  private final Set<ServerName> processingServers = new HashSet<ServerName>();
062
063  /**
064   * A dead server that comes back alive has a different start code. The new start code should be
065   *  greater than the old one, but we don't take this into account in this method.
066   *
067   * @param newServerName Servername as either <code>host:port</code> or
068   *                      <code>host,port,startcode</code>.
069   * @return true if this server was dead before and coming back alive again
070   */
071  public synchronized boolean cleanPreviousInstance(final ServerName newServerName) {
072    Iterator<ServerName> it = deadServers.keySet().iterator();
073    while (it.hasNext()) {
074      ServerName sn = it.next();
075      if (ServerName.isSameAddress(sn, newServerName)) {
076        // remove from deadServers
077        it.remove();
078        // remove from processingServers
079        boolean removed = processingServers.remove(sn);
080        if (removed) {
081          LOG.debug("Removed " + sn + " ; numProcessing=" + processingServers.size());
082        }
083        return true;
084      }
085    }
086
087    return false;
088  }
089
090  /**
091   * @param serverName server name.
092   * @return true if this server is on the dead servers list false otherwise
093   */
094  public synchronized boolean isDeadServer(final ServerName serverName) {
095    return deadServers.containsKey(serverName);
096  }
097
098  /**
099   * @param serverName server name.
100   * @return true if this server is on the processing servers list false otherwise
101   */
102  public synchronized boolean isProcessingServer(final ServerName serverName) {
103    return processingServers.contains(serverName);
104  }
105
106  /**
107   * Checks if there are currently any dead servers being processed by the
108   * master.  Returns true if at least one region server is currently being
109   * processed as dead.
110   *
111   * @return true if any RS are being processed as dead
112   */
113  public synchronized boolean areDeadServersInProgress() {
114    return !processingServers.isEmpty();
115  }
116
117  public synchronized Set<ServerName> copyServerNames() {
118    Set<ServerName> clone = new HashSet<>(deadServers.size());
119    clone.addAll(deadServers.keySet());
120    return clone;
121  }
122
123
124  /**
125   * Adds the server to the dead server list if it's not there already.
126   * @param sn the server name
127   */
128  public synchronized void add(ServerName sn) {
129    add(sn, true);
130  }
131
132  /**
133   * Adds the server to the dead server list if it's not there already.
134   * @param sn the server name
135   * @param processing whether there is an active SCP associated with the server
136   */
137  public synchronized void add(ServerName sn, boolean processing) {
138    if (!deadServers.containsKey(sn)){
139      deadServers.put(sn, EnvironmentEdgeManager.currentTime());
140    }
141    if (processing && processingServers.add(sn)) {
142      LOG.debug("Added {}; numProcessing={}", sn, processingServers.size());
143    }
144  }
145
146  /**
147   * Notify that we started processing this dead server.
148   * @param sn ServerName for the dead server.
149   */
150  public synchronized void notifyServer(ServerName sn) {
151    boolean added = processingServers.add(sn);
152    if (LOG.isDebugEnabled()) {
153      if (added) {
154        LOG.debug("Added " + sn + "; numProcessing=" + processingServers.size());
155      }
156      LOG.debug("Started processing " + sn + "; numProcessing=" + processingServers.size());
157    }
158  }
159
160  /**
161   * Complete processing for this dead server.
162   * @param sn ServerName for the dead server.
163   */
164  public synchronized void finish(ServerName sn) {
165    boolean removed = processingServers.remove(sn);
166    if (LOG.isDebugEnabled()) {
167      LOG.debug("Finished processing " + sn + "; numProcessing=" + processingServers.size());
168      if (removed) {
169        LOG.debug("Removed " + sn + " ; numProcessing=" + processingServers.size());
170      }
171    }
172  }
173
174  public synchronized int size() {
175    return deadServers.size();
176  }
177
178  public synchronized boolean isEmpty() {
179    return deadServers.isEmpty();
180  }
181
182  public synchronized void cleanAllPreviousInstances(final ServerName newServerName) {
183    Iterator<ServerName> it = deadServers.keySet().iterator();
184    while (it.hasNext()) {
185      ServerName sn = it.next();
186      if (ServerName.isSameAddress(sn, newServerName)) {
187        // remove from deadServers
188        it.remove();
189        // remove from processingServers
190        boolean removed = processingServers.remove(sn);
191        if (removed) {
192          LOG.debug("Removed " + sn + " ; numProcessing=" + processingServers.size());
193        }
194      }
195    }
196  }
197
198  @Override
199  public synchronized String toString() {
200    // Display unified set of servers from both maps
201    Set<ServerName> servers = new HashSet<ServerName>();
202    servers.addAll(deadServers.keySet());
203    servers.addAll(processingServers);
204    StringBuilder sb = new StringBuilder();
205    for (ServerName sn : servers) {
206      if (sb.length() > 0) {
207        sb.append(", ");
208      }
209      sb.append(sn.toString());
210      // Star entries that are being processed
211      if (processingServers.contains(sn)) {
212        sb.append("*");
213      }
214    }
215    return sb.toString();
216  }
217
218  /**
219   * Extract all the servers dead since a given time, and sort them.
220   * @param ts the time, 0 for all
221   * @return a sorted array list, by death time, lowest values first.
222   */
223  public synchronized List<Pair<ServerName, Long>> copyDeadServersSince(long ts){
224    List<Pair<ServerName, Long>> res =  new ArrayList<>(size());
225
226    for (Map.Entry<ServerName, Long> entry:deadServers.entrySet()){
227      if (entry.getValue() >= ts){
228        res.add(new Pair<>(entry.getKey(), entry.getValue()));
229      }
230    }
231
232    Collections.sort(res, ServerNameDeathDateComparator);
233    return res;
234  }
235  
236  /**
237   * Get the time when a server died
238   * @param deadServerName the dead server name
239   * @return the date when the server died 
240   */
241  public synchronized Date getTimeOfDeath(final ServerName deadServerName){
242    Long time = deadServers.get(deadServerName);
243    return time == null ? null : new Date(time);
244  }
245
246  private static Comparator<Pair<ServerName, Long>> ServerNameDeathDateComparator =
247      new Comparator<Pair<ServerName, Long>>(){
248
249    @Override
250    public int compare(Pair<ServerName, Long> o1, Pair<ServerName, Long> o2) {
251      return o1.getSecond().compareTo(o2.getSecond());
252    }
253  };
254
255  /**
256   * remove the specified dead server
257   * @param deadServerName the dead server name
258   * @return true if this server was removed
259   */
260
261  public synchronized boolean removeDeadServer(final ServerName deadServerName) {
262    Preconditions.checkState(!processingServers.contains(deadServerName),
263      "Asked to remove server still in processingServers set " + deadServerName +
264          " (numProcessing=" + processingServers.size() + ")");
265    if (deadServers.remove(deadServerName) == null) {
266      return false;
267    }
268    return true;
269  }
270}