001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import org.apache.hadoop.conf.Configuration;
022import org.apache.hadoop.hbase.replication.ReplicationException;
023import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
024import org.apache.hadoop.hbase.util.Threads;
025import org.apache.yetus.audience.InterfaceAudience;
026import org.slf4j.Logger;
027import org.slf4j.LoggerFactory;
028
029/**
030 * Used by a {@link RecoveredReplicationSource}.
031 */
032@InterfaceAudience.Private
033public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper {
034  private static final Logger LOG =
035    LoggerFactory.getLogger(RecoveredReplicationSourceShipper.class);
036
037  protected final RecoveredReplicationSource source;
038  private final ReplicationQueueStorage replicationQueues;
039
040  public RecoveredReplicationSourceShipper(Configuration conf, String walGroupId,
041    ReplicationSourceLogQueue logQueue, RecoveredReplicationSource source,
042    ReplicationQueueStorage queueStorage) {
043    super(conf, walGroupId, logQueue, source);
044    this.source = source;
045    this.replicationQueues = queueStorage;
046  }
047
048  @Override
049  protected void postFinish() {
050    source.tryFinish();
051  }
052
053  @Override
054  public long getStartPosition() {
055    long startPosition = getRecoveredQueueStartPos();
056    int numRetries = 0;
057    while (numRetries <= maxRetriesMultiplier) {
058      try {
059        source.locateRecoveredPaths(walGroupId);
060        break;
061      } catch (IOException e) {
062        LOG.error("Error while locating recovered queue paths, attempt #" + numRetries, e);
063        numRetries++;
064      }
065    }
066    return startPosition;
067  }
068
069  // If this is a recovered queue, the queue is already full and the first log
070  // normally has a position (unless the RS failed between 2 logs)
071  private long getRecoveredQueueStartPos() {
072    long startPosition = 0;
073    String peerClusterZNode = source.getQueueId();
074    try {
075      startPosition = this.replicationQueues.getWALPosition(source.getServer().getServerName(),
076        peerClusterZNode, this.logQueue.getQueue(walGroupId).peek().getName());
077      LOG.trace("Recovered queue started with log {} at position {}",
078        this.logQueue.getQueue(walGroupId).peek(), startPosition);
079    } catch (ReplicationException e) {
080      terminate("Couldn't get the position of this recovered queue " + peerClusterZNode, e);
081    }
082    return startPosition;
083  }
084
085  private void terminate(String reason, Exception cause) {
086    if (cause == null) {
087      LOG.info("Closing worker for wal group {} because: {}", this.walGroupId, reason);
088    } else {
089      LOG.error(
090        "Closing worker for wal group " + this.walGroupId + " because an error occurred: " + reason,
091        cause);
092    }
093    entryReader.interrupt();
094    Threads.shutdown(entryReader, sleepForRetries);
095    this.interrupt();
096    Threads.shutdown(this, sleepForRetries);
097    LOG.info("ReplicationSourceWorker {} terminated", this.getName());
098  }
099}