001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.IOException; 021import java.util.List; 022import java.util.UUID; 023import java.util.concurrent.PriorityBlockingQueue; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.fs.FileStatus; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.Server; 029import org.apache.hadoop.hbase.ServerName; 030import org.apache.hadoop.hbase.replication.ReplicationPeer; 031import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 032import org.apache.hadoop.hbase.util.FSUtils; 033import org.apache.hadoop.hbase.util.Threads; 034import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039/** 040 * Class that handles the recovered source of a replication stream, which is transfered from 041 * another dead region server. This will be closed when all logs are pushed to peer cluster. 042 */ 043@InterfaceAudience.Private 044public class RecoveredReplicationSource extends ReplicationSource { 045 046 private static final Logger LOG = LoggerFactory.getLogger(RecoveredReplicationSource.class); 047 048 private String actualPeerId; 049 050 @Override 051 public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, 052 ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, 053 String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider, 054 MetricsSource metrics) throws IOException { 055 super.init(conf, fs, manager, queueStorage, replicationPeer, server, peerClusterZnode, 056 clusterId, walFileLengthProvider, metrics); 057 this.actualPeerId = this.replicationQueueInfo.getPeerId(); 058 } 059 060 @Override 061 protected RecoveredReplicationSourceShipper createNewShipper(String walGroupId, 062 PriorityBlockingQueue<Path> queue) { 063 return new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this, queueStorage); 064 } 065 066 public void locateRecoveredPaths(PriorityBlockingQueue<Path> queue) throws IOException { 067 boolean hasPathChanged = false; 068 PriorityBlockingQueue<Path> newPaths = 069 new PriorityBlockingQueue<Path>(queueSizePerGroup, new LogsComparator()); 070 pathsLoop: for (Path path : queue) { 071 if (fs.exists(path)) { // still in same location, don't need to do anything 072 newPaths.add(path); 073 continue; 074 } 075 // Path changed - try to find the right path. 076 hasPathChanged = true; 077 if (server instanceof ReplicationSyncUp.DummyServer) { 078 // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data 079 // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists 080 Path newPath = getReplSyncUpPath(path); 081 newPaths.add(newPath); 082 continue; 083 } else { 084 // See if Path exists in the dead RS folder (there could be a chain of failures 085 // to look at) 086 List<ServerName> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers(); 087 LOG.info("NB dead servers : " + deadRegionServers.size()); 088 final Path walDir = FSUtils.getWALRootDir(conf); 089 for (ServerName curDeadServerName : deadRegionServers) { 090 final Path deadRsDirectory = 091 new Path(walDir, AbstractFSWALProvider.getWALDirectoryName(curDeadServerName 092 .getServerName())); 093 Path[] locs = new Path[] { new Path(deadRsDirectory, path.getName()), new Path( 094 deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT), path.getName()) }; 095 for (Path possibleLogLocation : locs) { 096 LOG.info("Possible location " + possibleLogLocation.toUri().toString()); 097 if (manager.getFs().exists(possibleLogLocation)) { 098 // We found the right new location 099 LOG.info("Log " + path + " still exists at " + possibleLogLocation); 100 newPaths.add(possibleLogLocation); 101 continue pathsLoop; 102 } 103 } 104 } 105 // didn't find a new location 106 LOG.error( 107 String.format("WAL Path %s doesn't exist and couldn't find its new location", path)); 108 newPaths.add(path); 109 } 110 } 111 112 if (hasPathChanged) { 113 if (newPaths.size() != queue.size()) { // this shouldn't happen 114 LOG.error("Recovery queue size is incorrect"); 115 throw new IOException("Recovery queue size error"); 116 } 117 // put the correct locations in the queue 118 // since this is a recovered queue with no new incoming logs, 119 // there shouldn't be any concurrency issues 120 queue.clear(); 121 for (Path path : newPaths) { 122 queue.add(path); 123 } 124 } 125 } 126 127 // N.B. the ReplicationSyncUp tool sets the manager.getWALDir to the root of the wal 128 // area rather than to the wal area for a particular region server. 129 private Path getReplSyncUpPath(Path path) throws IOException { 130 FileStatus[] rss = fs.listStatus(manager.getLogDir()); 131 for (FileStatus rs : rss) { 132 Path p = rs.getPath(); 133 FileStatus[] logs = fs.listStatus(p); 134 for (FileStatus log : logs) { 135 p = new Path(p, log.getPath().getName()); 136 if (p.getName().equals(path.getName())) { 137 LOG.info("Log " + p.getName() + " found at " + p); 138 return p; 139 } 140 } 141 } 142 LOG.error("Didn't find path for: " + path.getName()); 143 return path; 144 } 145 146 void tryFinish() { 147 // use synchronize to make sure one last thread will clean the queue 148 synchronized (workerThreads) { 149 Threads.sleep(100);// wait a short while for other worker thread to fully exit 150 boolean allTasksDone = workerThreads.values().stream().allMatch(w -> w.isFinished()); 151 if (allTasksDone) { 152 this.getSourceMetrics().clear(); 153 manager.removeRecoveredSource(this); 154 LOG.info("Finished recovering queue {} with the following stats: {}", queueId, getStats()); 155 } 156 } 157 } 158 159 @Override 160 public String getPeerId() { 161 return this.actualPeerId; 162 } 163 164 @Override 165 public ServerName getServerWALsBelongTo() { 166 return this.replicationQueueInfo.getDeadRegionServers().get(0); 167 } 168 169 @Override 170 public boolean isRecovered() { 171 return true; 172 } 173}