001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import com.google.errorprone.annotations.RestrictedApi; 021import java.util.ArrayList; 022import java.util.Collections; 023import java.util.HashMap; 024import java.util.Iterator; 025import java.util.List; 026import java.util.Map; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.ServerName; 029import org.apache.hadoop.hbase.util.Pair; 030import org.apache.hadoop.hbase.zookeeper.ZKUtil; 031import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 032import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.apache.zookeeper.KeeperException; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038import org.apache.hbase.thirdparty.com.google.common.base.Splitter; 039import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 040 041/** 042 * Just retain a small set of the methods for the old zookeeper based replication queue storage, for 043 * migrating. 044 */ 045@InterfaceAudience.Private 046public class ZKReplicationQueueStorageForMigration extends ZKReplicationStorageBase { 047 048 private static final Logger LOG = 049 LoggerFactory.getLogger(ZKReplicationQueueStorageForMigration.class); 050 051 public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY = 052 "zookeeper.znode.replication.hfile.refs"; 053 public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT = "hfile-refs"; 054 055 public static final String ZOOKEEPER_ZNODE_REPLICATION_REGIONS_KEY = 056 "zookeeper.znode.replication.regions"; 057 public static final String ZOOKEEPER_ZNODE_REPLICATION_REGIONS_DEFAULT = "regions"; 058 059 /** 060 * The name of the znode that contains all replication queues 061 */ 062 private final String queuesZNode; 063 064 /** 065 * The name of the znode that contains queues of hfile references to be replicated 066 */ 067 private final String hfileRefsZNode; 068 069 private final String regionsZNode; 070 071 public ZKReplicationQueueStorageForMigration(ZKWatcher zookeeper, Configuration conf) { 072 super(zookeeper, conf); 073 String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs"); 074 String hfileRefsZNodeName = conf.get(ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY, 075 ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT); 076 this.queuesZNode = ZNodePaths.joinZNode(replicationZNode, queuesZNodeName); 077 this.hfileRefsZNode = ZNodePaths.joinZNode(replicationZNode, hfileRefsZNodeName); 078 this.regionsZNode = ZNodePaths.joinZNode(replicationZNode, conf 079 .get(ZOOKEEPER_ZNODE_REPLICATION_REGIONS_KEY, ZOOKEEPER_ZNODE_REPLICATION_REGIONS_DEFAULT)); 080 } 081 082 public interface MigrationIterator<T> { 083 084 T next() throws Exception; 085 } 086 087 @SuppressWarnings("rawtypes") 088 private static final MigrationIterator EMPTY_ITER = new MigrationIterator() { 089 090 @Override 091 public Object next() { 092 return null; 093 } 094 }; 095 096 public static final class ZkReplicationQueueData { 097 098 private final ReplicationQueueId queueId; 099 100 private final Map<String, Long> walOffsets; 101 102 public ZkReplicationQueueData(ReplicationQueueId queueId, Map<String, Long> walOffsets) { 103 this.queueId = queueId; 104 this.walOffsets = walOffsets; 105 } 106 107 public ReplicationQueueId getQueueId() { 108 return queueId; 109 } 110 111 public Map<String, Long> getWalOffsets() { 112 return walOffsets; 113 } 114 } 115 116 private String getRsNode(ServerName serverName) { 117 return ZNodePaths.joinZNode(queuesZNode, serverName.getServerName()); 118 } 119 120 private String getQueueNode(ServerName serverName, String queueId) { 121 return ZNodePaths.joinZNode(getRsNode(serverName), queueId); 122 } 123 124 private String getFileNode(String queueNode, String fileName) { 125 return ZNodePaths.joinZNode(queueNode, fileName); 126 } 127 128 private String getFileNode(ServerName serverName, String queueId, String fileName) { 129 return getFileNode(getQueueNode(serverName, queueId), fileName); 130 } 131 132 static final String REGION_REPLICA_REPLICATION_PEER = "region_replica_replication"; 133 134 @SuppressWarnings("unchecked") 135 public MigrationIterator<Pair<ServerName, List<ZkReplicationQueueData>>> listAllQueues() 136 throws KeeperException { 137 List<String> replicators = ZKUtil.listChildrenNoWatch(zookeeper, queuesZNode); 138 if (replicators == null || replicators.isEmpty()) { 139 ZKUtil.deleteNodeRecursively(zookeeper, queuesZNode); 140 return EMPTY_ITER; 141 } 142 Iterator<String> iter = replicators.iterator(); 143 return new MigrationIterator<Pair<ServerName, List<ZkReplicationQueueData>>>() { 144 145 private ServerName previousServerName; 146 147 private boolean hasRegionReplicaReplicationQueue; 148 149 private void cleanupQueuesWithoutRegionReplicaReplication(ServerName serverName) 150 throws Exception { 151 String rsZNode = getRsNode(serverName); 152 List<String> queueIdList = ZKUtil.listChildrenNoWatch(zookeeper, rsZNode); 153 if (CollectionUtils.isEmpty(queueIdList)) { 154 return; 155 } 156 for (String queueId : queueIdList) { 157 ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); 158 if (!queueInfo.getPeerId().equals(REGION_REPLICA_REPLICATION_PEER)) { 159 ZKUtil.deleteNodeRecursively(zookeeper, getQueueNode(serverName, queueId)); 160 } 161 } 162 } 163 164 @Override 165 public Pair<ServerName, List<ZkReplicationQueueData>> next() throws Exception { 166 if (previousServerName != null) { 167 if (hasRegionReplicaReplicationQueue) { 168 // if there are region_replica_replication queues, we can not delete it, just delete 169 // other queues, see HBASE-29169. 170 cleanupQueuesWithoutRegionReplicaReplication(previousServerName); 171 } else { 172 ZKUtil.deleteNodeRecursively(zookeeper, getRsNode(previousServerName)); 173 } 174 } 175 if (!iter.hasNext()) { 176 // If there are region_replica_replication queues then we can not delete the data right 177 // now, otherwise we may crash the old region servers, see HBASE-29169. 178 // The migration procedure has a special step to cleanup everything 179 return null; 180 } 181 hasRegionReplicaReplicationQueue = false; 182 String replicator = iter.next(); 183 ServerName serverName = ServerName.parseServerName(replicator); 184 previousServerName = serverName; 185 List<String> queueIdList = ZKUtil.listChildrenNoWatch(zookeeper, getRsNode(serverName)); 186 if (CollectionUtils.isEmpty(queueIdList)) { 187 return Pair.newPair(serverName, Collections.emptyList()); 188 } 189 List<ZkReplicationQueueData> queueDataList = new ArrayList<>(queueIdList.size()); 190 for (String queueIdStr : queueIdList) { 191 ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueIdStr); 192 if (queueInfo.getPeerId().equals(REGION_REPLICA_REPLICATION_PEER)) { 193 // we do not need to migrate the data for this queue, skip 194 LOG.debug("Found region replica replication queue {}, skip", queueInfo); 195 hasRegionReplicaReplicationQueue = true; 196 continue; 197 } 198 ReplicationQueueId queueId; 199 if (queueInfo.getDeadRegionServers().isEmpty()) { 200 queueId = new ReplicationQueueId(serverName, queueInfo.getPeerId()); 201 } else { 202 queueId = new ReplicationQueueId(serverName, queueInfo.getPeerId(), 203 queueInfo.getDeadRegionServers().get(0)); 204 } 205 List<String> wals = 206 ZKUtil.listChildrenNoWatch(zookeeper, getQueueNode(serverName, queueIdStr)); 207 ZkReplicationQueueData queueData; 208 if (wals == null || wals.isEmpty()) { 209 queueData = new ZkReplicationQueueData(queueId, Collections.emptyMap()); 210 } else { 211 Map<String, Long> walOffsets = new HashMap<>(); 212 for (String wal : wals) { 213 byte[] data = ZKUtil.getData(zookeeper, getFileNode(serverName, queueIdStr, wal)); 214 if (data == null || data.length == 0) { 215 walOffsets.put(wal, 0L); 216 } else { 217 walOffsets.put(wal, ZKUtil.parseWALPositionFrom(data)); 218 } 219 } 220 queueData = new ZkReplicationQueueData(queueId, walOffsets); 221 } 222 queueDataList.add(queueData); 223 } 224 return Pair.newPair(serverName, queueDataList); 225 } 226 }; 227 } 228 229 public static final class ZkLastPushedSeqId { 230 231 private final String encodedRegionName; 232 233 private final String peerId; 234 235 private final long lastPushedSeqId; 236 237 ZkLastPushedSeqId(String encodedRegionName, String peerId, long lastPushedSeqId) { 238 this.encodedRegionName = encodedRegionName; 239 this.peerId = peerId; 240 this.lastPushedSeqId = lastPushedSeqId; 241 } 242 243 public String getEncodedRegionName() { 244 return encodedRegionName; 245 } 246 247 public String getPeerId() { 248 return peerId; 249 } 250 251 public long getLastPushedSeqId() { 252 return lastPushedSeqId; 253 } 254 255 } 256 257 @SuppressWarnings("unchecked") 258 public MigrationIterator<List<ZkLastPushedSeqId>> listAllLastPushedSeqIds() 259 throws KeeperException { 260 List<String> level1Prefixs = ZKUtil.listChildrenNoWatch(zookeeper, regionsZNode); 261 if (level1Prefixs == null || level1Prefixs.isEmpty()) { 262 ZKUtil.deleteNodeRecursively(zookeeper, regionsZNode); 263 return EMPTY_ITER; 264 } 265 Iterator<String> level1Iter = level1Prefixs.iterator(); 266 return new MigrationIterator<List<ZkLastPushedSeqId>>() { 267 268 private String level1Prefix; 269 270 private Iterator<String> level2Iter; 271 272 private String level2Prefix; 273 274 @Override 275 public List<ZkLastPushedSeqId> next() throws Exception { 276 for (;;) { 277 if (level2Iter == null || !level2Iter.hasNext()) { 278 if (!level1Iter.hasNext()) { 279 ZKUtil.deleteNodeRecursively(zookeeper, regionsZNode); 280 return null; 281 } 282 if (level1Prefix != null) { 283 // this will also delete the previous level2Prefix which is under this level1Prefix 284 ZKUtil.deleteNodeRecursively(zookeeper, 285 ZNodePaths.joinZNode(regionsZNode, level1Prefix)); 286 } 287 level1Prefix = level1Iter.next(); 288 List<String> level2Prefixes = ZKUtil.listChildrenNoWatch(zookeeper, 289 ZNodePaths.joinZNode(regionsZNode, level1Prefix)); 290 if (level2Prefixes != null) { 291 level2Iter = level2Prefixes.iterator(); 292 // reset level2Prefix as we have switched level1Prefix, otherwise the below delete 293 // level2Prefix section will delete the znode with this level2Prefix under the new 294 // level1Prefix 295 level2Prefix = null; 296 } 297 } else { 298 if (level2Prefix != null) { 299 ZKUtil.deleteNodeRecursively(zookeeper, 300 ZNodePaths.joinZNode(regionsZNode, level1Prefix, level2Prefix)); 301 } 302 level2Prefix = level2Iter.next(); 303 List<String> encodedRegionNameAndPeerIds = ZKUtil.listChildrenNoWatch(zookeeper, 304 ZNodePaths.joinZNode(regionsZNode, level1Prefix, level2Prefix)); 305 if (encodedRegionNameAndPeerIds == null || encodedRegionNameAndPeerIds.isEmpty()) { 306 return Collections.emptyList(); 307 } 308 List<ZkLastPushedSeqId> lastPushedSeqIds = new ArrayList<>(); 309 for (String encodedRegionNameAndPeerId : encodedRegionNameAndPeerIds) { 310 byte[] data = ZKUtil.getData(zookeeper, ZNodePaths.joinZNode(regionsZNode, 311 level1Prefix, level2Prefix, encodedRegionNameAndPeerId)); 312 long lastPushedSeqId = ZKUtil.parseWALPositionFrom(data); 313 Iterator<String> iter = Splitter.on('-').split(encodedRegionNameAndPeerId).iterator(); 314 String encodedRegionName = level1Prefix + level2Prefix + iter.next(); 315 String peerId = iter.next(); 316 lastPushedSeqIds 317 .add(new ZkLastPushedSeqId(encodedRegionName, peerId, lastPushedSeqId)); 318 } 319 return Collections.unmodifiableList(lastPushedSeqIds); 320 } 321 } 322 } 323 }; 324 } 325 326 private String getHFileRefsPeerNode(String peerId) { 327 return ZNodePaths.joinZNode(hfileRefsZNode, peerId); 328 } 329 330 /** 331 * Pair<PeerId, List<HFileRefs>> 332 */ 333 @SuppressWarnings("unchecked") 334 public MigrationIterator<Pair<String, List<String>>> listAllHFileRefs() throws KeeperException { 335 List<String> peerIds = ZKUtil.listChildrenNoWatch(zookeeper, hfileRefsZNode); 336 if (peerIds == null || peerIds.isEmpty()) { 337 ZKUtil.deleteNodeRecursively(zookeeper, hfileRefsZNode); 338 return EMPTY_ITER; 339 } 340 Iterator<String> iter = peerIds.iterator(); 341 return new MigrationIterator<Pair<String, List<String>>>() { 342 343 private String previousPeerId; 344 345 @Override 346 public Pair<String, List<String>> next() throws KeeperException { 347 if (previousPeerId != null) { 348 ZKUtil.deleteNodeRecursively(zookeeper, getHFileRefsPeerNode(previousPeerId)); 349 } 350 if (!iter.hasNext()) { 351 ZKUtil.deleteNodeRecursively(zookeeper, hfileRefsZNode); 352 return null; 353 } 354 String peerId = iter.next(); 355 List<String> refs = ZKUtil.listChildrenNoWatch(zookeeper, getHFileRefsPeerNode(peerId)); 356 previousPeerId = peerId; 357 return Pair.newPair(peerId, refs != null ? refs : Collections.emptyList()); 358 } 359 }; 360 } 361 362 public boolean hasData() throws KeeperException { 363 return ZKUtil.checkExists(zookeeper, queuesZNode) != -1 364 || ZKUtil.checkExists(zookeeper, regionsZNode) != -1 365 || ZKUtil.checkExists(zookeeper, hfileRefsZNode) != -1; 366 } 367 368 public void deleteAllData() throws KeeperException { 369 ZKUtil.deleteNodeRecursively(zookeeper, queuesZNode); 370 ZKUtil.deleteNodeRecursively(zookeeper, regionsZNode); 371 ZKUtil.deleteNodeRecursively(zookeeper, hfileRefsZNode); 372 } 373 374 @RestrictedApi(explanation = "Should only be called in tests", link = "", 375 allowedOnPath = ".*/src/test/.*") 376 String getQueuesZNode() { 377 return queuesZNode; 378 } 379 380 @RestrictedApi(explanation = "Should only be called in tests", link = "", 381 allowedOnPath = ".*/src/test/.*") 382 String getHfileRefsZNode() { 383 return hfileRefsZNode; 384 } 385 386 @RestrictedApi(explanation = "Should only be called in tests", link = "", 387 allowedOnPath = ".*/src/test/.*") 388 String getRegionsZNode() { 389 return regionsZNode; 390 } 391}