001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import java.io.IOException; 021import java.util.List; 022import java.util.UUID; 023import java.util.concurrent.PriorityBlockingQueue; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.fs.FileStatus; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.hbase.Server; 029import org.apache.hadoop.hbase.ServerName; 030import org.apache.hadoop.hbase.replication.ReplicationPeer; 031import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 032import org.apache.hadoop.hbase.util.CommonFSUtils; 033import org.apache.hadoop.hbase.util.Threads; 034import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039/** 040 * Class that handles the recovered source of a replication stream, which is transfered from another 041 * dead region server. This will be closed when all logs are pushed to peer cluster. 042 */ 043@InterfaceAudience.Private 044public class RecoveredReplicationSource extends ReplicationSource { 045 046 private static final Logger LOG = LoggerFactory.getLogger(RecoveredReplicationSource.class); 047 048 private String actualPeerId; 049 050 @Override 051 public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, 052 ReplicationQueueStorage queueStorage, ReplicationPeer replicationPeer, Server server, 053 String peerClusterZnode, UUID clusterId, WALFileLengthProvider walFileLengthProvider, 054 MetricsSource metrics) throws IOException { 055 super.init(conf, fs, manager, queueStorage, replicationPeer, server, peerClusterZnode, 056 clusterId, walFileLengthProvider, metrics); 057 this.actualPeerId = this.replicationQueueInfo.getPeerId(); 058 } 059 060 @Override 061 protected RecoveredReplicationSourceShipper createNewShipper(String walGroupId) { 062 return new RecoveredReplicationSourceShipper(conf, walGroupId, logQueue, this, queueStorage); 063 } 064 065 public void locateRecoveredPaths(String walGroupId) throws IOException { 066 boolean hasPathChanged = false; 067 PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId); 068 PriorityBlockingQueue<Path> newPaths = new PriorityBlockingQueue<Path>(queueSizePerGroup, 069 new AbstractFSWALProvider.WALStartTimeComparator()); 070 pathsLoop: for (Path path : queue) { 071 if (fs.exists(path)) { // still in same location, don't need to do anything 072 newPaths.add(path); 073 continue; 074 } 075 // Path changed - try to find the right path. 076 hasPathChanged = true; 077 if (server instanceof ReplicationSyncUp.DummyServer) { 078 // In the case of disaster/recovery, HMaster may be shutdown/crashed before flush data 079 // from .logs to .oldlogs. Loop into .logs folders and check whether a match exists 080 Path newPath = getReplSyncUpPath(path); 081 newPaths.add(newPath); 082 continue; 083 } else { 084 // See if Path exists in the dead RS folder (there could be a chain of failures 085 // to look at) 086 List<ServerName> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers(); 087 LOG.info("NB dead servers : " + deadRegionServers.size()); 088 final Path walDir = CommonFSUtils.getWALRootDir(conf); 089 for (ServerName curDeadServerName : deadRegionServers) { 090 final Path deadRsDirectory = new Path(walDir, 091 AbstractFSWALProvider.getWALDirectoryName(curDeadServerName.getServerName())); 092 Path[] locs = new Path[] { new Path(deadRsDirectory, path.getName()), 093 new Path(deadRsDirectory.suffix(AbstractFSWALProvider.SPLITTING_EXT), path.getName()) }; 094 for (Path possibleLogLocation : locs) { 095 LOG.info("Possible location " + possibleLogLocation.toUri().toString()); 096 if (manager.getFs().exists(possibleLogLocation)) { 097 // We found the right new location 098 LOG.info("Log " + path + " still exists at " + possibleLogLocation); 099 newPaths.add(possibleLogLocation); 100 continue pathsLoop; 101 } 102 } 103 } 104 // didn't find a new location 105 LOG.error( 106 String.format("WAL Path %s doesn't exist and couldn't find its new location", path)); 107 newPaths.add(path); 108 } 109 } 110 111 if (hasPathChanged) { 112 if (newPaths.size() != queue.size()) { // this shouldn't happen 113 LOG.error("Recovery queue size is incorrect"); 114 throw new IOException("Recovery queue size error"); 115 } 116 // put the correct locations in the queue 117 // since this is a recovered queue with no new incoming logs, 118 // there shouldn't be any concurrency issues 119 logQueue.clear(walGroupId); 120 for (Path path : newPaths) { 121 logQueue.enqueueLog(path, walGroupId); 122 } 123 } 124 } 125 126 // N.B. the ReplicationSyncUp tool sets the manager.getWALDir to the root of the wal 127 // area rather than to the wal area for a particular region server. 128 private Path getReplSyncUpPath(Path path) throws IOException { 129 FileStatus[] rss = fs.listStatus(manager.getLogDir()); 130 for (FileStatus rs : rss) { 131 Path p = rs.getPath(); 132 FileStatus[] logs = fs.listStatus(p); 133 for (FileStatus log : logs) { 134 p = new Path(p, log.getPath().getName()); 135 if (p.getName().equals(path.getName())) { 136 LOG.info("Log " + p.getName() + " found at " + p); 137 return p; 138 } 139 } 140 } 141 LOG.error("Didn't find path for: " + path.getName()); 142 return path; 143 } 144 145 void tryFinish() { 146 // use synchronize to make sure one last thread will clean the queue 147 synchronized (workerThreads) { 148 Threads.sleep(100);// wait a short while for other worker thread to fully exit 149 boolean allTasksDone = workerThreads.values().stream().allMatch(w -> w.isFinished()); 150 if (allTasksDone) { 151 this.getSourceMetrics().clear(); 152 manager.removeRecoveredSource(this); 153 LOG.info("Finished recovering queue {} with the following stats: {}", queueId, getStats()); 154 } 155 } 156 } 157 158 @Override 159 public String getPeerId() { 160 return this.actualPeerId; 161 } 162 163 @Override 164 public ServerName getServerWALsBelongTo() { 165 return this.replicationQueueInfo.getDeadRegionServers().get(0); 166 } 167 168 @Override 169 public boolean isRecovered() { 170 return true; 171 } 172}