001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.replication; 020 021import java.io.IOException; 022import java.util.ArrayList; 023 024import org.apache.yetus.audience.InterfaceAudience; 025import org.slf4j.Logger; 026import org.slf4j.LoggerFactory; 027import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 028import org.apache.hbase.thirdparty.com.google.common.util.concurrent.AbstractService; 029 030/** 031 * A Base implementation for {@link ReplicationEndpoint}s. For internal use. Uses our internal 032 * Guava. 033 */ 034// This class has been made InterfaceAudience.Private in 2.0.0. It used to be 035// LimitedPrivate. See HBASE-15982. 036@InterfaceAudience.Private 037public abstract class BaseReplicationEndpoint extends AbstractService 038 implements ReplicationEndpoint { 039 040 private static final Logger LOG = LoggerFactory.getLogger(BaseReplicationEndpoint.class); 041 public static final String REPLICATION_WALENTRYFILTER_CONFIG_KEY 042 = "hbase.replication.source.custom.walentryfilters"; 043 protected Context ctx; 044 045 @Override 046 public void init(Context context) throws IOException { 047 this.ctx = context; 048 049 if (this.ctx != null){ 050 ReplicationPeer peer = this.ctx.getReplicationPeer(); 051 if (peer != null){ 052 peer.trackPeerConfigChanges(this); 053 } else { 054 LOG.warn("Not tracking replication peer config changes for Peer Id " + this.ctx.getPeerId() + 055 " because there's no such peer"); 056 } 057 } 058 } 059 060 @Override 061 /** 062 * No-op implementation for subclasses to override if they wish to execute logic if their config changes 063 */ 064 public void peerConfigUpdated(ReplicationPeerConfig rpc){ 065 066 } 067 068 /** Returns a default set of filters */ 069 @Override 070 public WALEntryFilter getWALEntryfilter() { 071 ArrayList<WALEntryFilter> filters = Lists.newArrayList(); 072 WALEntryFilter scopeFilter = getScopeWALEntryFilter(); 073 if (scopeFilter != null) { 074 filters.add(scopeFilter); 075 } 076 WALEntryFilter tableCfFilter = getNamespaceTableCfWALEntryFilter(); 077 if (tableCfFilter != null) { 078 filters.add(tableCfFilter); 079 } 080 if (ctx != null && ctx.getPeerConfig() != null) { 081 String filterNameCSV = ctx.getPeerConfig().getConfiguration().get(REPLICATION_WALENTRYFILTER_CONFIG_KEY); 082 if (filterNameCSV != null && !filterNameCSV.isEmpty()) { 083 String[] filterNames = filterNameCSV.split(","); 084 for (String filterName : filterNames) { 085 try { 086 Class<?> clazz = Class.forName(filterName); 087 filters.add((WALEntryFilter) clazz.getDeclaredConstructor().newInstance()); 088 } catch (Exception e) { 089 LOG.error("Unable to create WALEntryFilter " + filterName, e); 090 } 091 } 092 } 093 } 094 return filters.isEmpty() ? null : new ChainWALEntryFilter(filters); 095 } 096 097 /** Returns a WALEntryFilter for checking the scope. Subclasses can 098 * return null if they don't want this filter */ 099 protected WALEntryFilter getScopeWALEntryFilter() { 100 return new ScopeWALEntryFilter(); 101 } 102 103 /** Returns a WALEntryFilter for checking replication per table and CF. Subclasses can 104 * return null if they don't want this filter */ 105 protected WALEntryFilter getNamespaceTableCfWALEntryFilter() { 106 return new NamespaceTableCfWALEntryFilter(ctx.getReplicationPeer()); 107 } 108 109 @Override 110 public boolean canReplicateToSameCluster() { 111 return false; 112 } 113 114 @Override 115 public boolean isStarting() { 116 return state() == State.STARTING; 117 } 118}