001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.FileNotFoundException;
021import java.io.IOException;
022import java.util.Arrays;
023import java.util.Collections;
024import java.util.HashSet;
025import java.util.Iterator;
026import java.util.LinkedList;
027import java.util.List;
028import java.util.Set;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.conf.Configured;
031import org.apache.hadoop.fs.FSDataOutputStream;
032import org.apache.hadoop.fs.FileStatus;
033import org.apache.hadoop.fs.FileSystem;
034import org.apache.hadoop.fs.Path;
035import org.apache.hadoop.hbase.Abortable;
036import org.apache.hadoop.hbase.ChoreService;
037import org.apache.hadoop.hbase.CoordinatedStateManager;
038import org.apache.hadoop.hbase.HBaseConfiguration;
039import org.apache.hadoop.hbase.HConstants;
040import org.apache.hadoop.hbase.Server;
041import org.apache.hadoop.hbase.ServerName;
042import org.apache.hadoop.hbase.client.AsyncClusterConnection;
043import org.apache.hadoop.hbase.client.Connection;
044import org.apache.hadoop.hbase.master.replication.OfflineTableReplicationQueueStorage;
045import org.apache.hadoop.hbase.replication.ReplicationException;
046import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
047import org.apache.hadoop.hbase.replication.ReplicationQueueId;
048import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
049import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.apache.hadoop.hbase.util.CommonFSUtils;
052import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
053import org.apache.hadoop.hbase.util.JsonMapper;
054import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
055import org.apache.hadoop.hbase.wal.WALFactory;
056import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
057import org.apache.hadoop.util.Tool;
058import org.apache.hadoop.util.ToolRunner;
059import org.apache.yetus.audience.InterfaceAudience;
060import org.apache.zookeeper.KeeperException;
061
062/**
063 * In a scenario of Replication based Disaster/Recovery, when hbase Master-Cluster crashes, this
064 * tool is used to sync-up the delta from Master to Slave using the info from ZooKeeper. The tool
065 * will run on Master-Cluster, and assume ZK, Filesystem and NetWork still available after hbase
066 * crashes
067 *
068 * <pre>
069 * hbase org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp
070 * </pre>
071 */
072@InterfaceAudience.Private
073public class ReplicationSyncUp extends Configured implements Tool {
074
075  public static class ReplicationSyncUpToolInfo {
076
077    private long startTimeMs;
078
079    public ReplicationSyncUpToolInfo() {
080    }
081
082    public ReplicationSyncUpToolInfo(long startTimeMs) {
083      this.startTimeMs = startTimeMs;
084    }
085
086    public long getStartTimeMs() {
087      return startTimeMs;
088    }
089
090    public void setStartTimeMs(long startTimeMs) {
091      this.startTimeMs = startTimeMs;
092    }
093  }
094
095  // For storing the information used to skip replicating some wals after the cluster is back online
096  public static final String INFO_DIR = "ReplicationSyncUp";
097
098  public static final String INFO_FILE = "info";
099
100  private static final long SLEEP_TIME = 10000;
101
102  /**
103   * Main program
104   */
105  public static void main(String[] args) throws Exception {
106    int ret = ToolRunner.run(HBaseConfiguration.create(), new ReplicationSyncUp(), args);
107    System.exit(ret);
108  }
109
110  // Find region servers under wal directory
111  // Here we only care about the region servers which may still be alive, as we need to add
112  // replications for them if missing. The dead region servers which have already been processed
113  // fully do not need to add their replication queues again, as the operation has already been done
114  // in SCP.
115  private Set<ServerName> listRegionServers(FileSystem walFs, Path walDir) throws IOException {
116    FileStatus[] statuses;
117    try {
118      statuses = walFs.listStatus(walDir);
119    } catch (FileNotFoundException e) {
120      System.out.println("WAL directory " + walDir + " does not exists, ignore");
121      return Collections.emptySet();
122    }
123    Set<ServerName> regionServers = new HashSet<>();
124    for (FileStatus status : statuses) {
125      // All wal files under the walDir is within its region server's directory
126      if (!status.isDirectory()) {
127        continue;
128      }
129      ServerName sn = AbstractFSWALProvider.getServerNameFromWALDirectoryName(status.getPath());
130      if (sn != null) {
131        regionServers.add(sn);
132      }
133    }
134    return regionServers;
135  }
136
137  private void addMissingReplicationQueues(ReplicationQueueStorage storage, ServerName regionServer,
138    Set<String> peerIds) throws ReplicationException {
139    Set<String> existingQueuePeerIds = new HashSet<>();
140    List<ReplicationQueueId> queueIds = storage.listAllQueueIds(regionServer);
141    for (Iterator<ReplicationQueueId> iter = queueIds.iterator(); iter.hasNext();) {
142      ReplicationQueueId queueId = iter.next();
143      if (!queueId.isRecovered()) {
144        existingQueuePeerIds.add(queueId.getPeerId());
145      }
146    }
147
148    for (String peerId : peerIds) {
149      if (!existingQueuePeerIds.contains(peerId)) {
150        ReplicationQueueId queueId = new ReplicationQueueId(regionServer, peerId);
151        System.out.println("Add replication queue " + queueId + " for claiming");
152        storage.setOffset(queueId, regionServer.toString(), ReplicationGroupOffset.BEGIN,
153          Collections.emptyMap());
154      }
155    }
156  }
157
158  private void addMissingReplicationQueues(ReplicationQueueStorage storage,
159    Set<ServerName> regionServers, Set<String> peerIds) throws ReplicationException {
160    for (ServerName regionServer : regionServers) {
161      addMissingReplicationQueues(storage, regionServer, peerIds);
162    }
163  }
164
165  // When using this tool, usually the source cluster is unhealthy, so we should try to claim the
166  // replication queues for the dead region servers first and then replicate the data out.
167  private void claimReplicationQueues(ReplicationSourceManager mgr, Set<ServerName> regionServers)
168    throws ReplicationException, KeeperException, IOException {
169    // union the region servers from both places, i.e, from the wal directory, and the records in
170    // replication queue storage.
171    Set<ServerName> replicators = new HashSet<>(regionServers);
172    ReplicationQueueStorage queueStorage = mgr.getQueueStorage();
173    replicators.addAll(queueStorage.listAllReplicators());
174    FileSystem fs = CommonFSUtils.getCurrentFileSystem(getConf());
175    Path infoDir = new Path(CommonFSUtils.getRootDir(getConf()), INFO_DIR);
176    for (ServerName sn : replicators) {
177      List<ReplicationQueueId> replicationQueues = queueStorage.listAllQueueIds(sn);
178      System.out.println(sn + " is dead, claim its replication queues: " + replicationQueues);
179      // record the rs name, so when master restarting, we will skip claiming its replication queue
180      fs.createNewFile(new Path(infoDir, sn.getServerName()));
181      for (ReplicationQueueId queueId : replicationQueues) {
182        mgr.claimQueue(queueId, true);
183      }
184    }
185  }
186
187  private void writeInfoFile(FileSystem fs, boolean isForce) throws IOException {
188    // Record the info of this run. Currently only record the time we run the job. We will use this
189    // timestamp to clean up the data for last sequence ids and hfile refs in replication queue
190    // storage. See ReplicationQueueStorage.removeLastSequenceIdsAndHFileRefsBefore.
191    ReplicationSyncUpToolInfo info =
192      new ReplicationSyncUpToolInfo(EnvironmentEdgeManager.currentTime());
193    String json = JsonMapper.writeObjectAsString(info);
194    Path infoDir = new Path(CommonFSUtils.getRootDir(getConf()), INFO_DIR);
195    try (FSDataOutputStream out = fs.create(new Path(infoDir, INFO_FILE), isForce)) {
196      out.write(Bytes.toBytes(json));
197    }
198  }
199
200  private static boolean parseOpts(String args[]) {
201    LinkedList<String> argv = new LinkedList<>();
202    argv.addAll(Arrays.asList(args));
203    String cmd = null;
204    while ((cmd = argv.poll()) != null) {
205      if (cmd.equals("-h") || cmd.equals("--h") || cmd.equals("--help")) {
206        printUsageAndExit(null, 0);
207      }
208      if (cmd.equals("-f")) {
209        return true;
210      }
211      if (!argv.isEmpty()) {
212        printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1);
213      }
214    }
215    return false;
216  }
217
218  private static void printUsageAndExit(final String message, final int exitCode) {
219    printUsage(message);
220    System.exit(exitCode);
221  }
222
223  private static void printUsage(final String message) {
224    if (message != null && message.length() > 0) {
225      System.err.println(message);
226    }
227    System.err.println("Usage: hbase " + ReplicationSyncUp.class.getName() + " \\");
228    System.err.println("  <OPTIONS> [-D<property=value>]*");
229    System.err.println();
230    System.err.println("General Options:");
231    System.err.println(" -h|--h|--help  Show this help and exit.");
232    System.err
233      .println(" -f Start a new ReplicationSyncUp after the previous ReplicationSyncUp failed. "
234        + "See HBASE-27623 for details.");
235  }
236
237  @Override
238  public int run(String[] args) throws Exception {
239    Abortable abortable = new Abortable() {
240
241      private volatile boolean abort = false;
242
243      @Override
244      public void abort(String why, Throwable e) {
245        if (isAborted()) {
246          return;
247        }
248        abort = true;
249        System.err.println("Aborting because of " + why);
250        e.printStackTrace();
251        System.exit(1);
252      }
253
254      @Override
255      public boolean isAborted() {
256        return abort;
257      }
258    };
259    boolean isForce = parseOpts(args);
260    Configuration conf = getConf();
261    try (ZKWatcher zkw = new ZKWatcher(conf,
262      "syncupReplication" + EnvironmentEdgeManager.currentTime(), abortable, true)) {
263      Path walRootDir = CommonFSUtils.getWALRootDir(conf);
264      FileSystem fs = CommonFSUtils.getWALFileSystem(conf);
265      Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME);
266      Path logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME);
267
268      System.out.println("Start Replication Server");
269      writeInfoFile(fs, isForce);
270      Replication replication = new Replication();
271      // use offline table replication queue storage
272      getConf().setClass(ReplicationStorageFactory.REPLICATION_QUEUE_IMPL,
273        OfflineTableReplicationQueueStorage.class, ReplicationQueueStorage.class);
274      DummyServer server = new DummyServer(getConf(), zkw);
275      replication
276        .initialize(server, fs, new Path(logDir, server.toString()), oldLogDir,
277          new WALFactory(conf,
278            ServerName.valueOf(
279              getClass().getSimpleName() + ",16010," + EnvironmentEdgeManager.currentTime()),
280            null));
281      ReplicationSourceManager manager = replication.getReplicationManager();
282      manager.init();
283      Set<ServerName> regionServers = listRegionServers(fs, logDir);
284      addMissingReplicationQueues(manager.getQueueStorage(), regionServers,
285        manager.getReplicationPeers().getAllPeerIds());
286      claimReplicationQueues(manager, regionServers);
287      while (manager.activeFailoverTaskCount() > 0) {
288        Thread.sleep(SLEEP_TIME);
289      }
290      while (manager.getOldSources().size() > 0) {
291        Thread.sleep(SLEEP_TIME);
292      }
293      manager.join();
294    } catch (InterruptedException e) {
295      System.err.println("didn't wait long enough:" + e);
296      return -1;
297    }
298    return 0;
299  }
300
301  private static final class DummyServer implements Server {
302    private final Configuration conf;
303    private final String hostname;
304    private final ZKWatcher zkw;
305    private volatile boolean abort = false;
306
307    DummyServer(Configuration conf, ZKWatcher zkw) {
308      // a unique name in case the first run fails
309      hostname = EnvironmentEdgeManager.currentTime() + ".SyncUpTool.replication.org";
310      this.conf = conf;
311      this.zkw = zkw;
312    }
313
314    @Override
315    public Configuration getConfiguration() {
316      return conf;
317    }
318
319    @Override
320    public ZKWatcher getZooKeeper() {
321      return zkw;
322    }
323
324    @Override
325    public CoordinatedStateManager getCoordinatedStateManager() {
326      return null;
327    }
328
329    @Override
330    public ServerName getServerName() {
331      return ServerName.valueOf(hostname, 1234, 1L);
332    }
333
334    @Override
335    public void abort(String why, Throwable e) {
336      if (isAborted()) {
337        return;
338      }
339      abort = true;
340      System.err.println("Aborting because of " + why);
341      e.printStackTrace();
342      System.exit(1);
343    }
344
345    @Override
346    public boolean isAborted() {
347      return abort;
348    }
349
350    @Override
351    public void stop(String why) {
352    }
353
354    @Override
355    public boolean isStopped() {
356      return false;
357    }
358
359    @Override
360    public Connection getConnection() {
361      return null;
362    }
363
364    @Override
365    public ChoreService getChoreService() {
366      return null;
367    }
368
369    @Override
370    public FileSystem getFileSystem() {
371      return null;
372    }
373
374    @Override
375    public boolean isStopping() {
376      return false;
377    }
378
379    @Override
380    public Connection createConnection(Configuration conf) throws IOException {
381      return null;
382    }
383
384    @Override
385    public AsyncClusterConnection getAsyncClusterConnection() {
386      return null;
387    }
388  }
389}