001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.replication;
020
021import java.util.UUID;
022
023import org.apache.hadoop.hbase.HBaseInterfaceAudience;
024import org.apache.yetus.audience.InterfaceAudience;
025import org.apache.yetus.audience.InterfaceStability;
026import org.apache.hadoop.hbase.wal.WALEdit;
027import org.apache.hadoop.hbase.wal.WALKeyImpl;
028import org.apache.hadoop.hbase.wal.WAL.Entry;
029
030
031/**
032 * Filters out entries with our peerClusterId (i.e. already replicated)
033 * and marks all other entries with our clusterID
034 */
035@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
036@InterfaceStability.Evolving
037public class ClusterMarkingEntryFilter implements WALEntryFilter {
038  private UUID clusterId;
039  private UUID peerClusterId;
040  private ReplicationEndpoint replicationEndpoint;
041
042  /**
043   * @param clusterId id of this cluster
044   * @param peerClusterId of the other cluster
045   * @param replicationEndpoint ReplicationEndpoint which will handle the actual replication
046   */
047  public ClusterMarkingEntryFilter(UUID clusterId, UUID peerClusterId, ReplicationEndpoint replicationEndpoint) {
048    this.clusterId = clusterId;
049    this.peerClusterId = peerClusterId;
050    this.replicationEndpoint = replicationEndpoint;
051  }
052  @Override
053  public Entry filter(Entry entry) {
054    // don't replicate if the log entries have already been consumed by the cluster
055    if (replicationEndpoint.canReplicateToSameCluster()
056        || !entry.getKey().getClusterIds().contains(peerClusterId)) {
057      WALEdit edit = entry.getEdit();
058      WALKeyImpl logKey = (WALKeyImpl)entry.getKey();
059
060      if (edit != null && !edit.isEmpty()) {
061        // Mark that the current cluster has the change
062        logKey.addClusterId(clusterId);
063        // We need to set the CC to null else it will be compressed when sent to the sink
064        entry.setCompressionContext(null);
065        return entry;
066      }
067    }
068    return null;
069  }
070}