001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.replication.regionserver; 020 021import java.io.IOException; 022import java.util.List; 023import java.util.UUID; 024import java.util.concurrent.PriorityBlockingQueue; 025 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.fs.FileStatus; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.fs.Path; 030import org.apache.hadoop.hbase.Server; 031import org.apache.hadoop.hbase.ServerName; 032import org.apache.yetus.audience.InterfaceAudience; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 036import org.apache.hadoop.hbase.replication.ReplicationPeers; 037import org.apache.hadoop.hbase.replication.ReplicationQueues; 038import org.apache.hadoop.hbase.util.FSUtils; 039import org.apache.hadoop.hbase.util.Threads; 040import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 041 042/** 043 * Class that handles the recovered source of a replication stream, which is transfered from 044 * another dead region server. This will be closed when all logs are pushed to peer cluster. 045 */ 046@InterfaceAudience.Private 047public class RecoveredReplicationSource extends ReplicationSource { 048 049 private static final Logger LOG = LoggerFactory.getLogger(RecoveredReplicationSource.class); 050 051 private String actualPeerId; 052 053 @Override 054 public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, 055 ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Server server, 056 String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint, 057 WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException { 058 super.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerClusterZnode, 059 clusterId, replicationEndpoint, walFileLengthProvider, metrics); 060 this.actualPeerId = this.replicationQueueInfo.getPeerId(); 061 } 062 063 @Override 064 protected void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) { 065 final RecoveredReplicationSourceShipper worker = 066 new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this, 067 this.replicationQueues); 068 ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker); 069 if (extant != null) { 070 LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId); 071 } else { 072 LOG.debug("Starting up worker for wal group " + walGroupId); 073 worker.startup(getUncaughtExceptionHandler()); 074 worker.setWALReader( 075 startNewWALReader(worker.getName(), walGroupId, queue, worker.getStartPosition())); 076 workerThreads.put(walGroupId, worker); 077 } 078 } 079 080 @Override 081 protected ReplicationSourceWALReader startNewWALReader(String threadName, 082 String walGroupId, PriorityBlockingQueue<Path> queue, long startPosition) { 083 ReplicationSourceWALReader walReader = new RecoveredReplicationSourceWALReader(fs, 084 conf, queue, startPosition, walEntryFilter, this); 085 Threads.setDaemonThreadRunning(walReader, threadName 086 + ".replicationSource.replicationWALReaderThread." + walGroupId + "," + peerClusterZnode, 087 getUncaughtExceptionHandler()); 088 return walReader; 089 } 090 091 public void locateRecoveredPaths(PriorityBlockingQueue<Path> queue) throws IOException { 092 boolean hasPathChanged = false; 093 PriorityBlockingQueue<Path> newPaths = 094 new PriorityBlockingQueue<Path>(queueSizePerGroup, new LogsComparator()); 095 pathsLoop: for (Path path : queue) { 096 if (fs.exists(path)) { // still in same location, don't need to do anything 097 newPaths.add(path); 098 continue; 099 } 100 // Path changed - try to find the right path. 101 hasPathChanged = true; 102 if (server instanceof ReplicationSyncUp.DummyServer) { 103 // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data 104 // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists 105 Path newPath = getReplSyncUpPath(path); 106 newPaths.add(newPath); 107 continue; 108 } else { 109 // See if Path exists in the dead RS folder (there could be a chain of failures 110 // to look at) 111 List<ServerName> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers(); 112 LOG.info("NB dead servers : " + deadRegionServers.size()); 113 final Path walDir = FSUtils.getWALRootDir(conf); 114 for (ServerName curDeadServerName : deadRegionServers) { 115 final Path deadRsDirectory = 116 new Path(walDir, AbstractFSWALProvider.getWALDirectoryName(curDeadServerName 117 .getServerName())); 118 Path[] locs = new Path[] { new Path(deadRsDirectory, path.getName()), new Path( 119 deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT), path.getName()) }; 120 for (Path possibleLogLocation : locs) { 121 LOG.info("Possible location " + possibleLogLocation.toUri().toString()); 122 if (manager.getFs().exists(possibleLogLocation)) { 123 // We found the right new location 124 LOG.info("Log " + path + " still exists at " + possibleLogLocation); 125 newPaths.add(possibleLogLocation); 126 continue pathsLoop; 127 } 128 } 129 } 130 // didn't find a new location 131 LOG.error( 132 String.format("WAL Path %s doesn't exist and couldn't find its new location", path)); 133 newPaths.add(path); 134 } 135 } 136 137 if (hasPathChanged) { 138 if (newPaths.size() != queue.size()) { // this shouldn't happen 139 LOG.error("Recovery queue size is incorrect"); 140 throw new IOException("Recovery queue size error"); 141 } 142 // put the correct locations in the queue 143 // since this is a recovered queue with no new incoming logs, 144 // there shouldn't be any concurrency issues 145 queue.clear(); 146 for (Path path : newPaths) { 147 queue.add(path); 148 } 149 } 150 } 151 152 // N.B. the ReplicationSyncUp tool sets the manager.getWALDir to the root of the wal 153 // area rather than to the wal area for a particular region server. 154 private Path getReplSyncUpPath(Path path) throws IOException { 155 FileStatus[] rss = fs.listStatus(manager.getLogDir()); 156 for (FileStatus rs : rss) { 157 Path p = rs.getPath(); 158 FileStatus[] logs = fs.listStatus(p); 159 for (FileStatus log : logs) { 160 p = new Path(p, log.getPath().getName()); 161 if (p.getName().equals(path.getName())) { 162 LOG.info("Log " + p.getName() + " found at " + p); 163 return p; 164 } 165 } 166 } 167 LOG.error("Didn't find path for: " + path.getName()); 168 return path; 169 } 170 171 public void tryFinish() { 172 // use synchronize to make sure one last thread will clean the queue 173 synchronized (workerThreads) { 174 Threads.sleep(100);// wait a short while for other worker thread to fully exit 175 boolean allTasksDone = true; 176 for (ReplicationSourceShipper worker : workerThreads.values()) { 177 if (!worker.isFinished()) { 178 allTasksDone = false; 179 break; 180 } 181 } 182 if (allTasksDone) { 183 manager.closeRecoveredQueue(this); 184 LOG.info("Finished recovering queue " + peerClusterZnode + " with the following stats: " 185 + getStats()); 186 } 187 } 188 } 189 190 @Override 191 public String getPeerId() { 192 return this.actualPeerId; 193 } 194 195 @Override 196 public ServerName getServerWALsBelongTo() { 197 return this.replicationQueueInfo.getDeadRegionServers().get(0); 198 } 199}