001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util.hbck; 019 020import java.util.ArrayList; 021import java.util.HashMap; 022import java.util.HashSet; 023import java.util.List; 024import java.util.Map; 025import java.util.Set; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.ServerName; 028import org.apache.hadoop.hbase.replication.ReplicationException; 029import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; 030import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; 031import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; 032import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 033import org.apache.hadoop.hbase.util.HbckErrorReporter; 034import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 035import org.apache.yetus.audience.InterfaceAudience; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039/** 040 * Check and fix undeleted replication queues for removed peerId. 041 */ 042@InterfaceAudience.Private 043public class ReplicationChecker { 044 045 private static final Logger LOG = LoggerFactory.getLogger(ReplicationChecker.class); 046 047 private final HbckErrorReporter errorReporter; 048 // replicator with its queueIds for removed peers 049 private Map<ServerName, List<String>> undeletedQueueIds = new HashMap<>(); 050 // replicator with its undeleted queueIds for removed peers in hfile-refs queue 051 private Set<String> undeletedHFileRefsPeerIds = new HashSet<>(); 052 053 private final ReplicationPeerStorage peerStorage; 054 private final ReplicationQueueStorage queueStorage; 055 056 public ReplicationChecker(Configuration conf, ZKWatcher zkw, HbckErrorReporter errorReporter) { 057 this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zkw, conf); 058 this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf); 059 this.errorReporter = errorReporter; 060 } 061 062 public boolean hasUnDeletedQueues() { 063 return errorReporter.getErrorList() 064 .contains(HbckErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE); 065 } 066 067 private Map<ServerName, List<String>> getUnDeletedQueues() throws ReplicationException { 068 Map<ServerName, List<String>> undeletedQueues = new HashMap<>(); 069 Set<String> peerIds = new HashSet<>(peerStorage.listPeerIds()); 070 for (ServerName replicator : queueStorage.getListOfReplicators()) { 071 for (String queueId : queueStorage.getAllQueues(replicator)) { 072 ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); 073 if (!peerIds.contains(queueInfo.getPeerId())) { 074 undeletedQueues.computeIfAbsent(replicator, key -> new ArrayList<>()).add(queueId); 075 LOG.debug( 076 "Undeleted replication queue for removed peer found: " 077 + "[removedPeerId={}, replicator={}, queueId={}]", 078 queueInfo.getPeerId(), replicator, queueId); 079 } 080 } 081 } 082 return undeletedQueues; 083 } 084 085 private Set<String> getUndeletedHFileRefsPeers() throws ReplicationException { 086 Set<String> undeletedHFileRefsPeerIds = 087 new HashSet<>(queueStorage.getAllPeersFromHFileRefsQueue()); 088 Set<String> peerIds = new HashSet<>(peerStorage.listPeerIds()); 089 undeletedHFileRefsPeerIds.removeAll(peerIds); 090 if (LOG.isDebugEnabled()) { 091 for (String peerId : undeletedHFileRefsPeerIds) { 092 LOG.debug("Undeleted replication hfile-refs queue for removed peer {} found", peerId); 093 } 094 } 095 return undeletedHFileRefsPeerIds; 096 } 097 098 public void checkUnDeletedQueues() throws ReplicationException { 099 undeletedQueueIds = getUnDeletedQueues(); 100 undeletedQueueIds.forEach((replicator, queueIds) -> { 101 queueIds.forEach(queueId -> { 102 ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); 103 String msg = "Undeleted replication queue for removed peer found: " 104 + String.format("[removedPeerId=%s, replicator=%s, queueId=%s]", queueInfo.getPeerId(), 105 replicator, queueId); 106 errorReporter.reportError(HbckErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE, msg); 107 }); 108 }); 109 undeletedHFileRefsPeerIds = getUndeletedHFileRefsPeers(); 110 undeletedHFileRefsPeerIds.stream() 111 .map(peerId -> "Undeleted replication hfile-refs queue for removed peer " + peerId + " found") 112 .forEach(msg -> errorReporter 113 .reportError(HbckErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE, msg)); 114 } 115 116 public void fixUnDeletedQueues() throws ReplicationException { 117 for (Map.Entry<ServerName, List<String>> replicatorAndQueueIds : undeletedQueueIds.entrySet()) { 118 ServerName replicator = replicatorAndQueueIds.getKey(); 119 for (String queueId : replicatorAndQueueIds.getValue()) { 120 queueStorage.removeQueue(replicator, queueId); 121 } 122 queueStorage.removeReplicatorIfQueueIsEmpty(replicator); 123 } 124 for (String peerId : undeletedHFileRefsPeerIds) { 125 queueStorage.removePeerFromHFileRefs(peerId); 126 } 127 } 128}