001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import java.io.IOException; 021import org.apache.hadoop.conf.Configuration; 022import org.apache.hadoop.conf.Configured; 023import org.apache.hadoop.fs.FileSystem; 024import org.apache.hadoop.hbase.Abortable; 025import org.apache.hadoop.hbase.HBaseConfiguration; 026import org.apache.hadoop.hbase.HBaseInterfaceAudience; 027import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 028import org.apache.hadoop.util.Tool; 029import org.apache.hadoop.util.ToolRunner; 030import org.apache.yetus.audience.InterfaceAudience; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034/** 035 * A tool for copying replication peer data across different replication peer storages. 036 * <p/> 037 * Notice that we will not delete the replication peer data from the source storage, as this tool 038 * can also be used by online migration. See HBASE-27110 for the whole design. 039 */ 040@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) 041public class CopyReplicationPeers extends Configured implements Tool { 042 043 private static final Logger LOG = LoggerFactory.getLogger(CopyReplicationPeers.class); 044 045 public static final String NAME = "copyreppeers"; 046 047 public CopyReplicationPeers(Configuration conf) { 048 super(conf); 049 } 050 051 private ReplicationPeerStorage create(String type, FileSystem fs, ZKWatcher zk) { 052 Configuration conf = new Configuration(getConf()); 053 conf.set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL, type); 054 return ReplicationStorageFactory.getReplicationPeerStorage(fs, zk, conf); 055 } 056 057 private ZKWatcher createZKWatcher() throws IOException { 058 return new ZKWatcher(getConf(), getClass().getSimpleName(), new Abortable() { 059 060 private volatile boolean aborted; 061 062 @Override 063 public boolean isAborted() { 064 return aborted; 065 } 066 067 @Override 068 public void abort(String why, Throwable e) { 069 aborted = true; 070 LOG.error(why, e); 071 System.exit(1); 072 } 073 }); 074 } 075 076 private void migrate(ReplicationPeerStorage src, ReplicationPeerStorage dst) 077 throws ReplicationException { 078 LOG.info("Start migrating from {} to {}", src.getClass().getSimpleName(), 079 dst.getClass().getSimpleName()); 080 for (String peerId : src.listPeerIds()) { 081 LOG.info("Going to migrate {}", peerId); 082 ReplicationPeerConfig peerConfig = src.getPeerConfig(peerId); 083 boolean enabled = src.isPeerEnabled(peerId); 084 SyncReplicationState syncState = src.getPeerSyncReplicationState(peerId); 085 SyncReplicationState newSyncState = src.getPeerNewSyncReplicationState(peerId); 086 if (newSyncState != SyncReplicationState.NONE) { 087 throw new IllegalStateException("Can not migrate peer " + peerId 088 + " as it is in an intermediate state, syncReplicationState is " + syncState 089 + " while newSyncReplicationState is " + newSyncState); 090 } 091 dst.addPeer(peerId, peerConfig, enabled, syncState); 092 LOG.info("Migrated peer {}, peerConfig = '{}', enabled = {}, syncReplicationState = {}", 093 peerId, peerConfig, enabled, syncState); 094 } 095 } 096 097 @Override 098 public int run(String[] args) throws Exception { 099 if (args.length != 2) { 100 System.err.println("Usage: bin/hbase " + NAME 101 + " <SRC_REPLICATION_PEER_STORAGE> <DST_REPLICATION_PEER_STORAGE>"); 102 System.err.println("The possible values for replication storage type:"); 103 for (ReplicationPeerStorageType type : ReplicationPeerStorageType.values()) { 104 System.err.println(" " + type.name().toLowerCase()); 105 } 106 return -1; 107 } 108 FileSystem fs = FileSystem.get(getConf()); 109 try (ZKWatcher zk = createZKWatcher()) { 110 ReplicationPeerStorage src = create(args[0], fs, zk); 111 ReplicationPeerStorage dst = create(args[1], fs, zk); 112 migrate(src, dst); 113 } 114 return 0; 115 } 116 117 public static void main(String[] args) throws Exception { 118 Configuration conf = HBaseConfiguration.create(); 119 int ret = ToolRunner.run(conf, new CopyReplicationPeers(conf), args); 120 System.exit(ret); 121 } 122}