001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.security.visibility; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.UUID; 024import java.util.concurrent.TimeUnit; 025import java.util.concurrent.TimeoutException; 026 027import org.apache.hadoop.hbase.ArrayBackedTag; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.PrivateCellUtil; 030import org.apache.hadoop.hbase.Tag; 031import org.apache.hadoop.hbase.TagType; 032import org.apache.hadoop.hbase.replication.ReplicationEndpoint; 033import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 034import org.apache.hadoop.hbase.replication.WALEntryFilter; 035import org.apache.hadoop.hbase.wal.WAL.Entry; 036import org.apache.hadoop.hbase.wal.WALEdit; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041@InterfaceAudience.Private 042public class VisibilityReplicationEndpoint implements ReplicationEndpoint { 043 044 private static final Logger LOG = LoggerFactory.getLogger(VisibilityReplicationEndpoint.class); 045 046 private final ReplicationEndpoint delegator; 047 private final VisibilityLabelService visibilityLabelsService; 048 049 public VisibilityReplicationEndpoint(ReplicationEndpoint endpoint, 050 VisibilityLabelService visibilityLabelsService) { 051 this.delegator = endpoint; 052 this.visibilityLabelsService = visibilityLabelsService; 053 } 054 055 @Override 056 public void init(Context context) throws IOException { 057 delegator.init(context); 058 } 059 060 @Override 061 public void peerConfigUpdated(ReplicationPeerConfig rpc){ 062 delegator.peerConfigUpdated(rpc); 063 } 064 065 @Override 066 public boolean replicate(ReplicateContext replicateContext) { 067 if (!delegator.canReplicateToSameCluster()) { 068 // Only when the replication is inter cluster replication we need to 069 // convert the visibility tags to 070 // string based tags. But for intra cluster replication like region 071 // replicas it is not needed. 072 List<Entry> entries = replicateContext.getEntries(); 073 List<Tag> visTags = new ArrayList<>(); 074 List<Tag> nonVisTags = new ArrayList<>(); 075 List<Entry> newEntries = new ArrayList<>(entries.size()); 076 for (Entry entry : entries) { 077 WALEdit newEdit = new WALEdit(); 078 ArrayList<Cell> cells = entry.getEdit().getCells(); 079 for (Cell cell : cells) { 080 if (cell.getTagsLength() > 0) { 081 visTags.clear(); 082 nonVisTags.clear(); 083 Byte serializationFormat = VisibilityUtils.extractAndPartitionTags(cell, visTags, 084 nonVisTags); 085 if (!visTags.isEmpty()) { 086 try { 087 byte[] modifiedVisExpression = visibilityLabelsService 088 .encodeVisibilityForReplication(visTags, serializationFormat); 089 if (modifiedVisExpression != null) { 090 nonVisTags 091 .add(new ArrayBackedTag(TagType.STRING_VIS_TAG_TYPE, modifiedVisExpression)); 092 } 093 } catch (Exception ioe) { 094 LOG.error( 095 "Exception while reading the visibility labels from the cell. The replication " 096 + "would happen as per the existing format and not as " + 097 "string type for the cell " 098 + cell + ".", ioe); 099 // just return the old entries as it is without applying the string type change 100 newEdit.add(cell); 101 continue; 102 } 103 // Recreate the cell with the new tags and the existing tags 104 Cell newCell = PrivateCellUtil.createCell(cell, nonVisTags); 105 newEdit.add(newCell); 106 } else { 107 newEdit.add(cell); 108 } 109 } else { 110 newEdit.add(cell); 111 } 112 } 113 newEntries.add(new Entry((entry.getKey()), newEdit)); 114 } 115 replicateContext.setEntries(newEntries); 116 return delegator.replicate(replicateContext); 117 } else { 118 return delegator.replicate(replicateContext); 119 } 120 } 121 122 @Override 123 public synchronized UUID getPeerUUID() { 124 return delegator.getPeerUUID(); 125 } 126 127 @Override 128 public boolean canReplicateToSameCluster() { 129 return delegator.canReplicateToSameCluster(); 130 } 131 132 @Override 133 public WALEntryFilter getWALEntryfilter() { 134 return delegator.getWALEntryfilter(); 135 } 136 137 @Override 138 public boolean isRunning() { 139 return this.delegator.isRunning(); 140 } 141 142 @Override 143 public boolean isStarting() {return this.delegator.isStarting();} 144 145 @Override 146 public void start() { 147 this.delegator.start(); 148 } 149 150 @Override 151 public void awaitRunning() { 152 this.delegator.awaitRunning(); 153 } 154 155 @Override 156 public void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { 157 this.delegator.awaitRunning(timeout, unit); 158 } 159 160 @Override 161 public void stop() { 162 this.delegator.stop(); 163 } 164 165 @Override 166 public void awaitTerminated() { 167 this.delegator.awaitTerminated(); 168 } 169 170 @Override 171 public void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { 172 this.delegator.awaitTerminated(timeout, unit); 173 } 174 175 @Override 176 public Throwable failureCause() { 177 return this.delegator.failureCause(); 178 } 179}