001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import org.apache.hadoop.conf.Configuration;
022import org.apache.hadoop.hbase.replication.ReplicationException;
023import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
024import org.apache.hadoop.hbase.util.Threads;
025import org.apache.yetus.audience.InterfaceAudience;
026import org.slf4j.Logger;
027import org.slf4j.LoggerFactory;
028
029/**
030 * Used by a {@link RecoveredReplicationSource}.
031 */
032@InterfaceAudience.Private
033public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper {
034  private static final Logger LOG =
035    LoggerFactory.getLogger(RecoveredReplicationSourceShipper.class);
036
037  protected final RecoveredReplicationSource source;
038  private final ReplicationQueueStorage replicationQueues;
039
040  public RecoveredReplicationSourceShipper(Configuration conf, String walGroupId,
041    ReplicationSourceLogQueue logQueue, RecoveredReplicationSource source,
042    ReplicationQueueStorage queueStorage) {
043    super(conf, walGroupId, logQueue, source);
044    this.source = source;
045    this.replicationQueues = queueStorage;
046  }
047
048  @Override
049  protected void noMoreData() {
050    LOG.debug("Finished recovering queue for group {} of peer {}", walGroupId, source.getQueueId());
051    source.getSourceMetrics().incrCompletedRecoveryQueue();
052    setWorkerState(WorkerState.FINISHED);
053  }
054
055  @Override
056  protected void postFinish() {
057    source.tryFinish();
058  }
059
060  @Override
061  public long getStartPosition() {
062    long startPosition = getRecoveredQueueStartPos();
063    int numRetries = 0;
064    while (numRetries <= maxRetriesMultiplier) {
065      try {
066        source.locateRecoveredPaths(walGroupId);
067        break;
068      } catch (IOException e) {
069        LOG.error("Error while locating recovered queue paths, attempt #" + numRetries, e);
070        numRetries++;
071      }
072    }
073    return startPosition;
074  }
075
076  // If this is a recovered queue, the queue is already full and the first log
077  // normally has a position (unless the RS failed between 2 logs)
078  private long getRecoveredQueueStartPos() {
079    long startPosition = 0;
080    String peerClusterZNode = source.getQueueId();
081    try {
082      startPosition = this.replicationQueues.getWALPosition(source.getServer().getServerName(),
083        peerClusterZNode, this.logQueue.getQueue(walGroupId).peek().getName());
084      LOG.trace("Recovered queue started with log {} at position {}",
085        this.logQueue.getQueue(walGroupId).peek(), startPosition);
086    } catch (ReplicationException e) {
087      terminate("Couldn't get the position of this recovered queue " + peerClusterZNode, e);
088    }
089    return startPosition;
090  }
091
092  private void terminate(String reason, Exception cause) {
093    if (cause == null) {
094      LOG.info("Closing worker for wal group {} because: {}", this.walGroupId, reason);
095    } else {
096      LOG.error(
097        "Closing worker for wal group " + this.walGroupId + " because an error occurred: " + reason,
098        cause);
099    }
100    entryReader.interrupt();
101    Threads.shutdown(entryReader, sleepForRetries);
102    this.interrupt();
103    Threads.shutdown(this, sleepForRetries);
104    LOG.info("ReplicationSourceWorker {} terminated", this.getName());
105  }
106}