001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import java.util.concurrent.PriorityBlockingQueue;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.fs.Path;
024import org.apache.hadoop.hbase.replication.ReplicationException;
025import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
026import org.apache.hadoop.hbase.util.Threads;
027import org.apache.yetus.audience.InterfaceAudience;
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030
031/**
032 *  Used by a {@link RecoveredReplicationSource}.
033 */
034@InterfaceAudience.Private
035public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper {
036  private static final Logger LOG =
037      LoggerFactory.getLogger(RecoveredReplicationSourceShipper.class);
038
039  protected final RecoveredReplicationSource source;
040  private final ReplicationQueueStorage replicationQueues;
041
042  public RecoveredReplicationSourceShipper(Configuration conf, String walGroupId,
043      PriorityBlockingQueue<Path> queue, RecoveredReplicationSource source,
044      ReplicationQueueStorage queueStorage) {
045    super(conf, walGroupId, queue, source);
046    this.source = source;
047    this.replicationQueues = queueStorage;
048  }
049
050  @Override
051  protected void noMoreData() {
052    LOG.debug("Finished recovering queue for group {} of peer {}", walGroupId, source.getQueueId());
053    source.getSourceMetrics().incrCompletedRecoveryQueue();
054    setWorkerState(WorkerState.FINISHED);
055  }
056
057  @Override
058  protected void postFinish() {
059    source.tryFinish();
060  }
061
062  @Override
063  public long getStartPosition() {
064    long startPosition = getRecoveredQueueStartPos();
065    int numRetries = 0;
066    while (numRetries <= maxRetriesMultiplier) {
067      try {
068        source.locateRecoveredPaths(queue);
069        break;
070      } catch (IOException e) {
071        LOG.error("Error while locating recovered queue paths, attempt #" + numRetries);
072        numRetries++;
073      }
074    }
075    return startPosition;
076  }
077
078  // If this is a recovered queue, the queue is already full and the first log
079  // normally has a position (unless the RS failed between 2 logs)
080  private long getRecoveredQueueStartPos() {
081    long startPosition = 0;
082    String peerClusterZNode = source.getQueueId();
083    try {
084      startPosition = this.replicationQueues.getWALPosition(source.getServer().getServerName(),
085        peerClusterZNode, this.queue.peek().getName());
086      LOG.trace("Recovered queue started with log {} at position {}", this.queue.peek(),
087        startPosition);
088    } catch (ReplicationException e) {
089      terminate("Couldn't get the position of this recovered queue " + peerClusterZNode, e);
090    }
091    return startPosition;
092  }
093
094  private void terminate(String reason, Exception cause) {
095    if (cause == null) {
096      LOG.info("Closing worker for wal group {} because: {}", this.walGroupId, reason);
097    } else {
098      LOG.error(
099        "Closing worker for wal group " + this.walGroupId + " because an error occurred: " + reason,
100        cause);
101    }
102    entryReader.interrupt();
103    Threads.shutdown(entryReader, sleepForRetries);
104    this.interrupt();
105    Threads.shutdown(this, sleepForRetries);
106    LOG.info("ReplicationSourceWorker {} terminated", this.getName());
107  }
108}