001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import java.util.Collections;
022import java.util.List;
023import java.util.Set;
024import java.util.stream.Collectors;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.conf.Configured;
027import org.apache.hadoop.fs.FileSystem;
028import org.apache.hadoop.fs.Path;
029import org.apache.hadoop.hbase.Abortable;
030import org.apache.hadoop.hbase.ChoreService;
031import org.apache.hadoop.hbase.CoordinatedStateManager;
032import org.apache.hadoop.hbase.HBaseConfiguration;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.Server;
035import org.apache.hadoop.hbase.ServerName;
036import org.apache.hadoop.hbase.client.ClusterConnection;
037import org.apache.hadoop.hbase.client.Connection;
038import org.apache.hadoop.hbase.replication.ReplicationException;
039import org.apache.hadoop.hbase.util.CommonFSUtils;
040import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
041import org.apache.hadoop.hbase.wal.WALFactory;
042import org.apache.hadoop.hbase.zookeeper.ZKUtil;
043import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
044import org.apache.hadoop.util.Tool;
045import org.apache.hadoop.util.ToolRunner;
046import org.apache.yetus.audience.InterfaceAudience;
047import org.apache.zookeeper.KeeperException;
048
049/**
050 * In a scenario of Replication based Disaster/Recovery, when hbase Master-Cluster crashes, this
051 * tool is used to sync-up the delta from Master to Slave using the info from ZooKeeper. The tool
052 * will run on Master-Cluster, and assume ZK, Filesystem and NetWork still available after hbase
053 * crashes
054 *
055 * <pre>
056 * hbase org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp
057 * </pre>
058 */
059@InterfaceAudience.Private
060public class ReplicationSyncUp extends Configured implements Tool {
061
062  private static final long SLEEP_TIME = 10000;
063
064  /**
065   * Main program
066   */
067  public static void main(String[] args) throws Exception {
068    int ret = ToolRunner.run(HBaseConfiguration.create(), new ReplicationSyncUp(), args);
069    System.exit(ret);
070  }
071
072  private Set<ServerName> getLiveRegionServers(ZKWatcher zkw) throws KeeperException {
073    List<String> rsZNodes = ZKUtil.listChildrenNoWatch(zkw, zkw.getZNodePaths().rsZNode);
074    return rsZNodes == null
075      ? Collections.emptySet()
076      : rsZNodes.stream().map(ServerName::parseServerName).collect(Collectors.toSet());
077  }
078
079  // When using this tool, usually the source cluster is unhealthy, so we should try to claim the
080  // replication queues for the dead region servers first and then replicate the data out.
081  private void claimReplicationQueues(ZKWatcher zkw, ReplicationSourceManager mgr)
082    throws ReplicationException, KeeperException {
083    List<ServerName> replicators = mgr.getQueueStorage().getListOfReplicators();
084    Set<ServerName> liveRegionServers = getLiveRegionServers(zkw);
085    for (ServerName sn : replicators) {
086      if (!liveRegionServers.contains(sn)) {
087        List<String> replicationQueues = mgr.getQueueStorage().getAllQueues(sn);
088        System.out.println(sn + " is dead, claim its replication queues: " + replicationQueues);
089        for (String queue : replicationQueues) {
090          mgr.claimQueue(sn, queue);
091        }
092      }
093    }
094  }
095
096  @Override
097  public int run(String[] args) throws Exception {
098    Abortable abortable = new Abortable() {
099      @Override
100      public void abort(String why, Throwable e) {
101      }
102
103      @Override
104      public boolean isAborted() {
105        return false;
106      }
107    };
108    Configuration conf = getConf();
109    try (ZKWatcher zkw = new ZKWatcher(conf,
110      "syncupReplication" + EnvironmentEdgeManager.currentTime(), abortable, true)) {
111      Path walRootDir = CommonFSUtils.getWALRootDir(conf);
112      FileSystem fs = CommonFSUtils.getWALFileSystem(conf);
113      Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
114      Path logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME);
115
116      System.out.println("Start Replication Server start");
117      Replication replication = new Replication();
118      replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir,
119        new WALFactory(conf, "test", null));
120      ReplicationSourceManager manager = replication.getReplicationManager();
121      manager.init();
122      claimReplicationQueues(zkw, manager);
123      while (manager.activeFailoverTaskCount() > 0) {
124        Thread.sleep(SLEEP_TIME);
125      }
126      while (manager.getOldSources().size() > 0) {
127        Thread.sleep(SLEEP_TIME);
128      }
129      manager.join();
130    } catch (InterruptedException e) {
131      System.err.println("didn't wait long enough:" + e);
132      return -1;
133    }
134    return 0;
135  }
136
137  class DummyServer implements Server {
138    String hostname;
139    ZKWatcher zkw;
140
141    DummyServer(ZKWatcher zkw) {
142      // a unique name in case the first run fails
143      hostname = EnvironmentEdgeManager.currentTime() + ".SyncUpTool.replication.org";
144      this.zkw = zkw;
145    }
146
147    DummyServer(String hostname) {
148      this.hostname = hostname;
149    }
150
151    @Override
152    public Configuration getConfiguration() {
153      return getConf();
154    }
155
156    @Override
157    public ZKWatcher getZooKeeper() {
158      return zkw;
159    }
160
161    @Override
162    public CoordinatedStateManager getCoordinatedStateManager() {
163      return null;
164    }
165
166    @Override
167    public ServerName getServerName() {
168      return ServerName.valueOf(hostname, 1234, 1L);
169    }
170
171    @Override
172    public void abort(String why, Throwable e) {
173    }
174
175    @Override
176    public boolean isAborted() {
177      return false;
178    }
179
180    @Override
181    public void stop(String why) {
182    }
183
184    @Override
185    public boolean isStopped() {
186      return false;
187    }
188
189    @Override
190    public ClusterConnection getConnection() {
191      return null;
192    }
193
194    @Override
195    public ChoreService getChoreService() {
196      return null;
197    }
198
199    @Override
200    public ClusterConnection getClusterConnection() {
201      return null;
202    }
203
204    @Override
205    public FileSystem getFileSystem() {
206      return null;
207    }
208
209    @Override
210    public boolean isStopping() {
211      return false;
212    }
213
214    @Override
215    public Connection createConnection(Configuration conf) throws IOException {
216      return null;
217    }
218  }
219}