001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.security.visibility;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import java.util.UUID;
024import java.util.concurrent.TimeUnit;
025import java.util.concurrent.TimeoutException;
026import org.apache.hadoop.hbase.ArrayBackedTag;
027import org.apache.hadoop.hbase.Cell;
028import org.apache.hadoop.hbase.PrivateCellUtil;
029import org.apache.hadoop.hbase.Tag;
030import org.apache.hadoop.hbase.TagType;
031import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
032import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
033import org.apache.hadoop.hbase.replication.WALEntryFilter;
034import org.apache.hadoop.hbase.wal.WAL.Entry;
035import org.apache.hadoop.hbase.wal.WALEdit;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040@InterfaceAudience.Private
041public class VisibilityReplicationEndpoint implements ReplicationEndpoint {
042
043  private static final Logger LOG = LoggerFactory.getLogger(VisibilityReplicationEndpoint.class);
044
045  private final ReplicationEndpoint delegator;
046  private final VisibilityLabelService visibilityLabelsService;
047
048  public VisibilityReplicationEndpoint(ReplicationEndpoint endpoint,
049    VisibilityLabelService visibilityLabelsService) {
050    this.delegator = endpoint;
051    this.visibilityLabelsService = visibilityLabelsService;
052  }
053
054  @Override
055  public void init(Context context) throws IOException {
056    delegator.init(context);
057  }
058
059  @Override
060  public void peerConfigUpdated(ReplicationPeerConfig rpc) {
061    delegator.peerConfigUpdated(rpc);
062  }
063
064  @Override
065  public boolean replicate(ReplicateContext replicateContext) {
066    if (!delegator.canReplicateToSameCluster()) {
067      // Only when the replication is inter cluster replication we need to
068      // convert the visibility tags to
069      // string based tags. But for intra cluster replication like region
070      // replicas it is not needed.
071      List<Entry> entries = replicateContext.getEntries();
072      List<Tag> visTags = new ArrayList<>();
073      List<Tag> nonVisTags = new ArrayList<>();
074      List<Entry> newEntries = new ArrayList<>(entries.size());
075      for (Entry entry : entries) {
076        WALEdit newEdit = new WALEdit();
077        ArrayList<Cell> cells = entry.getEdit().getCells();
078        for (Cell cell : cells) {
079          if (cell.getTagsLength() > 0) {
080            visTags.clear();
081            nonVisTags.clear();
082            Byte serializationFormat =
083              VisibilityUtils.extractAndPartitionTags(cell, visTags, nonVisTags);
084            if (!visTags.isEmpty()) {
085              try {
086                byte[] modifiedVisExpression = visibilityLabelsService
087                  .encodeVisibilityForReplication(visTags, serializationFormat);
088                if (modifiedVisExpression != null) {
089                  nonVisTags
090                    .add(new ArrayBackedTag(TagType.STRING_VIS_TAG_TYPE, modifiedVisExpression));
091                }
092              } catch (Exception ioe) {
093                LOG.error(
094                  "Exception while reading the visibility labels from the cell. The replication "
095                    + "would happen as per the existing format and not as "
096                    + "string type for the cell " + cell + ".",
097                  ioe);
098                // just return the old entries as it is without applying the string type change
099                newEdit.add(cell);
100                continue;
101              }
102              // Recreate the cell with the new tags and the existing tags
103              Cell newCell = PrivateCellUtil.createCell(cell, nonVisTags);
104              newEdit.add(newCell);
105            } else {
106              newEdit.add(cell);
107            }
108          } else {
109            newEdit.add(cell);
110          }
111        }
112        newEntries.add(new Entry((entry.getKey()), newEdit));
113      }
114      replicateContext.setEntries(newEntries);
115      return delegator.replicate(replicateContext);
116    } else {
117      return delegator.replicate(replicateContext);
118    }
119  }
120
121  @Override
122  public synchronized UUID getPeerUUID() {
123    return delegator.getPeerUUID();
124  }
125
126  @Override
127  public boolean canReplicateToSameCluster() {
128    return delegator.canReplicateToSameCluster();
129  }
130
131  @Override
132  public WALEntryFilter getWALEntryfilter() {
133    return delegator.getWALEntryfilter();
134  }
135
136  @Override
137  public boolean isRunning() {
138    return this.delegator.isRunning();
139  }
140
141  @Override
142  public boolean isStarting() {
143    return this.delegator.isStarting();
144  }
145
146  @Override
147  public void start() {
148    this.delegator.start();
149  }
150
151  @Override
152  public void awaitRunning() {
153    this.delegator.awaitRunning();
154  }
155
156  @Override
157  public void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException {
158    this.delegator.awaitRunning(timeout, unit);
159  }
160
161  @Override
162  public void stop() {
163    this.delegator.stop();
164  }
165
166  @Override
167  public void awaitTerminated() {
168    this.delegator.awaitTerminated();
169  }
170
171  @Override
172  public void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException {
173    this.delegator.awaitTerminated(timeout, unit);
174  }
175
176  @Override
177  public Throwable failureCause() {
178    return this.delegator.failureCause();
179  }
180}