001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Arrays;
024import java.util.Collections;
025import java.util.HashSet;
026import java.util.LinkedList;
027import java.util.List;
028import java.util.Map;
029import java.util.Queue;
030import java.util.Set;
031import java.util.stream.Collectors;
032import org.apache.hadoop.conf.Configuration;
033import org.apache.hadoop.conf.Configured;
034import org.apache.hadoop.fs.FileStatus;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.hbase.Abortable;
037import org.apache.hadoop.hbase.HBaseConfiguration;
038import org.apache.hadoop.hbase.ServerName;
039import org.apache.hadoop.hbase.Stoppable;
040import org.apache.hadoop.hbase.client.Admin;
041import org.apache.hadoop.hbase.client.Connection;
042import org.apache.hadoop.hbase.client.ConnectionFactory;
043import org.apache.hadoop.hbase.client.replication.TableCFs;
044import org.apache.hadoop.hbase.io.WALLink;
045import org.apache.hadoop.hbase.procedure2.util.StringUtils;
046import org.apache.hadoop.hbase.replication.ReplicationFactory;
047import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
048import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
049import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
050import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
051import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
052import org.apache.hadoop.hbase.replication.ReplicationTracker;
053import org.apache.hadoop.hbase.zookeeper.ZKUtil;
054import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
055import org.apache.hadoop.util.Tool;
056import org.apache.hadoop.util.ToolRunner;
057import org.apache.yetus.audience.InterfaceAudience;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061import org.apache.hbase.thirdparty.com.google.common.util.concurrent.AtomicLongMap;
062
063/**
064 * Provides information about the existing states of replication, replication peers and queues.
065 *
066 * Usage: hbase org.apache.hadoop.hbase.replication.regionserver.DumpReplicationQueues [args]
067 * Arguments: --distributed    Polls each RS to dump information about the queue
068 *            --hdfs           Reports HDFS usage by the replication queues (note: can be overestimated).
069 */
070@InterfaceAudience.Private
071public class DumpReplicationQueues extends Configured implements Tool {
072
073  private static final Logger LOG = LoggerFactory.getLogger(DumpReplicationQueues.class.getName());
074
075  private List<String> deadRegionServers;
076  private List<String> deletedQueues;
077  private AtomicLongMap<String> peersQueueSize;
078  private long totalSizeOfWALs;
079  private long numWalsNotFound;
080
081  public DumpReplicationQueues() {
082    deadRegionServers = new ArrayList<>();
083    deletedQueues = new ArrayList<>();
084    peersQueueSize = AtomicLongMap.create();
085    totalSizeOfWALs = 0;
086    numWalsNotFound = 0;
087  }
088
089  static class DumpOptions {
090    boolean hdfs = false;
091    boolean distributed = false;
092
093    public DumpOptions() {
094    }
095
096    public DumpOptions(DumpOptions that) {
097      this.hdfs = that.hdfs;
098      this.distributed = that.distributed;
099    }
100
101    boolean isHdfs () {
102      return hdfs;
103    }
104
105    boolean isDistributed() {
106      return distributed;
107    }
108
109    void setHdfs (boolean hdfs) {
110      this.hdfs = hdfs;
111    }
112
113    void setDistributed(boolean distributed) {
114      this.distributed = distributed;
115    }
116  }
117
118  static DumpOptions parseOpts(Queue<String> args) {
119    DumpOptions opts = new DumpOptions();
120
121    String cmd = null;
122    while ((cmd = args.poll()) != null) {
123      if (cmd.equals("-h") || cmd.equals("--h") || cmd.equals("--help")) {
124        // place item back onto queue so that caller knows parsing was incomplete
125        args.add(cmd);
126        break;
127      }
128      final String hdfs = "--hdfs";
129      if (cmd.equals(hdfs)) {
130        opts.setHdfs(true);
131        continue;
132      }
133      final String distributed = "--distributed";
134      if (cmd.equals(distributed)) {
135        opts.setDistributed(true);
136        continue;
137      } else {
138        printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1);
139      }
140      // check that --distributed is present when --hdfs is in the arguments
141      if (!opts.isDistributed()  && opts.isHdfs()) {
142        printUsageAndExit("ERROR: --hdfs option can only be used with --distributed: " + cmd, -1);
143      }
144    }
145    return opts;
146  }
147
148  /**
149   * Main
150   *
151   * @param args
152   * @throws Exception
153   */
154  public static void main(String[] args) throws Exception {
155    Configuration conf = HBaseConfiguration.create();
156    int ret = ToolRunner.run(conf, new DumpReplicationQueues(), args);
157    System.exit(ret);
158  }
159
160  @Override
161  public int run(String[] args) throws Exception {
162
163    int errCode = -1;
164    LinkedList<String> argv = new LinkedList<>();
165    argv.addAll(Arrays.asList(args));
166    DumpOptions opts = parseOpts(argv);
167
168    // args remaining, print help and exit
169    if (!argv.isEmpty()) {
170      errCode = 0;
171      printUsage();
172      return errCode;
173    }
174    return dumpReplicationQueues(opts);
175  }
176
177  protected void printUsage() {
178    printUsage(this.getClass().getName(), null);
179  }
180
181  protected static void printUsage(final String message) {
182    printUsage(DumpReplicationQueues.class.getName(), message);
183  }
184
185  protected static void printUsage(final String className, final String message) {
186    if (message != null && message.length() > 0) {
187      System.err.println(message);
188    }
189    System.err.println("Usage: hbase " + className + " \\");
190    System.err.println("  <OPTIONS> [-D<property=value>]*");
191    System.err.println();
192    System.err.println("General Options:");
193    System.err.println(" -h|--h|--help  Show this help and exit.");
194    System.err.println(" --distributed  Poll each RS and print its own replication queue. "
195        + "Default only polls ZooKeeper");
196    System.err.println(" --hdfs         Use HDFS to calculate usage of WALs by replication."
197        + " It could be overestimated if replicating to multiple peers."
198        + " --distributed flag is also needed.");
199  }
200
201  protected static void printUsageAndExit(final String message, final int exitCode) {
202    printUsage(message);
203    System.exit(exitCode);
204  }
205
206  private int dumpReplicationQueues(DumpOptions opts) throws Exception {
207    Configuration conf = getConf();
208    Connection connection = ConnectionFactory.createConnection(conf);
209    Admin admin = connection.getAdmin();
210
211    ZKWatcher zkw = new ZKWatcher(conf, "DumpReplicationQueues" + System.currentTimeMillis(),
212        new WarnOnlyAbortable(), true);
213
214    try {
215      // Our zk watcher
216      LOG.info("Our Quorum: " + zkw.getQuorum());
217      List<TableCFs> replicatedTableCFs = admin.listReplicatedTableCFs();
218      if (replicatedTableCFs.isEmpty()) {
219        LOG.info("No tables with a configured replication peer were found.");
220        return(0);
221      } else {
222        LOG.info("Replicated Tables: " + replicatedTableCFs);
223      }
224
225      List<ReplicationPeerDescription> peers = admin.listReplicationPeers();
226
227      if (peers.isEmpty()) {
228        LOG.info("Replication is enabled but no peer configuration was found.");
229      }
230
231      System.out.println("Dumping replication peers and configurations:");
232      System.out.println(dumpPeersState(peers));
233
234      if (opts.isDistributed()) {
235        LOG.info("Found [--distributed], will poll each RegionServer.");
236        Set<String> peerIds = peers.stream().map((peer) -> peer.getPeerId())
237            .collect(Collectors.toSet());
238        System.out.println(dumpQueues(zkw, peerIds, opts.isHdfs()));
239        System.out.println(dumpReplicationSummary());
240      } else {
241        // use ZK instead
242        System.out.print("Dumping replication znodes via ZooKeeper:");
243        System.out.println(ZKUtil.getReplicationZnodesDump(zkw));
244      }
245      return (0);
246    } catch (IOException e) {
247      return (-1);
248    } finally {
249      zkw.close();
250    }
251  }
252
253  public String dumpReplicationSummary() {
254    StringBuilder sb = new StringBuilder();
255    if (!deletedQueues.isEmpty()) {
256      sb.append("Found " + deletedQueues.size() + " deleted queues"
257          + ", run hbck -fixReplication in order to remove the deleted replication queues\n");
258      for (String deletedQueue : deletedQueues) {
259        sb.append("    " + deletedQueue + "\n");
260      }
261    }
262    if (!deadRegionServers.isEmpty()) {
263      sb.append("Found " + deadRegionServers.size() + " dead regionservers"
264          + ", restart one regionserver to transfer the queues of dead regionservers\n");
265      for (String deadRs : deadRegionServers) {
266        sb.append("    " + deadRs + "\n");
267      }
268    }
269    if (!peersQueueSize.isEmpty()) {
270      sb.append("Dumping all peers's number of WALs in replication queue\n");
271      for (Map.Entry<String, Long> entry : peersQueueSize.asMap().entrySet()) {
272        sb.append("    PeerId: " + entry.getKey() + " , sizeOfLogQueue: " + entry.getValue() + "\n");
273      }
274    }
275    sb.append("    Total size of WALs on HDFS: " + StringUtils.humanSize(totalSizeOfWALs) + "\n");
276    if (numWalsNotFound > 0) {
277      sb.append("    ERROR: There are " + numWalsNotFound + " WALs not found!!!\n");
278    }
279    return sb.toString();
280  }
281
282  public String dumpPeersState(List<ReplicationPeerDescription> peers) throws Exception {
283    Map<String, String> currentConf;
284    StringBuilder sb = new StringBuilder();
285    for (ReplicationPeerDescription peer : peers) {
286      ReplicationPeerConfig peerConfig = peer.getPeerConfig();
287      sb.append("Peer: " + peer.getPeerId() + "\n");
288      sb.append("    " + "State: " + (peer.isEnabled() ? "ENABLED" : "DISABLED") + "\n");
289      sb.append("    " + "Cluster Name: " + peerConfig.getClusterKey() + "\n");
290      sb.append("    " + "Replication Endpoint: " + peerConfig.getReplicationEndpointImpl() + "\n");
291      currentConf = peerConfig.getConfiguration();
292      // Only show when we have a custom configuration for the peer
293      if (currentConf.size() > 1) {
294        sb.append("    " + "Peer Configuration: " + currentConf + "\n");
295      }
296      sb.append("    " + "Peer Table CFs: " + peerConfig.getTableCFsMap() + "\n");
297      sb.append("    " + "Peer Namespaces: " + peerConfig.getNamespaces() + "\n");
298    }
299    return sb.toString();
300  }
301
302  public String dumpQueues(ZKWatcher zkw, Set<String> peerIds,
303      boolean hdfs) throws Exception {
304    ReplicationQueueStorage queueStorage;
305    ReplicationTracker replicationTracker;
306    StringBuilder sb = new StringBuilder();
307
308    queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf());
309    replicationTracker = ReplicationFactory.getReplicationTracker(zkw, new WarnOnlyAbortable(),
310      new WarnOnlyStoppable());
311    Set<String> liveRegionServers = new HashSet<>(replicationTracker.getListOfRegionServers());
312
313    // Loops each peer on each RS and dumps the queues
314    List<ServerName> regionservers = queueStorage.getListOfReplicators();
315    if (regionservers == null || regionservers.isEmpty()) {
316      return sb.toString();
317    }
318    for (ServerName regionserver : regionservers) {
319      List<String> queueIds = queueStorage.getAllQueues(regionserver);
320      if (!liveRegionServers.contains(regionserver.getServerName())) {
321        deadRegionServers.add(regionserver.getServerName());
322      }
323      for (String queueId : queueIds) {
324        ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
325        List<String> wals = queueStorage.getWALsInQueue(regionserver, queueId);
326        Collections.sort(wals);
327        if (!peerIds.contains(queueInfo.getPeerId())) {
328          deletedQueues.add(regionserver + "/" + queueId);
329          sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, true, hdfs));
330        } else {
331          sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, false, hdfs));
332        }
333      }
334    }
335    return sb.toString();
336  }
337
338  private String formatQueue(ServerName regionserver, ReplicationQueueStorage queueStorage,
339      ReplicationQueueInfo queueInfo, String queueId, List<String> wals, boolean isDeleted,
340      boolean hdfs) throws Exception {
341    StringBuilder sb = new StringBuilder();
342
343    List<ServerName> deadServers;
344
345    sb.append("Dumping replication queue info for RegionServer: [" + regionserver + "]" + "\n");
346    sb.append("    Queue znode: " + queueId + "\n");
347    sb.append("    PeerID: " + queueInfo.getPeerId() + "\n");
348    sb.append("    Recovered: " + queueInfo.isQueueRecovered() + "\n");
349    deadServers = queueInfo.getDeadRegionServers();
350    if (deadServers.isEmpty()) {
351      sb.append("    No dead RegionServers found in this queue." + "\n");
352    } else {
353      sb.append("    Dead RegionServers: " + deadServers + "\n");
354    }
355    sb.append("    Was deleted: " + isDeleted + "\n");
356    sb.append("    Number of WALs in replication queue: " + wals.size() + "\n");
357    peersQueueSize.addAndGet(queueInfo.getPeerId(), wals.size());
358
359    for (String wal : wals) {
360      long position = queueStorage.getWALPosition(regionserver, queueInfo.getPeerId(), wal);
361      sb.append("    Replication position for " + wal + ": " + (position > 0 ? position : "0"
362          + " (not started or nothing to replicate)") + "\n");
363    }
364
365    if (hdfs) {
366      FileSystem fs = FileSystem.get(getConf());
367      sb.append("    Total size of WALs on HDFS for this queue: "
368          + StringUtils.humanSize(getTotalWALSize(fs, wals, regionserver)) + "\n");
369    }
370    return sb.toString();
371  }
372
373  /**
374   *  return total size in bytes from a list of WALs
375   */
376  private long getTotalWALSize(FileSystem fs, List<String> wals, ServerName server)
377      throws IOException {
378    long size = 0;
379    FileStatus fileStatus;
380
381    for (String wal : wals) {
382      try {
383        fileStatus = (new WALLink(getConf(), server.getServerName(), wal)).getFileStatus(fs);
384      } catch (IOException e) {
385        if (e instanceof FileNotFoundException) {
386          numWalsNotFound++;
387          LOG.warn("WAL " + wal + " couldn't be found, skipping", e);
388        } else {
389          LOG.warn("Can't get file status of WAL " + wal + ", skipping", e);
390        }
391        continue;
392      }
393      size += fileStatus.getLen();
394    }
395
396    totalSizeOfWALs += size;
397    return size;
398  }
399
400  private static class WarnOnlyAbortable implements Abortable {
401    @Override
402    public void abort(String why, Throwable e) {
403      LOG.warn("DumpReplicationQueue received abort, ignoring.  Reason: " + why);
404      if (LOG.isDebugEnabled()) {
405        LOG.debug(e.toString(), e);
406      }
407    }
408
409    @Override
410    public boolean isAborted() {
411      return false;
412    }
413  }
414
415  private static class WarnOnlyStoppable implements Stoppable {
416    @Override
417    public void stop(String why) {
418      LOG.warn("DumpReplicationQueue received stop, ignoring.  Reason: " + why);
419    }
420
421    @Override
422    public boolean isStopped() {
423      return false;
424    }
425  }
426}