001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.IOException; 021import java.util.List; 022import java.util.UUID; 023import java.util.concurrent.PriorityBlockingQueue; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.fs.FileStatus; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.Server; 029import org.apache.hadoop.hbase.ServerName; 030import org.apache.hadoop.hbase.replication.ReplicationPeer; 031import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 032import org.apache.hadoop.hbase.util.CommonFSUtils; 033import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038/** 039 * Class that handles the recovered source of a replication stream, which is transfered from another 040 * dead region server. This will be closed when all logs are pushed to peer cluster. 041 */ 042@InterfaceAudience.Private 043public class RecoveredReplicationSource extends ReplicationSource { 044 045 private static final Logger LOG = LoggerFactory.getLogger(RecoveredReplicationSource.class); 046 047 private String actualPeerId; 048 049 @Override 050 public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, 051 ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, 052 String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider, 053 MetricsSource metrics) throws IOException { 054 super.init(conf, fs, manager, queueStorage, replicationPeer, server, peerClusterZnode, 055 clusterId, walFileLengthProvider, metrics); 056 this.actualPeerId = this.replicationQueueInfo.getPeerId(); 057 } 058 059 @Override 060 protected RecoveredReplicationSourceShipper createNewShipper(String walGroupId) { 061 return new RecoveredReplicationSourceShipper(conf, walGroupId, logQueue, this, queueStorage); 062 } 063 064 public void locateRecoveredPaths(String walGroupId) throws IOException { 065 boolean hasPathChanged = false; 066 PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId); 067 PriorityBlockingQueue<Path> newPaths = new PriorityBlockingQueue<Path>(queueSizePerGroup, 068 new AbstractFSWALProvider.WALStartTimeComparator()); 069 pathsLoop: for (Path path : queue) { 070 if (fs.exists(path)) { // still in same location, don't need to do anything 071 newPaths.add(path); 072 continue; 073 } 074 // Path changed - try to find the right path. 075 hasPathChanged = true; 076 if (server instanceof ReplicationSyncUp.DummyServer) { 077 // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data 078 // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists 079 Path newPath = getReplSyncUpPath(path); 080 newPaths.add(newPath); 081 continue; 082 } else { 083 // See if Path exists in the dead RS folder (there could be a chain of failures 084 // to look at) 085 List<ServerName> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers(); 086 LOG.info("NB dead servers : " + deadRegionServers.size()); 087 final Path walDir = CommonFSUtils.getWALRootDir(conf); 088 for (ServerName curDeadServerName : deadRegionServers) { 089 final Path deadRsDirectory = new Path(walDir, 090 AbstractFSWALProvider.getWALDirectoryName(curDeadServerName.getServerName())); 091 Path[] locs = new Path[] { new Path(deadRsDirectory, path.getName()), 092 new Path(deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT), path.getName()) }; 093 for (Path possibleLogLocation : locs) { 094 LOG.info("Possible location " + possibleLogLocation.toUri().toString()); 095 if (manager.getFs().exists(possibleLogLocation)) { 096 // We found the right new location 097 LOG.info("Log " + path + " still exists at " + possibleLogLocation); 098 newPaths.add(possibleLogLocation); 099 continue pathsLoop; 100 } 101 } 102 } 103 // didn't find a new location 104 LOG.error( 105 String.format("WAL Path %s doesn't exist and couldn't find its new location", path)); 106 newPaths.add(path); 107 } 108 } 109 110 if (hasPathChanged) { 111 if (newPaths.size() != queue.size()) { // this shouldn't happen 112 LOG.error("Recovery queue size is incorrect"); 113 throw new IOException("Recovery queue size error"); 114 } 115 // put the correct locations in the queue 116 // since this is a recovered queue with no new incoming logs, 117 // there shouldn't be any concurrency issues 118 logQueue.clear(walGroupId); 119 for (Path path : newPaths) { 120 logQueue.enqueueLog(path, walGroupId); 121 } 122 } 123 } 124 125 // N.B. the ReplicationSyncUp tool sets the manager.getWALDir to the root of the wal 126 // area rather than to the wal area for a particular region server. 127 private Path getReplSyncUpPath(Path path) throws IOException { 128 FileStatus[] rss = fs.listStatus(manager.getLogDir()); 129 for (FileStatus rs : rss) { 130 Path p = rs.getPath(); 131 FileStatus[] logs = fs.listStatus(p); 132 for (FileStatus log : logs) { 133 p = new Path(p, log.getPath().getName()); 134 if (p.getName().equals(path.getName())) { 135 LOG.info("Log " + p.getName() + " found at " + p); 136 return p; 137 } 138 } 139 } 140 LOG.error("Didn't find path for: " + path.getName()); 141 return path; 142 } 143 144 void tryFinish() { 145 if (workerThreads.isEmpty()) { 146 this.getSourceMetrics().clear(); 147 manager.finishRecoveredSource(this); 148 } 149 } 150 151 @Override 152 public String getPeerId() { 153 return this.actualPeerId; 154 } 155 156 @Override 157 public ServerName getServerWALsBelongTo() { 158 return this.replicationQueueInfo.getDeadRegionServers().get(0); 159 } 160 161 @Override 162 public boolean isRecovered() { 163 return true; 164 } 165}