001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.IOException; 021import java.util.Collections; 022import java.util.List; 023import java.util.Set; 024import java.util.stream.Collectors; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.conf.Configured; 027import org.apache.hadoop.fs.FileSystem; 028import org.apache.hadoop.fs.Path; 029import org.apache.hadoop.hbase.Abortable; 030import org.apache.hadoop.hbase.ChoreService; 031import org.apache.hadoop.hbase.CoordinatedStateManager; 032import org.apache.hadoop.hbase.HBaseConfiguration; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.Server; 035import org.apache.hadoop.hbase.ServerName; 036import org.apache.hadoop.hbase.client.ClusterConnection; 037import org.apache.hadoop.hbase.client.Connection; 038import org.apache.hadoop.hbase.replication.ReplicationException; 039import org.apache.hadoop.hbase.util.CommonFSUtils; 040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 041import org.apache.hadoop.hbase.wal.WALFactory; 042import org.apache.hadoop.hbase.zookeeper.ZKUtil; 043import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 044import org.apache.hadoop.util.Tool; 045import org.apache.hadoop.util.ToolRunner; 046import org.apache.yetus.audience.InterfaceAudience; 047import org.apache.zookeeper.KeeperException; 048 049/** 050 * In a scenario of Replication based Disaster/Recovery, when hbase Master-Cluster crashes, this 051 * tool is used to sync-up the delta from Master to Slave using the info from ZooKeeper. The tool 052 * will run on Master-Cluster, and assume ZK, Filesystem and NetWork still available after hbase 053 * crashes 054 * 055 * <pre> 056 * hbase org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp 057 * </pre> 058 */ 059@InterfaceAudience.Private 060public class ReplicationSyncUp extends Configured implements Tool { 061 062 private static final long SLEEP_TIME = 10000; 063 064 /** 065 * Main program 066 */ 067 public static void main(String[] args) throws Exception { 068 int ret = ToolRunner.run(HBaseConfiguration.create(), new ReplicationSyncUp(), args); 069 System.exit(ret); 070 } 071 072 private Set<ServerName> getLiveRegionServers(ZKWatcher zkw) throws KeeperException { 073 List<String> rsZNodes = ZKUtil.listChildrenNoWatch(zkw, zkw.getZNodePaths().rsZNode); 074 return rsZNodes == null 075 ? Collections.emptySet() 076 : rsZNodes.stream().map(ServerName::parseServerName).collect(Collectors.toSet()); 077 } 078 079 // When using this tool, usually the source cluster is unhealthy, so we should try to claim the 080 // replication queues for the dead region servers first and then replicate the data out. 081 private void claimReplicationQueues(ZKWatcher zkw, ReplicationSourceManager mgr) 082 throws ReplicationException, KeeperException { 083 List<ServerName> replicators = mgr.getQueueStorage().getListOfReplicators(); 084 Set<ServerName> liveRegionServers = getLiveRegionServers(zkw); 085 for (ServerName sn : replicators) { 086 if (!liveRegionServers.contains(sn)) { 087 List<String> replicationQueues = mgr.getQueueStorage().getAllQueues(sn); 088 System.out.println(sn + " is dead, claim its replication queues: " + replicationQueues); 089 for (String queue : replicationQueues) { 090 mgr.claimQueue(sn, queue); 091 } 092 } 093 } 094 } 095 096 @Override 097 public int run(String[] args) throws Exception { 098 Abortable abortable = new Abortable() { 099 @Override 100 public void abort(String why, Throwable e) { 101 } 102 103 @Override 104 public boolean isAborted() { 105 return false; 106 } 107 }; 108 Configuration conf = getConf(); 109 try (ZKWatcher zkw = new ZKWatcher(conf, 110 "syncupReplication" + EnvironmentEdgeManager.currentTime(), abortable, true)) { 111 Path walRootDir = CommonFSUtils.getWALRootDir(conf); 112 FileSystem fs = CommonFSUtils.getWALFileSystem(conf); 113 Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); 114 Path logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME); 115 116 System.out.println("Start Replication Server start"); 117 Replication replication = new Replication(); 118 replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir, 119 new WALFactory(conf, "test", null)); 120 ReplicationSourceManager manager = replication.getReplicationManager(); 121 manager.init(); 122 claimReplicationQueues(zkw, manager); 123 while (manager.activeFailoverTaskCount() > 0) { 124 Thread.sleep(SLEEP_TIME); 125 } 126 while (manager.getOldSources().size() > 0) { 127 Thread.sleep(SLEEP_TIME); 128 } 129 manager.join(); 130 } catch (InterruptedException e) { 131 System.err.println("didn't wait long enough:" + e); 132 return -1; 133 } 134 return 0; 135 } 136 137 class DummyServer implements Server { 138 String hostname; 139 ZKWatcher zkw; 140 141 DummyServer(ZKWatcher zkw) { 142 // a unique name in case the first run fails 143 hostname = EnvironmentEdgeManager.currentTime() + ".SyncUpTool.replication.org"; 144 this.zkw = zkw; 145 } 146 147 DummyServer(String hostname) { 148 this.hostname = hostname; 149 } 150 151 @Override 152 public Configuration getConfiguration() { 153 return getConf(); 154 } 155 156 @Override 157 public ZKWatcher getZooKeeper() { 158 return zkw; 159 } 160 161 @Override 162 public CoordinatedStateManager getCoordinatedStateManager() { 163 return null; 164 } 165 166 @Override 167 public ServerName getServerName() { 168 return ServerName.valueOf(hostname, 1234, 1L); 169 } 170 171 @Override 172 public void abort(String why, Throwable e) { 173 } 174 175 @Override 176 public boolean isAborted() { 177 return false; 178 } 179 180 @Override 181 public void stop(String why) { 182 } 183 184 @Override 185 public boolean isStopped() { 186 return false; 187 } 188 189 @Override 190 public ClusterConnection getConnection() { 191 return null; 192 } 193 194 @Override 195 public ChoreService getChoreService() { 196 return null; 197 } 198 199 @Override 200 public ClusterConnection getClusterConnection() { 201 return null; 202 } 203 204 @Override 205 public FileSystem getFileSystem() { 206 return null; 207 } 208 209 @Override 210 public boolean isStopping() { 211 return false; 212 } 213 214 @Override 215 public Connection createConnection(Configuration conf) throws IOException { 216 return null; 217 } 218 } 219}