001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.replication.regionserver;
020
021import java.io.IOException;
022import java.util.List;
023import java.util.UUID;
024import java.util.concurrent.PriorityBlockingQueue;
025
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.fs.FileStatus;
028import org.apache.hadoop.fs.FileSystem;
029import org.apache.hadoop.fs.Path;
030import org.apache.hadoop.hbase.Server;
031import org.apache.hadoop.hbase.ServerName;
032import org.apache.yetus.audience.InterfaceAudience;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
036import org.apache.hadoop.hbase.replication.ReplicationPeers;
037import org.apache.hadoop.hbase.replication.ReplicationQueues;
038import org.apache.hadoop.hbase.util.FSUtils;
039import org.apache.hadoop.hbase.util.Threads;
040import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
041
042/**
043 * Class that handles the recovered source of a replication stream, which is transfered from
044 * another dead region server. This will be closed when all logs are pushed to peer cluster.
045 */
046@InterfaceAudience.Private
047public class RecoveredReplicationSource extends ReplicationSource {
048
049  private static final Logger LOG = LoggerFactory.getLogger(RecoveredReplicationSource.class);
050
051  private String actualPeerId;
052
053  @Override
054  public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
055      ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Server server,
056      String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint,
057      WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
058    super.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerClusterZnode,
059      clusterId, replicationEndpoint, walFileLengthProvider, metrics);
060    this.actualPeerId = this.replicationQueueInfo.getPeerId();
061  }
062
063  @Override
064  protected void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
065    final RecoveredReplicationSourceShipper worker =
066        new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this,
067            this.replicationQueues);
068    ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker);
069    if (extant != null) {
070      LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
071    } else {
072      LOG.debug("Starting up worker for wal group " + walGroupId);
073      worker.startup(getUncaughtExceptionHandler());
074      worker.setWALReader(
075        startNewWALReader(worker.getName(), walGroupId, queue, worker.getStartPosition()));
076      workerThreads.put(walGroupId, worker);
077    }
078  }
079
080  @Override
081  protected ReplicationSourceWALReader startNewWALReader(String threadName,
082      String walGroupId, PriorityBlockingQueue<Path> queue, long startPosition) {
083    ReplicationSourceWALReader walReader = new RecoveredReplicationSourceWALReader(fs,
084        conf, queue, startPosition, walEntryFilter, this);
085    Threads.setDaemonThreadRunning(walReader, threadName
086        + ".replicationSource.replicationWALReaderThread." + walGroupId + "," + peerClusterZnode,
087      getUncaughtExceptionHandler());
088    return walReader;
089  }
090
091  public void locateRecoveredPaths(PriorityBlockingQueue<Path> queue) throws IOException {
092    boolean hasPathChanged = false;
093    PriorityBlockingQueue<Path> newPaths =
094        new PriorityBlockingQueue<Path>(queueSizePerGroup, new LogsComparator());
095    pathsLoop: for (Path path : queue) {
096      if (fs.exists(path)) { // still in same location, don't need to do anything
097        newPaths.add(path);
098        continue;
099      }
100      // Path changed - try to find the right path.
101      hasPathChanged = true;
102      if (server instanceof ReplicationSyncUp.DummyServer) {
103        // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data
104        // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists
105        Path newPath = getReplSyncUpPath(path);
106        newPaths.add(newPath);
107        continue;
108      } else {
109        // See if Path exists in the dead RS folder (there could be a chain of failures
110        // to look at)
111        List<ServerName> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
112        LOG.info("NB dead servers : " + deadRegionServers.size());
113        final Path walDir = FSUtils.getWALRootDir(conf);
114        for (ServerName curDeadServerName : deadRegionServers) {
115          final Path deadRsDirectory =
116              new Path(walDir, AbstractFSWALProvider.getWALDirectoryName(curDeadServerName
117                  .getServerName()));
118          Path[] locs = new Path[] { new Path(deadRsDirectory, path.getName()), new Path(
119              deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT), path.getName()) };
120          for (Path possibleLogLocation : locs) {
121            LOG.info("Possible location " + possibleLogLocation.toUri().toString());
122            if (manager.getFs().exists(possibleLogLocation)) {
123              // We found the right new location
124              LOG.info("Log " + path + " still exists at " + possibleLogLocation);
125              newPaths.add(possibleLogLocation);
126              continue pathsLoop;
127            }
128          }
129        }
130        // didn't find a new location
131        LOG.error(
132          String.format("WAL Path %s doesn't exist and couldn't find its new location", path));
133        newPaths.add(path);
134      }
135    }
136
137    if (hasPathChanged) {
138      if (newPaths.size() != queue.size()) { // this shouldn't happen
139        LOG.error("Recovery queue size is incorrect");
140        throw new IOException("Recovery queue size error");
141      }
142      // put the correct locations in the queue
143      // since this is a recovered queue with no new incoming logs,
144      // there shouldn't be any concurrency issues
145      queue.clear();
146      for (Path path : newPaths) {
147        queue.add(path);
148      }
149    }
150  }
151
152  // N.B. the ReplicationSyncUp tool sets the manager.getWALDir to the root of the wal
153  // area rather than to the wal area for a particular region server.
154  private Path getReplSyncUpPath(Path path) throws IOException {
155    FileStatus[] rss = fs.listStatus(manager.getLogDir());
156    for (FileStatus rs : rss) {
157      Path p = rs.getPath();
158      FileStatus[] logs = fs.listStatus(p);
159      for (FileStatus log : logs) {
160        p = new Path(p, log.getPath().getName());
161        if (p.getName().equals(path.getName())) {
162          LOG.info("Log " + p.getName() + " found at " + p);
163          return p;
164        }
165      }
166    }
167    LOG.error("Didn't find path for: " + path.getName());
168    return path;
169  }
170
171  public void tryFinish() {
172    // use synchronize to make sure one last thread will clean the queue
173    synchronized (workerThreads) {
174      Threads.sleep(100);// wait a short while for other worker thread to fully exit
175      boolean allTasksDone = true;
176      for (ReplicationSourceShipper worker : workerThreads.values()) {
177        if (!worker.isFinished()) {
178          allTasksDone = false;
179          break;
180        }
181      }
182      if (allTasksDone) {
183        manager.closeRecoveredQueue(this);
184        LOG.info("Finished recovering queue " + peerClusterZnode + " with the following stats: "
185            + getStats());
186      }
187    }
188  }
189
190  @Override
191  public String getPeerId() {
192    return this.actualPeerId;
193  }
194
195  @Override
196  public ServerName getServerWALsBelongTo() {
197    return this.replicationQueueInfo.getDeadRegionServers().get(0);
198  }
199}