001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import java.util.List;
022import java.util.UUID;
023import java.util.concurrent.PriorityBlockingQueue;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.fs.FileStatus;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.Server;
029import org.apache.hadoop.hbase.ServerName;
030import org.apache.hadoop.hbase.replication.ReplicationPeer;
031import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
032import org.apache.hadoop.hbase.util.FSUtils;
033import org.apache.hadoop.hbase.util.Threads;
034import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
035import org.apache.yetus.audience.InterfaceAudience;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039/**
040 * Class that handles the recovered source of a replication stream, which is transfered from
041 * another dead region server. This will be closed when all logs are pushed to peer cluster.
042 */
043@InterfaceAudience.Private
044public class RecoveredReplicationSource extends ReplicationSource {
045
046  private static final Logger LOG = LoggerFactory.getLogger(RecoveredReplicationSource.class);
047
048  private String actualPeerId;
049
050  @Override
051  public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
052      ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
053      String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
054      MetricsSource metrics) throws IOException {
055    super.init(conf, fs, manager, queueStorage, replicationPeer, server, peerClusterZnode,
056      clusterId, walFileLengthProvider, metrics);
057    this.actualPeerId = this.replicationQueueInfo.getPeerId();
058  }
059
060  @Override
061  protected RecoveredReplicationSourceShipper createNewShipper(String walGroupId,
062      PriorityBlockingQueue<Path> queue) {
063    return new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this, queueStorage);
064  }
065
066  public void locateRecoveredPaths(PriorityBlockingQueue<Path> queue) throws IOException {
067    boolean hasPathChanged = false;
068    PriorityBlockingQueue<Path> newPaths =
069        new PriorityBlockingQueue<Path>(queueSizePerGroup, new LogsComparator());
070    pathsLoop: for (Path path : queue) {
071      if (fs.exists(path)) { // still in same location, don't need to do anything
072        newPaths.add(path);
073        continue;
074      }
075      // Path changed - try to find the right path.
076      hasPathChanged = true;
077      if (server instanceof ReplicationSyncUp.DummyServer) {
078        // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
079        // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
080        Path newPath = getReplSyncUpPath(path);
081        newPaths.add(newPath);
082        continue;
083      } else {
084        // See if Path exists in the dead RS folder (there could be a chain of failures
085        // to look at)
086        List<ServerName> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
087        LOG.info("NB dead servers : " + deadRegionServers.size());
088        final Path walDir = FSUtils.getWALRootDir(conf);
089        for (ServerName curDeadServerName : deadRegionServers) {
090          final Path deadRsDirectory =
091              new Path(walDir, AbstractFSWALProvider.getWALDirectoryName(curDeadServerName
092                  .getServerName()));
093          Path[] locs = new Path[] { new Path(deadRsDirectory, path.getName()), new Path(
094              deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT), path.getName()) };
095          for (Path possibleLogLocation : locs) {
096            LOG.info("Possible location " + possibleLogLocation.toUri().toString());
097            if (manager.getFs().exists(possibleLogLocation)) {
098              // We found the right new location
099              LOG.info("Log " + path + " still exists at " + possibleLogLocation);
100              newPaths.add(possibleLogLocation);
101              continue pathsLoop;
102            }
103          }
104        }
105        // didn't find a new location
106        LOG.error(
107          String.format("WAL Path %s doesn't exist and couldn't find its new location", path));
108        newPaths.add(path);
109      }
110    }
111
112    if (hasPathChanged) {
113      if (newPaths.size() != queue.size()) { // this shouldn't happen
114        LOG.error("Recovery queue size is incorrect");
115        throw new IOException("Recovery queue size error");
116      }
117      // put the correct locations in the queue
118      // since this is a recovered queue with no new incoming logs,
119      // there shouldn't be any concurrency issues
120      queue.clear();
121      for (Path path : newPaths) {
122        queue.add(path);
123      }
124    }
125  }
126
127  // N.B. the ReplicationSyncUp tool sets the manager.getWALDir to the root of the wal
128  // area rather than to the wal area for a particular region server.
129  private Path getReplSyncUpPath(Path path) throws IOException {
130    FileStatus[] rss = fs.listStatus(manager.getLogDir());
131    for (FileStatus rs : rss) {
132      Path p = rs.getPath();
133      FileStatus[] logs = fs.listStatus(p);
134      for (FileStatus log : logs) {
135        p = new Path(p, log.getPath().getName());
136        if (p.getName().equals(path.getName())) {
137          LOG.info("Log " + p.getName() + " found at " + p);
138          return p;
139        }
140      }
141    }
142    LOG.error("Didn't find path for: " + path.getName());
143    return path;
144  }
145
146  void tryFinish() {
147    // use synchronize to make sure one last thread will clean the queue
148    synchronized (workerThreads) {
149      Threads.sleep(100);// wait a short while for other worker thread to fully exit
150      boolean allTasksDone = workerThreads.values().stream().allMatch(w -> w.isFinished());
151      if (allTasksDone) {
152        this.getSourceMetrics().clear();
153        manager.removeRecoveredSource(this);
154        LOG.info("Finished recovering queue {} with the following stats: {}", queueId, getStats());
155      }
156    }
157  }
158
159  @Override
160  public String getPeerId() {
161    return this.actualPeerId;
162  }
163
164  @Override
165  public ServerName getServerWALsBelongTo() {
166    return this.replicationQueueInfo.getDeadRegionServers().get(0);
167  }
168
169  @Override
170  public boolean isRecovered() {
171    return true;
172  }
173}