001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Arrays; 024import java.util.Collections; 025import java.util.HashSet; 026import java.util.LinkedList; 027import java.util.List; 028import java.util.Map; 029import java.util.Queue; 030import java.util.Set; 031import java.util.stream.Collectors; 032import org.apache.hadoop.conf.Configuration; 033import org.apache.hadoop.conf.Configured; 034import org.apache.hadoop.fs.FileStatus; 035import org.apache.hadoop.fs.FileSystem; 036import org.apache.hadoop.hbase.Abortable; 037import org.apache.hadoop.hbase.HBaseConfiguration; 038import org.apache.hadoop.hbase.ServerName; 039import org.apache.hadoop.hbase.Stoppable; 040import org.apache.hadoop.hbase.client.Admin; 041import org.apache.hadoop.hbase.client.ClusterConnection; 042import org.apache.hadoop.hbase.client.ConnectionFactory; 043import org.apache.hadoop.hbase.client.HBaseAdmin; 044import org.apache.hadoop.hbase.client.replication.TableCFs; 045import org.apache.hadoop.hbase.io.WALLink; 046import org.apache.hadoop.hbase.procedure2.util.StringUtils; 047import org.apache.hadoop.hbase.replication.ReplicationFactory; 048import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 049import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 050import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; 051import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 052import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 053import org.apache.hadoop.hbase.replication.ReplicationTracker; 054import org.apache.hadoop.hbase.zookeeper.ZKUtil; 055import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 056import org.apache.hadoop.util.Tool; 057import org.apache.hadoop.util.ToolRunner; 058import org.apache.yetus.audience.InterfaceAudience; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062import org.apache.hbase.thirdparty.com.google.common.util.concurrent.AtomicLongMap; 063 064/** 065 * Provides information about the existing states of replication, replication peers and queues. 066 * 067 * Usage: hbase org.apache.hadoop.hbase.replication.regionserver.DumpReplicationQueues [args] 068 * Arguments: --distributed Polls each RS to dump information about the queue 069 * --hdfs Reports HDFS usage by the replication queues (note: can be overestimated). 070 */ 071@InterfaceAudience.Private 072public class DumpReplicationQueues extends Configured implements Tool { 073 074 private static final Logger LOG = LoggerFactory.getLogger(DumpReplicationQueues.class.getName()); 075 076 private List<String> deadRegionServers; 077 private List<String> deletedQueues; 078 private AtomicLongMap<String> peersQueueSize; 079 private long totalSizeOfWALs; 080 private long numWalsNotFound; 081 082 public DumpReplicationQueues() { 083 deadRegionServers = new ArrayList<>(); 084 deletedQueues = new ArrayList<>(); 085 peersQueueSize = AtomicLongMap.create(); 086 totalSizeOfWALs = 0; 087 numWalsNotFound = 0; 088 } 089 090 static class DumpOptions { 091 boolean hdfs = false; 092 boolean distributed = false; 093 094 public DumpOptions() { 095 } 096 097 public DumpOptions(DumpOptions that) { 098 this.hdfs = that.hdfs; 099 this.distributed = that.distributed; 100 } 101 102 boolean isHdfs () { 103 return hdfs; 104 } 105 106 boolean isDistributed() { 107 return distributed; 108 } 109 110 void setHdfs (boolean hdfs) { 111 this.hdfs = hdfs; 112 } 113 114 void setDistributed(boolean distributed) { 115 this.distributed = distributed; 116 } 117 } 118 119 static DumpOptions parseOpts(Queue<String> args) { 120 DumpOptions opts = new DumpOptions(); 121 122 String cmd = null; 123 while ((cmd = args.poll()) != null) { 124 if (cmd.equals("-h") || cmd.equals("--h") || cmd.equals("--help")) { 125 // place item back onto queue so that caller knows parsing was incomplete 126 args.add(cmd); 127 break; 128 } 129 final String hdfs = "--hdfs"; 130 if (cmd.equals(hdfs)) { 131 opts.setHdfs(true); 132 continue; 133 } 134 final String distributed = "--distributed"; 135 if (cmd.equals(distributed)) { 136 opts.setDistributed(true); 137 continue; 138 } else { 139 printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1); 140 } 141 // check that --distributed is present when --hdfs is in the arguments 142 if (!opts.isDistributed() && opts.isHdfs()) { 143 printUsageAndExit("ERROR: --hdfs option can only be used with --distributed: " + cmd, -1); 144 } 145 } 146 return opts; 147 } 148 149 /** 150 * Main 151 * 152 * @param args 153 * @throws Exception 154 */ 155 public static void main(String[] args) throws Exception { 156 Configuration conf = HBaseConfiguration.create(); 157 int ret = ToolRunner.run(conf, new DumpReplicationQueues(), args); 158 System.exit(ret); 159 } 160 161 @Override 162 public int run(String[] args) throws Exception { 163 164 int errCode = -1; 165 LinkedList<String> argv = new LinkedList<>(); 166 argv.addAll(Arrays.asList(args)); 167 DumpOptions opts = parseOpts(argv); 168 169 // args remaining, print help and exit 170 if (!argv.isEmpty()) { 171 errCode = 0; 172 printUsage(); 173 return errCode; 174 } 175 return dumpReplicationQueues(opts); 176 } 177 178 protected void printUsage() { 179 printUsage(this.getClass().getName(), null); 180 } 181 182 protected static void printUsage(final String message) { 183 printUsage(DumpReplicationQueues.class.getName(), message); 184 } 185 186 protected static void printUsage(final String className, final String message) { 187 if (message != null && message.length() > 0) { 188 System.err.println(message); 189 } 190 System.err.println("Usage: hbase " + className + " \\"); 191 System.err.println(" <OPTIONS> [-D<property=value>]*"); 192 System.err.println(); 193 System.err.println("General Options:"); 194 System.err.println(" -h|--h|--help Show this help and exit."); 195 System.err.println(" --distributed Poll each RS and print its own replication queue. " 196 + "Default only polls ZooKeeper"); 197 System.err.println(" --hdfs Use HDFS to calculate usage of WALs by replication." 198 + " It could be overestimated if replicating to multiple peers." 199 + " --distributed flag is also needed."); 200 } 201 202 protected static void printUsageAndExit(final String message, final int exitCode) { 203 printUsage(message); 204 System.exit(exitCode); 205 } 206 207 private int dumpReplicationQueues(DumpOptions opts) throws Exception { 208 209 Configuration conf = getConf(); 210 HBaseAdmin.available(conf); 211 ClusterConnection connection = (ClusterConnection) ConnectionFactory.createConnection(conf); 212 Admin admin = connection.getAdmin(); 213 214 ZKWatcher zkw = new ZKWatcher(conf, "DumpReplicationQueues" + System.currentTimeMillis(), 215 new WarnOnlyAbortable(), true); 216 217 try { 218 // Our zk watcher 219 LOG.info("Our Quorum: " + zkw.getQuorum()); 220 List<TableCFs> replicatedTableCFs = admin.listReplicatedTableCFs(); 221 if (replicatedTableCFs.isEmpty()) { 222 LOG.info("No tables with a configured replication peer were found."); 223 return(0); 224 } else { 225 LOG.info("Replicated Tables: " + replicatedTableCFs); 226 } 227 228 List<ReplicationPeerDescription> peers = admin.listReplicationPeers(); 229 230 if (peers.isEmpty()) { 231 LOG.info("Replication is enabled but no peer configuration was found."); 232 } 233 234 System.out.println("Dumping replication peers and configurations:"); 235 System.out.println(dumpPeersState(peers)); 236 237 if (opts.isDistributed()) { 238 LOG.info("Found [--distributed], will poll each RegionServer."); 239 Set<String> peerIds = peers.stream().map((peer) -> peer.getPeerId()) 240 .collect(Collectors.toSet()); 241 System.out.println(dumpQueues(zkw, peerIds, opts.isHdfs())); 242 System.out.println(dumpReplicationSummary()); 243 } else { 244 // use ZK instead 245 System.out.print("Dumping replication znodes via ZooKeeper:"); 246 System.out.println(ZKUtil.getReplicationZnodesDump(zkw)); 247 } 248 return (0); 249 } catch (IOException e) { 250 return (-1); 251 } finally { 252 zkw.close(); 253 } 254 } 255 256 public String dumpReplicationSummary() { 257 StringBuilder sb = new StringBuilder(); 258 if (!deletedQueues.isEmpty()) { 259 sb.append("Found " + deletedQueues.size() + " deleted queues" 260 + ", run hbck -fixReplication in order to remove the deleted replication queues\n"); 261 for (String deletedQueue : deletedQueues) { 262 sb.append(" " + deletedQueue + "\n"); 263 } 264 } 265 if (!deadRegionServers.isEmpty()) { 266 sb.append("Found " + deadRegionServers.size() + " dead regionservers" 267 + ", restart one regionserver to transfer the queues of dead regionservers\n"); 268 for (String deadRs : deadRegionServers) { 269 sb.append(" " + deadRs + "\n"); 270 } 271 } 272 if (!peersQueueSize.isEmpty()) { 273 sb.append("Dumping all peers's number of WALs in replication queue\n"); 274 for (Map.Entry<String, Long> entry : peersQueueSize.asMap().entrySet()) { 275 sb.append(" PeerId: " + entry.getKey() + " , sizeOfLogQueue: " + entry.getValue() + "\n"); 276 } 277 } 278 sb.append(" Total size of WALs on HDFS: " + StringUtils.humanSize(totalSizeOfWALs) + "\n"); 279 if (numWalsNotFound > 0) { 280 sb.append(" ERROR: There are " + numWalsNotFound + " WALs not found!!!\n"); 281 } 282 return sb.toString(); 283 } 284 285 public String dumpPeersState(List<ReplicationPeerDescription> peers) throws Exception { 286 Map<String, String> currentConf; 287 StringBuilder sb = new StringBuilder(); 288 for (ReplicationPeerDescription peer : peers) { 289 ReplicationPeerConfig peerConfig = peer.getPeerConfig(); 290 sb.append("Peer: " + peer.getPeerId() + "\n"); 291 sb.append(" " + "State: " + (peer.isEnabled() ? "ENABLED" : "DISABLED") + "\n"); 292 sb.append(" " + "Cluster Name: " + peerConfig.getClusterKey() + "\n"); 293 sb.append(" " + "Replication Endpoint: " + peerConfig.getReplicationEndpointImpl() + "\n"); 294 currentConf = peerConfig.getConfiguration(); 295 // Only show when we have a custom configuration for the peer 296 if (currentConf.size() > 1) { 297 sb.append(" " + "Peer Configuration: " + currentConf + "\n"); 298 } 299 sb.append(" " + "Peer Table CFs: " + peerConfig.getTableCFsMap() + "\n"); 300 sb.append(" " + "Peer Namespaces: " + peerConfig.getNamespaces() + "\n"); 301 } 302 return sb.toString(); 303 } 304 305 public String dumpQueues(ZKWatcher zkw, Set<String> peerIds, 306 boolean hdfs) throws Exception { 307 ReplicationQueueStorage queueStorage; 308 ReplicationTracker replicationTracker; 309 StringBuilder sb = new StringBuilder(); 310 311 queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf()); 312 replicationTracker = ReplicationFactory.getReplicationTracker(zkw, new WarnOnlyAbortable(), 313 new WarnOnlyStoppable()); 314 Set<String> liveRegionServers = new HashSet<>(replicationTracker.getListOfRegionServers()); 315 316 // Loops each peer on each RS and dumps the queues 317 List<ServerName> regionservers = queueStorage.getListOfReplicators(); 318 if (regionservers == null || regionservers.isEmpty()) { 319 return sb.toString(); 320 } 321 for (ServerName regionserver : regionservers) { 322 List<String> queueIds = queueStorage.getAllQueues(regionserver); 323 if (!liveRegionServers.contains(regionserver.getServerName())) { 324 deadRegionServers.add(regionserver.getServerName()); 325 } 326 for (String queueId : queueIds) { 327 ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); 328 List<String> wals = queueStorage.getWALsInQueue(regionserver, queueId); 329 Collections.sort(wals); 330 if (!peerIds.contains(queueInfo.getPeerId())) { 331 deletedQueues.add(regionserver + "/" + queueId); 332 sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, true, hdfs)); 333 } else { 334 sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, false, hdfs)); 335 } 336 } 337 } 338 return sb.toString(); 339 } 340 341 private String formatQueue(ServerName regionserver, ReplicationQueueStorage queueStorage, 342 ReplicationQueueInfo queueInfo, String queueId, List<String> wals, boolean isDeleted, 343 boolean hdfs) throws Exception { 344 StringBuilder sb = new StringBuilder(); 345 346 List<ServerName> deadServers; 347 348 sb.append("Dumping replication queue info for RegionServer: [" + regionserver + "]" + "\n"); 349 sb.append(" Queue znode: " + queueId + "\n"); 350 sb.append(" PeerID: " + queueInfo.getPeerId() + "\n"); 351 sb.append(" Recovered: " + queueInfo.isQueueRecovered() + "\n"); 352 deadServers = queueInfo.getDeadRegionServers(); 353 if (deadServers.isEmpty()) { 354 sb.append(" No dead RegionServers found in this queue." + "\n"); 355 } else { 356 sb.append(" Dead RegionServers: " + deadServers + "\n"); 357 } 358 sb.append(" Was deleted: " + isDeleted + "\n"); 359 sb.append(" Number of WALs in replication queue: " + wals.size() + "\n"); 360 peersQueueSize.addAndGet(queueInfo.getPeerId(), wals.size()); 361 362 for (String wal : wals) { 363 long position = queueStorage.getWALPosition(regionserver, queueInfo.getPeerId(), wal); 364 sb.append(" Replication position for " + wal + ": " + (position > 0 ? position : "0" 365 + " (not started or nothing to replicate)") + "\n"); 366 } 367 368 if (hdfs) { 369 FileSystem fs = FileSystem.get(getConf()); 370 sb.append(" Total size of WALs on HDFS for this queue: " 371 + StringUtils.humanSize(getTotalWALSize(fs, wals, regionserver)) + "\n"); 372 } 373 return sb.toString(); 374 } 375 376 /** 377 * return total size in bytes from a list of WALs 378 */ 379 private long getTotalWALSize(FileSystem fs, List<String> wals, ServerName server) 380 throws IOException { 381 long size = 0; 382 FileStatus fileStatus; 383 384 for (String wal : wals) { 385 try { 386 fileStatus = (new WALLink(getConf(), server.getServerName(), wal)).getFileStatus(fs); 387 } catch (IOException e) { 388 if (e instanceof FileNotFoundException) { 389 numWalsNotFound++; 390 LOG.warn("WAL " + wal + " couldn't be found, skipping", e); 391 } else { 392 LOG.warn("Can't get file status of WAL " + wal + ", skipping", e); 393 } 394 continue; 395 } 396 size += fileStatus.getLen(); 397 } 398 399 totalSizeOfWALs += size; 400 return size; 401 } 402 403 private static class WarnOnlyAbortable implements Abortable { 404 @Override 405 public void abort(String why, Throwable e) { 406 LOG.warn("DumpReplicationQueue received abort, ignoring. Reason: " + why); 407 if (LOG.isDebugEnabled()) { 408 LOG.debug(e.toString(), e); 409 } 410 } 411 412 @Override 413 public boolean isAborted() { 414 return false; 415 } 416 } 417 418 private static class WarnOnlyStoppable implements Stoppable { 419 @Override 420 public void stop(String why) { 421 LOG.warn("DumpReplicationQueue received stop, ignoring. Reason: " + why); 422 } 423 424 @Override 425 public boolean isStopped() { 426 return false; 427 } 428 } 429}