001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import java.util.List;
022import java.util.UUID;
023import java.util.concurrent.PriorityBlockingQueue;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.fs.FileStatus;
026import org.apache.hadoop.fs.FileSystem;
027import org.apache.hadoop.fs.Path;
028import org.apache.hadoop.hbase.Server;
029import org.apache.hadoop.hbase.ServerName;
030import org.apache.hadoop.hbase.replication.ReplicationPeer;
031import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
032import org.apache.hadoop.hbase.util.CommonFSUtils;
033import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
034import org.apache.yetus.audience.InterfaceAudience;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038/**
039 * Class that handles the recovered source of a replication stream, which is transfered from another
040 * dead region server. This will be closed when all logs are pushed to peer cluster.
041 */
042@InterfaceAudience.Private
043public class RecoveredReplicationSource extends ReplicationSource {
044
045  private static final Logger LOG = LoggerFactory.getLogger(RecoveredReplicationSource.class);
046
047  private String actualPeerId;
048
049  @Override
050  public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
051    ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server,
052    String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider,
053    MetricsSource metrics) throws IOException {
054    super.init(conf, fs, manager, queueStorage, replicationPeer, server, peerClusterZnode,
055      clusterId, walFileLengthProvider, metrics);
056    this.actualPeerId = this.replicationQueueInfo.getPeerId();
057  }
058
059  @Override
060  protected RecoveredReplicationSourceShipper createNewShipper(String walGroupId) {
061    return new RecoveredReplicationSourceShipper(conf, walGroupId, logQueue, this, queueStorage);
062  }
063
064  public void locateRecoveredPaths(String walGroupId) throws IOException {
065    boolean hasPathChanged = false;
066    PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
067    PriorityBlockingQueue<Path> newPaths = new PriorityBlockingQueue<Path>(queueSizePerGroup,
068      new AbstractFSWALProvider.WALStartTimeComparator());
069    pathsLoop: for (Path path : queue) {
070      if (fs.exists(path)) { // still in same location, don't need to do anything
071        newPaths.add(path);
072        continue;
073      }
074      // Path changed - try to find the right path.
075      hasPathChanged = true;
076      if (server instanceof ReplicationSyncUp.DummyServer) {
077        // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
078        // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
079        Path newPath = getReplSyncUpPath(path);
080        newPaths.add(newPath);
081        continue;
082      } else {
083        // See if Path exists in the dead RS folder (there could be a chain of failures
084        // to look at)
085        List<ServerName> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
086        LOG.info("NB dead servers : " + deadRegionServers.size());
087        final Path walDir = CommonFSUtils.getWALRootDir(conf);
088        for (ServerName curDeadServerName : deadRegionServers) {
089          final Path deadRsDirectory = new Path(walDir,
090            AbstractFSWALProvider.getWALDirectoryName(curDeadServerName.getServerName()));
091          Path[] locs = new Path[] { new Path(deadRsDirectory, path.getName()),
092            new Path(deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT), path.getName()) };
093          for (Path possibleLogLocation : locs) {
094            LOG.info("Possible location " + possibleLogLocation.toUri().toString());
095            if (manager.getFs().exists(possibleLogLocation)) {
096              // We found the right new location
097              LOG.info("Log " + path + " still exists at " + possibleLogLocation);
098              newPaths.add(possibleLogLocation);
099              continue pathsLoop;
100            }
101          }
102        }
103        // didn't find a new location
104        LOG.error(
105          String.format("WAL Path %s doesn't exist and couldn't find its new location", path));
106        newPaths.add(path);
107      }
108    }
109
110    if (hasPathChanged) {
111      if (newPaths.size() != queue.size()) { // this shouldn't happen
112        LOG.error("Recovery queue size is incorrect");
113        throw new IOException("Recovery queue size error");
114      }
115      // put the correct locations in the queue
116      // since this is a recovered queue with no new incoming logs,
117      // there shouldn't be any concurrency issues
118      logQueue.clear(walGroupId);
119      for (Path path : newPaths) {
120        logQueue.enqueueLog(path, walGroupId);
121      }
122    }
123  }
124
125  // N.B. the ReplicationSyncUp tool sets the manager.getWALDir to the root of the wal
126  // area rather than to the wal area for a particular region server.
127  private Path getReplSyncUpPath(Path path) throws IOException {
128    FileStatus[] rss = fs.listStatus(manager.getLogDir());
129    for (FileStatus rs : rss) {
130      Path p = rs.getPath();
131      FileStatus[] logs = fs.listStatus(p);
132      for (FileStatus log : logs) {
133        p = new Path(p, log.getPath().getName());
134        if (p.getName().equals(path.getName())) {
135          LOG.info("Log " + p.getName() + " found at " + p);
136          return p;
137        }
138      }
139    }
140    LOG.error("Didn't find path for: " + path.getName());
141    return path;
142  }
143
144  void tryFinish() {
145    if (workerThreads.isEmpty()) {
146      this.getSourceMetrics().clear();
147      manager.finishRecoveredSource(this);
148    }
149  }
150
151  @Override
152  public String getPeerId() {
153    return this.actualPeerId;
154  }
155
156  @Override
157  public ServerName getServerWALsBelongTo() {
158    return this.replicationQueueInfo.getDeadRegionServers().get(0);
159  }
160
161  @Override
162  public boolean isRecovered() {
163    return true;
164  }
165}