001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Arrays; 024import java.util.Collections; 025import java.util.LinkedList; 026import java.util.List; 027import java.util.Map; 028import java.util.Queue; 029import java.util.Set; 030import java.util.stream.Collectors; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.conf.Configured; 033import org.apache.hadoop.fs.FileStatus; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.hbase.Abortable; 036import org.apache.hadoop.hbase.HBaseConfiguration; 037import org.apache.hadoop.hbase.ServerName; 038import org.apache.hadoop.hbase.client.Admin; 039import org.apache.hadoop.hbase.client.Connection; 040import org.apache.hadoop.hbase.client.ConnectionFactory; 041import org.apache.hadoop.hbase.client.replication.TableCFs; 042import org.apache.hadoop.hbase.io.WALLink; 043import org.apache.hadoop.hbase.procedure2.util.StringUtils; 044import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 045import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 046import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; 047import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 048import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 049import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 050import org.apache.hadoop.hbase.zookeeper.ZKDump; 051import org.apache.hadoop.hbase.zookeeper.ZKUtil; 052import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 053import org.apache.hadoop.util.Tool; 054import org.apache.hadoop.util.ToolRunner; 055import org.apache.yetus.audience.InterfaceAudience; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059import org.apache.hbase.thirdparty.com.google.common.util.concurrent.AtomicLongMap; 060 061/** 062 * Provides information about the existing states of replication, replication peers and queues. 063 * Usage: hbase org.apache.hadoop.hbase.replication.regionserver.DumpReplicationQueues [args] 064 * Arguments: --distributed Polls each RS to dump information about the queue --hdfs Reports HDFS 065 * usage by the replication queues (note: can be overestimated). 066 */ 067@InterfaceAudience.Private 068public class DumpReplicationQueues extends Configured implements Tool { 069 070 private static final Logger LOG = LoggerFactory.getLogger(DumpReplicationQueues.class.getName()); 071 072 private List<String> deadRegionServers; 073 private List<String> deletedQueues; 074 private AtomicLongMap<String> peersQueueSize; 075 private long totalSizeOfWALs; 076 private long numWalsNotFound; 077 078 public DumpReplicationQueues() { 079 deadRegionServers = new ArrayList<>(); 080 deletedQueues = new ArrayList<>(); 081 peersQueueSize = AtomicLongMap.create(); 082 totalSizeOfWALs = 0; 083 numWalsNotFound = 0; 084 } 085 086 static class DumpOptions { 087 boolean hdfs = false; 088 boolean distributed = false; 089 090 public DumpOptions() { 091 } 092 093 public DumpOptions(DumpOptions that) { 094 this.hdfs = that.hdfs; 095 this.distributed = that.distributed; 096 } 097 098 boolean isHdfs() { 099 return hdfs; 100 } 101 102 boolean isDistributed() { 103 return distributed; 104 } 105 106 void setHdfs(boolean hdfs) { 107 this.hdfs = hdfs; 108 } 109 110 void setDistributed(boolean distributed) { 111 this.distributed = distributed; 112 } 113 } 114 115 static DumpOptions parseOpts(Queue<String> args) { 116 DumpOptions opts = new DumpOptions(); 117 118 String cmd = null; 119 while ((cmd = args.poll()) != null) { 120 if (cmd.equals("-h") || cmd.equals("--h") || cmd.equals("--help")) { 121 // place item back onto queue so that caller knows parsing was incomplete 122 args.add(cmd); 123 break; 124 } 125 final String hdfs = "--hdfs"; 126 if (cmd.equals(hdfs)) { 127 opts.setHdfs(true); 128 continue; 129 } 130 final String distributed = "--distributed"; 131 if (cmd.equals(distributed)) { 132 opts.setDistributed(true); 133 continue; 134 } else { 135 printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1); 136 } 137 // check that --distributed is present when --hdfs is in the arguments 138 if (!opts.isDistributed() && opts.isHdfs()) { 139 printUsageAndExit("ERROR: --hdfs option can only be used with --distributed: " + cmd, -1); 140 } 141 } 142 return opts; 143 } 144 145 /** 146 * Main 147 */ 148 public static void main(String[] args) throws Exception { 149 Configuration conf = HBaseConfiguration.create(); 150 int ret = ToolRunner.run(conf, new DumpReplicationQueues(), args); 151 System.exit(ret); 152 } 153 154 @Override 155 public int run(String[] args) throws Exception { 156 157 int errCode = -1; 158 LinkedList<String> argv = new LinkedList<>(); 159 argv.addAll(Arrays.asList(args)); 160 DumpOptions opts = parseOpts(argv); 161 162 // args remaining, print help and exit 163 if (!argv.isEmpty()) { 164 errCode = 0; 165 printUsage(); 166 return errCode; 167 } 168 return dumpReplicationQueues(opts); 169 } 170 171 protected void printUsage() { 172 printUsage(this.getClass().getName(), null); 173 } 174 175 protected static void printUsage(final String message) { 176 printUsage(DumpReplicationQueues.class.getName(), message); 177 } 178 179 protected static void printUsage(final String className, final String message) { 180 if (message != null && message.length() > 0) { 181 System.err.println(message); 182 } 183 System.err.println("Usage: hbase " + className + " \\"); 184 System.err.println(" <OPTIONS> [-D<property=value>]*"); 185 System.err.println(); 186 System.err.println("General Options:"); 187 System.err.println(" -h|--h|--help Show this help and exit."); 188 System.err.println(" --distributed Poll each RS and print its own replication queue. " 189 + "Default only polls ZooKeeper"); 190 System.err.println(" --hdfs Use HDFS to calculate usage of WALs by replication." 191 + " It could be overestimated if replicating to multiple peers." 192 + " --distributed flag is also needed."); 193 } 194 195 protected static void printUsageAndExit(final String message, final int exitCode) { 196 printUsage(message); 197 System.exit(exitCode); 198 } 199 200 private int dumpReplicationQueues(DumpOptions opts) throws Exception { 201 Configuration conf = getConf(); 202 Connection connection = ConnectionFactory.createConnection(conf); 203 Admin admin = connection.getAdmin(); 204 205 ZKWatcher zkw = 206 new ZKWatcher(conf, "DumpReplicationQueues" + EnvironmentEdgeManager.currentTime(), 207 new WarnOnlyAbortable(), true); 208 209 try { 210 // Our zk watcher 211 LOG.info("Our Quorum: " + zkw.getQuorum()); 212 List<TableCFs> replicatedTableCFs = admin.listReplicatedTableCFs(); 213 if (replicatedTableCFs.isEmpty()) { 214 LOG.info("No tables with a configured replication peer were found."); 215 return (0); 216 } else { 217 LOG.info("Replicated Tables: " + replicatedTableCFs); 218 } 219 220 List<ReplicationPeerDescription> peers = admin.listReplicationPeers(); 221 222 if (peers.isEmpty()) { 223 LOG.info("Replication is enabled but no peer configuration was found."); 224 } 225 226 System.out.println("Dumping replication peers and configurations:"); 227 System.out.println(dumpPeersState(peers)); 228 229 if (opts.isDistributed()) { 230 LOG.info("Found [--distributed], will poll each RegionServer."); 231 Set<String> peerIds = 232 peers.stream().map((peer) -> peer.getPeerId()).collect(Collectors.toSet()); 233 System.out.println(dumpQueues(zkw, peerIds, opts.isHdfs())); 234 System.out.println(dumpReplicationSummary()); 235 } else { 236 // use ZK instead 237 System.out.print("Dumping replication znodes via ZooKeeper:"); 238 System.out.println(ZKDump.getReplicationZnodesDump(zkw)); 239 } 240 return (0); 241 } catch (IOException e) { 242 return (-1); 243 } finally { 244 zkw.close(); 245 } 246 } 247 248 public String dumpReplicationSummary() { 249 StringBuilder sb = new StringBuilder(); 250 if (!deletedQueues.isEmpty()) { 251 sb.append("Found " + deletedQueues.size() + " deleted queues" 252 + ", run hbck -fixReplication in order to remove the deleted replication queues\n"); 253 for (String deletedQueue : deletedQueues) { 254 sb.append(" " + deletedQueue + "\n"); 255 } 256 } 257 if (!deadRegionServers.isEmpty()) { 258 sb.append("Found " + deadRegionServers.size() + " dead regionservers" 259 + ", restart one regionserver to transfer the queues of dead regionservers\n"); 260 for (String deadRs : deadRegionServers) { 261 sb.append(" " + deadRs + "\n"); 262 } 263 } 264 if (!peersQueueSize.isEmpty()) { 265 sb.append("Dumping all peers's number of WALs in replication queue\n"); 266 for (Map.Entry<String, Long> entry : peersQueueSize.asMap().entrySet()) { 267 sb.append( 268 " PeerId: " + entry.getKey() + " , sizeOfLogQueue: " + entry.getValue() + "\n"); 269 } 270 } 271 sb.append(" Total size of WALs on HDFS: " + StringUtils.humanSize(totalSizeOfWALs) + "\n"); 272 if (numWalsNotFound > 0) { 273 sb.append(" ERROR: There are " + numWalsNotFound + " WALs not found!!!\n"); 274 } 275 return sb.toString(); 276 } 277 278 public String dumpPeersState(List<ReplicationPeerDescription> peers) throws Exception { 279 Map<String, String> currentConf; 280 StringBuilder sb = new StringBuilder(); 281 for (ReplicationPeerDescription peer : peers) { 282 ReplicationPeerConfig peerConfig = peer.getPeerConfig(); 283 sb.append("Peer: " + peer.getPeerId() + "\n"); 284 sb.append(" " + "State: " + (peer.isEnabled() ? "ENABLED" : "DISABLED") + "\n"); 285 sb.append(" " + "Cluster Name: " + peerConfig.getClusterKey() + "\n"); 286 sb.append(" " + "Replication Endpoint: " + peerConfig.getReplicationEndpointImpl() + "\n"); 287 currentConf = peerConfig.getConfiguration(); 288 // Only show when we have a custom configuration for the peer 289 if (currentConf.size() > 1) { 290 sb.append(" " + "Peer Configuration: " + currentConf + "\n"); 291 } 292 sb.append(" " + "Peer Table CFs: " + peerConfig.getTableCFsMap() + "\n"); 293 sb.append(" " + "Peer Namespaces: " + peerConfig.getNamespaces() + "\n"); 294 } 295 return sb.toString(); 296 } 297 298 public String dumpQueues(ZKWatcher zkw, Set<String> peerIds, boolean hdfs) throws Exception { 299 ReplicationQueueStorage queueStorage; 300 StringBuilder sb = new StringBuilder(); 301 302 queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf()); 303 Set<ServerName> liveRegionServers = ZKUtil.listChildrenNoWatch(zkw, zkw.getZNodePaths().rsZNode) 304 .stream().map(ServerName::parseServerName).collect(Collectors.toSet()); 305 306 // Loops each peer on each RS and dumps the queues 307 List<ServerName> regionservers = queueStorage.getListOfReplicators(); 308 if (regionservers == null || regionservers.isEmpty()) { 309 return sb.toString(); 310 } 311 for (ServerName regionserver : regionservers) { 312 List<String> queueIds = queueStorage.getAllQueues(regionserver); 313 if (!liveRegionServers.contains(regionserver)) { 314 deadRegionServers.add(regionserver.getServerName()); 315 } 316 for (String queueId : queueIds) { 317 ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); 318 List<String> wals = queueStorage.getWALsInQueue(regionserver, queueId); 319 Collections.sort(wals); 320 if (!peerIds.contains(queueInfo.getPeerId())) { 321 deletedQueues.add(regionserver + "/" + queueId); 322 sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, true, hdfs)); 323 } else { 324 sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, false, hdfs)); 325 } 326 } 327 } 328 return sb.toString(); 329 } 330 331 private String formatQueue(ServerName regionserver, ReplicationQueueStorage queueStorage, 332 ReplicationQueueInfo queueInfo, String queueId, List<String> wals, boolean isDeleted, 333 boolean hdfs) throws Exception { 334 StringBuilder sb = new StringBuilder(); 335 336 List<ServerName> deadServers; 337 338 sb.append("Dumping replication queue info for RegionServer: [" + regionserver + "]" + "\n"); 339 sb.append(" Queue znode: " + queueId + "\n"); 340 sb.append(" PeerID: " + queueInfo.getPeerId() + "\n"); 341 sb.append(" Recovered: " + queueInfo.isQueueRecovered() + "\n"); 342 deadServers = queueInfo.getDeadRegionServers(); 343 if (deadServers.isEmpty()) { 344 sb.append(" No dead RegionServers found in this queue." + "\n"); 345 } else { 346 sb.append(" Dead RegionServers: " + deadServers + "\n"); 347 } 348 sb.append(" Was deleted: " + isDeleted + "\n"); 349 sb.append(" Number of WALs in replication queue: " + wals.size() + "\n"); 350 peersQueueSize.addAndGet(queueInfo.getPeerId(), wals.size()); 351 352 for (String wal : wals) { 353 long position = queueStorage.getWALPosition(regionserver, queueInfo.getPeerId(), wal); 354 sb.append(" Replication position for " + wal + ": " 355 + (position > 0 ? position : "0" + " (not started or nothing to replicate)") + "\n"); 356 } 357 358 if (hdfs) { 359 FileSystem fs = FileSystem.get(getConf()); 360 sb.append(" Total size of WALs on HDFS for this queue: " 361 + StringUtils.humanSize(getTotalWALSize(fs, wals, regionserver)) + "\n"); 362 } 363 return sb.toString(); 364 } 365 366 /** 367 * return total size in bytes from a list of WALs 368 */ 369 private long getTotalWALSize(FileSystem fs, List<String> wals, ServerName server) 370 throws IOException { 371 long size = 0; 372 FileStatus fileStatus; 373 374 for (String wal : wals) { 375 try { 376 fileStatus = (new WALLink(getConf(), server.getServerName(), wal)).getFileStatus(fs); 377 } catch (IOException e) { 378 if (e instanceof FileNotFoundException) { 379 numWalsNotFound++; 380 LOG.warn("WAL " + wal + " couldn't be found, skipping", e); 381 } else { 382 LOG.warn("Can't get file status of WAL " + wal + ", skipping", e); 383 } 384 continue; 385 } 386 size += fileStatus.getLen(); 387 } 388 389 totalSizeOfWALs += size; 390 return size; 391 } 392 393 private static class WarnOnlyAbortable implements Abortable { 394 @Override 395 public void abort(String why, Throwable e) { 396 LOG.warn("DumpReplicationQueue received abort, ignoring. Reason: " + why); 397 if (LOG.isDebugEnabled()) { 398 LOG.debug(e.toString(), e); 399 } 400 } 401 402 @Override 403 public boolean isAborted() { 404 return false; 405 } 406 } 407}