001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.net.URLEncoder;
023import java.nio.charset.StandardCharsets;
024import java.util.ArrayList;
025import java.util.Arrays;
026import java.util.Collections;
027import java.util.Comparator;
028import java.util.LinkedList;
029import java.util.List;
030import java.util.Map;
031import java.util.Queue;
032import java.util.Set;
033import java.util.stream.Collectors;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.conf.Configured;
036import org.apache.hadoop.fs.FileStatus;
037import org.apache.hadoop.fs.FileSystem;
038import org.apache.hadoop.fs.Path;
039import org.apache.hadoop.hbase.HBaseConfiguration;
040import org.apache.hadoop.hbase.ServerName;
041import org.apache.hadoop.hbase.client.Admin;
042import org.apache.hadoop.hbase.client.Connection;
043import org.apache.hadoop.hbase.client.ConnectionFactory;
044import org.apache.hadoop.hbase.client.replication.TableCFs;
045import org.apache.hadoop.hbase.io.WALLink;
046import org.apache.hadoop.hbase.procedure2.util.StringUtils;
047import org.apache.hadoop.hbase.replication.ReplicationException;
048import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
049import org.apache.hadoop.hbase.replication.ReplicationOffsetUtil;
050import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
051import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
052import org.apache.hadoop.hbase.replication.ReplicationQueueData;
053import org.apache.hadoop.hbase.replication.ReplicationQueueId;
054import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
055import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
056import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
057import org.apache.hadoop.util.Tool;
058import org.apache.hadoop.util.ToolRunner;
059import org.apache.yetus.audience.InterfaceAudience;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062
063import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
064import org.apache.hbase.thirdparty.com.google.common.util.concurrent.AtomicLongMap;
065
066/**
067 * <p/>
068 * Provides information about the existing states of replication, replication peers and queues.
069 * Usage: hbase org.apache.hadoop.hbase.replication.regionserver.DumpReplicationQueues [args]
070 * Arguments: --distributed Polls each RS to dump information about the queue --hdfs Reports HDFS
071 * usage by the replication queues (note: can be overestimated). In the new version, we
072 * reimplemented the DumpReplicationQueues tool to support obtaining information from replication
073 * table.
074 */
075@InterfaceAudience.Private
076public class DumpReplicationQueues extends Configured implements Tool {
077
078  private static final Logger LOG = LoggerFactory.getLogger(DumpReplicationQueues.class.getName());
079
080  private List<String> deadRegionServers;
081  private List<String> deletedQueues;
082  private AtomicLongMap<String> peersQueueSize;
083  private long totalSizeOfWALs;
084  private long numWalsNotFound;
085
086  public DumpReplicationQueues() {
087    deadRegionServers = new ArrayList<>();
088    deletedQueues = new ArrayList<>();
089    peersQueueSize = AtomicLongMap.create();
090    totalSizeOfWALs = 0;
091    numWalsNotFound = 0;
092  }
093
094  static class DumpOptions {
095    boolean hdfs = false;
096    boolean distributed = false;
097
098    public DumpOptions() {
099    }
100
101    public DumpOptions(DumpOptions that) {
102      this.hdfs = that.hdfs;
103      this.distributed = that.distributed;
104    }
105
106    boolean isHdfs() {
107      return hdfs;
108    }
109
110    boolean isDistributed() {
111      return distributed;
112    }
113
114    void setHdfs(boolean hdfs) {
115      this.hdfs = hdfs;
116    }
117
118    void setDistributed(boolean distributed) {
119      this.distributed = distributed;
120    }
121  }
122
123  static DumpOptions parseOpts(Queue<String> args) {
124    DumpOptions opts = new DumpOptions();
125
126    String cmd = null;
127    while ((cmd = args.poll()) != null) {
128      if (cmd.equals("-h") || cmd.equals("--h") || cmd.equals("--help")) {
129        // place item back onto queue so that caller knows parsing was incomplete
130        args.add(cmd);
131        break;
132      }
133      final String hdfs = "--hdfs";
134      if (cmd.equals(hdfs)) {
135        opts.setHdfs(true);
136        continue;
137      }
138      final String distributed = "--distributed";
139      if (cmd.equals(distributed)) {
140        opts.setDistributed(true);
141        continue;
142      } else {
143        printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1);
144      }
145      // check that --distributed is present when --hdfs is in the arguments
146      if (!opts.isDistributed() && opts.isHdfs()) {
147        printUsageAndExit("ERROR: --hdfs option can only be used with --distributed: " + cmd, -1);
148      }
149    }
150    return opts;
151  }
152
153  /**
154   * Main
155   */
156  public static void main(String[] args) throws Exception {
157    Configuration conf = HBaseConfiguration.create();
158    int ret = ToolRunner.run(conf, new DumpReplicationQueues(), args);
159    System.exit(ret);
160  }
161
162  @Override
163  public int run(String[] args) throws Exception {
164
165    int errCode = -1;
166    LinkedList<String> argv = new LinkedList<>();
167    argv.addAll(Arrays.asList(args));
168    DumpOptions opts = parseOpts(argv);
169
170    // args remaining, print help and exit
171    if (!argv.isEmpty()) {
172      errCode = 0;
173      printUsage();
174      return errCode;
175    }
176    return dumpReplicationQueues(opts);
177  }
178
179  protected void printUsage() {
180    printUsage(this.getClass().getName(), null);
181  }
182
183  protected static void printUsage(final String message) {
184    printUsage(DumpReplicationQueues.class.getName(), message);
185  }
186
187  protected static void printUsage(final String className, final String message) {
188    if (message != null && message.length() > 0) {
189      System.err.println(message);
190    }
191    System.err.println("Usage: hbase " + className + " \\");
192    System.err.println("  <OPTIONS> [-D<property=value>]*");
193    System.err.println();
194    System.err.println("General Options:");
195    System.err.println(" -h|--h|--help  Show this help and exit.");
196    System.err.println(" --distributed  Poll each RS and print its own replication queue. "
197      + "Default only polls replication table.");
198    System.err.println(" --hdfs         Use HDFS to calculate usage of WALs by replication."
199      + " It could be overestimated if replicating to multiple peers."
200      + " --distributed flag is also needed.");
201  }
202
203  protected static void printUsageAndExit(final String message, final int exitCode) {
204    printUsage(message);
205    System.exit(exitCode);
206  }
207
208  private int dumpReplicationQueues(DumpOptions opts) throws Exception {
209    Configuration conf = getConf();
210    Connection connection = ConnectionFactory.createConnection(conf);
211    Admin admin = connection.getAdmin();
212
213    try {
214      List<TableCFs> replicatedTableCFs = admin.listReplicatedTableCFs();
215      if (replicatedTableCFs.isEmpty()) {
216        LOG.info("No tables with a configured replication peer were found.");
217        return (0);
218      } else {
219        LOG.info("Replicated Tables: " + replicatedTableCFs);
220      }
221
222      List<ReplicationPeerDescription> peers = admin.listReplicationPeers();
223
224      if (peers.isEmpty()) {
225        LOG.info("Replication is enabled but no peer configuration was found.");
226      }
227
228      System.out.println("Dumping replication peers and configurations:");
229      System.out.println(dumpPeersState(peers));
230
231      if (opts.isDistributed()) {
232        LOG.info("Found [--distributed], will poll each RegionServer.");
233        Set<String> peerIds =
234          peers.stream().map((peer) -> peer.getPeerId()).collect(Collectors.toSet());
235        System.out.println(dumpQueues(connection, peerIds, opts.isHdfs(), conf));
236        System.out.println(dumpReplicationSummary());
237      } else {
238        // use replication table instead
239        System.out.println("Dumping replication info via replication table.");
240        System.out.println(dumpReplicationViaTable(connection, conf));
241      }
242      return (0);
243    } catch (IOException e) {
244      return (-1);
245    } finally {
246      connection.close();
247    }
248  }
249
250  public String dumpReplicationViaTable(Connection connection, Configuration conf)
251    throws ReplicationException, IOException {
252    StringBuilder sb = new StringBuilder();
253    ReplicationQueueStorage queueStorage =
254      ReplicationStorageFactory.getReplicationQueueStorage(connection, conf);
255
256    // The dump info format is as follows:
257    // peers:
258    // peers/1: zk1:2181:/hbase
259    // peers/1/peer-state: ENABLED
260    // rs:
261    // rs/rs1,16020,1664092120094/1/rs1%2C16020%2C1664092120094.1664096778778: 123
262    // rs/rs2,16020,1664092120094/2/rs1%2C16020%2C1664092120094.1664096778778: 321
263    // hfile-refs:
264    // hfile-refs/1/hfile1,hfile2
265    // hfile-refs/2/hfile3,hfile4
266    String peersKey = "peers";
267    sb.append(peersKey).append(": ").append("\n");
268    List<ReplicationPeerDescription> repPeerDescs = connection.getAdmin().listReplicationPeers();
269    for (ReplicationPeerDescription repPeerDesc : repPeerDescs) {
270      sb.append(peersKey).append("/").append(repPeerDesc.getPeerId()).append(": ")
271        .append(repPeerDesc.getPeerConfig().getClusterKey()).append("\n");
272      sb.append(peersKey).append("/").append(repPeerDesc.getPeerId()).append("/peer-state: ")
273        .append(repPeerDesc.isEnabled() ? "ENABLED" : "DISABLED").append("\n");
274    }
275
276    List<ReplicationQueueData> repQueueDataList = queueStorage.listAllQueues();
277    String rsKey = "rs";
278    sb.append(rsKey).append(": ").append("\n");
279    for (ReplicationQueueData repQueueData : repQueueDataList) {
280      String peerId = repQueueData.getId().getPeerId();
281      for (ImmutableMap.Entry<String, ReplicationGroupOffset> entry : repQueueData.getOffsets()
282        .entrySet()) {
283        sb.append(rsKey).append("/").append(entry.getKey()).append("/").append(peerId).append("/")
284          .append(entry.getValue().getWal()).append(": ").append(entry.getValue().getOffset())
285          .append("\n");
286      }
287    }
288
289    List<String> peerIds = queueStorage.getAllPeersFromHFileRefsQueue();
290    String hfileKey = "hfile-refs";
291    sb.append(hfileKey).append(": ").append("\n");
292    for (String peerId : peerIds) {
293      List<String> hfiles = queueStorage.getReplicableHFiles(peerId);
294      sb.append(hfileKey).append("/").append(peerId).append("/").append(String.join(",", hfiles))
295        .append("\n");
296    }
297
298    return sb.toString();
299  }
300
301  public String dumpReplicationSummary() {
302    StringBuilder sb = new StringBuilder();
303    if (!deletedQueues.isEmpty()) {
304      sb.append("Found " + deletedQueues.size() + " deleted queues"
305        + ", run hbck -fixReplication in order to remove the deleted replication queues\n");
306      for (String deletedQueue : deletedQueues) {
307        sb.append("    " + deletedQueue + "\n");
308      }
309    }
310    if (!deadRegionServers.isEmpty()) {
311      sb.append("Found " + deadRegionServers.size() + " dead regionservers"
312        + ", restart one regionserver to transfer the queues of dead regionservers\n");
313      for (String deadRs : deadRegionServers) {
314        sb.append("    " + deadRs + "\n");
315      }
316    }
317    if (!peersQueueSize.isEmpty()) {
318      sb.append("Dumping all peers's number of WALs in replication queue\n");
319      for (Map.Entry<String, Long> entry : peersQueueSize.asMap().entrySet()) {
320        sb.append(
321          "    PeerId: " + entry.getKey() + " , sizeOfLogQueue: " + entry.getValue() + "\n");
322      }
323    }
324    sb.append("    Total size of WALs on HDFS: " + StringUtils.humanSize(totalSizeOfWALs) + "\n");
325    if (numWalsNotFound > 0) {
326      sb.append("    ERROR: There are " + numWalsNotFound + " WALs not found!!!\n");
327    }
328    return sb.toString();
329  }
330
331  public String dumpPeersState(List<ReplicationPeerDescription> peers) throws Exception {
332    Map<String, String> currentConf;
333    StringBuilder sb = new StringBuilder();
334    for (ReplicationPeerDescription peer : peers) {
335      ReplicationPeerConfig peerConfig = peer.getPeerConfig();
336      sb.append("Peer: " + peer.getPeerId() + "\n");
337      sb.append("    " + "State: " + (peer.isEnabled() ? "ENABLED" : "DISABLED") + "\n");
338      sb.append("    " + "Cluster Name: " + peerConfig.getClusterKey() + "\n");
339      sb.append("    " + "Replication Endpoint: " + peerConfig.getReplicationEndpointImpl() + "\n");
340      currentConf = peerConfig.getConfiguration();
341      // Only show when we have a custom configuration for the peer
342      if (currentConf.size() > 1) {
343        sb.append("    " + "Peer Configuration: " + currentConf + "\n");
344      }
345      sb.append("    " + "Peer Table CFs: " + peerConfig.getTableCFsMap() + "\n");
346      sb.append("    " + "Peer Namespaces: " + peerConfig.getNamespaces() + "\n");
347    }
348    return sb.toString();
349  }
350
351  public String dumpQueues(Connection connection, Set<String> peerIds, boolean hdfs,
352    Configuration conf) throws Exception {
353    StringBuilder sb = new StringBuilder();
354    ReplicationQueueStorage queueStorage =
355      ReplicationStorageFactory.getReplicationQueueStorage(connection, conf);
356
357    Set<ServerName> liveRegionServers =
358      connection.getAdmin().getClusterMetrics().getLiveServerMetrics().keySet();
359
360    List<ServerName> regionServers = queueStorage.listAllReplicators();
361    if (regionServers == null || regionServers.isEmpty()) {
362      return sb.toString();
363    }
364    for (ServerName regionServer : regionServers) {
365      List<ReplicationQueueId> queueIds = queueStorage.listAllQueueIds(regionServer);
366
367      if (!liveRegionServers.contains(regionServer)) {
368        deadRegionServers.add(regionServer.getServerName());
369      }
370      for (ReplicationQueueId queueId : queueIds) {
371        List<String> tmpWals = new ArrayList<>();
372        // wals
373        AbstractFSWALProvider
374          .getWALFiles(connection.getConfiguration(), queueId.getServerWALsBelongTo()).stream()
375          .map(Path::toString).forEach(tmpWals::add);
376
377        // old wals
378        AbstractFSWALProvider.getArchivedWALFiles(connection.getConfiguration(),
379          queueId.getServerWALsBelongTo(), URLEncoder
380            .encode(queueId.getServerWALsBelongTo().toString(), StandardCharsets.UTF_8.name()))
381          .stream().map(Path::toString).forEach(tmpWals::add);
382
383        Map<String, ReplicationGroupOffset> offsets = queueStorage.getOffsets(queueId);
384        // filter out the wal files that should replicate
385        List<String> wals = new ArrayList<>();
386        for (Map.Entry<String, ReplicationGroupOffset> entry : offsets.entrySet()) {
387          ReplicationGroupOffset offset = entry.getValue();
388          for (String wal : tmpWals) {
389            if (ReplicationOffsetUtil.shouldReplicate(offset, wal)) {
390              wals.add(wal);
391            }
392          }
393        }
394        Collections.sort(wals, Comparator.comparingLong(AbstractFSWALProvider::getTimestamp));
395        if (!peerIds.contains(queueId.getPeerId())) {
396          deletedQueues.add(regionServer + "/" + queueId);
397          sb.append(formatQueue(regionServer, offsets, wals, queueId, true, hdfs));
398        } else {
399          sb.append(formatQueue(regionServer, offsets, wals, queueId, false, hdfs));
400        }
401      }
402    }
403    return sb.toString();
404  }
405
406  private String formatQueue(ServerName regionServer, Map<String, ReplicationGroupOffset> offsets,
407    List<String> wals, ReplicationQueueId queueId, boolean isDeleted, boolean hdfs)
408    throws Exception {
409    StringBuilder sb = new StringBuilder();
410
411    sb.append("Dumping replication queue info for RegionServer: [" + regionServer + "]" + "\n");
412    sb.append("    Queue id: " + queueId + "\n");
413    sb.append("    PeerID: " + queueId.getPeerId() + "\n");
414    sb.append("    Recovered: " + queueId.isRecovered() + "\n");
415    // In new version, we only record the first dead RegionServer in queueId.
416    if (queueId.getSourceServerName().isPresent()) {
417      sb.append("    Dead RegionServer: " + queueId.getSourceServerName().get() + "\n");
418    } else {
419      sb.append("    No dead RegionServer found in this queue." + "\n");
420    }
421    sb.append("    Was deleted: " + isDeleted + "\n");
422    sb.append("    Number of WALs in replication queue: " + wals.size() + "\n");
423    peersQueueSize.addAndGet(queueId.getPeerId(), wals.size());
424
425    for (Map.Entry<String, ReplicationGroupOffset> entry : offsets.entrySet()) {
426      String walGroup = entry.getKey();
427      ReplicationGroupOffset offset = entry.getValue();
428      for (String wal : wals) {
429        long position = 0;
430        if (offset.getWal().equals(wal)) {
431          position = offset.getOffset();
432        }
433        sb.append(
434          " Replication position for " + (walGroup != null ? walGroup + "/" + wal : wal) + ": ");
435        if (position == 0) {
436          sb.append("0 (not started or nothing to replicate)");
437        } else if (position > 0) {
438          sb.append(position);
439        }
440        sb.append("\n");
441      }
442    }
443
444    if (hdfs) {
445      FileSystem fs = FileSystem.get(getConf());
446      sb.append("    Total size of WALs on HDFS for this queue: "
447        + StringUtils.humanSize(getTotalWALSize(fs, wals, regionServer)) + "\n");
448    }
449    return sb.toString();
450  }
451
452  /**
453   * return total size in bytes from a list of WALs
454   */
455  private long getTotalWALSize(FileSystem fs, List<String> wals, ServerName server) {
456    long size = 0;
457    FileStatus fileStatus;
458
459    for (String wal : wals) {
460      try {
461        fileStatus = (new WALLink(getConf(), server.getServerName(), wal)).getFileStatus(fs);
462      } catch (IOException e) {
463        if (e instanceof FileNotFoundException) {
464          numWalsNotFound++;
465          LOG.warn("WAL " + wal + " couldn't be found, skipping", e);
466        } else {
467          LOG.warn("Can't get file status of WAL " + wal + ", skipping", e);
468        }
469        continue;
470      }
471      size += fileStatus.getLen();
472    }
473
474    totalSizeOfWALs += size;
475    return size;
476  }
477}