001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.FileNotFoundException; 021import java.io.IOException; 022import java.util.Arrays; 023import java.util.Collections; 024import java.util.HashSet; 025import java.util.Iterator; 026import java.util.LinkedList; 027import java.util.List; 028import java.util.Set; 029import org.apache.hadoop.conf.Configuration; 030import org.apache.hadoop.conf.Configured; 031import org.apache.hadoop.fs.FSDataOutputStream; 032import org.apache.hadoop.fs.FileStatus; 033import org.apache.hadoop.fs.FileSystem; 034import org.apache.hadoop.fs.Path; 035import org.apache.hadoop.hbase.Abortable; 036import org.apache.hadoop.hbase.ChoreService; 037import org.apache.hadoop.hbase.CoordinatedStateManager; 038import org.apache.hadoop.hbase.HBaseConfiguration; 039import org.apache.hadoop.hbase.HConstants; 040import org.apache.hadoop.hbase.Server; 041import org.apache.hadoop.hbase.ServerName; 042import org.apache.hadoop.hbase.client.AsyncClusterConnection; 043import org.apache.hadoop.hbase.client.Connection; 044import org.apache.hadoop.hbase.keymeta.KeyManagementService; 045import org.apache.hadoop.hbase.master.replication.OfflineTableReplicationQueueStorage; 046import org.apache.hadoop.hbase.replication.ReplicationException; 047import org.apache.hadoop.hbase.replication.ReplicationGroupOffset; 048import org.apache.hadoop.hbase.replication.ReplicationQueueId; 049import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 050import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 051import org.apache.hadoop.hbase.util.Bytes; 052import org.apache.hadoop.hbase.util.CommonFSUtils; 053import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 054import org.apache.hadoop.hbase.util.JsonMapper; 055import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 056import org.apache.hadoop.hbase.wal.WALFactory; 057import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 058import org.apache.hadoop.util.Tool; 059import org.apache.hadoop.util.ToolRunner; 060import org.apache.yetus.audience.InterfaceAudience; 061import org.apache.zookeeper.KeeperException; 062 063/** 064 * In a scenario of Replication based Disaster/Recovery, when hbase Master-Cluster crashes, this 065 * tool is used to sync-up the delta from Master to Slave using the info from ZooKeeper. The tool 066 * will run on Master-Cluster, and assume ZK, Filesystem and NetWork still available after hbase 067 * crashes 068 * 069 * <pre> 070 * hbase org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp 071 * </pre> 072 */ 073@InterfaceAudience.Private 074public class ReplicationSyncUp extends Configured implements Tool { 075 076 public static class ReplicationSyncUpToolInfo { 077 078 private long startTimeMs; 079 080 public ReplicationSyncUpToolInfo() { 081 } 082 083 public ReplicationSyncUpToolInfo(long startTimeMs) { 084 this.startTimeMs = startTimeMs; 085 } 086 087 public long getStartTimeMs() { 088 return startTimeMs; 089 } 090 091 public void setStartTimeMs(long startTimeMs) { 092 this.startTimeMs = startTimeMs; 093 } 094 } 095 096 // For storing the information used to skip replicating some wals after the cluster is back online 097 public static final String INFO_DIR = "ReplicationSyncUp"; 098 099 public static final String INFO_FILE = "info"; 100 101 private static final long SLEEP_TIME = 10000; 102 103 /** 104 * Main program 105 */ 106 public static void main(String[] args) throws Exception { 107 int ret = ToolRunner.run(HBaseConfiguration.create(), new ReplicationSyncUp(), args); 108 System.exit(ret); 109 } 110 111 // Find region servers under wal directory 112 // Here we only care about the region servers which may still be alive, as we need to add 113 // replications for them if missing. The dead region servers which have already been processed 114 // fully do not need to add their replication queues again, as the operation has already been done 115 // in SCP. 116 private Set<ServerName> listRegionServers(FileSystem walFs, Path walDir) throws IOException { 117 FileStatus[] statuses; 118 try { 119 statuses = walFs.listStatus(walDir); 120 } catch (FileNotFoundException e) { 121 System.out.println("WAL directory " + walDir + " does not exists, ignore"); 122 return Collections.emptySet(); 123 } 124 Set<ServerName> regionServers = new HashSet<>(); 125 for (FileStatus status : statuses) { 126 // All wal files under the walDir is within its region server's directory 127 if (!status.isDirectory()) { 128 continue; 129 } 130 ServerName sn = AbstractFSWALProvider.getServerNameFromWALDirectoryName(status.getPath()); 131 if (sn != null) { 132 regionServers.add(sn); 133 } 134 } 135 return regionServers; 136 } 137 138 private void addMissingReplicationQueues(ReplicationQueueStorage storage, ServerName regionServer, 139 Set<String> peerIds) throws ReplicationException { 140 Set<String> existingQueuePeerIds = new HashSet<>(); 141 List<ReplicationQueueId> queueIds = storage.listAllQueueIds(regionServer); 142 for (Iterator<ReplicationQueueId> iter = queueIds.iterator(); iter.hasNext();) { 143 ReplicationQueueId queueId = iter.next(); 144 if (!queueId.isRecovered()) { 145 existingQueuePeerIds.add(queueId.getPeerId()); 146 } 147 } 148 149 for (String peerId : peerIds) { 150 if (!existingQueuePeerIds.contains(peerId)) { 151 ReplicationQueueId queueId = new ReplicationQueueId(regionServer, peerId); 152 System.out.println("Add replication queue " + queueId + " for claiming"); 153 storage.setOffset(queueId, regionServer.toString(), ReplicationGroupOffset.BEGIN, 154 Collections.emptyMap()); 155 } 156 } 157 } 158 159 private void addMissingReplicationQueues(ReplicationQueueStorage storage, 160 Set<ServerName> regionServers, Set<String> peerIds) throws ReplicationException { 161 for (ServerName regionServer : regionServers) { 162 addMissingReplicationQueues(storage, regionServer, peerIds); 163 } 164 } 165 166 // When using this tool, usually the source cluster is unhealthy, so we should try to claim the 167 // replication queues for the dead region servers first and then replicate the data out. 168 private void claimReplicationQueues(ReplicationSourceManager mgr, Set<ServerName> regionServers) 169 throws ReplicationException, KeeperException, IOException { 170 // union the region servers from both places, i.e, from the wal directory, and the records in 171 // replication queue storage. 172 Set<ServerName> replicators = new HashSet<>(regionServers); 173 ReplicationQueueStorage queueStorage = mgr.getQueueStorage(); 174 replicators.addAll(queueStorage.listAllReplicators()); 175 FileSystem fs = CommonFSUtils.getCurrentFileSystem(getConf()); 176 Path infoDir = new Path(CommonFSUtils.getRootDir(getConf()), INFO_DIR); 177 for (ServerName sn : replicators) { 178 List<ReplicationQueueId> replicationQueues = queueStorage.listAllQueueIds(sn); 179 System.out.println(sn + " is dead, claim its replication queues: " + replicationQueues); 180 // record the rs name, so when master restarting, we will skip claiming its replication queue 181 fs.createNewFile(new Path(infoDir, sn.getServerName())); 182 for (ReplicationQueueId queueId : replicationQueues) { 183 mgr.claimQueue(queueId, true); 184 } 185 } 186 } 187 188 private void writeInfoFile(FileSystem fs, boolean isForce) throws IOException { 189 // Record the info of this run. Currently only record the time we run the job. We will use this 190 // timestamp to clean up the data for last sequence ids and hfile refs in replication queue 191 // storage. See ReplicationQueueStorage.removeLastSequenceIdsAndHFileRefsBefore. 192 ReplicationSyncUpToolInfo info = 193 new ReplicationSyncUpToolInfo(EnvironmentEdgeManager.currentTime()); 194 String json = JsonMapper.writeObjectAsString(info); 195 Path infoDir = new Path(CommonFSUtils.getRootDir(getConf()), INFO_DIR); 196 try (FSDataOutputStream out = fs.create(new Path(infoDir, INFO_FILE), isForce)) { 197 out.write(Bytes.toBytes(json)); 198 } 199 } 200 201 private static boolean parseOpts(String args[]) { 202 LinkedList<String> argv = new LinkedList<>(); 203 argv.addAll(Arrays.asList(args)); 204 String cmd = null; 205 while ((cmd = argv.poll()) != null) { 206 if (cmd.equals("-h") || cmd.equals("--h") || cmd.equals("--help")) { 207 printUsageAndExit(null, 0); 208 } 209 if (cmd.equals("-f")) { 210 return true; 211 } 212 if (!argv.isEmpty()) { 213 printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1); 214 } 215 } 216 return false; 217 } 218 219 private static void printUsageAndExit(final String message, final int exitCode) { 220 printUsage(message); 221 System.exit(exitCode); 222 } 223 224 private static void printUsage(final String message) { 225 if (message != null && message.length() > 0) { 226 System.err.println(message); 227 } 228 System.err.println("Usage: hbase " + ReplicationSyncUp.class.getName() + " \\"); 229 System.err.println(" <OPTIONS> [-D<property=value>]*"); 230 System.err.println(); 231 System.err.println("General Options:"); 232 System.err.println(" -h|--h|--help Show this help and exit."); 233 System.err 234 .println(" -f Start a new ReplicationSyncUp after the previous ReplicationSyncUp failed. " 235 + "See HBASE-27623 for details."); 236 } 237 238 @Override 239 public int run(String[] args) throws Exception { 240 Abortable abortable = new Abortable() { 241 242 private volatile boolean abort = false; 243 244 @Override 245 public void abort(String why, Throwable e) { 246 if (isAborted()) { 247 return; 248 } 249 abort = true; 250 System.err.println("Aborting because of " + why); 251 e.printStackTrace(); 252 System.exit(1); 253 } 254 255 @Override 256 public boolean isAborted() { 257 return abort; 258 } 259 }; 260 boolean isForce = parseOpts(args); 261 Configuration conf = getConf(); 262 try (ZKWatcher zkw = new ZKWatcher(conf, 263 "syncupReplication" + EnvironmentEdgeManager.currentTime(), abortable, true)) { 264 Path walRootDir = CommonFSUtils.getWALRootDir(conf); 265 FileSystem fs = CommonFSUtils.getWALFileSystem(conf); 266 Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); 267 Path logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME); 268 269 System.out.println("Start Replication Server"); 270 writeInfoFile(fs, isForce); 271 Replication replication = new Replication(); 272 // use offline table replication queue storage 273 getConf().setClass(ReplicationStorageFactory.REPLICATION_QUEUE_IMPL, 274 OfflineTableReplicationQueueStorage.class, ReplicationQueueStorage.class); 275 DummyServer server = new DummyServer(getConf(), zkw); 276 replication 277 .initialize(server, fs, new Path(logDir, server.toString()), oldLogDir, 278 new WALFactory(conf, 279 ServerName.valueOf( 280 getClass().getSimpleName() + ",16010," + EnvironmentEdgeManager.currentTime()), 281 null)); 282 ReplicationSourceManager manager = replication.getReplicationManager(); 283 manager.init(); 284 Set<ServerName> regionServers = listRegionServers(fs, logDir); 285 addMissingReplicationQueues(manager.getQueueStorage(), regionServers, 286 manager.getReplicationPeers().getAllPeerIds()); 287 claimReplicationQueues(manager, regionServers); 288 while (manager.activeFailoverTaskCount() > 0) { 289 Thread.sleep(SLEEP_TIME); 290 } 291 while (manager.getOldSources().size() > 0) { 292 Thread.sleep(SLEEP_TIME); 293 } 294 manager.join(); 295 } catch (InterruptedException e) { 296 System.err.println("didn't wait long enough:" + e); 297 return -1; 298 } 299 return 0; 300 } 301 302 private static final class DummyServer implements Server { 303 private final Configuration conf; 304 private final String hostname; 305 private final ZKWatcher zkw; 306 private volatile boolean abort = false; 307 308 DummyServer(Configuration conf, ZKWatcher zkw) { 309 // a unique name in case the first run fails 310 hostname = EnvironmentEdgeManager.currentTime() + ".SyncUpTool.replication.org"; 311 this.conf = conf; 312 this.zkw = zkw; 313 } 314 315 @Override 316 public Configuration getConfiguration() { 317 return conf; 318 } 319 320 @Override 321 public ZKWatcher getZooKeeper() { 322 return zkw; 323 } 324 325 @Override 326 public CoordinatedStateManager getCoordinatedStateManager() { 327 return null; 328 } 329 330 @Override 331 public ServerName getServerName() { 332 return ServerName.valueOf(hostname, 1234, 1L); 333 } 334 335 @Override 336 public void abort(String why, Throwable e) { 337 if (isAborted()) { 338 return; 339 } 340 abort = true; 341 System.err.println("Aborting because of " + why); 342 e.printStackTrace(); 343 System.exit(1); 344 } 345 346 @Override 347 public boolean isAborted() { 348 return abort; 349 } 350 351 @Override 352 public void stop(String why) { 353 } 354 355 @Override 356 public boolean isStopped() { 357 return false; 358 } 359 360 @Override 361 public Connection getConnection() { 362 return null; 363 } 364 365 @Override 366 public ChoreService getChoreService() { 367 return null; 368 } 369 370 @Override 371 public FileSystem getFileSystem() { 372 return null; 373 } 374 375 @Override 376 public boolean isStopping() { 377 return false; 378 } 379 380 @Override 381 public Connection createConnection(Configuration conf) throws IOException { 382 return null; 383 } 384 385 @Override 386 public AsyncClusterConnection getAsyncClusterConnection() { 387 return null; 388 } 389 390 @Override 391 public KeyManagementService getKeyManagementService() { 392 return null; 393 } 394 } 395}