001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.Collections;
025import java.util.LinkedList;
026import java.util.List;
027import java.util.Map;
028import java.util.Queue;
029import java.util.Set;
030import java.util.stream.Collectors;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.conf.Configured;
033import org.apache.hadoop.fs.FileStatus;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.hbase.Abortable;
036import org.apache.hadoop.hbase.HBaseConfiguration;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.client.Admin;
039import org.apache.hadoop.hbase.client.ClusterConnection;
040import org.apache.hadoop.hbase.client.ConnectionFactory;
041import org.apache.hadoop.hbase.client.HBaseAdmin;
042import org.apache.hadoop.hbase.client.replication.TableCFs;
043import org.apache.hadoop.hbase.io.WALLink;
044import org.apache.hadoop.hbase.procedure2.util.StringUtils;
045import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
046import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
047import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
048import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
049import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
051import org.apache.hadoop.hbase.zookeeper.ZKDump;
052import org.apache.hadoop.hbase.zookeeper.ZKUtil;
053import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
054import org.apache.hadoop.util.Tool;
055import org.apache.hadoop.util.ToolRunner;
056import org.apache.yetus.audience.InterfaceAudience;
057import org.slf4j.Logger;
058import org.slf4j.LoggerFactory;
059
060import org.apache.hbase.thirdparty.com.google.common.util.concurrent.AtomicLongMap;
061
062/**
063 * Provides information about the existing states of replication, replication peers and queues.
064 * Usage: hbase org.apache.hadoop.hbase.replication.regionserver.DumpReplicationQueues [args]
065 * Arguments: --distributed Polls each RS to dump information about the queue --hdfs Reports HDFS
066 * usage by the replication queues (note: can be overestimated).
067 */
068@InterfaceAudience.Private
069public class DumpReplicationQueues extends Configured implements Tool {
070
071  private static final Logger LOG = LoggerFactory.getLogger(DumpReplicationQueues.class.getName());
072
073  private List<String> deadRegionServers;
074  private List<String> deletedQueues;
075  private AtomicLongMap<String> peersQueueSize;
076  private long totalSizeOfWALs;
077  private long numWalsNotFound;
078
079  public DumpReplicationQueues() {
080    deadRegionServers = new ArrayList<>();
081    deletedQueues = new ArrayList<>();
082    peersQueueSize = AtomicLongMap.create();
083    totalSizeOfWALs = 0;
084    numWalsNotFound = 0;
085  }
086
087  static class DumpOptions {
088    boolean hdfs = false;
089    boolean distributed = false;
090
091    public DumpOptions() {
092    }
093
094    public DumpOptions(DumpOptions that) {
095      this.hdfs = that.hdfs;
096      this.distributed = that.distributed;
097    }
098
099    boolean isHdfs() {
100      return hdfs;
101    }
102
103    boolean isDistributed() {
104      return distributed;
105    }
106
107    void setHdfs(boolean hdfs) {
108      this.hdfs = hdfs;
109    }
110
111    void setDistributed(boolean distributed) {
112      this.distributed = distributed;
113    }
114  }
115
116  static DumpOptions parseOpts(Queue<String> args) {
117    DumpOptions opts = new DumpOptions();
118
119    String cmd = null;
120    while ((cmd = args.poll()) != null) {
121      if (cmd.equals("-h") || cmd.equals("--h") || cmd.equals("--help")) {
122        // place item back onto queue so that caller knows parsing was incomplete
123        args.add(cmd);
124        break;
125      }
126      final String hdfs = "--hdfs";
127      if (cmd.equals(hdfs)) {
128        opts.setHdfs(true);
129        continue;
130      }
131      final String distributed = "--distributed";
132      if (cmd.equals(distributed)) {
133        opts.setDistributed(true);
134        continue;
135      } else {
136        printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1);
137      }
138      // check that --distributed is present when --hdfs is in the arguments
139      if (!opts.isDistributed() && opts.isHdfs()) {
140        printUsageAndExit("ERROR: --hdfs option can only be used with --distributed: " + cmd, -1);
141      }
142    }
143    return opts;
144  }
145
146  /**
147   * Main nn
148   */
149  public static void main(String[] args) throws Exception {
150    Configuration conf = HBaseConfiguration.create();
151    int ret = ToolRunner.run(conf, new DumpReplicationQueues(), args);
152    System.exit(ret);
153  }
154
155  @Override
156  public int run(String[] args) throws Exception {
157
158    int errCode = -1;
159    LinkedList<String> argv = new LinkedList<>();
160    argv.addAll(Arrays.asList(args));
161    DumpOptions opts = parseOpts(argv);
162
163    // args remaining, print help and exit
164    if (!argv.isEmpty()) {
165      errCode = 0;
166      printUsage();
167      return errCode;
168    }
169    return dumpReplicationQueues(opts);
170  }
171
172  protected void printUsage() {
173    printUsage(this.getClass().getName(), null);
174  }
175
176  protected static void printUsage(final String message) {
177    printUsage(DumpReplicationQueues.class.getName(), message);
178  }
179
180  protected static void printUsage(final String className, final String message) {
181    if (message != null && message.length() > 0) {
182      System.err.println(message);
183    }
184    System.err.println("Usage: hbase " + className + " \\");
185    System.err.println("  <OPTIONS> [-D<property=value>]*");
186    System.err.println();
187    System.err.println("General Options:");
188    System.err.println(" -h|--h|--help  Show this help and exit.");
189    System.err.println(" --distributed  Poll each RS and print its own replication queue. "
190      + "Default only polls ZooKeeper");
191    System.err.println(" --hdfs         Use HDFS to calculate usage of WALs by replication."
192      + " It could be overestimated if replicating to multiple peers."
193      + " --distributed flag is also needed.");
194  }
195
196  protected static void printUsageAndExit(final String message, final int exitCode) {
197    printUsage(message);
198    System.exit(exitCode);
199  }
200
201  private int dumpReplicationQueues(DumpOptions opts) throws Exception {
202
203    Configuration conf = getConf();
204    HBaseAdmin.available(conf);
205    ClusterConnection connection = (ClusterConnection) ConnectionFactory.createConnection(conf);
206    Admin admin = connection.getAdmin();
207
208    ZKWatcher zkw =
209      new ZKWatcher(conf, "DumpReplicationQueues" + EnvironmentEdgeManager.currentTime(),
210        new WarnOnlyAbortable(), true);
211
212    try {
213      // Our zk watcher
214      LOG.info("Our Quorum: " + zkw.getQuorum());
215      List<TableCFs> replicatedTableCFs = admin.listReplicatedTableCFs();
216      if (replicatedTableCFs.isEmpty()) {
217        LOG.info("No tables with a configured replication peer were found.");
218        return (0);
219      } else {
220        LOG.info("Replicated Tables: " + replicatedTableCFs);
221      }
222
223      List<ReplicationPeerDescription> peers = admin.listReplicationPeers();
224
225      if (peers.isEmpty()) {
226        LOG.info("Replication is enabled but no peer configuration was found.");
227      }
228
229      System.out.println("Dumping replication peers and configurations:");
230      System.out.println(dumpPeersState(peers));
231
232      if (opts.isDistributed()) {
233        LOG.info("Found [--distributed], will poll each RegionServer.");
234        Set<String> peerIds =
235          peers.stream().map((peer) -> peer.getPeerId()).collect(Collectors.toSet());
236        System.out.println(dumpQueues(zkw, peerIds, opts.isHdfs()));
237        System.out.println(dumpReplicationSummary());
238      } else {
239        // use ZK instead
240        System.out.print("Dumping replication znodes via ZooKeeper:");
241        System.out.println(ZKDump.getReplicationZnodesDump(zkw));
242      }
243      return (0);
244    } catch (IOException e) {
245      return (-1);
246    } finally {
247      zkw.close();
248    }
249  }
250
251  public String dumpReplicationSummary() {
252    StringBuilder sb = new StringBuilder();
253    if (!deletedQueues.isEmpty()) {
254      sb.append("Found " + deletedQueues.size() + " deleted queues"
255        + ", run hbck -fixReplication in order to remove the deleted replication queues\n");
256      for (String deletedQueue : deletedQueues) {
257        sb.append("    " + deletedQueue + "\n");
258      }
259    }
260    if (!deadRegionServers.isEmpty()) {
261      sb.append("Found " + deadRegionServers.size() + " dead regionservers"
262        + ", restart one regionserver to transfer the queues of dead regionservers\n");
263      for (String deadRs : deadRegionServers) {
264        sb.append("    " + deadRs + "\n");
265      }
266    }
267    if (!peersQueueSize.isEmpty()) {
268      sb.append("Dumping all peers's number of WALs in replication queue\n");
269      for (Map.Entry<String, Long> entry : peersQueueSize.asMap().entrySet()) {
270        sb.append(
271          "    PeerId: " + entry.getKey() + " , sizeOfLogQueue: " + entry.getValue() + "\n");
272      }
273    }
274    sb.append("    Total size of WALs on HDFS: " + StringUtils.humanSize(totalSizeOfWALs) + "\n");
275    if (numWalsNotFound > 0) {
276      sb.append("    ERROR: There are " + numWalsNotFound + " WALs not found!!!\n");
277    }
278    return sb.toString();
279  }
280
281  public String dumpPeersState(List<ReplicationPeerDescription> peers) throws Exception {
282    Map<String, String> currentConf;
283    StringBuilder sb = new StringBuilder();
284    for (ReplicationPeerDescription peer : peers) {
285      ReplicationPeerConfig peerConfig = peer.getPeerConfig();
286      sb.append("Peer: " + peer.getPeerId() + "\n");
287      sb.append("    " + "State: " + (peer.isEnabled() ? "ENABLED" : "DISABLED") + "\n");
288      sb.append("    " + "Cluster Name: " + peerConfig.getClusterKey() + "\n");
289      sb.append("    " + "Replication Endpoint: " + peerConfig.getReplicationEndpointImpl() + "\n");
290      currentConf = peerConfig.getConfiguration();
291      // Only show when we have a custom configuration for the peer
292      if (currentConf.size() > 1) {
293        sb.append("    " + "Peer Configuration: " + currentConf + "\n");
294      }
295      sb.append("    " + "Peer Table CFs: " + peerConfig.getTableCFsMap() + "\n");
296      sb.append("    " + "Peer Namespaces: " + peerConfig.getNamespaces() + "\n");
297    }
298    return sb.toString();
299  }
300
301  public String dumpQueues(ZKWatcher zkw, Set<String> peerIds, boolean hdfs) throws Exception {
302    ReplicationQueueStorage queueStorage;
303    StringBuilder sb = new StringBuilder();
304
305    queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf());
306    Set<ServerName> liveRegionServers = ZKUtil.listChildrenNoWatch(zkw, zkw.getZNodePaths().rsZNode)
307      .stream().map(ServerName::parseServerName).collect(Collectors.toSet());
308
309    // Loops each peer on each RS and dumps the queues
310    List<ServerName> regionservers = queueStorage.getListOfReplicators();
311    if (regionservers == null || regionservers.isEmpty()) {
312      return sb.toString();
313    }
314    for (ServerName regionserver : regionservers) {
315      List<String> queueIds = queueStorage.getAllQueues(regionserver);
316      if (!liveRegionServers.contains(regionserver)) {
317        deadRegionServers.add(regionserver.getServerName());
318      }
319      for (String queueId : queueIds) {
320        ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
321        List<String> wals = queueStorage.getWALsInQueue(regionserver, queueId);
322        Collections.sort(wals);
323        if (!peerIds.contains(queueInfo.getPeerId())) {
324          deletedQueues.add(regionserver + "/" + queueId);
325          sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, true, hdfs));
326        } else {
327          sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, false, hdfs));
328        }
329      }
330    }
331    return sb.toString();
332  }
333
334  private String formatQueue(ServerName regionserver, ReplicationQueueStorage queueStorage,
335    ReplicationQueueInfo queueInfo, String queueId, List<String> wals, boolean isDeleted,
336    boolean hdfs) throws Exception {
337    StringBuilder sb = new StringBuilder();
338
339    List<ServerName> deadServers;
340
341    sb.append("Dumping replication queue info for RegionServer: [" + regionserver + "]" + "\n");
342    sb.append("    Queue znode: " + queueId + "\n");
343    sb.append("    PeerID: " + queueInfo.getPeerId() + "\n");
344    sb.append("    Recovered: " + queueInfo.isQueueRecovered() + "\n");
345    deadServers = queueInfo.getDeadRegionServers();
346    if (deadServers.isEmpty()) {
347      sb.append("    No dead RegionServers found in this queue." + "\n");
348    } else {
349      sb.append("    Dead RegionServers: " + deadServers + "\n");
350    }
351    sb.append("    Was deleted: " + isDeleted + "\n");
352    sb.append("    Number of WALs in replication queue: " + wals.size() + "\n");
353    peersQueueSize.addAndGet(queueInfo.getPeerId(), wals.size());
354
355    for (String wal : wals) {
356      long position = queueStorage.getWALPosition(regionserver, queueInfo.getPeerId(), wal);
357      sb.append("    Replication position for " + wal + ": "
358        + (position > 0 ? position : "0" + " (not started or nothing to replicate)") + "\n");
359    }
360
361    if (hdfs) {
362      FileSystem fs = FileSystem.get(getConf());
363      sb.append("    Total size of WALs on HDFS for this queue: "
364        + StringUtils.humanSize(getTotalWALSize(fs, wals, regionserver)) + "\n");
365    }
366    return sb.toString();
367  }
368
369  /**
370   * return total size in bytes from a list of WALs
371   */
372  private long getTotalWALSize(FileSystem fs, List<String> wals, ServerName server)
373    throws IOException {
374    long size = 0;
375    FileStatus fileStatus;
376
377    for (String wal : wals) {
378      try {
379        fileStatus = (new WALLink(getConf(), server.getServerName(), wal)).getFileStatus(fs);
380      } catch (IOException e) {
381        if (e instanceof FileNotFoundException) {
382          numWalsNotFound++;
383          LOG.warn("WAL " + wal + " couldn't be found, skipping", e);
384        } else {
385          LOG.warn("Can't get file status of WAL " + wal + ", skipping", e);
386        }
387        continue;
388      }
389      size += fileStatus.getLen();
390    }
391
392    totalSizeOfWALs += size;
393    return size;
394  }
395
396  private static class WarnOnlyAbortable implements Abortable {
397    @Override
398    public void abort(String why, Throwable e) {
399      LOG.warn("DumpReplicationQueue received abort, ignoring.  Reason: " + why);
400      if (LOG.isDebugEnabled()) {
401        LOG.debug(e.toString(), e);
402      }
403    }
404
405    @Override
406    public boolean isAborted() {
407      return false;
408    }
409  }
410}