001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.Arrays;
023import java.util.Collections;
024import java.util.HashSet;
025import java.util.Iterator;
026import java.util.LinkedList;
027import java.util.List;
028import java.util.Set;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.conf.Configured;
031import org.apache.hadoop.fs.FSDataOutputStream;
032import org.apache.hadoop.fs.FileStatus;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.Abortable;
036import org.apache.hadoop.hbase.ChoreService;
037import org.apache.hadoop.hbase.CoordinatedStateManager;
038import org.apache.hadoop.hbase.HBaseConfiguration;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.Server;
041import org.apache.hadoop.hbase.ServerName;
042import org.apache.hadoop.hbase.client.AsyncClusterConnection;
043import org.apache.hadoop.hbase.client.Connection;
044import org.apache.hadoop.hbase.keymeta.KeyManagementService;
045import org.apache.hadoop.hbase.master.replication.OfflineTableReplicationQueueStorage;
046import org.apache.hadoop.hbase.replication.ReplicationException;
047import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
048import org.apache.hadoop.hbase.replication.ReplicationQueueId;
049import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
050import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
051import org.apache.hadoop.hbase.util.Bytes;
052import org.apache.hadoop.hbase.util.CommonFSUtils;
053import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
054import org.apache.hadoop.hbase.util.JsonMapper;
055import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
056import org.apache.hadoop.hbase.wal.WALFactory;
057import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
058import org.apache.hadoop.util.Tool;
059import org.apache.hadoop.util.ToolRunner;
060import org.apache.yetus.audience.InterfaceAudience;
061import org.apache.zookeeper.KeeperException;
062
063/**
064 * In a scenario of Replication based Disaster/Recovery, when hbase Master-Cluster crashes, this
065 * tool is used to sync-up the delta from Master to Slave using the info from ZooKeeper. The tool
066 * will run on Master-Cluster, and assume ZK, Filesystem and NetWork still available after hbase
067 * crashes
068 *
069 * <pre>
070 * hbase org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp
071 * </pre>
072 */
073@InterfaceAudience.Private
074public class ReplicationSyncUp extends Configured implements Tool {
075
076  public static class ReplicationSyncUpToolInfo {
077
078    private long startTimeMs;
079
080    public ReplicationSyncUpToolInfo() {
081    }
082
083    public ReplicationSyncUpToolInfo(long startTimeMs) {
084      this.startTimeMs = startTimeMs;
085    }
086
087    public long getStartTimeMs() {
088      return startTimeMs;
089    }
090
091    public void setStartTimeMs(long startTimeMs) {
092      this.startTimeMs = startTimeMs;
093    }
094  }
095
096  // For storing the information used to skip replicating some wals after the cluster is back online
097  public static final String INFO_DIR = "ReplicationSyncUp";
098
099  public static final String INFO_FILE = "info";
100
101  private static final long SLEEP_TIME = 10000;
102
103  /**
104   * Main program
105   */
106  public static void main(String[] args) throws Exception {
107    int ret = ToolRunner.run(HBaseConfiguration.create(), new ReplicationSyncUp(), args);
108    System.exit(ret);
109  }
110
111  // Find region servers under wal directory
112  // Here we only care about the region servers which may still be alive, as we need to add
113  // replications for them if missing. The dead region servers which have already been processed
114  // fully do not need to add their replication queues again, as the operation has already been done
115  // in SCP.
116  private Set<ServerName> listRegionServers(FileSystem walFs, Path walDir) throws IOException {
117    FileStatus[] statuses;
118    try {
119      statuses = walFs.listStatus(walDir);
120    } catch (FileNotFoundException e) {
121      System.out.println("WAL directory " + walDir + " does not exists, ignore");
122      return Collections.emptySet();
123    }
124    Set<ServerName> regionServers = new HashSet<>();
125    for (FileStatus status : statuses) {
126      // All wal files under the walDir is within its region server's directory
127      if (!status.isDirectory()) {
128        continue;
129      }
130      ServerName sn = AbstractFSWALProvider.getServerNameFromWALDirectoryName(status.getPath());
131      if (sn != null) {
132        regionServers.add(sn);
133      }
134    }
135    return regionServers;
136  }
137
138  private void addMissingReplicationQueues(ReplicationQueueStorage storage, ServerName regionServer,
139    Set<String> peerIds) throws ReplicationException {
140    Set<String> existingQueuePeerIds = new HashSet<>();
141    List<ReplicationQueueId> queueIds = storage.listAllQueueIds(regionServer);
142    for (Iterator<ReplicationQueueId> iter = queueIds.iterator(); iter.hasNext();) {
143      ReplicationQueueId queueId = iter.next();
144      if (!queueId.isRecovered()) {
145        existingQueuePeerIds.add(queueId.getPeerId());
146      }
147    }
148
149    for (String peerId : peerIds) {
150      if (!existingQueuePeerIds.contains(peerId)) {
151        ReplicationQueueId queueId = new ReplicationQueueId(regionServer, peerId);
152        System.out.println("Add replication queue " + queueId + " for claiming");
153        storage.setOffset(queueId, regionServer.toString(), ReplicationGroupOffset.BEGIN,
154          Collections.emptyMap());
155      }
156    }
157  }
158
159  private void addMissingReplicationQueues(ReplicationQueueStorage storage,
160    Set<ServerName> regionServers, Set<String> peerIds) throws ReplicationException {
161    for (ServerName regionServer : regionServers) {
162      addMissingReplicationQueues(storage, regionServer, peerIds);
163    }
164  }
165
166  // When using this tool, usually the source cluster is unhealthy, so we should try to claim the
167  // replication queues for the dead region servers first and then replicate the data out.
168  private void claimReplicationQueues(ReplicationSourceManager mgr, Set<ServerName> regionServers)
169    throws ReplicationException, KeeperException, IOException {
170    // union the region servers from both places, i.e, from the wal directory, and the records in
171    // replication queue storage.
172    Set<ServerName> replicators = new HashSet<>(regionServers);
173    ReplicationQueueStorage queueStorage = mgr.getQueueStorage();
174    replicators.addAll(queueStorage.listAllReplicators());
175    FileSystem fs = CommonFSUtils.getCurrentFileSystem(getConf());
176    Path infoDir = new Path(CommonFSUtils.getRootDir(getConf()), INFO_DIR);
177    for (ServerName sn : replicators) {
178      List<ReplicationQueueId> replicationQueues = queueStorage.listAllQueueIds(sn);
179      System.out.println(sn + " is dead, claim its replication queues: " + replicationQueues);
180      // record the rs name, so when master restarting, we will skip claiming its replication queue
181      fs.createNewFile(new Path(infoDir, sn.getServerName()));
182      for (ReplicationQueueId queueId : replicationQueues) {
183        mgr.claimQueue(queueId, true);
184      }
185    }
186  }
187
188  private void writeInfoFile(FileSystem fs, boolean isForce) throws IOException {
189    // Record the info of this run. Currently only record the time we run the job. We will use this
190    // timestamp to clean up the data for last sequence ids and hfile refs in replication queue
191    // storage. See ReplicationQueueStorage.removeLastSequenceIdsAndHFileRefsBefore.
192    ReplicationSyncUpToolInfo info =
193      new ReplicationSyncUpToolInfo(EnvironmentEdgeManager.currentTime());
194    String json = JsonMapper.writeObjectAsString(info);
195    Path infoDir = new Path(CommonFSUtils.getRootDir(getConf()), INFO_DIR);
196    try (FSDataOutputStream out = fs.create(new Path(infoDir, INFO_FILE), isForce)) {
197      out.write(Bytes.toBytes(json));
198    }
199  }
200
201  private static boolean parseOpts(String args[]) {
202    LinkedList<String> argv = new LinkedList<>();
203    argv.addAll(Arrays.asList(args));
204    String cmd = null;
205    while ((cmd = argv.poll()) != null) {
206      if (cmd.equals("-h") || cmd.equals("--h") || cmd.equals("--help")) {
207        printUsageAndExit(null, 0);
208      }
209      if (cmd.equals("-f")) {
210        return true;
211      }
212      if (!argv.isEmpty()) {
213        printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1);
214      }
215    }
216    return false;
217  }
218
219  private static void printUsageAndExit(final String message, final int exitCode) {
220    printUsage(message);
221    System.exit(exitCode);
222  }
223
224  private static void printUsage(final String message) {
225    if (message != null && message.length() > 0) {
226      System.err.println(message);
227    }
228    System.err.println("Usage: hbase " + ReplicationSyncUp.class.getName() + " \\");
229    System.err.println("  <OPTIONS> [-D<property=value>]*");
230    System.err.println();
231    System.err.println("General Options:");
232    System.err.println(" -h|--h|--help  Show this help and exit.");
233    System.err
234      .println(" -f Start a new ReplicationSyncUp after the previous ReplicationSyncUp failed. "
235        + "See HBASE-27623 for details.");
236  }
237
238  @Override
239  public int run(String[] args) throws Exception {
240    Abortable abortable = new Abortable() {
241
242      private volatile boolean abort = false;
243
244      @Override
245      public void abort(String why, Throwable e) {
246        if (isAborted()) {
247          return;
248        }
249        abort = true;
250        System.err.println("Aborting because of " + why);
251        e.printStackTrace();
252        System.exit(1);
253      }
254
255      @Override
256      public boolean isAborted() {
257        return abort;
258      }
259    };
260    boolean isForce = parseOpts(args);
261    Configuration conf = getConf();
262    try (ZKWatcher zkw = new ZKWatcher(conf,
263      "syncupReplication" + EnvironmentEdgeManager.currentTime(), abortable, true)) {
264      Path walRootDir = CommonFSUtils.getWALRootDir(conf);
265      FileSystem fs = CommonFSUtils.getWALFileSystem(conf);
266      Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
267      Path logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME);
268
269      System.out.println("Start Replication Server");
270      writeInfoFile(fs, isForce);
271      Replication replication = new Replication();
272      // use offline table replication queue storage
273      getConf().setClass(ReplicationStorageFactory.REPLICATION_QUEUE_IMPL,
274        OfflineTableReplicationQueueStorage.class, ReplicationQueueStorage.class);
275      DummyServer server = new DummyServer(getConf(), zkw);
276      replication
277        .initialize(server, fs, new Path(logDir, server.toString()), oldLogDir,
278          new WALFactory(conf,
279            ServerName.valueOf(
280              getClass().getSimpleName() + ",16010," + EnvironmentEdgeManager.currentTime()),
281            null));
282      ReplicationSourceManager manager = replication.getReplicationManager();
283      manager.init();
284      Set<ServerName> regionServers = listRegionServers(fs, logDir);
285      addMissingReplicationQueues(manager.getQueueStorage(), regionServers,
286        manager.getReplicationPeers().getAllPeerIds());
287      claimReplicationQueues(manager, regionServers);
288      while (manager.activeFailoverTaskCount() > 0) {
289        Thread.sleep(SLEEP_TIME);
290      }
291      while (manager.getOldSources().size() > 0) {
292        Thread.sleep(SLEEP_TIME);
293      }
294      manager.join();
295    } catch (InterruptedException e) {
296      System.err.println("didn't wait long enough:" + e);
297      return -1;
298    }
299    return 0;
300  }
301
302  private static final class DummyServer implements Server {
303    private final Configuration conf;
304    private final String hostname;
305    private final ZKWatcher zkw;
306    private volatile boolean abort = false;
307
308    DummyServer(Configuration conf, ZKWatcher zkw) {
309      // a unique name in case the first run fails
310      hostname = EnvironmentEdgeManager.currentTime() + ".SyncUpTool.replication.org";
311      this.conf = conf;
312      this.zkw = zkw;
313    }
314
315    @Override
316    public Configuration getConfiguration() {
317      return conf;
318    }
319
320    @Override
321    public ZKWatcher getZooKeeper() {
322      return zkw;
323    }
324
325    @Override
326    public CoordinatedStateManager getCoordinatedStateManager() {
327      return null;
328    }
329
330    @Override
331    public ServerName getServerName() {
332      return ServerName.valueOf(hostname, 1234, 1L);
333    }
334
335    @Override
336    public void abort(String why, Throwable e) {
337      if (isAborted()) {
338        return;
339      }
340      abort = true;
341      System.err.println("Aborting because of " + why);
342      e.printStackTrace();
343      System.exit(1);
344    }
345
346    @Override
347    public boolean isAborted() {
348      return abort;
349    }
350
351    @Override
352    public void stop(String why) {
353    }
354
355    @Override
356    public boolean isStopped() {
357      return false;
358    }
359
360    @Override
361    public Connection getConnection() {
362      return null;
363    }
364
365    @Override
366    public ChoreService getChoreService() {
367      return null;
368    }
369
370    @Override
371    public FileSystem getFileSystem() {
372      return null;
373    }
374
375    @Override
376    public boolean isStopping() {
377      return false;
378    }
379
380    @Override
381    public Connection createConnection(Configuration conf) throws IOException {
382      return null;
383    }
384
385    @Override
386    public AsyncClusterConnection getAsyncClusterConnection() {
387      return null;
388    }
389
390    @Override
391    public KeyManagementService getKeyManagementService() {
392      return null;
393    }
394  }
395}