001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Arrays; 024import java.util.Collections; 025import java.util.LinkedList; 026import java.util.List; 027import java.util.Map; 028import java.util.Queue; 029import java.util.Set; 030import java.util.stream.Collectors; 031import org.apache.hadoop.conf.Configuration; 032import org.apache.hadoop.conf.Configured; 033import org.apache.hadoop.fs.FileStatus; 034import org.apache.hadoop.fs.FileSystem; 035import org.apache.hadoop.hbase.Abortable; 036import org.apache.hadoop.hbase.HBaseConfiguration; 037import org.apache.hadoop.hbase.ServerName; 038import org.apache.hadoop.hbase.client.Admin; 039import org.apache.hadoop.hbase.client.ClusterConnection; 040import org.apache.hadoop.hbase.client.ConnectionFactory; 041import org.apache.hadoop.hbase.client.HBaseAdmin; 042import org.apache.hadoop.hbase.client.replication.TableCFs; 043import org.apache.hadoop.hbase.io.WALLink; 044import org.apache.hadoop.hbase.procedure2.util.StringUtils; 045import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 046import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; 047import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; 048import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 049import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 051import org.apache.hadoop.hbase.zookeeper.ZKDump; 052import org.apache.hadoop.hbase.zookeeper.ZKUtil; 053import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 054import org.apache.hadoop.util.Tool; 055import org.apache.hadoop.util.ToolRunner; 056import org.apache.yetus.audience.InterfaceAudience; 057import org.slf4j.Logger; 058import org.slf4j.LoggerFactory; 059 060import org.apache.hbase.thirdparty.com.google.common.util.concurrent.AtomicLongMap; 061 062/** 063 * Provides information about the existing states of replication, replication peers and queues. 064 * Usage: hbase org.apache.hadoop.hbase.replication.regionserver.DumpReplicationQueues [args] 065 * Arguments: --distributed Polls each RS to dump information about the queue --hdfs Reports HDFS 066 * usage by the replication queues (note: can be overestimated). 067 */ 068@InterfaceAudience.Private 069public class DumpReplicationQueues extends Configured implements Tool { 070 071 private static final Logger LOG = LoggerFactory.getLogger(DumpReplicationQueues.class.getName()); 072 073 private List<String> deadRegionServers; 074 private List<String> deletedQueues; 075 private AtomicLongMap<String> peersQueueSize; 076 private long totalSizeOfWALs; 077 private long numWalsNotFound; 078 079 public DumpReplicationQueues() { 080 deadRegionServers = new ArrayList<>(); 081 deletedQueues = new ArrayList<>(); 082 peersQueueSize = AtomicLongMap.create(); 083 totalSizeOfWALs = 0; 084 numWalsNotFound = 0; 085 } 086 087 static class DumpOptions { 088 boolean hdfs = false; 089 boolean distributed = false; 090 091 public DumpOptions() { 092 } 093 094 public DumpOptions(DumpOptions that) { 095 this.hdfs = that.hdfs; 096 this.distributed = that.distributed; 097 } 098 099 boolean isHdfs() { 100 return hdfs; 101 } 102 103 boolean isDistributed() { 104 return distributed; 105 } 106 107 void setHdfs(boolean hdfs) { 108 this.hdfs = hdfs; 109 } 110 111 void setDistributed(boolean distributed) { 112 this.distributed = distributed; 113 } 114 } 115 116 static DumpOptions parseOpts(Queue<String> args) { 117 DumpOptions opts = new DumpOptions(); 118 119 String cmd = null; 120 while ((cmd = args.poll()) != null) { 121 if (cmd.equals("-h") || cmd.equals("--h") || cmd.equals("--help")) { 122 // place item back onto queue so that caller knows parsing was incomplete 123 args.add(cmd); 124 break; 125 } 126 final String hdfs = "--hdfs"; 127 if (cmd.equals(hdfs)) { 128 opts.setHdfs(true); 129 continue; 130 } 131 final String distributed = "--distributed"; 132 if (cmd.equals(distributed)) { 133 opts.setDistributed(true); 134 continue; 135 } else { 136 printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1); 137 } 138 // check that --distributed is present when --hdfs is in the arguments 139 if (!opts.isDistributed() && opts.isHdfs()) { 140 printUsageAndExit("ERROR: --hdfs option can only be used with --distributed: " + cmd, -1); 141 } 142 } 143 return opts; 144 } 145 146 /** 147 * Main 148 */ 149 public static void main(String[] args) throws Exception { 150 Configuration conf = HBaseConfiguration.create(); 151 int ret = ToolRunner.run(conf, new DumpReplicationQueues(), args); 152 System.exit(ret); 153 } 154 155 @Override 156 public int run(String[] args) throws Exception { 157 158 int errCode = -1; 159 LinkedList<String> argv = new LinkedList<>(); 160 argv.addAll(Arrays.asList(args)); 161 DumpOptions opts = parseOpts(argv); 162 163 // args remaining, print help and exit 164 if (!argv.isEmpty()) { 165 errCode = 0; 166 printUsage(); 167 return errCode; 168 } 169 return dumpReplicationQueues(opts); 170 } 171 172 protected void printUsage() { 173 printUsage(this.getClass().getName(), null); 174 } 175 176 protected static void printUsage(final String message) { 177 printUsage(DumpReplicationQueues.class.getName(), message); 178 } 179 180 protected static void printUsage(final String className, final String message) { 181 if (message != null && message.length() > 0) { 182 System.err.println(message); 183 } 184 System.err.println("Usage: hbase " + className + " \\"); 185 System.err.println(" <OPTIONS> [-D<property=value>]*"); 186 System.err.println(); 187 System.err.println("General Options:"); 188 System.err.println(" -h|--h|--help Show this help and exit."); 189 System.err.println(" --distributed Poll each RS and print its own replication queue. " 190 + "Default only polls ZooKeeper"); 191 System.err.println(" --hdfs Use HDFS to calculate usage of WALs by replication." 192 + " It could be overestimated if replicating to multiple peers." 193 + " --distributed flag is also needed."); 194 } 195 196 protected static void printUsageAndExit(final String message, final int exitCode) { 197 printUsage(message); 198 System.exit(exitCode); 199 } 200 201 private int dumpReplicationQueues(DumpOptions opts) throws Exception { 202 203 Configuration conf = getConf(); 204 HBaseAdmin.available(conf); 205 ClusterConnection connection = (ClusterConnection) ConnectionFactory.createConnection(conf); 206 Admin admin = connection.getAdmin(); 207 208 ZKWatcher zkw = 209 new ZKWatcher(conf, "DumpReplicationQueues" + EnvironmentEdgeManager.currentTime(), 210 new WarnOnlyAbortable(), true); 211 212 try { 213 // Our zk watcher 214 LOG.info("Our Quorum: " + zkw.getQuorum()); 215 List<TableCFs> replicatedTableCFs = admin.listReplicatedTableCFs(); 216 if (replicatedTableCFs.isEmpty()) { 217 LOG.info("No tables with a configured replication peer were found."); 218 return (0); 219 } else { 220 LOG.info("Replicated Tables: " + replicatedTableCFs); 221 } 222 223 List<ReplicationPeerDescription> peers = admin.listReplicationPeers(); 224 225 if (peers.isEmpty()) { 226 LOG.info("Replication is enabled but no peer configuration was found."); 227 } 228 229 System.out.println("Dumping replication peers and configurations:"); 230 System.out.println(dumpPeersState(peers)); 231 232 if (opts.isDistributed()) { 233 LOG.info("Found [--distributed], will poll each RegionServer."); 234 Set<String> peerIds = 235 peers.stream().map((peer) -> peer.getPeerId()).collect(Collectors.toSet()); 236 System.out.println(dumpQueues(zkw, peerIds, opts.isHdfs())); 237 System.out.println(dumpReplicationSummary()); 238 } else { 239 // use ZK instead 240 System.out.print("Dumping replication znodes via ZooKeeper:"); 241 System.out.println(ZKDump.getReplicationZnodesDump(zkw)); 242 } 243 return (0); 244 } catch (IOException e) { 245 return (-1); 246 } finally { 247 zkw.close(); 248 } 249 } 250 251 public String dumpReplicationSummary() { 252 StringBuilder sb = new StringBuilder(); 253 if (!deletedQueues.isEmpty()) { 254 sb.append("Found " + deletedQueues.size() + " deleted queues" 255 + ", run hbck -fixReplication in order to remove the deleted replication queues\n"); 256 for (String deletedQueue : deletedQueues) { 257 sb.append(" " + deletedQueue + "\n"); 258 } 259 } 260 if (!deadRegionServers.isEmpty()) { 261 sb.append("Found " + deadRegionServers.size() + " dead regionservers" 262 + ", restart one regionserver to transfer the queues of dead regionservers\n"); 263 for (String deadRs : deadRegionServers) { 264 sb.append(" " + deadRs + "\n"); 265 } 266 } 267 if (!peersQueueSize.isEmpty()) { 268 sb.append("Dumping all peers's number of WALs in replication queue\n"); 269 for (Map.Entry<String, Long> entry : peersQueueSize.asMap().entrySet()) { 270 sb.append( 271 " PeerId: " + entry.getKey() + " , sizeOfLogQueue: " + entry.getValue() + "\n"); 272 } 273 } 274 sb.append(" Total size of WALs on HDFS: " + StringUtils.humanSize(totalSizeOfWALs) + "\n"); 275 if (numWalsNotFound > 0) { 276 sb.append(" ERROR: There are " + numWalsNotFound + " WALs not found!!!\n"); 277 } 278 return sb.toString(); 279 } 280 281 public String dumpPeersState(List<ReplicationPeerDescription> peers) throws Exception { 282 Map<String, String> currentConf; 283 StringBuilder sb = new StringBuilder(); 284 for (ReplicationPeerDescription peer : peers) { 285 ReplicationPeerConfig peerConfig = peer.getPeerConfig(); 286 sb.append("Peer: " + peer.getPeerId() + "\n"); 287 sb.append(" " + "State: " + (peer.isEnabled() ? "ENABLED" : "DISABLED") + "\n"); 288 sb.append(" " + "Cluster Name: " + peerConfig.getClusterKey() + "\n"); 289 sb.append(" " + "Replication Endpoint: " + peerConfig.getReplicationEndpointImpl() + "\n"); 290 currentConf = peerConfig.getConfiguration(); 291 // Only show when we have a custom configuration for the peer 292 if (currentConf.size() > 1) { 293 sb.append(" " + "Peer Configuration: " + currentConf + "\n"); 294 } 295 sb.append(" " + "Peer Table CFs: " + peerConfig.getTableCFsMap() + "\n"); 296 sb.append(" " + "Peer Namespaces: " + peerConfig.getNamespaces() + "\n"); 297 } 298 return sb.toString(); 299 } 300 301 public String dumpQueues(ZKWatcher zkw, Set<String> peerIds, boolean hdfs) throws Exception { 302 ReplicationQueueStorage queueStorage; 303 StringBuilder sb = new StringBuilder(); 304 305 queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf()); 306 Set<ServerName> liveRegionServers = ZKUtil.listChildrenNoWatch(zkw, zkw.getZNodePaths().rsZNode) 307 .stream().map(ServerName::parseServerName).collect(Collectors.toSet()); 308 309 // Loops each peer on each RS and dumps the queues 310 List<ServerName> regionservers = queueStorage.getListOfReplicators(); 311 if (regionservers == null || regionservers.isEmpty()) { 312 return sb.toString(); 313 } 314 for (ServerName regionserver : regionservers) { 315 List<String> queueIds = queueStorage.getAllQueues(regionserver); 316 if (!liveRegionServers.contains(regionserver)) { 317 deadRegionServers.add(regionserver.getServerName()); 318 } 319 for (String queueId : queueIds) { 320 ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); 321 List<String> wals = queueStorage.getWALsInQueue(regionserver, queueId); 322 Collections.sort(wals); 323 if (!peerIds.contains(queueInfo.getPeerId())) { 324 deletedQueues.add(regionserver + "/" + queueId); 325 sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, true, hdfs)); 326 } else { 327 sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, false, hdfs)); 328 } 329 } 330 } 331 return sb.toString(); 332 } 333 334 private String formatQueue(ServerName regionserver, ReplicationQueueStorage queueStorage, 335 ReplicationQueueInfo queueInfo, String queueId, List<String> wals, boolean isDeleted, 336 boolean hdfs) throws Exception { 337 StringBuilder sb = new StringBuilder(); 338 339 List<ServerName> deadServers; 340 341 sb.append("Dumping replication queue info for RegionServer: [" + regionserver + "]" + "\n"); 342 sb.append(" Queue znode: " + queueId + "\n"); 343 sb.append(" PeerID: " + queueInfo.getPeerId() + "\n"); 344 sb.append(" Recovered: " + queueInfo.isQueueRecovered() + "\n"); 345 deadServers = queueInfo.getDeadRegionServers(); 346 if (deadServers.isEmpty()) { 347 sb.append(" No dead RegionServers found in this queue." + "\n"); 348 } else { 349 sb.append(" Dead RegionServers: " + deadServers + "\n"); 350 } 351 sb.append(" Was deleted: " + isDeleted + "\n"); 352 sb.append(" Number of WALs in replication queue: " + wals.size() + "\n"); 353 peersQueueSize.addAndGet(queueInfo.getPeerId(), wals.size()); 354 355 for (String wal : wals) { 356 long position = queueStorage.getWALPosition(regionserver, queueInfo.getPeerId(), wal); 357 sb.append(" Replication position for " + wal + ": " 358 + (position > 0 ? position : "0" + " (not started or nothing to replicate)") + "\n"); 359 } 360 361 if (hdfs) { 362 FileSystem fs = FileSystem.get(getConf()); 363 sb.append(" Total size of WALs on HDFS for this queue: " 364 + StringUtils.humanSize(getTotalWALSize(fs, wals, regionserver)) + "\n"); 365 } 366 return sb.toString(); 367 } 368 369 /** 370 * return total size in bytes from a list of WALs 371 */ 372 private long getTotalWALSize(FileSystem fs, List<String> wals, ServerName server) 373 throws IOException { 374 long size = 0; 375 FileStatus fileStatus; 376 377 for (String wal : wals) { 378 try { 379 fileStatus = (new WALLink(getConf(), server.getServerName(), wal)).getFileStatus(fs); 380 } catch (IOException e) { 381 if (e instanceof FileNotFoundException) { 382 numWalsNotFound++; 383 LOG.warn("WAL " + wal + " couldn't be found, skipping", e); 384 } else { 385 LOG.warn("Can't get file status of WAL " + wal + ", skipping", e); 386 } 387 continue; 388 } 389 size += fileStatus.getLen(); 390 } 391 392 totalSizeOfWALs += size; 393 return size; 394 } 395 396 private static class WarnOnlyAbortable implements Abortable { 397 @Override 398 public void abort(String why, Throwable e) { 399 LOG.warn("DumpReplicationQueue received abort, ignoring. Reason: " + why); 400 if (LOG.isDebugEnabled()) { 401 LOG.debug(e.toString(), e); 402 } 403 } 404 405 @Override 406 public boolean isAborted() { 407 return false; 408 } 409 } 410}