001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.replication.regionserver; 020 021import java.io.IOException; 022import java.util.concurrent.PriorityBlockingQueue; 023 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.fs.Path; 026import org.apache.yetus.audience.InterfaceAudience; 027import org.slf4j.Logger; 028import org.slf4j.LoggerFactory; 029import org.apache.hadoop.hbase.replication.ReplicationException; 030import org.apache.hadoop.hbase.replication.ReplicationQueues; 031import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch; 032import org.apache.hadoop.hbase.util.Threads; 033 034/** 035 * Used by a {@link RecoveredReplicationSource}. 036 */ 037@InterfaceAudience.Private 038public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper { 039 private static final Logger LOG = 040 LoggerFactory.getLogger(RecoveredReplicationSourceShipper.class); 041 042 protected final RecoveredReplicationSource source; 043 private final ReplicationQueues replicationQueues; 044 045 public RecoveredReplicationSourceShipper(Configuration conf, String walGroupId, 046 PriorityBlockingQueue<Path> queue, RecoveredReplicationSource source, 047 ReplicationQueues replicationQueues) { 048 super(conf, walGroupId, queue, source); 049 this.source = source; 050 this.replicationQueues = replicationQueues; 051 } 052 053 @Override 054 public void run() { 055 setWorkerState(WorkerState.RUNNING); 056 // Loop until we close down 057 while (isActive()) { 058 int sleepMultiplier = 1; 059 // Sleep until replication is enabled again 060 if (!source.isPeerEnabled()) { 061 if (source.sleepForRetries("Replication is disabled", sleepMultiplier)) { 062 sleepMultiplier++; 063 } 064 continue; 065 } 066 067 while (entryReader == null) { 068 if (source.sleepForRetries("Replication WAL entry reader thread not initialized", 069 sleepMultiplier)) { 070 sleepMultiplier++; 071 } 072 } 073 074 try { 075 WALEntryBatch entryBatch = entryReader.take(); 076 shipEdits(entryBatch); 077 if (entryBatch.getWalEntries().isEmpty()) { 078 LOG.debug("Finished recovering queue for group " + walGroupId + " of peer " 079 + source.getPeerClusterZnode()); 080 source.getSourceMetrics().incrCompletedRecoveryQueue(); 081 setWorkerState(WorkerState.FINISHED); 082 continue; 083 } 084 } catch (InterruptedException e) { 085 LOG.trace("Interrupted while waiting for next replication entry batch", e); 086 Thread.currentThread().interrupt(); 087 } 088 } 089 source.tryFinish(); 090 // If the worker exits run loop without finishing its task, mark it as stopped. 091 if (!isFinished()) { 092 setWorkerState(WorkerState.STOPPED); 093 } 094 } 095 096 @Override 097 public long getStartPosition() { 098 long startPosition = getRecoveredQueueStartPos(); 099 int numRetries = 0; 100 while (numRetries <= maxRetriesMultiplier) { 101 try { 102 source.locateRecoveredPaths(queue); 103 break; 104 } catch (IOException e) { 105 LOG.error("Error while locating recovered queue paths, attempt #" + numRetries); 106 numRetries++; 107 } 108 } 109 return startPosition; 110 } 111 112 // If this is a recovered queue, the queue is already full and the first log 113 // normally has a position (unless the RS failed between 2 logs) 114 private long getRecoveredQueueStartPos() { 115 long startPosition = 0; 116 String peerClusterZnode = source.getPeerClusterZnode(); 117 try { 118 startPosition = this.replicationQueues.getLogPosition(peerClusterZnode, 119 this.queue.peek().getName()); 120 if (LOG.isTraceEnabled()) { 121 LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position " 122 + startPosition); 123 } 124 } catch (ReplicationException e) { 125 terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e); 126 } 127 return startPosition; 128 } 129 130 @Override 131 protected void updateLogPosition(long lastReadPosition) { 132 source.getSourceManager().logPositionAndCleanOldLogs(currentPath, source.getPeerClusterZnode(), 133 lastReadPosition, true, false); 134 lastLoggedPosition = lastReadPosition; 135 } 136 137 private void terminate(String reason, Exception cause) { 138 if (cause == null) { 139 LOG.info("Closing worker for wal group " + this.walGroupId + " because: " + reason); 140 141 } else { 142 LOG.error("Closing worker for wal group " + this.walGroupId 143 + " because an error occurred: " + reason, cause); 144 } 145 entryReader.interrupt(); 146 Threads.shutdown(entryReader, sleepForRetries); 147 this.interrupt(); 148 Threads.shutdown(this, sleepForRetries); 149 LOG.info("ReplicationSourceWorker " + this.getName() + " terminated"); 150 } 151}