001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.security.visibility;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.UUID;
024import java.util.concurrent.TimeUnit;
025import java.util.concurrent.TimeoutException;
026
027import org.apache.hadoop.hbase.ArrayBackedTag;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.PrivateCellUtil;
030import org.apache.hadoop.hbase.Tag;
031import org.apache.hadoop.hbase.TagType;
032import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
033import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
034import org.apache.hadoop.hbase.replication.WALEntryFilter;
035import org.apache.hadoop.hbase.wal.WAL.Entry;
036import org.apache.hadoop.hbase.wal.WALEdit;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041@InterfaceAudience.Private
042public class VisibilityReplicationEndpoint implements ReplicationEndpoint {
043
044  private static final Logger LOG = LoggerFactory.getLogger(VisibilityReplicationEndpoint.class);
045
046  private final ReplicationEndpoint delegator;
047  private final VisibilityLabelService visibilityLabelsService;
048
049  public VisibilityReplicationEndpoint(ReplicationEndpoint endpoint,
050      VisibilityLabelService visibilityLabelsService) {
051    this.delegator = endpoint;
052    this.visibilityLabelsService = visibilityLabelsService;
053  }
054
055  @Override
056  public void init(Context context) throws IOException {
057    delegator.init(context);
058  }
059
060  @Override
061  public void peerConfigUpdated(ReplicationPeerConfig rpc){
062    delegator.peerConfigUpdated(rpc);
063  }
064
065  @Override
066  public boolean replicate(ReplicateContext replicateContext) {
067    if (!delegator.canReplicateToSameCluster()) {
068      // Only when the replication is inter cluster replication we need to
069      // convert the visibility tags to
070      // string based tags. But for intra cluster replication like region
071      // replicas it is not needed.
072      List<Entry> entries = replicateContext.getEntries();
073      List<Tag> visTags = new ArrayList<>();
074      List<Tag> nonVisTags = new ArrayList<>();
075      List<Entry> newEntries = new ArrayList<>(entries.size());
076      for (Entry entry : entries) {
077        WALEdit newEdit = new WALEdit();
078        ArrayList<Cell> cells = entry.getEdit().getCells();
079        for (Cell cell : cells) {
080          if (cell.getTagsLength() > 0) {
081            visTags.clear();
082            nonVisTags.clear();
083            Byte serializationFormat = VisibilityUtils.extractAndPartitionTags(cell, visTags,
084                nonVisTags);
085            if (!visTags.isEmpty()) {
086              try {
087                byte[] modifiedVisExpression = visibilityLabelsService
088                    .encodeVisibilityForReplication(visTags, serializationFormat);
089                if (modifiedVisExpression != null) {
090                  nonVisTags
091                      .add(new ArrayBackedTag(TagType.STRING_VIS_TAG_TYPE, modifiedVisExpression));
092                }
093              } catch (Exception ioe) {
094                LOG.error(
095                    "Exception while reading the visibility labels from the cell. The replication "
096                        + "would happen as per the existing format and not as " +
097                        "string type for the cell "
098                        + cell + ".", ioe);
099                // just return the old entries as it is without applying the string type change
100                newEdit.add(cell);
101                continue;
102              }
103              // Recreate the cell with the new tags and the existing tags
104              Cell newCell = PrivateCellUtil.createCell(cell, nonVisTags);
105              newEdit.add(newCell);
106            } else {
107              newEdit.add(cell);
108            }
109          } else {
110            newEdit.add(cell);
111          }
112        }
113        newEntries.add(new Entry((entry.getKey()), newEdit));
114      }
115      replicateContext.setEntries(newEntries);
116      return delegator.replicate(replicateContext);
117    } else {
118      return delegator.replicate(replicateContext);
119    }
120  }
121
122  @Override
123  public synchronized UUID getPeerUUID() {
124    return delegator.getPeerUUID();
125  }
126
127  @Override
128  public boolean canReplicateToSameCluster() {
129    return delegator.canReplicateToSameCluster();
130  }
131
132  @Override
133  public WALEntryFilter getWALEntryfilter() {
134    return delegator.getWALEntryfilter();
135  }
136
137  @Override
138  public boolean isRunning() {
139    return this.delegator.isRunning();
140  }
141
142  @Override
143  public boolean isStarting() {return this.delegator.isStarting();}
144
145  @Override
146  public void start() {
147    this.delegator.start();
148  }
149
150  @Override
151  public void awaitRunning() {
152    this.delegator.awaitRunning();
153  }
154
155  @Override
156  public void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException {
157    this.delegator.awaitRunning(timeout, unit);
158  }
159
160  @Override
161  public void stop() {
162    this.delegator.stop();
163  }
164
165  @Override
166  public void awaitTerminated() {
167    this.delegator.awaitTerminated();
168  }
169
170  @Override
171  public void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException {
172    this.delegator.awaitTerminated(timeout, unit);
173  }
174
175  @Override
176  public Throwable failureCause() {
177    return this.delegator.failureCause();
178  }
179}