View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   * http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.security.visibility;
20  
21  import static org.apache.hadoop.hbase.HConstants.OperationStatusCode.SANITY_CHECK_FAILURE;
22  import static org.apache.hadoop.hbase.HConstants.OperationStatusCode.SUCCESS;
23  import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABELS_TABLE_FAMILY;
24  import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABELS_TABLE_NAME;
25  
26  import java.io.IOException;
27  import java.net.InetAddress;
28  import java.util.ArrayList;
29  import java.util.HashMap;
30  import java.util.Iterator;
31  import java.util.List;
32  import java.util.Map;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.hbase.AuthUtil;
38  import org.apache.hadoop.hbase.Cell;
39  import org.apache.hadoop.hbase.CellScanner;
40  import org.apache.hadoop.hbase.CellUtil;
41  import org.apache.hadoop.hbase.CoprocessorEnvironment;
42  import org.apache.hadoop.hbase.DoNotRetryIOException;
43  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
44  import org.apache.hadoop.hbase.HColumnDescriptor;
45  import org.apache.hadoop.hbase.HConstants;
46  import org.apache.hadoop.hbase.HTableDescriptor;
47  import org.apache.hadoop.hbase.MetaTableAccessor;
48  import org.apache.hadoop.hbase.TableName;
49  import org.apache.hadoop.hbase.Tag;
50  import org.apache.hadoop.hbase.TagRewriteCell;
51  import org.apache.hadoop.hbase.TagType;
52  import org.apache.hadoop.hbase.classification.InterfaceAudience;
53  import org.apache.hadoop.hbase.client.Append;
54  import org.apache.hadoop.hbase.client.Delete;
55  import org.apache.hadoop.hbase.client.Get;
56  import org.apache.hadoop.hbase.client.Increment;
57  import org.apache.hadoop.hbase.client.Mutation;
58  import org.apache.hadoop.hbase.client.Put;
59  import org.apache.hadoop.hbase.client.Result;
60  import org.apache.hadoop.hbase.client.Scan;
61  import org.apache.hadoop.hbase.constraint.ConstraintException;
62  import org.apache.hadoop.hbase.coprocessor.BaseMasterAndRegionObserver;
63  import org.apache.hadoop.hbase.coprocessor.BaseRegionServerObserver;
64  import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
65  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
66  import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
67  import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
68  import org.apache.hadoop.hbase.coprocessor.ObserverContext;
69  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
70  import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
71  import org.apache.hadoop.hbase.exceptions.DeserializationException;
72  import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
73  import org.apache.hadoop.hbase.filter.Filter;
74  import org.apache.hadoop.hbase.filter.FilterBase;
75  import org.apache.hadoop.hbase.filter.FilterList;
76  import org.apache.hadoop.hbase.io.hfile.HFile;
77  import org.apache.hadoop.hbase.ipc.RpcServer;
78  import org.apache.hadoop.hbase.master.MasterServices;
79  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
80  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult;
81  import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos;
82  import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.GetAuthsRequest;
83  import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.GetAuthsResponse;
84  import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.ListLabelsRequest;
85  import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.ListLabelsResponse;
86  import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.SetAuthsRequest;
87  import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.VisibilityLabel;
88  import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.VisibilityLabelsRequest;
89  import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.VisibilityLabelsResponse;
90  import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos.VisibilityLabelsService;
91  import org.apache.hadoop.hbase.regionserver.BloomType;
92  import org.apache.hadoop.hbase.regionserver.DeleteTracker;
93  import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
94  import org.apache.hadoop.hbase.regionserver.InternalScanner;
95  import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
96  import org.apache.hadoop.hbase.regionserver.OperationStatus;
97  import org.apache.hadoop.hbase.regionserver.Region;
98  import org.apache.hadoop.hbase.regionserver.RegionScanner;
99  import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
100 import org.apache.hadoop.hbase.security.AccessDeniedException;
101 import org.apache.hadoop.hbase.security.Superusers;
102 import org.apache.hadoop.hbase.security.User;
103 import org.apache.hadoop.hbase.security.access.AccessController;
104 import org.apache.hadoop.hbase.util.ByteStringer;
105 import org.apache.hadoop.hbase.util.Bytes;
106 import org.apache.hadoop.hbase.util.Pair;
107 
108 import com.google.common.collect.Lists;
109 import com.google.common.collect.MapMaker;
110 import com.google.protobuf.ByteString;
111 import com.google.protobuf.RpcCallback;
112 import com.google.protobuf.RpcController;
113 import com.google.protobuf.Service;
114 
115 /**
116  * Coprocessor that has both the MasterObserver and RegionObserver implemented that supports in
117  * visibility labels
118  */
119 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
120 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MT_CORRECTNESS",
121   justification="FIX visibilityLabelService; Make Synchronized!!!")
122 public class VisibilityController extends BaseMasterAndRegionObserver implements
123     VisibilityLabelsService.Interface, CoprocessorService {
124 
125   private static final Log LOG = LogFactory.getLog(VisibilityController.class);
126   private static final Log AUDITLOG = LogFactory.getLog("SecurityLogger."
127       + VisibilityController.class.getName());
128   // flags if we are running on a region of the 'labels' table
129   private boolean labelsRegion = false;
130   // Flag denoting whether AcessController is available or not.
131   private boolean accessControllerAvailable = false;
132   private Configuration conf;
133   private volatile boolean initialized = false;
134   private boolean checkAuths = false;
135   /** Mapping of scanner instances to the user who created them */
136   private Map<InternalScanner,String> scannerOwners =
137       new MapMaker().weakKeys().makeMap();
138 
139   private VisibilityLabelService visibilityLabelService; // FindBugs: MT_CORRECTNESS FIX!!!
140 
141   /** if we are active, usually true, only not true if "hbase.security.authorization"
142     has been set to false in site configuration */
143   boolean authorizationEnabled;
144 
145   // Add to this list if there are any reserved tag types
146   private static ArrayList<Byte> RESERVED_VIS_TAG_TYPES = new ArrayList<Byte>();
147   static {
148     RESERVED_VIS_TAG_TYPES.add(TagType.VISIBILITY_TAG_TYPE);
149     RESERVED_VIS_TAG_TYPES.add(TagType.VISIBILITY_EXP_SERIALIZATION_FORMAT_TAG_TYPE);
150     RESERVED_VIS_TAG_TYPES.add(TagType.STRING_VIS_TAG_TYPE);
151   }
152 
153   public static boolean isAuthorizationSupported(Configuration conf) {
154     return conf.getBoolean(User.HBASE_SECURITY_AUTHORIZATION_CONF_KEY, true);
155   }
156 
157   public static boolean isCellAuthorizationSupported(Configuration conf) {
158     return isAuthorizationSupported(conf);
159   }
160 
161   @Override
162   public void start(CoprocessorEnvironment env) throws IOException {
163     this.conf = env.getConfiguration();
164 
165     authorizationEnabled = isAuthorizationSupported(conf);
166     if (!authorizationEnabled) {
167       LOG.warn("The VisibilityController has been loaded with authorization checks disabled.");
168     }
169 
170     if (HFile.getFormatVersion(conf) < HFile.MIN_FORMAT_VERSION_WITH_TAGS) {
171       throw new RuntimeException("A minimum HFile version of " + HFile.MIN_FORMAT_VERSION_WITH_TAGS
172         + " is required to persist visibility labels. Consider setting " + HFile.FORMAT_VERSION_KEY
173         + " accordingly.");
174     }
175 
176     if (env instanceof RegionServerCoprocessorEnvironment) {
177       throw new RuntimeException("Visibility controller should not be configured as "
178           + "'hbase.coprocessor.regionserver.classes'.");
179     }
180     // Do not create for master CPs
181     if (!(env instanceof MasterCoprocessorEnvironment)) {
182       visibilityLabelService = VisibilityLabelServiceManager.getInstance()
183           .getVisibilityLabelService(this.conf);
184     }
185   }
186 
187   @Override
188   public void stop(CoprocessorEnvironment env) throws IOException {
189 
190   }
191 
192   /********************************* Master related hooks **********************************/
193 
194   @Override
195   public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException {
196     // Need to create the new system table for labels here
197     MasterServices master = ctx.getEnvironment().getMasterServices();
198     if (!MetaTableAccessor.tableExists(master.getConnection(), LABELS_TABLE_NAME)) {
199       HTableDescriptor labelsTable = new HTableDescriptor(LABELS_TABLE_NAME);
200       HColumnDescriptor labelsColumn = new HColumnDescriptor(LABELS_TABLE_FAMILY);
201       labelsColumn.setBloomFilterType(BloomType.NONE);
202       labelsColumn.setBlockCacheEnabled(false); // We will cache all the labels. No need of normal
203                                                  // table block cache.
204       labelsTable.addFamily(labelsColumn);
205       // Let the "labels" table having only one region always. We are not expecting too many labels in
206       // the system.
207       labelsTable.setValue(HTableDescriptor.SPLIT_POLICY,
208           DisabledRegionSplitPolicy.class.getName());
209       labelsTable.setValue(Bytes.toBytes(HConstants.DISALLOW_WRITES_IN_RECOVERING),
210           Bytes.toBytes(true));
211       master.createTable(labelsTable, null, HConstants.NO_NONCE, HConstants.NO_NONCE);
212     }
213   }
214 
215   @Override
216   public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
217       TableName tableName, HTableDescriptor htd) throws IOException {
218     if (!authorizationEnabled) {
219       return;
220     }
221     if (LABELS_TABLE_NAME.equals(tableName)) {
222       throw new ConstraintException("Cannot alter " + LABELS_TABLE_NAME);
223     }
224   }
225 
226   @Override
227   public void preAddColumn(ObserverContext<MasterCoprocessorEnvironment> ctx, TableName tableName,
228       HColumnDescriptor column) throws IOException {
229     if (!authorizationEnabled) {
230       return;
231     }
232     if (LABELS_TABLE_NAME.equals(tableName)) {
233       throw new ConstraintException("Cannot alter " + LABELS_TABLE_NAME);
234     }
235   }
236 
237   @Override
238   public void preModifyColumn(ObserverContext<MasterCoprocessorEnvironment> ctx,
239       TableName tableName, HColumnDescriptor descriptor) throws IOException {
240     if (!authorizationEnabled) {
241       return;
242     }
243     if (LABELS_TABLE_NAME.equals(tableName)) {
244       throw new ConstraintException("Cannot alter " + LABELS_TABLE_NAME);
245     }
246   }
247 
248   @Override
249   public void preDeleteColumn(ObserverContext<MasterCoprocessorEnvironment> ctx,
250       TableName tableName, byte[] c) throws IOException {
251     if (!authorizationEnabled) {
252       return;
253     }
254     if (LABELS_TABLE_NAME.equals(tableName)) {
255       throw new ConstraintException("Cannot alter " + LABELS_TABLE_NAME);
256     }
257   }
258 
259   @Override
260   public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> ctx, TableName tableName)
261       throws IOException {
262     if (!authorizationEnabled) {
263       return;
264     }
265     if (LABELS_TABLE_NAME.equals(tableName)) {
266       throw new ConstraintException("Cannot disable " + LABELS_TABLE_NAME);
267     }
268   }
269 
270   /****************************** Region related hooks ******************************/
271 
272   @Override
273   public void postOpen(ObserverContext<RegionCoprocessorEnvironment> e) {
274     // Read the entire labels table and populate the zk
275     if (e.getEnvironment().getRegion().getRegionInfo().getTable().equals(LABELS_TABLE_NAME)) {
276       this.labelsRegion = true;
277       synchronized (this) {
278         this.accessControllerAvailable = CoprocessorHost.getLoadedCoprocessors()
279           .contains(AccessController.class.getName());
280       }
281       // Defer the init of VisibilityLabelService on labels region until it is in recovering state.
282       if (!e.getEnvironment().getRegion().isRecovering()) {
283         initVisibilityLabelService(e.getEnvironment());
284       }
285     } else {
286       checkAuths = e.getEnvironment().getConfiguration()
287           .getBoolean(VisibilityConstants.CHECK_AUTHS_FOR_MUTATION, false);
288       initVisibilityLabelService(e.getEnvironment());
289     }
290   }
291 
292   @Override
293   public void postLogReplay(ObserverContext<RegionCoprocessorEnvironment> e) {
294     if (this.labelsRegion) {
295       initVisibilityLabelService(e.getEnvironment());
296       LOG.debug("post labels region log replay");
297     }
298   }
299 
300   private void initVisibilityLabelService(RegionCoprocessorEnvironment env) {
301     try {
302       this.visibilityLabelService.init(env);
303       this.initialized = true;
304     } catch (IOException ioe) {
305       LOG.error("Error while initializing VisibilityLabelService..", ioe);
306       throw new RuntimeException(ioe);
307     }
308   }
309 
310   @Override
311   public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
312       MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
313     if (c.getEnvironment().getRegion().getRegionInfo().getTable().isSystemTable()) {
314       return;
315     }
316     // TODO this can be made as a global LRU cache at HRS level?
317     Map<String, List<Tag>> labelCache = new HashMap<String, List<Tag>>();
318     for (int i = 0; i < miniBatchOp.size(); i++) {
319       Mutation m = miniBatchOp.getOperation(i);
320       CellVisibility cellVisibility = null;
321       try {
322         cellVisibility = m.getCellVisibility();
323       } catch (DeserializationException de) {
324         miniBatchOp.setOperationStatus(i,
325             new OperationStatus(SANITY_CHECK_FAILURE, de.getMessage()));
326         continue;
327       }
328       boolean sanityFailure = false;
329       boolean modifiedTagFound = false;
330       Pair<Boolean, Tag> pair = new Pair<Boolean, Tag>(false, null);
331       for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
332         pair = checkForReservedVisibilityTagPresence(cellScanner.current(), pair);
333         if (!pair.getFirst()) {
334           // Don't disallow reserved tags if authorization is disabled
335           if (authorizationEnabled) {
336             miniBatchOp.setOperationStatus(i, new OperationStatus(SANITY_CHECK_FAILURE,
337               "Mutation contains cell with reserved type tag"));
338             sanityFailure = true;
339           }
340           break;
341         } else {
342           // Indicates that the cell has a the tag which was modified in the src replication cluster
343           Tag tag = pair.getSecond();
344           if (cellVisibility == null && tag != null) {
345             // May need to store only the first one
346             cellVisibility = new CellVisibility(Bytes.toString(tag.getBuffer(), tag.getTagOffset(),
347                 tag.getTagLength()));
348             modifiedTagFound = true;
349           }
350         }
351       }
352       if (!sanityFailure) {
353         if (cellVisibility != null) {
354           String labelsExp = cellVisibility.getExpression();
355           List<Tag> visibilityTags = labelCache.get(labelsExp);
356           if (visibilityTags == null) {
357             // Don't check user auths for labels with Mutations when the user is super user
358             boolean authCheck = authorizationEnabled && checkAuths && !(isSystemOrSuperUser());
359             try {
360               visibilityTags = this.visibilityLabelService.createVisibilityExpTags(labelsExp, true,
361                   authCheck);
362             } catch (InvalidLabelException e) {
363               miniBatchOp.setOperationStatus(i,
364                   new OperationStatus(SANITY_CHECK_FAILURE, e.getMessage()));
365             }
366             if (visibilityTags != null) {
367               labelCache.put(labelsExp, visibilityTags);
368             }
369           }
370           if (visibilityTags != null) {
371             List<Cell> updatedCells = new ArrayList<Cell>();
372             for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
373               Cell cell = cellScanner.current();
374               List<Tag> tags = Tag.asList(cell.getTagsArray(), cell.getTagsOffset(),
375                   cell.getTagsLength());
376               if (modifiedTagFound) {
377                 // Rewrite the tags by removing the modified tags.
378                 removeReplicationVisibilityTag(tags);
379               }
380               tags.addAll(visibilityTags);
381               Cell updatedCell = new TagRewriteCell(cell, Tag.fromList(tags));
382               updatedCells.add(updatedCell);
383             }
384             m.getFamilyCellMap().clear();
385             // Clear and add new Cells to the Mutation.
386             for (Cell cell : updatedCells) {
387               if (m instanceof Put) {
388                 Put p = (Put) m;
389                 p.add(cell);
390               } else if (m instanceof Delete) {
391                 Delete d = (Delete) m;
392                 d.addDeleteMarker(cell);
393               }
394             }
395           }
396         }
397       }
398     }
399   }
400 
401   @Override
402   public void prePrepareTimeStampForDeleteVersion(
403       ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation delete, Cell cell,
404       byte[] byteNow, Get get) throws IOException {
405     // Nothing to do if we are not filtering by visibility
406     if (!authorizationEnabled) {
407       return;
408     }
409 
410     CellVisibility cellVisibility = null;
411     try {
412       cellVisibility = delete.getCellVisibility();
413     } catch (DeserializationException de) {
414       throw new IOException("Invalid cell visibility specified " + delete, de);
415     }
416     // The check for checkForReservedVisibilityTagPresence happens in preBatchMutate happens.
417     // It happens for every mutation and that would be enough.
418     List<Tag> visibilityTags = new ArrayList<Tag>();
419     if (cellVisibility != null) {
420       String labelsExp = cellVisibility.getExpression();
421       try {
422         visibilityTags = this.visibilityLabelService.createVisibilityExpTags(labelsExp, false,
423             false);
424       } catch (InvalidLabelException e) {
425         throw new IOException("Invalid cell visibility specified " + labelsExp, e);
426       }
427     }
428     get.setFilter(new DeleteVersionVisibilityExpressionFilter(visibilityTags,
429         VisibilityConstants.SORTED_ORDINAL_SERIALIZATION_FORMAT));
430     List<Cell> result = ctx.getEnvironment().getRegion().get(get, false);
431 
432     if (result.size() < get.getMaxVersions()) {
433       // Nothing to delete
434       CellUtil.updateLatestStamp(cell, Long.MIN_VALUE);
435       return;
436     }
437     if (result.size() > get.getMaxVersions()) {
438       throw new RuntimeException("Unexpected size: " + result.size()
439           + ". Results more than the max versions obtained.");
440     }
441     Cell getCell = result.get(get.getMaxVersions() - 1);
442     CellUtil.setTimestamp(cell, getCell.getTimestamp());
443 
444     // We are bypassing here because in the HRegion.updateDeleteLatestVersionTimeStamp we would
445     // update with the current timestamp after again doing a get. As the hook as already determined
446     // the needed timestamp we need to bypass here.
447     // TODO : See if HRegion.updateDeleteLatestVersionTimeStamp() could be
448     // called only if the hook is not called.
449     ctx.bypass();
450   }
451 
452   /**
453    * Checks whether cell contains any tag with type as VISIBILITY_TAG_TYPE. This
454    * tag type is reserved and should not be explicitly set by user.
455    *
456    * @param cell
457    *          - the cell under consideration
458    * @param pair - an optional pair of type <Boolean, Tag> which would be reused
459    *               if already set and new one will be created if null is passed
460    * @return a pair<Boolean, Tag> - if the boolean is false then it indicates
461    *         that the cell has a RESERVERD_VIS_TAG and with boolean as true, not
462    *         null tag indicates that a string modified tag was found.
463    */
464   private Pair<Boolean, Tag> checkForReservedVisibilityTagPresence(Cell cell,
465       Pair<Boolean, Tag> pair) throws IOException {
466     if (pair == null) {
467       pair = new Pair<Boolean, Tag>(false, null);
468     } else {
469       pair.setFirst(false);
470       pair.setSecond(null);
471     }
472     // Bypass this check when the operation is done by a system/super user.
473     // This is done because, while Replication, the Cells coming to the peer cluster with reserved
474     // typed tags and this is fine and should get added to the peer cluster table
475     if (isSystemOrSuperUser()) {
476       // Does the cell contain special tag which indicates that the replicated
477       // cell visiblilty tags
478       // have been modified
479       Tag modifiedTag = null;
480       if (cell.getTagsLength() > 0) {
481         Iterator<Tag> tagsIterator = CellUtil.tagsIterator(cell.getTagsArray(),
482             cell.getTagsOffset(), cell.getTagsLength());
483         while (tagsIterator.hasNext()) {
484           Tag tag = tagsIterator.next();
485           if (tag.getType() == TagType.STRING_VIS_TAG_TYPE) {
486             modifiedTag = tag;
487             break;
488           }
489         }
490       }
491       pair.setFirst(true);
492       pair.setSecond(modifiedTag);
493       return pair;
494     }
495     if (cell.getTagsLength() > 0) {
496       Iterator<Tag> tagsItr = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
497           cell.getTagsLength());
498       while (tagsItr.hasNext()) {
499         if (RESERVED_VIS_TAG_TYPES.contains(tagsItr.next().getType())) {
500           return pair;
501         }
502       }
503     }
504     pair.setFirst(true);
505     return pair;
506   }
507 
508   /**
509    * Checks whether cell contains any tag with type as VISIBILITY_TAG_TYPE. This
510    * tag type is reserved and should not be explicitly set by user. There are
511    * two versions of this method one that accepts pair and other without pair.
512    * In case of preAppend and preIncrement the additional operations are not
513    * needed like checking for STRING_VIS_TAG_TYPE and hence the API without pair
514    * could be used.
515    *
516    * @param cell
517    * @return true or false
518    * @throws IOException
519    */
520   private boolean checkForReservedVisibilityTagPresence(Cell cell) throws IOException {
521     // Bypass this check when the operation is done by a system/super user.
522     // This is done because, while Replication, the Cells coming to the peer
523     // cluster with reserved
524     // typed tags and this is fine and should get added to the peer cluster
525     // table
526     if (isSystemOrSuperUser()) {
527       return true;
528     }
529     if (cell.getTagsLength() > 0) {
530       Iterator<Tag> tagsItr = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
531           cell.getTagsLength());
532       while (tagsItr.hasNext()) {
533         if (RESERVED_VIS_TAG_TYPES.contains(tagsItr.next().getType())) {
534           return false;
535         }
536       }
537     }
538     return true;
539   }
540 
541   private void removeReplicationVisibilityTag(List<Tag> tags) throws IOException {
542     Iterator<Tag> iterator = tags.iterator();
543     while (iterator.hasNext()) {
544       Tag tag = iterator.next();
545       if (tag.getType() == TagType.STRING_VIS_TAG_TYPE) {
546         iterator.remove();
547         break;
548       }
549     }
550   }
551 
552   @Override
553   public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan,
554       RegionScanner s) throws IOException {
555     if (!initialized) {
556       throw new VisibilityControllerNotReadyException("VisibilityController not yet initialized!");
557     }
558     // Nothing to do if authorization is not enabled
559     if (!authorizationEnabled) {
560       return s;
561     }
562     Region region = e.getEnvironment().getRegion();
563     Authorizations authorizations = null;
564     try {
565       authorizations = scan.getAuthorizations();
566     } catch (DeserializationException de) {
567       throw new IOException(de);
568     }
569     if (authorizations == null) {
570       // No Authorizations present for this scan/Get!
571       // In case of system tables other than "labels" just scan with out visibility check and
572       // filtering. Checking visibility labels for META and NAMESPACE table is not needed.
573       TableName table = region.getRegionInfo().getTable();
574       if (table.isSystemTable() && !table.equals(LABELS_TABLE_NAME)) {
575         return s;
576       }
577     }
578 
579     Filter visibilityLabelFilter = VisibilityUtils.createVisibilityLabelFilter(region,
580         authorizations);
581     if (visibilityLabelFilter != null) {
582       Filter filter = scan.getFilter();
583       if (filter != null) {
584         scan.setFilter(new FilterList(filter, visibilityLabelFilter));
585       } else {
586         scan.setFilter(visibilityLabelFilter);
587       }
588     }
589     return s;
590   }
591 
592   @Override
593   public DeleteTracker postInstantiateDeleteTracker(
594       ObserverContext<RegionCoprocessorEnvironment> ctx, DeleteTracker delTracker)
595       throws IOException {
596     // Nothing to do if we are not filtering by visibility
597     if (!authorizationEnabled) {
598       return delTracker;
599     }
600     Region region = ctx.getEnvironment().getRegion();
601     TableName table = region.getRegionInfo().getTable();
602     if (table.isSystemTable()) {
603       return delTracker;
604     }
605     // We are creating a new type of delete tracker here which is able to track
606     // the timestamps and also the
607     // visibility tags per cell. The covering cells are determined not only
608     // based on the delete type and ts
609     // but also on the visibility expression matching.
610     return new VisibilityScanDeleteTracker();
611   }
612 
613   @Override
614   public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
615       final Scan scan, final RegionScanner s) throws IOException {
616     User user = VisibilityUtils.getActiveUser();
617     if (user != null && user.getShortName() != null) {
618       scannerOwners.put(s, user.getShortName());
619     }
620     return s;
621   }
622 
623   @Override
624   public boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> c,
625       final InternalScanner s, final List<Result> result, final int limit, final boolean hasNext)
626       throws IOException {
627     requireScannerOwner(s);
628     return hasNext;
629   }
630 
631   @Override
632   public void preScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
633       final InternalScanner s) throws IOException {
634     requireScannerOwner(s);
635   }
636 
637   @Override
638   public void postScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
639       final InternalScanner s) throws IOException {
640     // clean up any associated owner mapping
641     scannerOwners.remove(s);
642   }
643 
644   /**
645    * Verify, when servicing an RPC, that the caller is the scanner owner. If so, we assume that
646    * access control is correctly enforced based on the checks performed in preScannerOpen()
647    */
648   private void requireScannerOwner(InternalScanner s) throws AccessDeniedException {
649     if (!RpcServer.isInRpcCallContext())
650       return;
651     String requestUName = RpcServer.getRequestUserName();
652     String owner = scannerOwners.get(s);
653     if (authorizationEnabled && owner != null && !owner.equals(requestUName)) {
654       throw new AccessDeniedException("User '" + requestUName + "' is not the scanner owner!");
655     }
656   }
657 
658   @Override
659   public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get,
660       List<Cell> results) throws IOException {
661     if (!initialized) {
662       throw new VisibilityControllerNotReadyException("VisibilityController not yet initialized");
663     }
664     // Nothing useful to do if authorization is not enabled
665     if (!authorizationEnabled) {
666       return;
667     }
668     Region region = e.getEnvironment().getRegion();
669     Authorizations authorizations = null;
670     try {
671       authorizations = get.getAuthorizations();
672     } catch (DeserializationException de) {
673       throw new IOException(de);
674     }
675     if (authorizations == null) {
676       // No Authorizations present for this scan/Get!
677       // In case of system tables other than "labels" just scan with out visibility check and
678       // filtering. Checking visibility labels for META and NAMESPACE table is not needed.
679       TableName table = region.getRegionInfo().getTable();
680       if (table.isSystemTable() && !table.equals(LABELS_TABLE_NAME)) {
681         return;
682       }
683     }
684     Filter visibilityLabelFilter = VisibilityUtils.createVisibilityLabelFilter(e.getEnvironment()
685         .getRegion(), authorizations);
686     if (visibilityLabelFilter != null) {
687       Filter filter = get.getFilter();
688       if (filter != null) {
689         get.setFilter(new FilterList(filter, visibilityLabelFilter));
690       } else {
691         get.setFilter(visibilityLabelFilter);
692       }
693     }
694   }
695 
696   private boolean isSystemOrSuperUser() throws IOException {
697     return Superusers.isSuperUser(VisibilityUtils.getActiveUser());
698   }
699 
700   @Override
701   public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> e, Append append)
702       throws IOException {
703     // If authorization is not enabled, we don't care about reserved tags
704     if (!authorizationEnabled) {
705       return null;
706     }
707     for (CellScanner cellScanner = append.cellScanner(); cellScanner.advance();) {
708       if (!checkForReservedVisibilityTagPresence(cellScanner.current())) {
709         throw new FailedSanityCheckException("Append contains cell with reserved type tag");
710       }
711     }
712     return null;
713   }
714 
715   @Override
716   public Result preIncrement(ObserverContext<RegionCoprocessorEnvironment> e, Increment increment)
717       throws IOException {
718     // If authorization is not enabled, we don't care about reserved tags
719     if (!authorizationEnabled) {
720       return null;
721     }
722     for (CellScanner cellScanner = increment.cellScanner(); cellScanner.advance();) {
723       if (!checkForReservedVisibilityTagPresence(cellScanner.current())) {
724         throw new FailedSanityCheckException("Increment contains cell with reserved type tag");
725       }
726     }
727     return null;
728   }
729 
730   @Override
731   public Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx,
732       MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException {
733     List<Tag> tags = Lists.newArrayList();
734     CellVisibility cellVisibility = null;
735     try {
736       cellVisibility = mutation.getCellVisibility();
737     } catch (DeserializationException e) {
738       throw new IOException(e);
739     }
740     if (cellVisibility == null) {
741       return newCell;
742     }
743     // Prepend new visibility tags to a new list of tags for the cell
744     // Don't check user auths for labels with Mutations when the user is super user
745     boolean authCheck = authorizationEnabled && checkAuths && !(isSystemOrSuperUser());
746     tags.addAll(this.visibilityLabelService.createVisibilityExpTags(cellVisibility.getExpression(),
747         true, authCheck));
748     // Save an object allocation where we can
749     if (newCell.getTagsLength() > 0) {
750       // Carry forward all other tags
751       Iterator<Tag> tagsItr = CellUtil.tagsIterator(newCell.getTagsArray(),
752           newCell.getTagsOffset(), newCell.getTagsLength());
753       while (tagsItr.hasNext()) {
754         Tag tag = tagsItr.next();
755         if (tag.getType() != TagType.VISIBILITY_TAG_TYPE
756             && tag.getType() != TagType.VISIBILITY_EXP_SERIALIZATION_FORMAT_TAG_TYPE) {
757           tags.add(tag);
758         }
759       }
760     }
761 
762     Cell rewriteCell = new TagRewriteCell(newCell, Tag.fromList(tags));
763     return rewriteCell;
764   }
765 
766   @Override
767   public Service getService() {
768     return VisibilityLabelsProtos.VisibilityLabelsService.newReflectiveService(this);
769   }
770 
771   /****************************** VisibilityEndpoint service related methods ******************************/
772   @Override
773   public synchronized void addLabels(RpcController controller, VisibilityLabelsRequest request,
774       RpcCallback<VisibilityLabelsResponse> done) {
775     VisibilityLabelsResponse.Builder response = VisibilityLabelsResponse.newBuilder();
776     List<VisibilityLabel> visLabels = request.getVisLabelList();
777     if (!initialized) {
778       setExceptionResults(visLabels.size(),
779         new VisibilityControllerNotReadyException("VisibilityController not yet initialized!"),
780         response);
781     } else {
782       List<byte[]> labels = new ArrayList<byte[]>(visLabels.size());
783       try {
784         if (authorizationEnabled) {
785           checkCallingUserAuth();
786         }
787         RegionActionResult successResult = RegionActionResult.newBuilder().build();
788         for (VisibilityLabel visLabel : visLabels) {
789           byte[] label = visLabel.getLabel().toByteArray();
790           labels.add(label);
791           response.addResult(successResult); // Just mark as success. Later it will get reset
792                                              // based on the result from
793                                              // visibilityLabelService.addLabels ()
794         }
795         if (!labels.isEmpty()) {
796           OperationStatus[] opStatus = this.visibilityLabelService.addLabels(labels);
797           logResult(true, "addLabels", "Adding labels allowed", null, labels, null);
798           int i = 0;
799           for (OperationStatus status : opStatus) {
800             while (response.getResult(i) != successResult)
801               i++;
802             if (status.getOperationStatusCode() != SUCCESS) {
803               RegionActionResult.Builder failureResultBuilder = RegionActionResult.newBuilder();
804               failureResultBuilder.setException(ResponseConverter
805                   .buildException(new DoNotRetryIOException(status.getExceptionMsg())));
806               response.setResult(i, failureResultBuilder.build());
807             }
808             i++;
809           }
810         }
811       } catch (AccessDeniedException e) {
812         logResult(false, "addLabels", e.getMessage(), null, labels, null);
813         LOG.error("User is not having required permissions to add labels", e);
814         setExceptionResults(visLabels.size(), e, response);
815       } catch (IOException e) {
816         LOG.error(e);
817         setExceptionResults(visLabels.size(), e, response);
818       }
819     }
820     done.run(response.build());
821   }
822 
823   private void setExceptionResults(int size, IOException e,
824       VisibilityLabelsResponse.Builder response) {
825     RegionActionResult.Builder failureResultBuilder = RegionActionResult.newBuilder();
826     failureResultBuilder.setException(ResponseConverter.buildException(e));
827     RegionActionResult failureResult = failureResultBuilder.build();
828     for (int i = 0; i < size; i++) {
829       response.addResult(i, failureResult);
830     }
831   }
832 
833   @Override
834   public synchronized void setAuths(RpcController controller, SetAuthsRequest request,
835       RpcCallback<VisibilityLabelsResponse> done) {
836     VisibilityLabelsResponse.Builder response = VisibilityLabelsResponse.newBuilder();
837     List<ByteString> auths = request.getAuthList();
838     if (!initialized) {
839       setExceptionResults(auths.size(),
840         new VisibilityControllerNotReadyException("VisibilityController not yet initialized!"),
841         response);
842     } else {
843       byte[] user = request.getUser().toByteArray();
844       List<byte[]> labelAuths = new ArrayList<byte[]>(auths.size());
845       try {
846         if (authorizationEnabled) {
847           checkCallingUserAuth();
848         }
849         for (ByteString authBS : auths) {
850           labelAuths.add(authBS.toByteArray());
851         }
852         OperationStatus[] opStatus = this.visibilityLabelService.setAuths(user, labelAuths);
853         logResult(true, "setAuths", "Setting authorization for labels allowed", user, labelAuths,
854           null);
855         RegionActionResult successResult = RegionActionResult.newBuilder().build();
856         for (OperationStatus status : opStatus) {
857           if (status.getOperationStatusCode() == SUCCESS) {
858             response.addResult(successResult);
859           } else {
860             RegionActionResult.Builder failureResultBuilder = RegionActionResult.newBuilder();
861             failureResultBuilder.setException(ResponseConverter
862                 .buildException(new DoNotRetryIOException(status.getExceptionMsg())));
863             response.addResult(failureResultBuilder.build());
864           }
865         }
866       } catch (AccessDeniedException e) {
867         logResult(false, "setAuths", e.getMessage(), user, labelAuths, null);
868         LOG.error("User is not having required permissions to set authorization", e);
869         setExceptionResults(auths.size(), e, response);
870       } catch (IOException e) {
871         LOG.error(e);
872         setExceptionResults(auths.size(), e, response);
873       }
874     }
875     done.run(response.build());
876   }
877 
878   private void logResult(boolean isAllowed, String request, String reason, byte[] user,
879       List<byte[]> labelAuths, String regex) {
880     if (AUDITLOG.isTraceEnabled()) {
881       // This is more duplicated code!
882       InetAddress remoteAddr = RpcServer.getRemoteAddress();
883       List<String> labelAuthsStr = new ArrayList<>();
884       if (labelAuths != null) {
885         int labelAuthsSize = labelAuths.size();
886         labelAuthsStr = new ArrayList<>(labelAuthsSize);
887         for (int i = 0; i < labelAuthsSize; i++) {
888           labelAuthsStr.add(Bytes.toString(labelAuths.get(i)));
889         }
890       }
891 
892       User requestingUser = null;
893       try {
894         requestingUser = VisibilityUtils.getActiveUser();
895       } catch (IOException e) {
896         LOG.warn("Failed to get active system user.");
897         LOG.debug("Details on failure to get active system user.", e);
898       }
899       AUDITLOG.trace("Access " + (isAllowed ? "allowed" : "denied") + " for user "
900           + (requestingUser != null ? requestingUser.getShortName() : "UNKNOWN") + "; reason: "
901           + reason + "; remote address: " + (remoteAddr != null ? remoteAddr : "") + "; request: "
902           + request + "; user: " + (user != null ? Bytes.toShort(user) : "null") + "; labels: "
903           + labelAuthsStr + "; regex: " + regex);
904     }
905   }
906 
907   @Override
908   public synchronized void getAuths(RpcController controller, GetAuthsRequest request,
909       RpcCallback<GetAuthsResponse> done) {
910     GetAuthsResponse.Builder response = GetAuthsResponse.newBuilder();
911     if (!initialized) {
912       controller.setFailed("VisibilityController not yet initialized");
913     } else {
914       byte[] user = request.getUser().toByteArray();
915       List<String> labels = null;
916       try {
917         // We do ACL check here as we create scanner directly on region. It will not make calls to
918         // AccessController CP methods.
919         if (authorizationEnabled && accessControllerAvailable && !isSystemOrSuperUser()) {
920           User requestingUser = VisibilityUtils.getActiveUser();
921           throw new AccessDeniedException("User '"
922               + (requestingUser != null ? requestingUser.getShortName() : "null")
923               + "' is not authorized to perform this action.");
924         }
925         if (AuthUtil.isGroupPrincipal(Bytes.toString(user))) {
926           // For backward compatibility. Previous custom visibilityLabelService
927           // implementation may not have getGroupAuths
928           try {
929             this.visibilityLabelService.getClass().getDeclaredMethod("getGroupAuths",
930                 new Class[] { String[].class, Boolean.TYPE });
931           } catch (SecurityException e) {
932             throw new AccessDeniedException("Failed to obtain getGroupAuths implementation");
933           } catch (NoSuchMethodException e) {
934             throw new AccessDeniedException(
935                 "Get group auth is not supported in this implementation");
936           }
937           String group = AuthUtil.getGroupName(Bytes.toString(user));
938           labels = this.visibilityLabelService.getGroupAuths(new String[] { group }, false);
939         } else {
940           labels = this.visibilityLabelService.getAuths(user, false);
941         }
942         logResult(true, "getAuths", "Get authorizations for user allowed", user, null, null);
943       } catch (AccessDeniedException e) {
944         logResult(false, "getAuths", e.getMessage(), user, null, null);
945         ResponseConverter.setControllerException(controller, e);
946       } catch (IOException e) {
947         ResponseConverter.setControllerException(controller, e);
948       }
949       response.setUser(request.getUser());
950       if (labels != null) {
951         for (String label : labels) {
952           response.addAuth(ByteStringer.wrap(Bytes.toBytes(label)));
953         }
954       }
955     }
956     done.run(response.build());
957   }
958 
959   @Override
960   public synchronized void clearAuths(RpcController controller, SetAuthsRequest request,
961       RpcCallback<VisibilityLabelsResponse> done) {
962     VisibilityLabelsResponse.Builder response = VisibilityLabelsResponse.newBuilder();
963     List<ByteString> auths = request.getAuthList();
964     if (!initialized) {
965       setExceptionResults(auths.size(), new CoprocessorException(
966           "VisibilityController not yet initialized"), response);
967     } else {
968       byte[] requestUser = request.getUser().toByteArray();
969       List<byte[]> labelAuths = new ArrayList<byte[]>(auths.size());
970       try {
971         // When AC is ON, do AC based user auth check
972         if (authorizationEnabled && accessControllerAvailable && !isSystemOrSuperUser()) {
973           User user = VisibilityUtils.getActiveUser();
974           throw new AccessDeniedException("User '" + (user != null ? user.getShortName() : "null")
975               + " is not authorized to perform this action.");
976         }
977         if (authorizationEnabled) {
978           checkCallingUserAuth(); // When AC is not in place the calling user should have
979                                   // SYSTEM_LABEL auth to do this action.
980         }
981         for (ByteString authBS : auths) {
982           labelAuths.add(authBS.toByteArray());
983         }
984 
985         OperationStatus[] opStatus =
986             this.visibilityLabelService.clearAuths(requestUser, labelAuths);
987         logResult(true, "clearAuths", "Removing authorization for labels allowed", requestUser,
988           labelAuths, null);
989         RegionActionResult successResult = RegionActionResult.newBuilder().build();
990         for (OperationStatus status : opStatus) {
991           if (status.getOperationStatusCode() == SUCCESS) {
992             response.addResult(successResult);
993           } else {
994             RegionActionResult.Builder failureResultBuilder = RegionActionResult.newBuilder();
995             failureResultBuilder.setException(ResponseConverter
996                 .buildException(new DoNotRetryIOException(status.getExceptionMsg())));
997             response.addResult(failureResultBuilder.build());
998           }
999         }
1000       } catch (AccessDeniedException e) {
1001         logResult(false, "clearAuths", e.getMessage(), requestUser, labelAuths, null);
1002         LOG.error("User is not having required permissions to clear authorization", e);
1003         setExceptionResults(auths.size(), e, response);
1004       } catch (IOException e) {
1005         LOG.error(e);
1006         setExceptionResults(auths.size(), e, response);
1007       }
1008     }
1009     done.run(response.build());
1010   }
1011 
1012   @Override
1013   public synchronized void listLabels(RpcController controller, ListLabelsRequest request,
1014       RpcCallback<ListLabelsResponse> done) {
1015     ListLabelsResponse.Builder response = ListLabelsResponse.newBuilder();
1016     if (!initialized) {
1017       controller.setFailed("VisibilityController not yet initialized");
1018     } else {
1019       List<String> labels = null;
1020       String regex = request.hasRegex() ? request.getRegex() : null;
1021       try {
1022         // We do ACL check here as we create scanner directly on region. It will not make calls to
1023         // AccessController CP methods.
1024         if (authorizationEnabled && accessControllerAvailable && !isSystemOrSuperUser()) {
1025           User requestingUser = VisibilityUtils.getActiveUser();
1026           throw new AccessDeniedException("User '"
1027               + (requestingUser != null ? requestingUser.getShortName() : "null")
1028               + "' is not authorized to perform this action.");
1029         }
1030         labels = this.visibilityLabelService.listLabels(regex);
1031         logResult(false, "listLabels", "Listing labels allowed", null, null, regex);
1032       } catch (AccessDeniedException e) {
1033         logResult(false, "listLabels", e.getMessage(), null, null, regex);
1034         ResponseConverter.setControllerException(controller, e);
1035       } catch (IOException e) {
1036         ResponseConverter.setControllerException(controller, e);
1037       }
1038       if (labels != null && !labels.isEmpty()) {
1039         for (String label : labels) {
1040           response.addLabel(ByteStringer.wrap(Bytes.toBytes(label)));
1041         }
1042       }
1043     }
1044     done.run(response.build());
1045   }
1046 
1047   private void checkCallingUserAuth() throws IOException {
1048     if (!authorizationEnabled) { // Redundant, but just in case
1049       return;
1050     }
1051     if (!accessControllerAvailable) {
1052       User user = VisibilityUtils.getActiveUser();
1053       if (user == null) {
1054         throw new IOException("Unable to retrieve calling user");
1055       }
1056       boolean havingSystemAuth = false;
1057       try {
1058         this.visibilityLabelService.getClass().getDeclaredMethod("havingSystemAuth",
1059             new Class[] { User.class });
1060         havingSystemAuth = this.visibilityLabelService.havingSystemAuth(user);
1061       } catch (SecurityException e) {
1062         // Just consider this as AccessDeniedException
1063       } catch (NoSuchMethodException e) {
1064         // VLS not having havingSystemAuth(User) method. Go with deprecated havingSystemAuth(byte[])
1065         // method invoke
1066         havingSystemAuth = this.visibilityLabelService.havingSystemAuth(Bytes.toBytes(user
1067             .getShortName()));
1068       }
1069       if (!havingSystemAuth) {
1070         throw new AccessDeniedException("User '" + user.getShortName()
1071             + "' is not authorized to perform this action.");
1072       }
1073     }
1074   }
1075 
1076   private static class DeleteVersionVisibilityExpressionFilter extends FilterBase {
1077     private List<Tag> deleteCellVisTags;
1078     private Byte deleteCellVisTagsFormat;
1079 
1080     public DeleteVersionVisibilityExpressionFilter(List<Tag> deleteCellVisTags,
1081         Byte deleteCellVisTagsFormat) {
1082       this.deleteCellVisTags = deleteCellVisTags;
1083       this.deleteCellVisTagsFormat = deleteCellVisTagsFormat;
1084     }
1085 
1086     @Override
1087     public ReturnCode filterKeyValue(Cell cell) throws IOException {
1088       List<Tag> putVisTags = new ArrayList<Tag>();
1089       Byte putCellVisTagsFormat = VisibilityUtils.extractVisibilityTags(cell, putVisTags);
1090       if (putVisTags.isEmpty() && deleteCellVisTags.isEmpty()) {
1091         // Early out if there are no tags in the cell
1092         return ReturnCode.INCLUDE;
1093       }
1094       boolean matchFound = VisibilityLabelServiceManager
1095           .getInstance().getVisibilityLabelService()
1096           .matchVisibility(putVisTags, putCellVisTagsFormat, deleteCellVisTags,
1097               deleteCellVisTagsFormat);
1098       return matchFound ? ReturnCode.INCLUDE : ReturnCode.SKIP;
1099     }
1100 
1101     // Override here explicitly as the method in super class FilterBase might do a KeyValue recreate.
1102     // See HBASE-12068
1103     @Override
1104     public Cell transformCell(Cell v) {
1105       return v;
1106     }
1107   }
1108 
1109   /**
1110    * A RegionServerObserver impl that provides the custom
1111    * VisibilityReplicationEndpoint. This class should be configured as the
1112    * 'hbase.coprocessor.regionserver.classes' for the visibility tags to be
1113    * replicated as string.  The value for the configuration should be
1114    * 'org.apache.hadoop.hbase.security.visibility.VisibilityController$VisibilityReplication'.
1115    */
1116   public static class VisibilityReplication extends BaseRegionServerObserver {
1117     private Configuration conf;
1118     private VisibilityLabelService visibilityLabelService;
1119 
1120     @Override
1121     public void start(CoprocessorEnvironment env) throws IOException {
1122       this.conf = env.getConfiguration();
1123       visibilityLabelService = VisibilityLabelServiceManager.getInstance()
1124           .getVisibilityLabelService(this.conf);
1125     }
1126 
1127     @Override
1128     public void stop(CoprocessorEnvironment env) throws IOException {
1129     }
1130 
1131     @Override
1132     public ReplicationEndpoint postCreateReplicationEndPoint(
1133         ObserverContext<RegionServerCoprocessorEnvironment> ctx, ReplicationEndpoint endpoint) {
1134       return new VisibilityReplicationEndpoint(endpoint, visibilityLabelService);
1135     }
1136   }
1137 }