001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import org.apache.yetus.audience.InterfaceAudience; 023import org.slf4j.Logger; 024import org.slf4j.LoggerFactory; 025 026import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 027import org.apache.hbase.thirdparty.com.google.common.util.concurrent.AbstractService; 028 029/** 030 * A Base implementation for {@link ReplicationEndpoint}s. For internal use. Uses our internal 031 * Guava. 032 */ 033// This class has been made InterfaceAudience.Private in 2.0.0. It used to be 034// LimitedPrivate. See HBASE-15982. 035@InterfaceAudience.Private 036public abstract class BaseReplicationEndpoint extends AbstractService 037 implements ReplicationEndpoint { 038 039 private static final Logger LOG = LoggerFactory.getLogger(BaseReplicationEndpoint.class); 040 public static final String REPLICATION_WALENTRYFILTER_CONFIG_KEY = 041 "hbase.replication.source.custom.walentryfilters"; 042 protected Context ctx; 043 044 @Override 045 public void init(Context context) throws IOException { 046 this.ctx = context; 047 048 if (this.ctx != null) { 049 ReplicationPeer peer = this.ctx.getReplicationPeer(); 050 if (peer != null) { 051 peer.registerPeerConfigListener(this); 052 } else { 053 LOG.warn("Not tracking replication peer config changes for Peer Id " + this.ctx.getPeerId() 054 + " because there's no such peer"); 055 } 056 } 057 } 058 059 @Override 060 /** 061 * No-op implementation for subclasses to override if they wish to execute logic if their config 062 * changes 063 */ 064 public void peerConfigUpdated(ReplicationPeerConfig rpc) { 065 066 } 067 068 /** Returns a default set of filters */ 069 @Override 070 public WALEntryFilter getWALEntryfilter() { 071 ArrayList<WALEntryFilter> filters = Lists.newArrayList(); 072 WALEntryFilter scopeFilter = getScopeWALEntryFilter(); 073 if (scopeFilter != null) { 074 filters.add(scopeFilter); 075 } 076 WALEntryFilter tableCfFilter = getNamespaceTableCfWALEntryFilter(); 077 if (tableCfFilter != null) { 078 filters.add(tableCfFilter); 079 } 080 if (ctx != null && ctx.getPeerConfig() != null) { 081 String filterNameCSV = 082 ctx.getPeerConfig().getConfiguration().get(REPLICATION_WALENTRYFILTER_CONFIG_KEY); 083 if (filterNameCSV != null && !filterNameCSV.isEmpty()) { 084 String[] filterNames = filterNameCSV.split(","); 085 for (String filterName : filterNames) { 086 try { 087 Class<?> clazz = Class.forName(filterName); 088 filters.add((WALEntryFilter) clazz.getDeclaredConstructor().newInstance()); 089 } catch (Exception e) { 090 LOG.error("Unable to create WALEntryFilter " + filterName, e); 091 } 092 } 093 } 094 } 095 return filters.isEmpty() ? null : new ChainWALEntryFilter(filters); 096 } 097 098 /** 099 * Returns a WALEntryFilter for checking the scope. Subclasses can return null if they don't want 100 * this filter 101 */ 102 protected WALEntryFilter getScopeWALEntryFilter() { 103 return new ScopeWALEntryFilter(); 104 } 105 106 /** 107 * Returns a WALEntryFilter for checking replication per table and CF. Subclasses can return null 108 * if they don't want this filter 109 */ 110 protected WALEntryFilter getNamespaceTableCfWALEntryFilter() { 111 return new NamespaceTableCfWALEntryFilter(ctx.getReplicationPeer()); 112 } 113 114 @Override 115 public boolean canReplicateToSameCluster() { 116 return false; 117 } 118 119 @Override 120 public boolean isStarting() { 121 return state() == State.STARTING; 122 } 123}