001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import java.io.IOException; 021import java.lang.reflect.Constructor; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.fs.FileSystem; 024import org.apache.hadoop.hbase.Coprocessor; 025import org.apache.hadoop.hbase.NamespaceDescriptor; 026import org.apache.hadoop.hbase.TableName; 027import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 028import org.apache.hadoop.hbase.client.Connection; 029import org.apache.hadoop.hbase.client.CoprocessorDescriptorBuilder; 030import org.apache.hadoop.hbase.client.TableDescriptor; 031import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 032import org.apache.hadoop.hbase.util.ReflectionUtils; 033import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 034import org.apache.yetus.audience.InterfaceAudience; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038/** 039 * Used to create replication storage(peer, queue) classes. 040 */ 041@InterfaceAudience.Private 042public final class ReplicationStorageFactory { 043 044 private static final Logger LOG = LoggerFactory.getLogger(ReplicationStorageFactory.class); 045 046 public static final String REPLICATION_PEER_STORAGE_IMPL = "hbase.replication.peer.storage.impl"; 047 048 // must use zookeeper here, otherwise when user upgrading from an old version without changing the 049 // config file, they will loss all the replication peer data. 050 public static final ReplicationPeerStorageType DEFAULT_REPLICATION_PEER_STORAGE_IMPL = 051 ReplicationPeerStorageType.ZOOKEEPER; 052 053 public static final String REPLICATION_QUEUE_TABLE_NAME = "hbase.replication.queue.table.name"; 054 055 public static final TableName REPLICATION_QUEUE_TABLE_NAME_DEFAULT = 056 TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "replication"); 057 058 public static final String REPLICATION_QUEUE_IMPL = "hbase.replication.queue.storage.impl"; 059 060 public static TableDescriptor createReplicationQueueTableDescriptor(TableName tableName) 061 throws IOException { 062 return TableDescriptorBuilder.newBuilder(tableName) 063 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(TableReplicationQueueStorage.QUEUE_FAMILY)) 064 .setColumnFamily( 065 ColumnFamilyDescriptorBuilder.of(TableReplicationQueueStorage.LAST_SEQUENCE_ID_FAMILY)) 066 .setColumnFamily( 067 ColumnFamilyDescriptorBuilder.of(TableReplicationQueueStorage.HFILE_REF_FAMILY)) 068 .setValue("hbase.regionserver.region.split_restriction.type", "DelimitedKeyPrefix") 069 .setValue("hbase.regionserver.region.split_restriction.delimiter", "-") 070 .setCoprocessor(CoprocessorDescriptorBuilder 071 .newBuilder("org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint") 072 .setPriority(Coprocessor.PRIORITY_SYSTEM).build()) 073 .build(); 074 } 075 076 private ReplicationStorageFactory() { 077 } 078 079 private static Class<? extends ReplicationPeerStorage> 080 getReplicationPeerStorageClass(Configuration conf) { 081 try { 082 ReplicationPeerStorageType type = ReplicationPeerStorageType.valueOf( 083 conf.get(REPLICATION_PEER_STORAGE_IMPL, DEFAULT_REPLICATION_PEER_STORAGE_IMPL.name()) 084 .toUpperCase()); 085 return type.getClazz(); 086 } catch (IllegalArgumentException e) { 087 return conf.getClass(REPLICATION_PEER_STORAGE_IMPL, 088 DEFAULT_REPLICATION_PEER_STORAGE_IMPL.getClazz(), ReplicationPeerStorage.class); 089 } 090 } 091 092 /** 093 * Create a new {@link ReplicationPeerStorage}. 094 */ 095 public static ReplicationPeerStorage getReplicationPeerStorage(FileSystem fs, ZKWatcher zk, 096 Configuration conf) { 097 Class<? extends ReplicationPeerStorage> clazz = getReplicationPeerStorageClass(conf); 098 for (Constructor<?> c : clazz.getConstructors()) { 099 if (c.getParameterCount() != 2) { 100 continue; 101 } 102 if (c.getParameterTypes()[0].isAssignableFrom(FileSystem.class)) { 103 return ReflectionUtils.newInstance(clazz, fs, conf); 104 } else if (c.getParameterTypes()[0].isAssignableFrom(ZKWatcher.class)) { 105 return ReflectionUtils.newInstance(clazz, zk, conf); 106 } 107 } 108 throw new IllegalArgumentException( 109 "Can not create replication peer storage with type " + clazz); 110 } 111 112 /** 113 * Create a new {@link ReplicationQueueStorage}. 114 */ 115 public static ReplicationQueueStorage getReplicationQueueStorage(Connection conn, 116 Configuration conf) { 117 return getReplicationQueueStorage(conn, conf, TableName.valueOf(conf 118 .get(REPLICATION_QUEUE_TABLE_NAME, REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString()))); 119 } 120 121 /** 122 * Create a new {@link ReplicationQueueStorage}. 123 */ 124 public static ReplicationQueueStorage getReplicationQueueStorage(Connection conn, 125 Configuration conf, TableName tableName) { 126 Class<? extends ReplicationQueueStorage> clazz = conf.getClass(REPLICATION_QUEUE_IMPL, 127 TableReplicationQueueStorage.class, ReplicationQueueStorage.class); 128 try { 129 Constructor<? extends ReplicationQueueStorage> c = 130 clazz.getConstructor(Connection.class, TableName.class); 131 return c.newInstance(conn, tableName); 132 } catch (Exception e) { 133 LOG.debug( 134 "failed to create ReplicationQueueStorage with Connection, try creating with Configuration", 135 e); 136 return ReflectionUtils.newInstance(clazz, conf, tableName); 137 } 138 } 139 140 public static boolean isReplicationQueueTable(Configuration conf, TableName tableName) { 141 TableName replicationQueueTableName = TableName.valueOf(conf.get(REPLICATION_QUEUE_TABLE_NAME, 142 REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString())); 143 return replicationQueueTableName.equals(tableName); 144 } 145}