001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import org.apache.hadoop.conf.Configuration;
022import org.apache.hadoop.conf.Configured;
023import org.apache.hadoop.fs.FileSystem;
024import org.apache.hadoop.fs.Path;
025import org.apache.hadoop.hbase.Abortable;
026import org.apache.hadoop.hbase.ChoreService;
027import org.apache.hadoop.hbase.CoordinatedStateManager;
028import org.apache.hadoop.hbase.HBaseConfiguration;
029import org.apache.hadoop.hbase.HConstants;
030import org.apache.hadoop.hbase.Server;
031import org.apache.hadoop.hbase.ServerName;
032import org.apache.hadoop.hbase.client.ClusterConnection;
033import org.apache.hadoop.hbase.client.Connection;
034import org.apache.hadoop.hbase.util.CommonFSUtils;
035import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
036import org.apache.hadoop.util.Tool;
037import org.apache.hadoop.util.ToolRunner;
038import org.apache.yetus.audience.InterfaceAudience;
039
040/**
041 * In a scenario of Replication based Disaster/Recovery, when hbase Master-Cluster crashes, this
042 * tool is used to sync-up the delta from Master to Slave using the info from ZooKeeper. The tool
043 * will run on Master-Cluser, and assume ZK, Filesystem and NetWork still available after hbase
044 * crashes
045 *
046 * <pre>
047 * hbase org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp
048 * </pre>
049 */
050@InterfaceAudience.Private
051public class ReplicationSyncUp extends Configured implements Tool {
052
053  private static final long SLEEP_TIME = 10000;
054
055  /**
056   * Main program
057   */
058  public static void main(String[] args) throws Exception {
059    int ret = ToolRunner.run(HBaseConfiguration.create(), new ReplicationSyncUp(), args);
060    System.exit(ret);
061  }
062
063  @Override
064  public int run(String[] args) throws Exception {
065    Abortable abortable = new Abortable() {
066      @Override
067      public void abort(String why, Throwable e) {
068      }
069
070      @Override
071      public boolean isAborted() {
072        return false;
073      }
074    };
075    Configuration conf = getConf();
076    try (ZKWatcher zkw =
077      new ZKWatcher(conf, "syncupReplication" + System.currentTimeMillis(), abortable, true)) {
078      Path walRootDir = CommonFSUtils.getWALRootDir(conf);
079      FileSystem fs = CommonFSUtils.getWALFileSystem(conf);
080      Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
081      Path logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME);
082
083      System.out.println("Start Replication Server start");
084      Replication replication = new Replication();
085      replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir, null);
086      ReplicationSourceManager manager = replication.getReplicationManager();
087      manager.init().get();
088      while (manager.activeFailoverTaskCount() > 0) {
089        Thread.sleep(SLEEP_TIME);
090      }
091      while (manager.getOldSources().size() > 0) {
092        Thread.sleep(SLEEP_TIME);
093      }
094      manager.join();
095    } catch (InterruptedException e) {
096      System.err.println("didn't wait long enough:" + e);
097      return -1;
098    }
099    return 0;
100  }
101
102  class DummyServer implements Server {
103    String hostname;
104    ZKWatcher zkw;
105
106    DummyServer(ZKWatcher zkw) {
107      // a unique name in case the first run fails
108      hostname = System.currentTimeMillis() + ".SyncUpTool.replication.org";
109      this.zkw = zkw;
110    }
111
112    DummyServer(String hostname) {
113      this.hostname = hostname;
114    }
115
116    @Override
117    public Configuration getConfiguration() {
118      return getConf();
119    }
120
121    @Override
122    public ZKWatcher getZooKeeper() {
123      return zkw;
124    }
125
126    @Override
127    public CoordinatedStateManager getCoordinatedStateManager() {
128      return null;
129    }
130
131    @Override
132    public ServerName getServerName() {
133      return ServerName.valueOf(hostname, 1234, 1L);
134    }
135
136    @Override
137    public void abort(String why, Throwable e) {
138    }
139
140    @Override
141    public boolean isAborted() {
142      return false;
143    }
144
145    @Override
146    public void stop(String why) {
147    }
148
149    @Override
150    public boolean isStopped() {
151      return false;
152    }
153
154    @Override
155    public ClusterConnection getConnection() {
156      return null;
157    }
158
159    @Override
160    public ChoreService getChoreService() {
161      return null;
162    }
163
164    @Override
165    public ClusterConnection getClusterConnection() {
166      return null;
167    }
168
169    @Override
170    public FileSystem getFileSystem() {
171      return null;
172    }
173
174    @Override
175    public boolean isStopping() {
176      return false;
177    }
178
179    @Override
180    public Connection createConnection(Configuration conf) throws IOException {
181      return null;
182    }
183  }
184}