001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util.hbck; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.HashMap; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Map; 026import java.util.Set; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.fs.FileSystem; 029import org.apache.hadoop.hbase.ServerName; 030import org.apache.hadoop.hbase.client.Connection; 031import org.apache.hadoop.hbase.replication.ReplicationException; 032import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; 033import org.apache.hadoop.hbase.replication.ReplicationQueueData; 034import org.apache.hadoop.hbase.replication.ReplicationQueueId; 035import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 036import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 037import org.apache.hadoop.hbase.util.HbckErrorReporter; 038import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 039import org.apache.yetus.audience.InterfaceAudience; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043/** 044 * Check and fix undeleted replication queues for removed peerId. 045 */ 046@InterfaceAudience.Private 047public class ReplicationChecker { 048 049 private static final Logger LOG = LoggerFactory.getLogger(ReplicationChecker.class); 050 051 private final HbckErrorReporter errorReporter; 052 // replicator with its queueIds for removed peers 053 private Map<ServerName, List<ReplicationQueueId>> undeletedQueueIds = new HashMap<>(); 054 // replicator with its undeleted queueIds for removed peers in hfile-refs queue 055 private Set<String> undeletedHFileRefsPeerIds = new HashSet<>(); 056 057 private final ReplicationPeerStorage peerStorage; 058 private final ReplicationQueueStorage queueStorage; 059 060 public ReplicationChecker(Configuration conf, ZKWatcher zkw, Connection conn, 061 HbckErrorReporter errorReporter) throws IOException { 062 this.peerStorage = 063 ReplicationStorageFactory.getReplicationPeerStorage(FileSystem.get(conf), zkw, conf); 064 this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(conn, conf); 065 this.errorReporter = errorReporter; 066 } 067 068 public boolean hasUnDeletedQueues() { 069 return errorReporter.getErrorList() 070 .contains(HbckErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE); 071 } 072 073 private Map<ServerName, List<ReplicationQueueId>> getUnDeletedQueues() 074 throws ReplicationException { 075 Map<ServerName, List<ReplicationQueueId>> undeletedQueues = new HashMap<>(); 076 Set<String> peerIds = new HashSet<>(peerStorage.listPeerIds()); 077 for (ReplicationQueueData queueData : queueStorage.listAllQueues()) { 078 ReplicationQueueId queueId = queueData.getId(); 079 if (!peerIds.contains(queueId.getPeerId())) { 080 undeletedQueues.computeIfAbsent(queueId.getServerName(), key -> new ArrayList<>()) 081 .add(queueId); 082 LOG.debug( 083 "Undeleted replication queue for removed peer found: " 084 + "[removedPeerId={}, replicator={}, queueId={}]", 085 queueId.getPeerId(), queueId.getServerName(), queueId); 086 } 087 } 088 return undeletedQueues; 089 } 090 091 private Set<String> getUndeletedHFileRefsPeers() throws ReplicationException { 092 Set<String> undeletedHFileRefsPeerIds = 093 new HashSet<>(queueStorage.getAllPeersFromHFileRefsQueue()); 094 Set<String> peerIds = new HashSet<>(peerStorage.listPeerIds()); 095 undeletedHFileRefsPeerIds.removeAll(peerIds); 096 if (LOG.isDebugEnabled()) { 097 for (String peerId : undeletedHFileRefsPeerIds) { 098 LOG.debug("Undeleted replication hfile-refs queue for removed peer {} found", peerId); 099 } 100 } 101 return undeletedHFileRefsPeerIds; 102 } 103 104 public void checkUnDeletedQueues() throws ReplicationException { 105 undeletedQueueIds = getUnDeletedQueues(); 106 undeletedQueueIds.forEach((replicator, queueIds) -> { 107 queueIds.forEach(queueId -> { 108 String msg = "Undeleted replication queue for removed peer found: " 109 + String.format("[removedPeerId=%s, replicator=%s, queueId=%s]", queueId.getPeerId(), 110 replicator, queueId); 111 errorReporter.reportError(HbckErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE, msg); 112 }); 113 }); 114 undeletedHFileRefsPeerIds = getUndeletedHFileRefsPeers(); 115 undeletedHFileRefsPeerIds.stream() 116 .map(peerId -> "Undeleted replication hfile-refs queue for removed peer " + peerId + " found") 117 .forEach(msg -> errorReporter 118 .reportError(HbckErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE, msg)); 119 } 120 121 public void fixUnDeletedQueues() throws ReplicationException { 122 for (Map.Entry<ServerName, List<ReplicationQueueId>> replicatorAndQueueIds : undeletedQueueIds 123 .entrySet()) { 124 ServerName replicator = replicatorAndQueueIds.getKey(); 125 for (ReplicationQueueId queueId : replicatorAndQueueIds.getValue()) { 126 queueStorage.removeQueue(queueId); 127 } 128 } 129 for (String peerId : undeletedHFileRefsPeerIds) { 130 queueStorage.removePeerFromHFileRefs(peerId); 131 } 132 } 133 134 public boolean checkHasDataInQueues() throws ReplicationException { 135 return queueStorage.hasData(); 136 } 137}