001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Arrays; 024import java.util.LinkedList; 025import java.util.List; 026import java.util.Map; 027import java.util.Queue; 028import java.util.Set; 029import java.util.stream.Collectors; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.conf.Configured; 032import org.apache.hadoop.fs.FileStatus; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.hbase.Abortable; 035import org.apache.hadoop.hbase.HBaseConfiguration; 036import org.apache.hadoop.hbase.ServerName; 037import org.apache.hadoop.hbase.Stoppable; 038import org.apache.hadoop.hbase.client.Admin; 039import org.apache.hadoop.hbase.client.ClusterConnection; 040import org.apache.hadoop.hbase.client.ConnectionFactory; 041import org.apache.hadoop.hbase.client.HBaseAdmin; 042import org.apache.hadoop.hbase.client.replication.TableCFs; 043import org.apache.hadoop.hbase.io.WALLink; 044import org.apache.hadoop.hbase.procedure2.util.StringUtils; 045import org.apache.hadoop.hbase.replication.ReplicationFactory; 046import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 047import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 048import org.apache.hadoop.hbase.replication.ReplicationPeers; 049import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; 050import org.apache.hadoop.hbase.replication.ReplicationQueues; 051import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; 052import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments; 053import org.apache.hadoop.hbase.replication.ReplicationTracker; 054import org.apache.hadoop.hbase.zookeeper.ZKUtil; 055import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 056import org.apache.hadoop.util.Tool; 057import org.apache.hadoop.util.ToolRunner; 058import org.apache.yetus.audience.InterfaceAudience; 059import org.apache.zookeeper.KeeperException; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063import org.apache.hbase.thirdparty.com.google.common.util.concurrent.AtomicLongMap; 064 065/** 066 * Provides information about the existing states of replication, replication peers and queues. 067 * 068 * Usage: hbase org.apache.hadoop.hbase.replication.regionserver.DumpReplicationQueues [args] 069 * Arguments: --distributed Polls each RS to dump information about the queue 070 * --hdfs Reports HDFS usage by the replication queues (note: can be overestimated). 071 */ 072@InterfaceAudience.Private 073public class DumpReplicationQueues extends Configured implements Tool { 074 075 private static final Logger LOG = LoggerFactory.getLogger(DumpReplicationQueues.class.getName()); 076 077 private List<String> deadRegionServers; 078 private List<String> deletedQueues; 079 private AtomicLongMap<String> peersQueueSize; 080 private long totalSizeOfWALs; 081 private long numWalsNotFound; 082 083 public DumpReplicationQueues() { 084 deadRegionServers = new ArrayList<>(); 085 deletedQueues = new ArrayList<>(); 086 peersQueueSize = AtomicLongMap.create(); 087 totalSizeOfWALs = 0; 088 numWalsNotFound = 0; 089 } 090 091 static class DumpOptions { 092 boolean hdfs = false; 093 boolean distributed = false; 094 095 public DumpOptions() { 096 } 097 098 public DumpOptions(DumpOptions that) { 099 this.hdfs = that.hdfs; 100 this.distributed = that.distributed; 101 } 102 103 boolean isHdfs () { 104 return hdfs; 105 } 106 107 boolean isDistributed() { 108 return distributed; 109 } 110 111 void setHdfs (boolean hdfs) { 112 this.hdfs = hdfs; 113 } 114 115 void setDistributed(boolean distributed) { 116 this.distributed = distributed; 117 } 118 } 119 120 static DumpOptions parseOpts(Queue<String> args) { 121 DumpOptions opts = new DumpOptions(); 122 123 String cmd = null; 124 while ((cmd = args.poll()) != null) { 125 if (cmd.equals("-h") || cmd.equals("--h") || cmd.equals("--help")) { 126 // place item back onto queue so that caller knows parsing was incomplete 127 args.add(cmd); 128 break; 129 } 130 final String hdfs = "--hdfs"; 131 if (cmd.equals(hdfs)) { 132 opts.setHdfs(true); 133 continue; 134 } 135 final String distributed = "--distributed"; 136 if (cmd.equals(distributed)) { 137 opts.setDistributed(true); 138 continue; 139 } else { 140 printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1); 141 } 142 // check that --distributed is present when --hdfs is in the arguments 143 if (!opts.isDistributed() && opts.isHdfs()) { 144 printUsageAndExit("ERROR: --hdfs option can only be used with --distributed: " + cmd, -1); 145 } 146 } 147 return opts; 148 } 149 150 /** 151 * Main 152 * 153 * @param args 154 * @throws Exception 155 */ 156 public static void main(String[] args) throws Exception { 157 Configuration conf = HBaseConfiguration.create(); 158 int ret = ToolRunner.run(conf, new DumpReplicationQueues(), args); 159 System.exit(ret); 160 } 161 162 @Override 163 public int run(String[] args) throws Exception { 164 165 int errCode = -1; 166 LinkedList<String> argv = new LinkedList<>(); 167 argv.addAll(Arrays.asList(args)); 168 DumpOptions opts = parseOpts(argv); 169 170 // args remaining, print help and exit 171 if (!argv.isEmpty()) { 172 errCode = 0; 173 printUsage(); 174 return errCode; 175 } 176 return dumpReplicationQueues(opts); 177 } 178 179 protected void printUsage() { 180 printUsage(this.getClass().getName(), null); 181 } 182 183 protected static void printUsage(final String message) { 184 printUsage(DumpReplicationQueues.class.getName(), message); 185 } 186 187 protected static void printUsage(final String className, final String message) { 188 if (message != null && message.length() > 0) { 189 System.err.println(message); 190 } 191 System.err.println("Usage: hbase " + className + " \\"); 192 System.err.println(" <OPTIONS> [-D<property=value>]*"); 193 System.err.println(); 194 System.err.println("General Options:"); 195 System.err.println(" -h|--h|--help Show this help and exit."); 196 System.err.println(" --distributed Poll each RS and print its own replication queue. " 197 + "Default only polls ZooKeeper"); 198 System.err.println(" --hdfs Use HDFS to calculate usage of WALs by replication." 199 + " It could be overestimated if replicating to multiple peers." 200 + " --distributed flag is also needed."); 201 } 202 203 protected static void printUsageAndExit(final String message, final int exitCode) { 204 printUsage(message); 205 System.exit(exitCode); 206 } 207 208 private int dumpReplicationQueues(DumpOptions opts) throws Exception { 209 210 Configuration conf = getConf(); 211 HBaseAdmin.available(conf); 212 ClusterConnection connection = (ClusterConnection) ConnectionFactory.createConnection(conf); 213 Admin admin = connection.getAdmin(); 214 215 ZKWatcher zkw = new ZKWatcher(conf, "DumpReplicationQueues" + System.currentTimeMillis(), 216 new WarnOnlyAbortable(), true); 217 218 try { 219 // Our zk watcher 220 LOG.info("Our Quorum: " + zkw.getQuorum()); 221 List<TableCFs> replicatedTableCFs = admin.listReplicatedTableCFs(); 222 if (replicatedTableCFs.isEmpty()) { 223 LOG.info("No tables with a configured replication peer were found."); 224 return(0); 225 } else { 226 LOG.info("Replicated Tables: " + replicatedTableCFs); 227 } 228 229 List<ReplicationPeerDescription> peers = admin.listReplicationPeers(); 230 231 if (peers.isEmpty()) { 232 LOG.info("Replication is enabled but no peer configuration was found."); 233 } 234 235 System.out.println("Dumping replication peers and configurations:"); 236 System.out.println(dumpPeersState(peers)); 237 238 if (opts.isDistributed()) { 239 LOG.info("Found [--distributed], will poll each RegionServer."); 240 Set<String> peerIds = peers.stream().map((peer) -> peer.getPeerId()) 241 .collect(Collectors.toSet()); 242 System.out.println(dumpQueues(connection, zkw, peerIds, opts.isHdfs())); 243 System.out.println(dumpReplicationSummary()); 244 } else { 245 // use ZK instead 246 System.out.print("Dumping replication znodes via ZooKeeper:"); 247 System.out.println(ZKUtil.getReplicationZnodesDump(zkw)); 248 } 249 return (0); 250 } catch (IOException e) { 251 return (-1); 252 } finally { 253 zkw.close(); 254 } 255 } 256 257 public String dumpReplicationSummary() { 258 StringBuilder sb = new StringBuilder(); 259 if (!deletedQueues.isEmpty()) { 260 sb.append("Found " + deletedQueues.size() + " deleted queues" 261 + ", run hbck -fixReplication in order to remove the deleted replication queues\n"); 262 for (String deletedQueue : deletedQueues) { 263 sb.append(" " + deletedQueue + "\n"); 264 } 265 } 266 if (!deadRegionServers.isEmpty()) { 267 sb.append("Found " + deadRegionServers.size() + " dead regionservers" 268 + ", restart one regionserver to transfer the queues of dead regionservers\n"); 269 for (String deadRs : deadRegionServers) { 270 sb.append(" " + deadRs + "\n"); 271 } 272 } 273 if (!peersQueueSize.isEmpty()) { 274 sb.append("Dumping all peers's number of WALs in replication queue\n"); 275 for (Map.Entry<String, Long> entry : peersQueueSize.asMap().entrySet()) { 276 sb.append(" PeerId: " + entry.getKey() + " , sizeOfLogQueue: " + entry.getValue() + "\n"); 277 } 278 } 279 sb.append(" Total size of WALs on HDFS: " + StringUtils.humanSize(totalSizeOfWALs) + "\n"); 280 if (numWalsNotFound > 0) { 281 sb.append(" ERROR: There are " + numWalsNotFound + " WALs not found!!!\n"); 282 } 283 return sb.toString(); 284 } 285 286 public String dumpPeersState(List<ReplicationPeerDescription> peers) throws Exception { 287 Map<String, String> currentConf; 288 StringBuilder sb = new StringBuilder(); 289 for (ReplicationPeerDescription peer : peers) { 290 ReplicationPeerConfig peerConfig = peer.getPeerConfig(); 291 sb.append("Peer: " + peer.getPeerId() + "\n"); 292 sb.append(" " + "State: " + (peer.isEnabled() ? "ENABLED" : "DISABLED") + "\n"); 293 sb.append(" " + "Cluster Name: " + peerConfig.getClusterKey() + "\n"); 294 sb.append(" " + "Replication Endpoint: " + peerConfig.getReplicationEndpointImpl() + "\n"); 295 currentConf = peerConfig.getConfiguration(); 296 // Only show when we have a custom configuration for the peer 297 if (currentConf.size() > 1) { 298 sb.append(" " + "Peer Configuration: " + currentConf + "\n"); 299 } 300 sb.append(" " + "Peer Table CFs: " + peerConfig.getTableCFsMap() + "\n"); 301 sb.append(" " + "Peer Namespaces: " + peerConfig.getNamespaces() + "\n"); 302 } 303 return sb.toString(); 304 } 305 306 public String dumpQueues(ClusterConnection connection, ZKWatcher zkw, Set<String> peerIds, 307 boolean hdfs) throws Exception { 308 ReplicationQueuesClient queuesClient; 309 ReplicationPeers replicationPeers; 310 ReplicationQueues replicationQueues; 311 ReplicationTracker replicationTracker; 312 ReplicationQueuesClientArguments replicationArgs = 313 new ReplicationQueuesClientArguments(getConf(), new WarnOnlyAbortable(), zkw); 314 StringBuilder sb = new StringBuilder(); 315 316 queuesClient = ReplicationFactory.getReplicationQueuesClient(replicationArgs); 317 queuesClient.init(); 318 replicationQueues = ReplicationFactory.getReplicationQueues(replicationArgs); 319 replicationPeers = ReplicationFactory.getReplicationPeers(zkw, getConf(), queuesClient, connection); 320 replicationTracker = ReplicationFactory.getReplicationTracker(zkw, replicationPeers, getConf(), 321 new WarnOnlyAbortable(), new WarnOnlyStoppable()); 322 List<String> liveRegionServers = replicationTracker.getListOfRegionServers(); 323 324 // Loops each peer on each RS and dumps the queues 325 try { 326 List<String> regionservers = queuesClient.getListOfReplicators(); 327 if (regionservers == null || regionservers.isEmpty()) { 328 return sb.toString(); 329 } 330 for (String regionserver : regionservers) { 331 List<String> queueIds = queuesClient.getAllQueues(regionserver); 332 replicationQueues.init(regionserver); 333 if (!liveRegionServers.contains(regionserver)) { 334 deadRegionServers.add(regionserver); 335 } 336 for (String queueId : queueIds) { 337 ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); 338 List<String> wals = queuesClient.getLogsInQueue(regionserver, queueId); 339 if (!peerIds.contains(queueInfo.getPeerId())) { 340 deletedQueues.add(regionserver + "/" + queueId); 341 sb.append(formatQueue(regionserver, replicationQueues, queueInfo, queueId, wals, true, 342 hdfs)); 343 } else { 344 sb.append(formatQueue(regionserver, replicationQueues, queueInfo, queueId, wals, false, 345 hdfs)); 346 } 347 } 348 } 349 } catch (KeeperException ke) { 350 throw new IOException(ke); 351 } 352 return sb.toString(); 353 } 354 355 private String formatQueue(String regionserver, ReplicationQueues replicationQueues, ReplicationQueueInfo queueInfo, 356 String queueId, List<String> wals, boolean isDeleted, boolean hdfs) throws Exception { 357 358 StringBuilder sb = new StringBuilder(); 359 360 List<ServerName> deadServers; 361 362 sb.append("Dumping replication queue info for RegionServer: [" + regionserver + "]" + "\n"); 363 sb.append(" Queue znode: " + queueId + "\n"); 364 sb.append(" PeerID: " + queueInfo.getPeerId() + "\n"); 365 sb.append(" Recovered: " + queueInfo.isQueueRecovered() + "\n"); 366 deadServers = queueInfo.getDeadRegionServers(); 367 if (deadServers.isEmpty()) { 368 sb.append(" No dead RegionServers found in this queue." + "\n"); 369 } else { 370 sb.append(" Dead RegionServers: " + deadServers + "\n"); 371 } 372 sb.append(" Was deleted: " + isDeleted + "\n"); 373 sb.append(" Number of WALs in replication queue: " + wals.size() + "\n"); 374 peersQueueSize.addAndGet(queueInfo.getPeerId(), wals.size()); 375 376 for (String wal : wals) { 377 long position = replicationQueues.getLogPosition(queueInfo.getPeerId(), wal); 378 sb.append(" Replication position for " + wal + ": " + (position > 0 ? position : "0" 379 + " (not started or nothing to replicate)") + "\n"); 380 } 381 382 if (hdfs) { 383 FileSystem fs = FileSystem.get(getConf()); 384 sb.append(" Total size of WALs on HDFS for this queue: " 385 + StringUtils.humanSize(getTotalWALSize(fs, wals, regionserver)) + "\n"); 386 } 387 return sb.toString(); 388 } 389 390 /** 391 * return total size in bytes from a list of WALs 392 */ 393 private long getTotalWALSize(FileSystem fs, List<String> wals, String server) throws IOException { 394 long size = 0; 395 FileStatus fileStatus; 396 397 for (String wal : wals) { 398 try { 399 fileStatus = (new WALLink(getConf(), server, wal)).getFileStatus(fs); 400 } catch (IOException e) { 401 if (e instanceof FileNotFoundException) { 402 numWalsNotFound++; 403 LOG.warn("WAL " + wal + " couldn't be found, skipping", e); 404 } else { 405 LOG.warn("Can't get file status of WAL " + wal + ", skipping", e); 406 } 407 continue; 408 } 409 size += fileStatus.getLen(); 410 } 411 412 totalSizeOfWALs += size; 413 return size; 414 } 415 416 private static class WarnOnlyAbortable implements Abortable { 417 @Override 418 public void abort(String why, Throwable e) { 419 LOG.warn("DumpReplicationQueue received abort, ignoring. Reason: " + why); 420 if (LOG.isDebugEnabled()) { 421 LOG.debug(e.toString(), e); 422 } 423 } 424 425 @Override 426 public boolean isAborted() { 427 return false; 428 } 429 } 430 431 private static class WarnOnlyStoppable implements Stoppable { 432 @Override 433 public void stop(String why) { 434 LOG.warn("DumpReplicationQueue received stop, ignoring. Reason: " + why); 435 } 436 437 @Override 438 public boolean isStopped() { 439 return false; 440 } 441 } 442}