001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.master; 019 020import static org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage.PEERS_ZNODE; 021import static org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage.PEERS_ZNODE_DEFAULT; 022import static org.apache.hadoop.hbase.replication.ZKReplicationStorageBase.REPLICATION_ZNODE; 023import static org.apache.hadoop.hbase.replication.ZKReplicationStorageBase.REPLICATION_ZNODE_DEFAULT; 024 025import java.io.IOException; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.HBaseConfiguration; 028import org.apache.hadoop.hbase.client.Admin; 029import org.apache.hadoop.hbase.client.Connection; 030import org.apache.hadoop.hbase.client.ConnectionFactory; 031import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 032import org.apache.hadoop.hbase.replication.ReplicationException; 033import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 034import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; 035import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 036import org.apache.hadoop.hbase.zookeeper.ZKUtil; 037import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 038import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 039import org.apache.yetus.audience.InterfaceAudience; 040import org.apache.yetus.audience.InterfaceStability; 041import org.apache.zookeeper.KeeperException; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; 046 047/** 048 * This class is used to upgrade TableCFs from HBase 1.0, 1.1, 1.2, 1.3 to HBase 1.4 or 2.x. It will 049 * be removed in HBase 3.x. See HBASE-11393 050 */ 051@InterfaceAudience.Private 052@InterfaceStability.Unstable 053public class ReplicationPeerConfigUpgrader { 054 055 private static final String TABLE_CFS_ZNODE = "zookeeper.znode.replication.peers.tableCFs"; 056 private static final String TABLE_CFS_ZNODE_DEFAULT = "tableCFs"; 057 058 private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerConfigUpgrader.class); 059 private final Configuration conf; 060 private final ZKWatcher zookeeper; 061 private final ReplicationPeerStorage peerStorage; 062 063 public ReplicationPeerConfigUpgrader(ZKWatcher zookeeper, Configuration conf) { 064 this.zookeeper = zookeeper; 065 this.conf = conf; 066 this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zookeeper, conf); 067 } 068 069 public void upgrade() throws Exception { 070 try (Connection conn = ConnectionFactory.createConnection(conf)) { 071 Admin admin = conn.getAdmin(); 072 admin.listReplicationPeers().forEach((peerDesc) -> { 073 String peerId = peerDesc.getPeerId(); 074 ReplicationPeerConfig peerConfig = peerDesc.getPeerConfig(); 075 if ( 076 (peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) 077 || (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty()) 078 ) { 079 peerConfig.setReplicateAllUserTables(false); 080 try { 081 admin.updateReplicationPeerConfig(peerId, peerConfig); 082 } catch (Exception e) { 083 LOG.error("Failed to upgrade replication peer config for peerId=" + peerId, e); 084 } 085 } 086 }); 087 } 088 } 089 090 public void copyTableCFs() throws ReplicationException { 091 for (String peerId : peerStorage.listPeerIds()) { 092 if (!copyTableCFs(peerId)) { 093 LOG.error("upgrade tableCFs failed for peerId=" + peerId); 094 } 095 } 096 } 097 098 protected String getTableCFsNode(String peerId) { 099 String replicationZNode = ZNodePaths.joinZNode(zookeeper.getZNodePaths().baseZNode, 100 conf.get(REPLICATION_ZNODE, REPLICATION_ZNODE_DEFAULT)); 101 String peersZNode = 102 ZNodePaths.joinZNode(replicationZNode, conf.get(PEERS_ZNODE, PEERS_ZNODE_DEFAULT)); 103 return ZNodePaths.joinZNode(peersZNode, 104 ZNodePaths.joinZNode(peerId, conf.get(TABLE_CFS_ZNODE, TABLE_CFS_ZNODE_DEFAULT))); 105 } 106 107 public boolean copyTableCFs(String peerId) throws ReplicationException { 108 String tableCFsNode = getTableCFsNode(peerId); 109 try { 110 if (ZKUtil.checkExists(zookeeper, tableCFsNode) != -1) { 111 ReplicationPeerConfig rpc = peerStorage.getPeerConfig(peerId); 112 // We only need to copy data from tableCFs node to rpc Node the first time hmaster start. 113 if (rpc.getTableCFsMap() == null || rpc.getTableCFsMap().isEmpty()) { 114 // we copy TableCFs node into PeerNode 115 LOG.info("Copy table ColumnFamilies into peer=" + peerId); 116 ReplicationProtos.TableCF[] tableCFs = 117 ReplicationPeerConfigUtil.parseTableCFs(ZKUtil.getData(this.zookeeper, tableCFsNode)); 118 if (tableCFs != null && tableCFs.length > 0) { 119 rpc.setTableCFsMap(ReplicationPeerConfigUtil.convert2Map(tableCFs)); 120 peerStorage.updatePeerConfig(peerId, rpc); 121 } 122 } else { 123 LOG.info("No tableCFs in peerNode:" + peerId); 124 } 125 } 126 } catch (KeeperException e) { 127 LOG.warn("NOTICE!! Update peerId failed, peerId=" + peerId, e); 128 return false; 129 } catch (InterruptedException e) { 130 LOG.warn("NOTICE!! Update peerId failed, peerId=" + peerId, e); 131 return false; 132 } catch (IOException e) { 133 LOG.warn("NOTICE!! Update peerId failed, peerId=" + peerId, e); 134 return false; 135 } 136 return true; 137 } 138 139 private static void printUsageAndExit() { 140 System.err.printf( 141 "Usage: hbase org.apache.hadoop.hbase.replication.master.ReplicationPeerConfigUpgrader" 142 + " [options]"); 143 System.err.println(" where [options] are:"); 144 System.err.println(" -h|-help Show this help and exit."); 145 System.err.println(" copyTableCFs Copy table-cfs to replication peer config"); 146 System.err.println(" upgrade Upgrade replication peer config to new format"); 147 System.err.println(); 148 System.exit(1); 149 } 150 151 public static void main(String[] args) throws Exception { 152 if (args.length != 1) { 153 printUsageAndExit(); 154 } 155 if (args[0].equals("-help") || args[0].equals("-h")) { 156 printUsageAndExit(); 157 } else if (args[0].equals("copyTableCFs")) { 158 Configuration conf = HBaseConfiguration.create(); 159 try (ZKWatcher zkw = new ZKWatcher(conf, "ReplicationPeerConfigUpgrader", null)) { 160 ReplicationPeerConfigUpgrader tableCFsUpdater = 161 new ReplicationPeerConfigUpgrader(zkw, conf); 162 tableCFsUpdater.copyTableCFs(); 163 } 164 } else if (args[0].equals("upgrade")) { 165 Configuration conf = HBaseConfiguration.create(); 166 try (ZKWatcher zkw = new ZKWatcher(conf, "ReplicationPeerConfigUpgrader", null)) { 167 ReplicationPeerConfigUpgrader upgrader = new ReplicationPeerConfigUpgrader(zkw, conf); 168 upgrader.upgrade(); 169 } 170 } else { 171 printUsageAndExit(); 172 } 173 } 174}