001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.Collections;
025import java.util.LinkedList;
026import java.util.List;
027import java.util.Map;
028import java.util.Queue;
029import java.util.Set;
030import java.util.stream.Collectors;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.conf.Configured;
033import org.apache.hadoop.fs.FileStatus;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.hbase.Abortable;
036import org.apache.hadoop.hbase.HBaseConfiguration;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.client.Admin;
039import org.apache.hadoop.hbase.client.Connection;
040import org.apache.hadoop.hbase.client.ConnectionFactory;
041import org.apache.hadoop.hbase.client.replication.TableCFs;
042import org.apache.hadoop.hbase.io.WALLink;
043import org.apache.hadoop.hbase.procedure2.util.StringUtils;
044import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
045import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
046import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
047import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
048import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
049import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
050import org.apache.hadoop.hbase.zookeeper.ZKDump;
051import org.apache.hadoop.hbase.zookeeper.ZKUtil;
052import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
053import org.apache.hadoop.util.Tool;
054import org.apache.hadoop.util.ToolRunner;
055import org.apache.yetus.audience.InterfaceAudience;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058
059import org.apache.hbase.thirdparty.com.google.common.util.concurrent.AtomicLongMap;
060
061/**
062 * Provides information about the existing states of replication, replication peers and queues.
063 * Usage: hbase org.apache.hadoop.hbase.replication.regionserver.DumpReplicationQueues [args]
064 * Arguments: --distributed Polls each RS to dump information about the queue --hdfs Reports HDFS
065 * usage by the replication queues (note: can be overestimated).
066 */
067@InterfaceAudience.Private
068public class DumpReplicationQueues extends Configured implements Tool {
069
070  private static final Logger LOG = LoggerFactory.getLogger(DumpReplicationQueues.class.getName());
071
072  private List<String> deadRegionServers;
073  private List<String> deletedQueues;
074  private AtomicLongMap<String> peersQueueSize;
075  private long totalSizeOfWALs;
076  private long numWalsNotFound;
077
078  public DumpReplicationQueues() {
079    deadRegionServers = new ArrayList<>();
080    deletedQueues = new ArrayList<>();
081    peersQueueSize = AtomicLongMap.create();
082    totalSizeOfWALs = 0;
083    numWalsNotFound = 0;
084  }
085
086  static class DumpOptions {
087    boolean hdfs = false;
088    boolean distributed = false;
089
090    public DumpOptions() {
091    }
092
093    public DumpOptions(DumpOptions that) {
094      this.hdfs = that.hdfs;
095      this.distributed = that.distributed;
096    }
097
098    boolean isHdfs() {
099      return hdfs;
100    }
101
102    boolean isDistributed() {
103      return distributed;
104    }
105
106    void setHdfs(boolean hdfs) {
107      this.hdfs = hdfs;
108    }
109
110    void setDistributed(boolean distributed) {
111      this.distributed = distributed;
112    }
113  }
114
115  static DumpOptions parseOpts(Queue<String> args) {
116    DumpOptions opts = new DumpOptions();
117
118    String cmd = null;
119    while ((cmd = args.poll()) != null) {
120      if (cmd.equals("-h") || cmd.equals("--h") || cmd.equals("--help")) {
121        // place item back onto queue so that caller knows parsing was incomplete
122        args.add(cmd);
123        break;
124      }
125      final String hdfs = "--hdfs";
126      if (cmd.equals(hdfs)) {
127        opts.setHdfs(true);
128        continue;
129      }
130      final String distributed = "--distributed";
131      if (cmd.equals(distributed)) {
132        opts.setDistributed(true);
133        continue;
134      } else {
135        printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1);
136      }
137      // check that --distributed is present when --hdfs is in the arguments
138      if (!opts.isDistributed() && opts.isHdfs()) {
139        printUsageAndExit("ERROR: --hdfs option can only be used with --distributed: " + cmd, -1);
140      }
141    }
142    return opts;
143  }
144
145  /**
146   * Main nn
147   */
148  public static void main(String[] args) throws Exception {
149    Configuration conf = HBaseConfiguration.create();
150    int ret = ToolRunner.run(conf, new DumpReplicationQueues(), args);
151    System.exit(ret);
152  }
153
154  @Override
155  public int run(String[] args) throws Exception {
156
157    int errCode = -1;
158    LinkedList<String> argv = new LinkedList<>();
159    argv.addAll(Arrays.asList(args));
160    DumpOptions opts = parseOpts(argv);
161
162    // args remaining, print help and exit
163    if (!argv.isEmpty()) {
164      errCode = 0;
165      printUsage();
166      return errCode;
167    }
168    return dumpReplicationQueues(opts);
169  }
170
171  protected void printUsage() {
172    printUsage(this.getClass().getName(), null);
173  }
174
175  protected static void printUsage(final String message) {
176    printUsage(DumpReplicationQueues.class.getName(), message);
177  }
178
179  protected static void printUsage(final String className, final String message) {
180    if (message != null && message.length() > 0) {
181      System.err.println(message);
182    }
183    System.err.println("Usage: hbase " + className + " \\");
184    System.err.println("  <OPTIONS> [-D<property=value>]*");
185    System.err.println();
186    System.err.println("General Options:");
187    System.err.println(" -h|--h|--help  Show this help and exit.");
188    System.err.println(" --distributed  Poll each RS and print its own replication queue. "
189      + "Default only polls ZooKeeper");
190    System.err.println(" --hdfs         Use HDFS to calculate usage of WALs by replication."
191      + " It could be overestimated if replicating to multiple peers."
192      + " --distributed flag is also needed.");
193  }
194
195  protected static void printUsageAndExit(final String message, final int exitCode) {
196    printUsage(message);
197    System.exit(exitCode);
198  }
199
200  private int dumpReplicationQueues(DumpOptions opts) throws Exception {
201    Configuration conf = getConf();
202    Connection connection = ConnectionFactory.createConnection(conf);
203    Admin admin = connection.getAdmin();
204
205    ZKWatcher zkw =
206      new ZKWatcher(conf, "DumpReplicationQueues" + EnvironmentEdgeManager.currentTime(),
207        new WarnOnlyAbortable(), true);
208
209    try {
210      // Our zk watcher
211      LOG.info("Our Quorum: " + zkw.getQuorum());
212      List<TableCFs> replicatedTableCFs = admin.listReplicatedTableCFs();
213      if (replicatedTableCFs.isEmpty()) {
214        LOG.info("No tables with a configured replication peer were found.");
215        return (0);
216      } else {
217        LOG.info("Replicated Tables: " + replicatedTableCFs);
218      }
219
220      List<ReplicationPeerDescription> peers = admin.listReplicationPeers();
221
222      if (peers.isEmpty()) {
223        LOG.info("Replication is enabled but no peer configuration was found.");
224      }
225
226      System.out.println("Dumping replication peers and configurations:");
227      System.out.println(dumpPeersState(peers));
228
229      if (opts.isDistributed()) {
230        LOG.info("Found [--distributed], will poll each RegionServer.");
231        Set<String> peerIds =
232          peers.stream().map((peer) -> peer.getPeerId()).collect(Collectors.toSet());
233        System.out.println(dumpQueues(zkw, peerIds, opts.isHdfs()));
234        System.out.println(dumpReplicationSummary());
235      } else {
236        // use ZK instead
237        System.out.print("Dumping replication znodes via ZooKeeper:");
238        System.out.println(ZKDump.getReplicationZnodesDump(zkw));
239      }
240      return (0);
241    } catch (IOException e) {
242      return (-1);
243    } finally {
244      zkw.close();
245    }
246  }
247
248  public String dumpReplicationSummary() {
249    StringBuilder sb = new StringBuilder();
250    if (!deletedQueues.isEmpty()) {
251      sb.append("Found " + deletedQueues.size() + " deleted queues"
252        + ", run hbck -fixReplication in order to remove the deleted replication queues\n");
253      for (String deletedQueue : deletedQueues) {
254        sb.append("    " + deletedQueue + "\n");
255      }
256    }
257    if (!deadRegionServers.isEmpty()) {
258      sb.append("Found " + deadRegionServers.size() + " dead regionservers"
259        + ", restart one regionserver to transfer the queues of dead regionservers\n");
260      for (String deadRs : deadRegionServers) {
261        sb.append("    " + deadRs + "\n");
262      }
263    }
264    if (!peersQueueSize.isEmpty()) {
265      sb.append("Dumping all peers's number of WALs in replication queue\n");
266      for (Map.Entry<String, Long> entry : peersQueueSize.asMap().entrySet()) {
267        sb.append(
268          "    PeerId: " + entry.getKey() + " , sizeOfLogQueue: " + entry.getValue() + "\n");
269      }
270    }
271    sb.append("    Total size of WALs on HDFS: " + StringUtils.humanSize(totalSizeOfWALs) + "\n");
272    if (numWalsNotFound > 0) {
273      sb.append("    ERROR: There are " + numWalsNotFound + " WALs not found!!!\n");
274    }
275    return sb.toString();
276  }
277
278  public String dumpPeersState(List<ReplicationPeerDescription> peers) throws Exception {
279    Map<String, String> currentConf;
280    StringBuilder sb = new StringBuilder();
281    for (ReplicationPeerDescription peer : peers) {
282      ReplicationPeerConfig peerConfig = peer.getPeerConfig();
283      sb.append("Peer: " + peer.getPeerId() + "\n");
284      sb.append("    " + "State: " + (peer.isEnabled() ? "ENABLED" : "DISABLED") + "\n");
285      sb.append("    " + "Cluster Name: " + peerConfig.getClusterKey() + "\n");
286      sb.append("    " + "Replication Endpoint: " + peerConfig.getReplicationEndpointImpl() + "\n");
287      currentConf = peerConfig.getConfiguration();
288      // Only show when we have a custom configuration for the peer
289      if (currentConf.size() > 1) {
290        sb.append("    " + "Peer Configuration: " + currentConf + "\n");
291      }
292      sb.append("    " + "Peer Table CFs: " + peerConfig.getTableCFsMap() + "\n");
293      sb.append("    " + "Peer Namespaces: " + peerConfig.getNamespaces() + "\n");
294    }
295    return sb.toString();
296  }
297
298  public String dumpQueues(ZKWatcher zkw, Set<String> peerIds, boolean hdfs) throws Exception {
299    ReplicationQueueStorage queueStorage;
300    StringBuilder sb = new StringBuilder();
301
302    queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf());
303    Set<ServerName> liveRegionServers = ZKUtil.listChildrenNoWatch(zkw, zkw.getZNodePaths().rsZNode)
304      .stream().map(ServerName::parseServerName).collect(Collectors.toSet());
305
306    // Loops each peer on each RS and dumps the queues
307    List<ServerName> regionservers = queueStorage.getListOfReplicators();
308    if (regionservers == null || regionservers.isEmpty()) {
309      return sb.toString();
310    }
311    for (ServerName regionserver : regionservers) {
312      List<String> queueIds = queueStorage.getAllQueues(regionserver);
313      if (!liveRegionServers.contains(regionserver)) {
314        deadRegionServers.add(regionserver.getServerName());
315      }
316      for (String queueId : queueIds) {
317        ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
318        List<String> wals = queueStorage.getWALsInQueue(regionserver, queueId);
319        Collections.sort(wals);
320        if (!peerIds.contains(queueInfo.getPeerId())) {
321          deletedQueues.add(regionserver + "/" + queueId);
322          sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, true, hdfs));
323        } else {
324          sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, false, hdfs));
325        }
326      }
327    }
328    return sb.toString();
329  }
330
331  private String formatQueue(ServerName regionserver, ReplicationQueueStorage queueStorage,
332    ReplicationQueueInfo queueInfo, String queueId, List<String> wals, boolean isDeleted,
333    boolean hdfs) throws Exception {
334    StringBuilder sb = new StringBuilder();
335
336    List<ServerName> deadServers;
337
338    sb.append("Dumping replication queue info for RegionServer: [" + regionserver + "]" + "\n");
339    sb.append("    Queue znode: " + queueId + "\n");
340    sb.append("    PeerID: " + queueInfo.getPeerId() + "\n");
341    sb.append("    Recovered: " + queueInfo.isQueueRecovered() + "\n");
342    deadServers = queueInfo.getDeadRegionServers();
343    if (deadServers.isEmpty()) {
344      sb.append("    No dead RegionServers found in this queue." + "\n");
345    } else {
346      sb.append("    Dead RegionServers: " + deadServers + "\n");
347    }
348    sb.append("    Was deleted: " + isDeleted + "\n");
349    sb.append("    Number of WALs in replication queue: " + wals.size() + "\n");
350    peersQueueSize.addAndGet(queueInfo.getPeerId(), wals.size());
351
352    for (String wal : wals) {
353      long position = queueStorage.getWALPosition(regionserver, queueInfo.getPeerId(), wal);
354      sb.append("    Replication position for " + wal + ": "
355        + (position > 0 ? position : "0" + " (not started or nothing to replicate)") + "\n");
356    }
357
358    if (hdfs) {
359      FileSystem fs = FileSystem.get(getConf());
360      sb.append("    Total size of WALs on HDFS for this queue: "
361        + StringUtils.humanSize(getTotalWALSize(fs, wals, regionserver)) + "\n");
362    }
363    return sb.toString();
364  }
365
366  /**
367   * return total size in bytes from a list of WALs
368   */
369  private long getTotalWALSize(FileSystem fs, List<String> wals, ServerName server)
370    throws IOException {
371    long size = 0;
372    FileStatus fileStatus;
373
374    for (String wal : wals) {
375      try {
376        fileStatus = (new WALLink(getConf(), server.getServerName(), wal)).getFileStatus(fs);
377      } catch (IOException e) {
378        if (e instanceof FileNotFoundException) {
379          numWalsNotFound++;
380          LOG.warn("WAL " + wal + " couldn't be found, skipping", e);
381        } else {
382          LOG.warn("Can't get file status of WAL " + wal + ", skipping", e);
383        }
384        continue;
385      }
386      size += fileStatus.getLen();
387    }
388
389    totalSizeOfWALs += size;
390    return size;
391  }
392
393  private static class WarnOnlyAbortable implements Abortable {
394    @Override
395    public void abort(String why, Throwable e) {
396      LOG.warn("DumpReplicationQueue received abort, ignoring.  Reason: " + why);
397      if (LOG.isDebugEnabled()) {
398        LOG.debug(e.toString(), e);
399      }
400    }
401
402    @Override
403    public boolean isAborted() {
404      return false;
405    }
406  }
407}