001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.LinkedList;
025import java.util.List;
026import java.util.Map;
027import java.util.Queue;
028import java.util.Set;
029import java.util.stream.Collectors;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.conf.Configured;
032import org.apache.hadoop.fs.FileStatus;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.hbase.Abortable;
035import org.apache.hadoop.hbase.HBaseConfiguration;
036import org.apache.hadoop.hbase.ServerName;
037import org.apache.hadoop.hbase.Stoppable;
038import org.apache.hadoop.hbase.client.Admin;
039import org.apache.hadoop.hbase.client.ClusterConnection;
040import org.apache.hadoop.hbase.client.ConnectionFactory;
041import org.apache.hadoop.hbase.client.HBaseAdmin;
042import org.apache.hadoop.hbase.client.replication.TableCFs;
043import org.apache.hadoop.hbase.io.WALLink;
044import org.apache.hadoop.hbase.procedure2.util.StringUtils;
045import org.apache.hadoop.hbase.replication.ReplicationFactory;
046import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
047import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
048import org.apache.hadoop.hbase.replication.ReplicationPeers;
049import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
050import org.apache.hadoop.hbase.replication.ReplicationQueues;
051import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
052import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
053import org.apache.hadoop.hbase.replication.ReplicationTracker;
054import org.apache.hadoop.hbase.zookeeper.ZKUtil;
055import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
056import org.apache.hadoop.util.Tool;
057import org.apache.hadoop.util.ToolRunner;
058import org.apache.yetus.audience.InterfaceAudience;
059import org.apache.zookeeper.KeeperException;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063import org.apache.hbase.thirdparty.com.google.common.util.concurrent.AtomicLongMap;
064
065/**
066 * Provides information about the existing states of replication, replication peers and queues.
067 *
068 * Usage: hbase org.apache.hadoop.hbase.replication.regionserver.DumpReplicationQueues [args]
069 * Arguments: --distributed    Polls each RS to dump information about the queue
070 *            --hdfs           Reports HDFS usage by the replication queues (note: can be overestimated).
071 */
072@InterfaceAudience.Private
073public class DumpReplicationQueues extends Configured implements Tool {
074
075  private static final Logger LOG = LoggerFactory.getLogger(DumpReplicationQueues.class.getName());
076
077  private List<String> deadRegionServers;
078  private List<String> deletedQueues;
079  private AtomicLongMap<String> peersQueueSize;
080  private long totalSizeOfWALs;
081  private long numWalsNotFound;
082
083  public DumpReplicationQueues() {
084    deadRegionServers = new ArrayList<>();
085    deletedQueues = new ArrayList<>();
086    peersQueueSize = AtomicLongMap.create();
087    totalSizeOfWALs = 0;
088    numWalsNotFound = 0;
089  }
090
091  static class DumpOptions {
092    boolean hdfs = false;
093    boolean distributed = false;
094
095    public DumpOptions() {
096    }
097
098    public DumpOptions(DumpOptions that) {
099      this.hdfs = that.hdfs;
100      this.distributed = that.distributed;
101    }
102
103    boolean isHdfs () {
104      return hdfs;
105    }
106
107    boolean isDistributed() {
108      return distributed;
109    }
110
111    void setHdfs (boolean hdfs) {
112      this.hdfs = hdfs;
113    }
114
115    void setDistributed(boolean distributed) {
116      this.distributed = distributed;
117    }
118  }
119
120  static DumpOptions parseOpts(Queue<String> args) {
121    DumpOptions opts = new DumpOptions();
122
123    String cmd = null;
124    while ((cmd = args.poll()) != null) {
125      if (cmd.equals("-h") || cmd.equals("--h") || cmd.equals("--help")) {
126        // place item back onto queue so that caller knows parsing was incomplete
127        args.add(cmd);
128        break;
129      }
130      final String hdfs = "--hdfs";
131      if (cmd.equals(hdfs)) {
132        opts.setHdfs(true);
133        continue;
134      }
135      final String distributed = "--distributed";
136      if (cmd.equals(distributed)) {
137        opts.setDistributed(true);
138        continue;
139      } else {
140        printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1);
141      }
142      // check that --distributed is present when --hdfs is in the arguments
143      if (!opts.isDistributed()  && opts.isHdfs()) {
144        printUsageAndExit("ERROR: --hdfs option can only be used with --distributed: " + cmd, -1);
145      }
146    }
147    return opts;
148  }
149
150  /**
151   * Main
152   *
153   * @param args
154   * @throws Exception
155   */
156  public static void main(String[] args) throws Exception {
157    Configuration conf = HBaseConfiguration.create();
158    int ret = ToolRunner.run(conf, new DumpReplicationQueues(), args);
159    System.exit(ret);
160  }
161
162  @Override
163  public int run(String[] args) throws Exception {
164
165    int errCode = -1;
166    LinkedList<String> argv = new LinkedList<>();
167    argv.addAll(Arrays.asList(args));
168    DumpOptions opts = parseOpts(argv);
169
170    // args remaining, print help and exit
171    if (!argv.isEmpty()) {
172      errCode = 0;
173      printUsage();
174      return errCode;
175    }
176    return dumpReplicationQueues(opts);
177  }
178
179  protected void printUsage() {
180    printUsage(this.getClass().getName(), null);
181  }
182
183  protected static void printUsage(final String message) {
184    printUsage(DumpReplicationQueues.class.getName(), message);
185  }
186
187  protected static void printUsage(final String className, final String message) {
188    if (message != null && message.length() > 0) {
189      System.err.println(message);
190    }
191    System.err.println("Usage: hbase " + className + " \\");
192    System.err.println("  <OPTIONS> [-D<property=value>]*");
193    System.err.println();
194    System.err.println("General Options:");
195    System.err.println(" -h|--h|--help  Show this help and exit.");
196    System.err.println(" --distributed  Poll each RS and print its own replication queue. "
197        + "Default only polls ZooKeeper");
198    System.err.println(" --hdfs         Use HDFS to calculate usage of WALs by replication."
199        + " It could be overestimated if replicating to multiple peers."
200        + " --distributed flag is also needed.");
201  }
202
203  protected static void printUsageAndExit(final String message, final int exitCode) {
204    printUsage(message);
205    System.exit(exitCode);
206  }
207
208  private int dumpReplicationQueues(DumpOptions opts) throws Exception {
209
210    Configuration conf = getConf();
211    HBaseAdmin.available(conf);
212    ClusterConnection connection = (ClusterConnection) ConnectionFactory.createConnection(conf);
213    Admin admin = connection.getAdmin();
214
215    ZKWatcher zkw = new ZKWatcher(conf, "DumpReplicationQueues" + System.currentTimeMillis(),
216        new WarnOnlyAbortable(), true);
217
218    try {
219      // Our zk watcher
220      LOG.info("Our Quorum: " + zkw.getQuorum());
221      List<TableCFs> replicatedTableCFs = admin.listReplicatedTableCFs();
222      if (replicatedTableCFs.isEmpty()) {
223        LOG.info("No tables with a configured replication peer were found.");
224        return(0);
225      } else {
226        LOG.info("Replicated Tables: " + replicatedTableCFs);
227      }
228
229      List<ReplicationPeerDescription> peers = admin.listReplicationPeers();
230
231      if (peers.isEmpty()) {
232        LOG.info("Replication is enabled but no peer configuration was found.");
233      }
234
235      System.out.println("Dumping replication peers and configurations:");
236      System.out.println(dumpPeersState(peers));
237
238      if (opts.isDistributed()) {
239        LOG.info("Found [--distributed], will poll each RegionServer.");
240        Set<String> peerIds = peers.stream().map((peer) -> peer.getPeerId())
241            .collect(Collectors.toSet());
242        System.out.println(dumpQueues(connection, zkw, peerIds, opts.isHdfs()));
243        System.out.println(dumpReplicationSummary());
244      } else {
245        // use ZK instead
246        System.out.print("Dumping replication znodes via ZooKeeper:");
247        System.out.println(ZKUtil.getReplicationZnodesDump(zkw));
248      }
249      return (0);
250    } catch (IOException e) {
251      return (-1);
252    } finally {
253      zkw.close();
254    }
255  }
256
257  public String dumpReplicationSummary() {
258    StringBuilder sb = new StringBuilder();
259    if (!deletedQueues.isEmpty()) {
260      sb.append("Found " + deletedQueues.size() + " deleted queues"
261          + ", run hbck -fixReplication in order to remove the deleted replication queues\n");
262      for (String deletedQueue : deletedQueues) {
263        sb.append("    " + deletedQueue + "\n");
264      }
265    }
266    if (!deadRegionServers.isEmpty()) {
267      sb.append("Found " + deadRegionServers.size() + " dead regionservers"
268          + ", restart one regionserver to transfer the queues of dead regionservers\n");
269      for (String deadRs : deadRegionServers) {
270        sb.append("    " + deadRs + "\n");
271      }
272    }
273    if (!peersQueueSize.isEmpty()) {
274      sb.append("Dumping all peers's number of WALs in replication queue\n");
275      for (Map.Entry<String, Long> entry : peersQueueSize.asMap().entrySet()) {
276        sb.append("    PeerId: " + entry.getKey() + " , sizeOfLogQueue: " + entry.getValue() + "\n");
277      }
278    }
279    sb.append("    Total size of WALs on HDFS: " + StringUtils.humanSize(totalSizeOfWALs) + "\n");
280    if (numWalsNotFound > 0) {
281      sb.append("    ERROR: There are " + numWalsNotFound + " WALs not found!!!\n");
282    }
283    return sb.toString();
284  }
285
286  public String dumpPeersState(List<ReplicationPeerDescription> peers) throws Exception {
287    Map<String, String> currentConf;
288    StringBuilder sb = new StringBuilder();
289    for (ReplicationPeerDescription peer : peers) {
290      ReplicationPeerConfig peerConfig = peer.getPeerConfig();
291      sb.append("Peer: " + peer.getPeerId() + "\n");
292      sb.append("    " + "State: " + (peer.isEnabled() ? "ENABLED" : "DISABLED") + "\n");
293      sb.append("    " + "Cluster Name: " + peerConfig.getClusterKey() + "\n");
294      sb.append("    " + "Replication Endpoint: " + peerConfig.getReplicationEndpointImpl() + "\n");
295      currentConf = peerConfig.getConfiguration();
296      // Only show when we have a custom configuration for the peer
297      if (currentConf.size() > 1) {
298        sb.append("    " + "Peer Configuration: " + currentConf + "\n");
299      }
300      sb.append("    " + "Peer Table CFs: " + peerConfig.getTableCFsMap() + "\n");
301      sb.append("    " + "Peer Namespaces: " + peerConfig.getNamespaces() + "\n");
302    }
303    return sb.toString();
304  }
305
306  public String dumpQueues(ClusterConnection connection, ZKWatcher zkw, Set<String> peerIds,
307                           boolean hdfs) throws Exception {
308    ReplicationQueuesClient queuesClient;
309    ReplicationPeers replicationPeers;
310    ReplicationQueues replicationQueues;
311    ReplicationTracker replicationTracker;
312    ReplicationQueuesClientArguments replicationArgs =
313        new ReplicationQueuesClientArguments(getConf(), new WarnOnlyAbortable(), zkw);
314    StringBuilder sb = new StringBuilder();
315
316    queuesClient = ReplicationFactory.getReplicationQueuesClient(replicationArgs);
317    queuesClient.init();
318    replicationQueues = ReplicationFactory.getReplicationQueues(replicationArgs);
319    replicationPeers = ReplicationFactory.getReplicationPeers(zkw, getConf(), queuesClient, connection);
320    replicationTracker = ReplicationFactory.getReplicationTracker(zkw, replicationPeers, getConf(),
321      new WarnOnlyAbortable(), new WarnOnlyStoppable());
322    List<String> liveRegionServers = replicationTracker.getListOfRegionServers();
323
324    // Loops each peer on each RS and dumps the queues
325    try {
326      List<String> regionservers = queuesClient.getListOfReplicators();
327      if (regionservers == null || regionservers.isEmpty()) {
328        return sb.toString();
329      }
330      for (String regionserver : regionservers) {
331        List<String> queueIds = queuesClient.getAllQueues(regionserver);
332        replicationQueues.init(regionserver);
333        if (!liveRegionServers.contains(regionserver)) {
334          deadRegionServers.add(regionserver);
335        }
336        for (String queueId : queueIds) {
337          ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
338          List<String> wals = queuesClient.getLogsInQueue(regionserver, queueId);
339          if (!peerIds.contains(queueInfo.getPeerId())) {
340            deletedQueues.add(regionserver + "/" + queueId);
341            sb.append(formatQueue(regionserver, replicationQueues, queueInfo, queueId, wals, true,
342              hdfs));
343          } else {
344            sb.append(formatQueue(regionserver, replicationQueues, queueInfo, queueId, wals, false,
345              hdfs));
346          }
347        }
348      }
349    } catch (KeeperException ke) {
350      throw new IOException(ke);
351    }
352    return sb.toString();
353  }
354
355  private String formatQueue(String regionserver, ReplicationQueues replicationQueues, ReplicationQueueInfo queueInfo,
356                           String queueId, List<String> wals, boolean isDeleted, boolean hdfs) throws Exception {
357
358    StringBuilder sb = new StringBuilder();
359
360    List<ServerName> deadServers;
361
362    sb.append("Dumping replication queue info for RegionServer: [" + regionserver + "]" + "\n");
363    sb.append("    Queue znode: " + queueId + "\n");
364    sb.append("    PeerID: " + queueInfo.getPeerId() + "\n");
365    sb.append("    Recovered: " + queueInfo.isQueueRecovered() + "\n");
366    deadServers = queueInfo.getDeadRegionServers();
367    if (deadServers.isEmpty()) {
368      sb.append("    No dead RegionServers found in this queue." + "\n");
369    } else {
370      sb.append("    Dead RegionServers: " + deadServers + "\n");
371    }
372    sb.append("    Was deleted: " + isDeleted + "\n");
373    sb.append("    Number of WALs in replication queue: " + wals.size() + "\n");
374    peersQueueSize.addAndGet(queueInfo.getPeerId(), wals.size());
375
376    for (String wal : wals) {
377      long position = replicationQueues.getLogPosition(queueInfo.getPeerId(), wal);
378      sb.append("    Replication position for " + wal + ": " + (position > 0 ? position : "0"
379          + " (not started or nothing to replicate)") + "\n");
380    }
381
382    if (hdfs) {
383      FileSystem fs = FileSystem.get(getConf());
384      sb.append("    Total size of WALs on HDFS for this queue: "
385          + StringUtils.humanSize(getTotalWALSize(fs, wals, regionserver)) + "\n");
386    }
387    return sb.toString();
388  }
389
390  /**
391   *  return total size in bytes from a list of WALs
392   */
393  private long getTotalWALSize(FileSystem fs, List<String> wals, String server) throws IOException {
394    long size = 0;
395    FileStatus fileStatus;
396
397    for (String wal : wals) {
398      try {
399        fileStatus = (new WALLink(getConf(), server, wal)).getFileStatus(fs);
400      } catch (IOException e) {
401        if (e instanceof FileNotFoundException) {
402          numWalsNotFound++;
403          LOG.warn("WAL " + wal + " couldn't be found, skipping", e);
404        } else {
405          LOG.warn("Can't get file status of WAL " + wal + ", skipping", e);
406        }
407        continue;
408      }
409      size += fileStatus.getLen();
410    }
411
412    totalSizeOfWALs += size;
413    return size;
414  }
415
416  private static class WarnOnlyAbortable implements Abortable {
417    @Override
418    public void abort(String why, Throwable e) {
419      LOG.warn("DumpReplicationQueue received abort, ignoring.  Reason: " + why);
420      if (LOG.isDebugEnabled()) {
421        LOG.debug(e.toString(), e);
422      }
423    }
424
425    @Override
426    public boolean isAborted() {
427      return false;
428    }
429  }
430
431  private static class WarnOnlyStoppable implements Stoppable {
432    @Override
433    public void stop(String why) {
434      LOG.warn("DumpReplicationQueue received stop, ignoring.  Reason: " + why);
435    }
436
437    @Override
438    public boolean isStopped() {
439      return false;
440    }
441  }
442}