001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.Collections;
025import java.util.HashSet;
026import java.util.LinkedList;
027import java.util.List;
028import java.util.Map;
029import java.util.Queue;
030import java.util.Set;
031import java.util.stream.Collectors;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.conf.Configured;
034import org.apache.hadoop.fs.FileStatus;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.hbase.Abortable;
037import org.apache.hadoop.hbase.HBaseConfiguration;
038import org.apache.hadoop.hbase.ServerName;
039import org.apache.hadoop.hbase.Stoppable;
040import org.apache.hadoop.hbase.client.Admin;
041import org.apache.hadoop.hbase.client.ClusterConnection;
042import org.apache.hadoop.hbase.client.ConnectionFactory;
043import org.apache.hadoop.hbase.client.HBaseAdmin;
044import org.apache.hadoop.hbase.client.replication.TableCFs;
045import org.apache.hadoop.hbase.io.WALLink;
046import org.apache.hadoop.hbase.procedure2.util.StringUtils;
047import org.apache.hadoop.hbase.replication.ReplicationFactory;
048import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
049import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
050import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
051import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
052import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
053import org.apache.hadoop.hbase.replication.ReplicationTracker;
054import org.apache.hadoop.hbase.zookeeper.ZKUtil;
055import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
056import org.apache.hadoop.util.Tool;
057import org.apache.hadoop.util.ToolRunner;
058import org.apache.yetus.audience.InterfaceAudience;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062import org.apache.hbase.thirdparty.com.google.common.util.concurrent.AtomicLongMap;
063
064/**
065 * Provides information about the existing states of replication, replication peers and queues.
066 *
067 * Usage: hbase org.apache.hadoop.hbase.replication.regionserver.DumpReplicationQueues [args]
068 * Arguments: --distributed    Polls each RS to dump information about the queue
069 *            --hdfs           Reports HDFS usage by the replication queues (note: can be overestimated).
070 */
071@InterfaceAudience.Private
072public class DumpReplicationQueues extends Configured implements Tool {
073
074  private static final Logger LOG = LoggerFactory.getLogger(DumpReplicationQueues.class.getName());
075
076  private List<String> deadRegionServers;
077  private List<String> deletedQueues;
078  private AtomicLongMap<String> peersQueueSize;
079  private long totalSizeOfWALs;
080  private long numWalsNotFound;
081
082  public DumpReplicationQueues() {
083    deadRegionServers = new ArrayList<>();
084    deletedQueues = new ArrayList<>();
085    peersQueueSize = AtomicLongMap.create();
086    totalSizeOfWALs = 0;
087    numWalsNotFound = 0;
088  }
089
090  static class DumpOptions {
091    boolean hdfs = false;
092    boolean distributed = false;
093
094    public DumpOptions() {
095    }
096
097    public DumpOptions(DumpOptions that) {
098      this.hdfs = that.hdfs;
099      this.distributed = that.distributed;
100    }
101
102    boolean isHdfs () {
103      return hdfs;
104    }
105
106    boolean isDistributed() {
107      return distributed;
108    }
109
110    void setHdfs (boolean hdfs) {
111      this.hdfs = hdfs;
112    }
113
114    void setDistributed(boolean distributed) {
115      this.distributed = distributed;
116    }
117  }
118
119  static DumpOptions parseOpts(Queue<String> args) {
120    DumpOptions opts = new DumpOptions();
121
122    String cmd = null;
123    while ((cmd = args.poll()) != null) {
124      if (cmd.equals("-h") || cmd.equals("--h") || cmd.equals("--help")) {
125        // place item back onto queue so that caller knows parsing was incomplete
126        args.add(cmd);
127        break;
128      }
129      final String hdfs = "--hdfs";
130      if (cmd.equals(hdfs)) {
131        opts.setHdfs(true);
132        continue;
133      }
134      final String distributed = "--distributed";
135      if (cmd.equals(distributed)) {
136        opts.setDistributed(true);
137        continue;
138      } else {
139        printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1);
140      }
141      // check that --distributed is present when --hdfs is in the arguments
142      if (!opts.isDistributed()  && opts.isHdfs()) {
143        printUsageAndExit("ERROR: --hdfs option can only be used with --distributed: " + cmd, -1);
144      }
145    }
146    return opts;
147  }
148
149  /**
150   * Main
151   *
152   * @param args
153   * @throws Exception
154   */
155  public static void main(String[] args) throws Exception {
156    Configuration conf = HBaseConfiguration.create();
157    int ret = ToolRunner.run(conf, new DumpReplicationQueues(), args);
158    System.exit(ret);
159  }
160
161  @Override
162  public int run(String[] args) throws Exception {
163
164    int errCode = -1;
165    LinkedList<String> argv = new LinkedList<>();
166    argv.addAll(Arrays.asList(args));
167    DumpOptions opts = parseOpts(argv);
168
169    // args remaining, print help and exit
170    if (!argv.isEmpty()) {
171      errCode = 0;
172      printUsage();
173      return errCode;
174    }
175    return dumpReplicationQueues(opts);
176  }
177
178  protected void printUsage() {
179    printUsage(this.getClass().getName(), null);
180  }
181
182  protected static void printUsage(final String message) {
183    printUsage(DumpReplicationQueues.class.getName(), message);
184  }
185
186  protected static void printUsage(final String className, final String message) {
187    if (message != null && message.length() > 0) {
188      System.err.println(message);
189    }
190    System.err.println("Usage: hbase " + className + " \\");
191    System.err.println("  <OPTIONS> [-D<property=value>]*");
192    System.err.println();
193    System.err.println("General Options:");
194    System.err.println(" -h|--h|--help  Show this help and exit.");
195    System.err.println(" --distributed  Poll each RS and print its own replication queue. "
196        + "Default only polls ZooKeeper");
197    System.err.println(" --hdfs         Use HDFS to calculate usage of WALs by replication."
198        + " It could be overestimated if replicating to multiple peers."
199        + " --distributed flag is also needed.");
200  }
201
202  protected static void printUsageAndExit(final String message, final int exitCode) {
203    printUsage(message);
204    System.exit(exitCode);
205  }
206
207  private int dumpReplicationQueues(DumpOptions opts) throws Exception {
208
209    Configuration conf = getConf();
210    HBaseAdmin.available(conf);
211    ClusterConnection connection = (ClusterConnection) ConnectionFactory.createConnection(conf);
212    Admin admin = connection.getAdmin();
213
214    ZKWatcher zkw = new ZKWatcher(conf, "DumpReplicationQueues" + System.currentTimeMillis(),
215        new WarnOnlyAbortable(), true);
216
217    try {
218      // Our zk watcher
219      LOG.info("Our Quorum: " + zkw.getQuorum());
220      List<TableCFs> replicatedTableCFs = admin.listReplicatedTableCFs();
221      if (replicatedTableCFs.isEmpty()) {
222        LOG.info("No tables with a configured replication peer were found.");
223        return(0);
224      } else {
225        LOG.info("Replicated Tables: " + replicatedTableCFs);
226      }
227
228      List<ReplicationPeerDescription> peers = admin.listReplicationPeers();
229
230      if (peers.isEmpty()) {
231        LOG.info("Replication is enabled but no peer configuration was found.");
232      }
233
234      System.out.println("Dumping replication peers and configurations:");
235      System.out.println(dumpPeersState(peers));
236
237      if (opts.isDistributed()) {
238        LOG.info("Found [--distributed], will poll each RegionServer.");
239        Set<String> peerIds = peers.stream().map((peer) -> peer.getPeerId())
240            .collect(Collectors.toSet());
241        System.out.println(dumpQueues(zkw, peerIds, opts.isHdfs()));
242        System.out.println(dumpReplicationSummary());
243      } else {
244        // use ZK instead
245        System.out.print("Dumping replication znodes via ZooKeeper:");
246        System.out.println(ZKUtil.getReplicationZnodesDump(zkw));
247      }
248      return (0);
249    } catch (IOException e) {
250      return (-1);
251    } finally {
252      zkw.close();
253    }
254  }
255
256  public String dumpReplicationSummary() {
257    StringBuilder sb = new StringBuilder();
258    if (!deletedQueues.isEmpty()) {
259      sb.append("Found " + deletedQueues.size() + " deleted queues"
260          + ", run hbck -fixReplication in order to remove the deleted replication queues\n");
261      for (String deletedQueue : deletedQueues) {
262        sb.append("    " + deletedQueue + "\n");
263      }
264    }
265    if (!deadRegionServers.isEmpty()) {
266      sb.append("Found " + deadRegionServers.size() + " dead regionservers"
267          + ", restart one regionserver to transfer the queues of dead regionservers\n");
268      for (String deadRs : deadRegionServers) {
269        sb.append("    " + deadRs + "\n");
270      }
271    }
272    if (!peersQueueSize.isEmpty()) {
273      sb.append("Dumping all peers's number of WALs in replication queue\n");
274      for (Map.Entry<String, Long> entry : peersQueueSize.asMap().entrySet()) {
275        sb.append("    PeerId: " + entry.getKey() + " , sizeOfLogQueue: " + entry.getValue() + "\n");
276      }
277    }
278    sb.append("    Total size of WALs on HDFS: " + StringUtils.humanSize(totalSizeOfWALs) + "\n");
279    if (numWalsNotFound > 0) {
280      sb.append("    ERROR: There are " + numWalsNotFound + " WALs not found!!!\n");
281    }
282    return sb.toString();
283  }
284
285  public String dumpPeersState(List<ReplicationPeerDescription> peers) throws Exception {
286    Map<String, String> currentConf;
287    StringBuilder sb = new StringBuilder();
288    for (ReplicationPeerDescription peer : peers) {
289      ReplicationPeerConfig peerConfig = peer.getPeerConfig();
290      sb.append("Peer: " + peer.getPeerId() + "\n");
291      sb.append("    " + "State: " + (peer.isEnabled() ? "ENABLED" : "DISABLED") + "\n");
292      sb.append("    " + "Cluster Name: " + peerConfig.getClusterKey() + "\n");
293      sb.append("    " + "Replication Endpoint: " + peerConfig.getReplicationEndpointImpl() + "\n");
294      currentConf = peerConfig.getConfiguration();
295      // Only show when we have a custom configuration for the peer
296      if (currentConf.size() > 1) {
297        sb.append("    " + "Peer Configuration: " + currentConf + "\n");
298      }
299      sb.append("    " + "Peer Table CFs: " + peerConfig.getTableCFsMap() + "\n");
300      sb.append("    " + "Peer Namespaces: " + peerConfig.getNamespaces() + "\n");
301    }
302    return sb.toString();
303  }
304
305  public String dumpQueues(ZKWatcher zkw, Set<String> peerIds,
306      boolean hdfs) throws Exception {
307    ReplicationQueueStorage queueStorage;
308    ReplicationTracker replicationTracker;
309    StringBuilder sb = new StringBuilder();
310
311    queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf());
312    replicationTracker = ReplicationFactory.getReplicationTracker(zkw, new WarnOnlyAbortable(),
313      new WarnOnlyStoppable());
314    Set<String> liveRegionServers = new HashSet<>(replicationTracker.getListOfRegionServers());
315
316    // Loops each peer on each RS and dumps the queues
317    List<ServerName> regionservers = queueStorage.getListOfReplicators();
318    if (regionservers == null || regionservers.isEmpty()) {
319      return sb.toString();
320    }
321    for (ServerName regionserver : regionservers) {
322      List<String> queueIds = queueStorage.getAllQueues(regionserver);
323      if (!liveRegionServers.contains(regionserver.getServerName())) {
324        deadRegionServers.add(regionserver.getServerName());
325      }
326      for (String queueId : queueIds) {
327        ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
328        List<String> wals = queueStorage.getWALsInQueue(regionserver, queueId);
329        Collections.sort(wals);
330        if (!peerIds.contains(queueInfo.getPeerId())) {
331          deletedQueues.add(regionserver + "/" + queueId);
332          sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, true, hdfs));
333        } else {
334          sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, false, hdfs));
335        }
336      }
337    }
338    return sb.toString();
339  }
340
341  private String formatQueue(ServerName regionserver, ReplicationQueueStorage queueStorage,
342      ReplicationQueueInfo queueInfo, String queueId, List<String> wals, boolean isDeleted,
343      boolean hdfs) throws Exception {
344    StringBuilder sb = new StringBuilder();
345
346    List<ServerName> deadServers;
347
348    sb.append("Dumping replication queue info for RegionServer: [" + regionserver + "]" + "\n");
349    sb.append("    Queue znode: " + queueId + "\n");
350    sb.append("    PeerID: " + queueInfo.getPeerId() + "\n");
351    sb.append("    Recovered: " + queueInfo.isQueueRecovered() + "\n");
352    deadServers = queueInfo.getDeadRegionServers();
353    if (deadServers.isEmpty()) {
354      sb.append("    No dead RegionServers found in this queue." + "\n");
355    } else {
356      sb.append("    Dead RegionServers: " + deadServers + "\n");
357    }
358    sb.append("    Was deleted: " + isDeleted + "\n");
359    sb.append("    Number of WALs in replication queue: " + wals.size() + "\n");
360    peersQueueSize.addAndGet(queueInfo.getPeerId(), wals.size());
361
362    for (String wal : wals) {
363      long position = queueStorage.getWALPosition(regionserver, queueInfo.getPeerId(), wal);
364      sb.append("    Replication position for " + wal + ": " + (position > 0 ? position : "0"
365          + " (not started or nothing to replicate)") + "\n");
366    }
367
368    if (hdfs) {
369      FileSystem fs = FileSystem.get(getConf());
370      sb.append("    Total size of WALs on HDFS for this queue: "
371          + StringUtils.humanSize(getTotalWALSize(fs, wals, regionserver)) + "\n");
372    }
373    return sb.toString();
374  }
375
376  /**
377   *  return total size in bytes from a list of WALs
378   */
379  private long getTotalWALSize(FileSystem fs, List<String> wals, ServerName server)
380      throws IOException {
381    long size = 0;
382    FileStatus fileStatus;
383
384    for (String wal : wals) {
385      try {
386        fileStatus = (new WALLink(getConf(), server.getServerName(), wal)).getFileStatus(fs);
387      } catch (IOException e) {
388        if (e instanceof FileNotFoundException) {
389          numWalsNotFound++;
390          LOG.warn("WAL " + wal + " couldn't be found, skipping", e);
391        } else {
392          LOG.warn("Can't get file status of WAL " + wal + ", skipping", e);
393        }
394        continue;
395      }
396      size += fileStatus.getLen();
397    }
398
399    totalSizeOfWALs += size;
400    return size;
401  }
402
403  private static class WarnOnlyAbortable implements Abortable {
404    @Override
405    public void abort(String why, Throwable e) {
406      LOG.warn("DumpReplicationQueue received abort, ignoring.  Reason: " + why);
407      if (LOG.isDebugEnabled()) {
408        LOG.debug(e.toString(), e);
409      }
410    }
411
412    @Override
413    public boolean isAborted() {
414      return false;
415    }
416  }
417
418  private static class WarnOnlyStoppable implements Stoppable {
419    @Override
420    public void stop(String why) {
421      LOG.warn("DumpReplicationQueue received stop, ignoring.  Reason: " + why);
422    }
423
424    @Override
425    public boolean isStopped() {
426      return false;
427    }
428  }
429}