001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.replication.regionserver;
020
021import java.io.IOException;
022import java.util.concurrent.PriorityBlockingQueue;
023
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.fs.Path;
026import org.apache.yetus.audience.InterfaceAudience;
027import org.slf4j.Logger;
028import org.slf4j.LoggerFactory;
029import org.apache.hadoop.hbase.replication.ReplicationException;
030import org.apache.hadoop.hbase.replication.ReplicationQueues;
031import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch;
032import org.apache.hadoop.hbase.util.Threads;
033
034/**
035 *  Used by a {@link RecoveredReplicationSource}.
036 */
037@InterfaceAudience.Private
038public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper {
039  private static final Logger LOG =
040      LoggerFactory.getLogger(RecoveredReplicationSourceShipper.class);
041
042  protected final RecoveredReplicationSource source;
043  private final ReplicationQueues replicationQueues;
044
045  public RecoveredReplicationSourceShipper(Configuration conf, String walGroupId,
046      PriorityBlockingQueue<Path> queue, RecoveredReplicationSource source,
047      ReplicationQueues replicationQueues) {
048    super(conf, walGroupId, queue, source);
049    this.source = source;
050    this.replicationQueues = replicationQueues;
051  }
052
053  @Override
054  public void run() {
055    setWorkerState(WorkerState.RUNNING);
056    // Loop until we close down
057    while (isActive()) {
058      int sleepMultiplier = 1;
059      // Sleep until replication is enabled again
060      if (!source.isPeerEnabled()) {
061        if (source.sleepForRetries("Replication is disabled", sleepMultiplier)) {
062          sleepMultiplier++;
063        }
064        continue;
065      }
066
067      while (entryReader == null) {
068        if (source.sleepForRetries("Replication WAL entry reader thread not initialized",
069          sleepMultiplier)) {
070          sleepMultiplier++;
071        }
072      }
073
074      try {
075        WALEntryBatch entryBatch = entryReader.take();
076        shipEdits(entryBatch);
077        if (entryBatch.getWalEntries().isEmpty()) {
078          LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
079              + source.getPeerClusterZnode());
080          source.getSourceMetrics().incrCompletedRecoveryQueue();
081          setWorkerState(WorkerState.FINISHED);
082          continue;
083        }
084      } catch (InterruptedException e) {
085        LOG.trace("Interrupted while waiting for next replication entry batch", e);
086        Thread.currentThread().interrupt();
087      }
088    }
089    source.tryFinish();
090    // If the worker exits run loop without finishing its task, mark it as stopped.
091    if (!isFinished()) {
092      setWorkerState(WorkerState.STOPPED);
093    }
094  }
095
096  @Override
097  public long getStartPosition() {
098    long startPosition = getRecoveredQueueStartPos();
099    int numRetries = 0;
100    while (numRetries <= maxRetriesMultiplier) {
101      try {
102        source.locateRecoveredPaths(queue);
103        break;
104      } catch (IOException e) {
105        LOG.error("Error while locating recovered queue paths, attempt #" + numRetries);
106        numRetries++;
107      }
108    }
109    return startPosition;
110  }
111
112  // If this is a recovered queue, the queue is already full and the first log
113  // normally has a position (unless the RS failed between 2 logs)
114  private long getRecoveredQueueStartPos() {
115    long startPosition = 0;
116    String peerClusterZnode = source.getPeerClusterZnode();
117    try {
118      startPosition = this.replicationQueues.getLogPosition(peerClusterZnode,
119        this.queue.peek().getName());
120      if (LOG.isTraceEnabled()) {
121        LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position "
122            + startPosition);
123      }
124    } catch (ReplicationException e) {
125      terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e);
126    }
127    return startPosition;
128  }
129
130  @Override
131  protected void updateLogPosition(long lastReadPosition) {
132    source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getPeerClusterZnode(),
133      lastReadPosition, true, false);
134    lastLoggedPosition = lastReadPosition;
135  }
136
137  private void terminate(String reason, Exception cause) {
138    if (cause == null) {
139      LOG.info("Closing worker for wal group " + this.walGroupId + " because: " + reason);
140
141    } else {
142      LOG.error("Closing worker for wal group " + this.walGroupId
143          + " because an error occurred: " + reason, cause);
144    }
145    entryReader.interrupt();
146    Threads.shutdown(entryReader, sleepForRetries);
147    this.interrupt();
148    Threads.shutdown(this, sleepForRetries);
149    LOG.info("ReplicationSourceWorker " + this.getName() + " terminated");
150  }
151}