View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.security.visibility;
19  
20  import static org.apache.hadoop.hbase.TagType.VISIBILITY_TAG_TYPE;
21  import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABELS_TABLE_FAMILY;
22  import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABELS_TABLE_NAME;
23  import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.LABEL_QUALIFIER;
24  import static org.apache.hadoop.hbase.security.visibility.VisibilityConstants.SORTED_ORDINAL_SERIALIZATION_FORMAT;
25  import static org.apache.hadoop.hbase.security.visibility.VisibilityUtils.SYSTEM_LABEL;
26  
27  import java.io.ByteArrayOutputStream;
28  import java.io.DataOutputStream;
29  import java.io.IOException;
30  import java.util.ArrayList;
31  import java.util.BitSet;
32  import java.util.Collections;
33  import java.util.HashMap;
34  import java.util.HashSet;
35  import java.util.Iterator;
36  import java.util.List;
37  import java.util.Map;
38  import java.util.Set;
39  import java.util.concurrent.atomic.AtomicInteger;
40  import java.util.regex.Pattern;
41  
42  import org.apache.commons.logging.Log;
43  import org.apache.commons.logging.LogFactory;
44  import org.apache.hadoop.conf.Configuration;
45  import org.apache.hadoop.hbase.AuthUtil;
46  import org.apache.hadoop.hbase.Cell;
47  import org.apache.hadoop.hbase.CellUtil;
48  import org.apache.hadoop.hbase.HConstants;
49  import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
50  import org.apache.hadoop.hbase.Tag;
51  import org.apache.hadoop.hbase.TagType;
52  import org.apache.hadoop.hbase.classification.InterfaceAudience;
53  import org.apache.hadoop.hbase.client.Delete;
54  import org.apache.hadoop.hbase.client.Mutation;
55  import org.apache.hadoop.hbase.client.Put;
56  import org.apache.hadoop.hbase.client.Scan;
57  import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
58  import org.apache.hadoop.hbase.filter.Filter;
59  import org.apache.hadoop.hbase.io.util.StreamUtils;
60  import org.apache.hadoop.hbase.regionserver.OperationStatus;
61  import org.apache.hadoop.hbase.regionserver.Region;
62  import org.apache.hadoop.hbase.regionserver.RegionScanner;
63  import org.apache.hadoop.hbase.security.Superusers;
64  import org.apache.hadoop.hbase.security.User;
65  import org.apache.hadoop.hbase.util.Bytes;
66  import org.apache.hadoop.hbase.util.Pair;
67  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
68  
69  @InterfaceAudience.Private
70  public class DefaultVisibilityLabelServiceImpl implements VisibilityLabelService {
71  
72    private static final Log LOG = LogFactory.getLog(DefaultVisibilityLabelServiceImpl.class);
73  
74    // "system" label is having an ordinal value 1.
75    private static final int SYSTEM_LABEL_ORDINAL = 1;
76    private static final Tag[] LABELS_TABLE_TAGS = new Tag[1];
77    private static final byte[] DUMMY_VALUE = new byte[0];
78  
79    private AtomicInteger ordinalCounter = new AtomicInteger(-1);
80    private Configuration conf;
81    private Region labelsRegion;
82    private VisibilityLabelsCache labelsCache;
83    private List<ScanLabelGenerator> scanLabelGenerators;
84  
85    static {
86      ByteArrayOutputStream baos = new ByteArrayOutputStream();
87      DataOutputStream dos = new DataOutputStream(baos);
88      try {
89        StreamUtils.writeRawVInt32(dos, SYSTEM_LABEL_ORDINAL);
90      } catch (IOException e) {
91        // We write to a byte array. No Exception can happen.
92      }
93      LABELS_TABLE_TAGS[0] = new Tag(VISIBILITY_TAG_TYPE, baos.toByteArray());
94    }
95  
96    public DefaultVisibilityLabelServiceImpl() {
97  
98    }
99  
100   @Override
101   public void setConf(Configuration conf) {
102     this.conf = conf;
103   }
104 
105   @Override
106   public Configuration getConf() {
107     return this.conf;
108   }
109 
110   @Override
111   public void init(RegionCoprocessorEnvironment e) throws IOException {
112     ZooKeeperWatcher zk = e.getRegionServerServices().getZooKeeper();
113     try {
114       labelsCache = VisibilityLabelsCache.createAndGet(zk, this.conf);
115     } catch (IOException ioe) {
116       LOG.error("Error creating VisibilityLabelsCache", ioe);
117       throw ioe;
118     }
119     this.scanLabelGenerators = VisibilityUtils.getScanLabelGenerators(this.conf);
120     if (e.getRegion().getRegionInfo().getTable().equals(LABELS_TABLE_NAME)) {
121       this.labelsRegion = e.getRegion();
122       Pair<Map<String, Integer>, Map<String, List<Integer>>> labelsAndUserAuths =
123           extractLabelsAndAuths(getExistingLabelsWithAuths());
124       Map<String, Integer> labels = labelsAndUserAuths.getFirst();
125       Map<String, List<Integer>> userAuths = labelsAndUserAuths.getSecond();
126       // Add the "system" label if it is not added into the system yet
127       addSystemLabel(this.labelsRegion, labels, userAuths);
128       int ordinal = SYSTEM_LABEL_ORDINAL; // Ordinal 1 is reserved for "system" label.
129       for (Integer i : labels.values()) {
130         if (i > ordinal) {
131           ordinal = i;
132         }
133       }
134       this.ordinalCounter.set(ordinal + 1);
135       if (labels.size() > 0) {
136         // If there is no data need not write to zk
137         byte[] serialized = VisibilityUtils.getDataToWriteToZooKeeper(labels);
138         this.labelsCache.writeToZookeeper(serialized, true);
139         this.labelsCache.refreshLabelsCache(serialized);
140       }
141       if (userAuths.size() > 0) {
142         byte[] serialized = VisibilityUtils.getUserAuthsDataToWriteToZooKeeper(userAuths);
143         this.labelsCache.writeToZookeeper(serialized, false);
144         this.labelsCache.refreshUserAuthsCache(serialized);
145       }
146     }
147   }
148 
149   protected List<List<Cell>> getExistingLabelsWithAuths() throws IOException {
150     Scan scan = new Scan();
151     RegionScanner scanner = labelsRegion.getScanner(scan);
152     List<List<Cell>> existingLabels = new ArrayList<List<Cell>>();
153     try {
154       while (true) {
155         List<Cell> cells = new ArrayList<Cell>();
156         scanner.next(cells);
157         if (cells.isEmpty()) {
158           break;
159         }
160         existingLabels.add(cells);
161       }
162     } finally {
163       scanner.close();
164     }
165     return existingLabels;
166   }
167 
168   protected Pair<Map<String, Integer>, Map<String, List<Integer>>> extractLabelsAndAuths(
169       List<List<Cell>> labelDetails) {
170     Map<String, Integer> labels = new HashMap<String, Integer>();
171     Map<String, List<Integer>> userAuths = new HashMap<String, List<Integer>>();
172     for (List<Cell> cells : labelDetails) {
173       for (Cell cell : cells) {
174         if (Bytes.equals(cell.getQualifierArray(), cell.getQualifierOffset(),
175             cell.getQualifierLength(), LABEL_QUALIFIER, 0, LABEL_QUALIFIER.length)) {
176           labels.put(
177               Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()),
178               Bytes.toInt(cell.getRowArray(), cell.getRowOffset()));
179         } else {
180           // These are user cells who has authorization for this label
181           String user = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(),
182               cell.getQualifierLength());
183           List<Integer> auths = userAuths.get(user);
184           if (auths == null) {
185             auths = new ArrayList<Integer>();
186             userAuths.put(user, auths);
187           }
188           auths.add(Bytes.toInt(cell.getRowArray(), cell.getRowOffset()));
189         }
190       }
191     }
192     return new Pair<Map<String, Integer>, Map<String, List<Integer>>>(labels, userAuths);
193   }
194 
195   protected void addSystemLabel(Region region, Map<String, Integer> labels,
196       Map<String, List<Integer>> userAuths) throws IOException {
197     if (!labels.containsKey(SYSTEM_LABEL)) {
198       Put p = new Put(Bytes.toBytes(SYSTEM_LABEL_ORDINAL));
199       p.addImmutable(LABELS_TABLE_FAMILY, LABEL_QUALIFIER, Bytes.toBytes(SYSTEM_LABEL));
200       region.put(p);
201       labels.put(SYSTEM_LABEL, SYSTEM_LABEL_ORDINAL);
202     }
203   }
204 
205   @Override
206   public OperationStatus[] addLabels(List<byte[]> labels) throws IOException {
207     assert labelsRegion != null;
208     OperationStatus[] finalOpStatus = new OperationStatus[labels.size()];
209     List<Mutation> puts = new ArrayList<Mutation>(labels.size());
210     int i = 0;
211     for (byte[] label : labels) {
212       String labelStr = Bytes.toString(label);
213       if (this.labelsCache.getLabelOrdinal(labelStr) > 0) {
214         finalOpStatus[i] = new OperationStatus(OperationStatusCode.FAILURE,
215             new LabelAlreadyExistsException("Label '" + labelStr + "' already exists"));
216       } else {
217         Put p = new Put(Bytes.toBytes(ordinalCounter.get()));
218         p.addImmutable(LABELS_TABLE_FAMILY, LABEL_QUALIFIER, label, LABELS_TABLE_TAGS);
219         if (LOG.isDebugEnabled()) {
220           LOG.debug("Adding the label " + labelStr);
221         }
222         puts.add(p);
223         ordinalCounter.incrementAndGet();
224       }
225       i++;
226     }
227     if (mutateLabelsRegion(puts, finalOpStatus)) {
228       updateZk(true);
229     }
230     return finalOpStatus;
231   }
232 
233   @Override
234   public OperationStatus[] setAuths(byte[] user, List<byte[]> authLabels) throws IOException {
235     assert labelsRegion != null;
236     OperationStatus[] finalOpStatus = new OperationStatus[authLabels.size()];
237     List<Mutation> puts = new ArrayList<Mutation>(authLabels.size());
238     int i = 0;
239     for (byte[] auth : authLabels) {
240       String authStr = Bytes.toString(auth);
241       int labelOrdinal = this.labelsCache.getLabelOrdinal(authStr);
242       if (labelOrdinal == 0) {
243         // This label is not yet added. 1st this should be added to the system
244         finalOpStatus[i] = new OperationStatus(OperationStatusCode.FAILURE,
245             new InvalidLabelException("Label '" + authStr + "' doesn't exists"));
246       } else {
247         Put p = new Put(Bytes.toBytes(labelOrdinal));
248         p.addImmutable(LABELS_TABLE_FAMILY, user, DUMMY_VALUE, LABELS_TABLE_TAGS);
249         puts.add(p);
250       }
251       i++;
252     }
253     if (mutateLabelsRegion(puts, finalOpStatus)) {
254       updateZk(false);
255     }
256     return finalOpStatus;
257   }
258 
259   @Override
260   public OperationStatus[] clearAuths(byte[] user, List<byte[]> authLabels) throws IOException {
261     assert labelsRegion != null;
262     OperationStatus[] finalOpStatus = new OperationStatus[authLabels.size()];
263     List<String> currentAuths;
264     if (AuthUtil.isGroupPrincipal(Bytes.toString(user))) {
265       String group = AuthUtil.getGroupName(Bytes.toString(user));
266       currentAuths = this.getGroupAuths(new String[]{group}, true);
267     }
268     else {
269       currentAuths = this.getUserAuths(user, true);
270     }
271     List<Mutation> deletes = new ArrayList<Mutation>(authLabels.size());
272     int i = 0;
273     for (byte[] authLabel : authLabels) {
274       String authLabelStr = Bytes.toString(authLabel);
275       if (currentAuths.contains(authLabelStr)) {
276         int labelOrdinal = this.labelsCache.getLabelOrdinal(authLabelStr);
277         assert labelOrdinal > 0;
278         Delete d = new Delete(Bytes.toBytes(labelOrdinal));
279         d.deleteColumns(LABELS_TABLE_FAMILY, user);
280         deletes.add(d);
281       } else {
282         // This label is not set for the user.
283         finalOpStatus[i] = new OperationStatus(OperationStatusCode.FAILURE,
284             new InvalidLabelException("Label '" + authLabelStr + "' is not set for the user "
285                 + Bytes.toString(user)));
286       }
287       i++;
288     }
289     if (mutateLabelsRegion(deletes, finalOpStatus)) {
290       updateZk(false);
291     }
292     return finalOpStatus;
293   }
294 
295   /**
296    * Adds the mutations to labels region and set the results to the finalOpStatus. finalOpStatus
297    * might have some entries in it where the OpStatus is FAILURE. We will leave those and set in
298    * others in the order.
299    * @param mutations
300    * @param finalOpStatus
301    * @return whether we need a ZK update or not.
302    */
303   private boolean mutateLabelsRegion(List<Mutation> mutations, OperationStatus[] finalOpStatus)
304       throws IOException {
305     OperationStatus[] opStatus = this.labelsRegion.batchMutate(mutations
306       .toArray(new Mutation[mutations.size()]), HConstants.NO_NONCE, HConstants.NO_NONCE);
307     int i = 0;
308     boolean updateZk = false;
309     for (OperationStatus status : opStatus) {
310       // Update the zk when atleast one of the mutation was added successfully.
311       updateZk = updateZk || (status.getOperationStatusCode() == OperationStatusCode.SUCCESS);
312       for (; i < finalOpStatus.length; i++) {
313         if (finalOpStatus[i] == null) {
314           finalOpStatus[i] = status;
315           break;
316         }
317       }
318     }
319     return updateZk;
320   }
321 
322   @Override
323   @Deprecated
324   public List<String> getAuths(byte[] user, boolean systemCall)
325       throws IOException {
326     return getUserAuths(user, systemCall);
327   }
328 
329   @Override
330   public List<String> getUserAuths(byte[] user, boolean systemCall)
331       throws IOException {
332     assert (labelsRegion != null || systemCall);
333     if (systemCall || labelsRegion == null) {
334       return this.labelsCache.getUserAuths(Bytes.toString(user));
335     }
336     Scan s = new Scan();
337     if (user != null && user.length > 0) {
338       s.addColumn(LABELS_TABLE_FAMILY, user);
339     }
340     Filter filter = VisibilityUtils.createVisibilityLabelFilter(this.labelsRegion,
341       new Authorizations(SYSTEM_LABEL));
342     s.setFilter(filter);
343     ArrayList<String> auths = new ArrayList<String>();
344     RegionScanner scanner = this.labelsRegion.getScanner(s);
345     try {
346       List<Cell> results = new ArrayList<Cell>(1);
347       while (true) {
348         scanner.next(results);
349         if (results.isEmpty()) break;
350         Cell cell = results.get(0);
351         int ordinal = Bytes.toInt(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
352         String label = this.labelsCache.getLabel(ordinal);
353         if (label != null) {
354           auths.add(label);
355         }
356         results.clear();
357       }
358     } finally {
359       scanner.close();
360     }
361     return auths;
362   }
363 
364   @Override
365   public List<String> getGroupAuths(String[] groups, boolean systemCall)
366       throws IOException {
367     assert (labelsRegion != null || systemCall);
368     if (systemCall || labelsRegion == null) {
369       return this.labelsCache.getGroupAuths(groups);
370     }
371     Scan s = new Scan();
372     if (groups != null && groups.length > 0) {
373       for (String group : groups) {
374         s.addColumn(LABELS_TABLE_FAMILY, Bytes.toBytes(AuthUtil.toGroupEntry(group)));
375       }
376     }
377     Filter filter = VisibilityUtils.createVisibilityLabelFilter(this.labelsRegion,
378         new Authorizations(SYSTEM_LABEL));
379     s.setFilter(filter);
380     Set<String> auths = new HashSet<String>();
381     RegionScanner scanner = this.labelsRegion.getScanner(s);
382     try {
383       List<Cell> results = new ArrayList<Cell>(1);
384       while (true) {
385         scanner.next(results);
386         if (results.isEmpty()) break;
387         Cell cell = results.get(0);
388         int ordinal = Bytes.toInt(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
389         String label = this.labelsCache.getLabel(ordinal);
390         if (label != null) {
391           auths.add(label);
392         }
393         results.clear();
394       }
395     } finally {
396       scanner.close();
397     }
398     return new ArrayList<String>(auths);
399   }
400 
401   @Override
402   public List<String> listLabels(String regex) throws IOException {
403     assert (labelsRegion != null);
404     Pair<Map<String, Integer>, Map<String, List<Integer>>> labelsAndUserAuths =
405         extractLabelsAndAuths(getExistingLabelsWithAuths());
406     Map<String, Integer> labels = labelsAndUserAuths.getFirst();
407     labels.remove(SYSTEM_LABEL);
408     if (regex != null) {
409       Pattern pattern = Pattern.compile(regex);
410       ArrayList<String> matchedLabels = new ArrayList<String>();
411       for (String label : labels.keySet()) {
412         if (pattern.matcher(label).matches()) {
413           matchedLabels.add(label);
414         }
415       }
416       return matchedLabels;
417     }
418     return new ArrayList<String>(labels.keySet());
419   }
420 
421   @Override
422   public List<Tag> createVisibilityExpTags(String visExpression, boolean withSerializationFormat,
423       boolean checkAuths) throws IOException {
424     Set<Integer> auths = new HashSet<Integer>();
425     if (checkAuths) {
426       User user = VisibilityUtils.getActiveUser();
427       auths.addAll(this.labelsCache.getUserAuthsAsOrdinals(user.getShortName()));
428       auths.addAll(this.labelsCache.getGroupAuthsAsOrdinals(user.getGroupNames()));
429     }
430     return VisibilityUtils.createVisibilityExpTags(visExpression, withSerializationFormat,
431         checkAuths, auths, labelsCache);
432   }
433 
434   protected void updateZk(boolean labelAddition) throws IOException {
435     // We will add to zookeeper here.
436     // TODO we should add the delta only to zk. Else this will be a very heavy op and when there are
437     // so many labels and auth in the system, we will end up adding lots of data to zk. Most
438     // possibly we will exceed zk node data limit!
439     Pair<Map<String, Integer>, Map<String, List<Integer>>> labelsAndUserAuths =
440         extractLabelsAndAuths(getExistingLabelsWithAuths());
441     Map<String, Integer> existingLabels = labelsAndUserAuths.getFirst();
442     Map<String, List<Integer>> userAuths = labelsAndUserAuths.getSecond();
443     if (labelAddition) {
444       byte[] serialized = VisibilityUtils.getDataToWriteToZooKeeper(existingLabels);
445       this.labelsCache.writeToZookeeper(serialized, true);
446     } else {
447       byte[] serialized = VisibilityUtils.getUserAuthsDataToWriteToZooKeeper(userAuths);
448       this.labelsCache.writeToZookeeper(serialized, false);
449     }
450   }
451 
452   @Override
453   public VisibilityExpEvaluator getVisibilityExpEvaluator(Authorizations authorizations)
454       throws IOException {
455     // If a super user issues a get/scan, he should be able to scan the cells
456     // irrespective of the Visibility labels
457     if (isReadFromSystemAuthUser()) {
458       return new VisibilityExpEvaluator() {
459         @Override
460         public boolean evaluate(Cell cell) throws IOException {
461           return true;
462         }
463       };
464     }
465     List<String> authLabels = null;
466     for (ScanLabelGenerator scanLabelGenerator : scanLabelGenerators) {
467       try {
468         // null authorizations to be handled inside SLG impl.
469         authLabels = scanLabelGenerator.getLabels(VisibilityUtils.getActiveUser(), authorizations);
470         authLabels = (authLabels == null) ? new ArrayList<String>() : authLabels;
471         authorizations = new Authorizations(authLabels);
472       } catch (Throwable t) {
473         LOG.error(t);
474         throw new IOException(t);
475       }
476     }
477     int labelsCount = this.labelsCache.getLabelsCount();
478     final BitSet bs = new BitSet(labelsCount + 1); // ordinal is index 1 based
479     if (authLabels != null) {
480       for (String authLabel : authLabels) {
481         int labelOrdinal = this.labelsCache.getLabelOrdinal(authLabel);
482         if (labelOrdinal != 0) {
483           bs.set(labelOrdinal);
484         }
485       }
486     }
487 
488     return new VisibilityExpEvaluator() {
489       @Override
490       public boolean evaluate(Cell cell) throws IOException {
491         boolean visibilityTagPresent = false;
492         // Save an object allocation where we can
493         if (cell.getTagsLength() > 0) {
494           Iterator<Tag> tagsItr = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
495               cell.getTagsLength());
496           while (tagsItr.hasNext()) {
497             boolean includeKV = true;
498             Tag tag = tagsItr.next();
499             if (tag.getType() == VISIBILITY_TAG_TYPE) {
500               visibilityTagPresent = true;
501               int offset = tag.getTagOffset();
502               int endOffset = offset + tag.getTagLength();
503               while (offset < endOffset) {
504                 Pair<Integer, Integer> result = StreamUtils
505                     .readRawVarint32(tag.getBuffer(), offset);
506                 int currLabelOrdinal = result.getFirst();
507                 if (currLabelOrdinal < 0) {
508                   // check for the absence of this label in the Scan Auth labels
509                   // ie. to check BitSet corresponding bit is 0
510                   int temp = -currLabelOrdinal;
511                   if (bs.get(temp)) {
512                     includeKV = false;
513                     break;
514                   }
515                 } else {
516                   if (!bs.get(currLabelOrdinal)) {
517                     includeKV = false;
518                     break;
519                   }
520                 }
521                 offset += result.getSecond();
522               }
523               if (includeKV) {
524                 // We got one visibility expression getting evaluated to true. Good to include this
525                 // KV in the result then.
526                 return true;
527               }
528             }
529           }
530         }
531         return !(visibilityTagPresent);
532       }
533     };
534   }
535 
536   protected boolean isReadFromSystemAuthUser() throws IOException {
537     User user = VisibilityUtils.getActiveUser();
538     return havingSystemAuth(user);
539   }
540 
541   @Override
542   @Deprecated
543   public boolean havingSystemAuth(byte[] user) throws IOException {
544     // Implementation for backward compatibility
545     if (Superusers.isSuperUser(Bytes.toString(user))) {
546       return true;
547     }
548     List<String> auths = this.getUserAuths(user, true);
549     if (LOG.isTraceEnabled()) {
550       LOG.trace("The auths for user " + Bytes.toString(user) + " are " + auths);
551     }
552     return auths.contains(SYSTEM_LABEL);
553   }
554 
555   @Override
556   public boolean havingSystemAuth(User user) throws IOException {
557     // A super user has 'system' auth.
558     if (Superusers.isSuperUser(user)) {
559       return true;
560     }
561     // A user can also be explicitly granted 'system' auth.
562     List<String> auths = this.getUserAuths(Bytes.toBytes(user.getShortName()), true);
563     if (LOG.isTraceEnabled()) {
564       LOG.trace("The auths for user " + user.getShortName() + " are " + auths);
565     }
566     if (auths.contains(SYSTEM_LABEL)) {
567       return true;
568     }
569     auths = this.getGroupAuths(user.getGroupNames(), true);
570     if (LOG.isTraceEnabled()) {
571       LOG.trace("The auths for groups of user " + user.getShortName() + " are " + auths);
572     }
573     return auths.contains(SYSTEM_LABEL);
574   }
575 
576   @Override
577   public boolean matchVisibility(List<Tag> putVisTags, Byte putTagsFormat, List<Tag> deleteVisTags,
578       Byte deleteTagsFormat) throws IOException {
579       // Early out if there are no tags in both of cell and delete
580       if (putVisTags.isEmpty() && deleteVisTags.isEmpty()) {
581         return true;
582       }
583       // Early out if one of the tags is empty
584       if (putVisTags.isEmpty() ^ deleteVisTags.isEmpty()) {
585         return false;
586       }
587     if ((deleteTagsFormat != null && deleteTagsFormat == SORTED_ORDINAL_SERIALIZATION_FORMAT)
588         && (putTagsFormat == null || putTagsFormat == SORTED_ORDINAL_SERIALIZATION_FORMAT)) {
589       if (putTagsFormat == null) {
590         return matchUnSortedVisibilityTags(putVisTags, deleteVisTags);
591       } else {
592         return matchOrdinalSortedVisibilityTags(putVisTags, deleteVisTags);
593       }
594     }
595     throw new IOException("Unexpected tag format passed for comparison, deleteTagsFormat : "
596         + deleteTagsFormat + ", putTagsFormat : " + putTagsFormat);
597   }
598 
599   /**
600    * @param putVisTags Visibility tags in Put Mutation
601    * @param deleteVisTags Visibility tags in Delete Mutation
602    * @return true when all the visibility tags in Put matches with visibility tags in Delete.
603    * This is used when, at least one set of tags are not sorted based on the label ordinal.
604    */
605   private static boolean matchUnSortedVisibilityTags(List<Tag> putVisTags,
606       List<Tag> deleteVisTags) throws IOException {
607     return compareTagsOrdinals(sortTagsBasedOnOrdinal(putVisTags),
608         sortTagsBasedOnOrdinal(deleteVisTags));
609   }
610 
611   /**
612    * @param putVisTags Visibility tags in Put Mutation
613    * @param deleteVisTags Visibility tags in Delete Mutation
614    * @return true when all the visibility tags in Put matches with visibility tags in Delete.
615    * This is used when both the set of tags are sorted based on the label ordinal.
616    */
617   private static boolean matchOrdinalSortedVisibilityTags(List<Tag> putVisTags,
618       List<Tag> deleteVisTags) {
619     boolean matchFound = false;
620     // If the size does not match. Definitely we are not comparing the equal tags.
621     if ((deleteVisTags.size()) == putVisTags.size()) {
622       for (Tag tag : deleteVisTags) {
623         matchFound = false;
624         for (Tag givenTag : putVisTags) {
625           if (Bytes.equals(tag.getBuffer(), tag.getTagOffset(), tag.getTagLength(),
626               givenTag.getBuffer(), givenTag.getTagOffset(), givenTag.getTagLength())) {
627             matchFound = true;
628             break;
629           }
630         }
631         if (!matchFound) break;
632       }
633     }
634     return matchFound;
635   }
636 
637   private static List<List<Integer>> sortTagsBasedOnOrdinal(List<Tag> tags) throws IOException {
638     List<List<Integer>> fullTagsList = new ArrayList<List<Integer>>();
639     for (Tag tag : tags) {
640       if (tag.getType() == VISIBILITY_TAG_TYPE) {
641         getSortedTagOrdinals(fullTagsList, tag);
642       }
643     }
644     return fullTagsList;
645   }
646 
647   private static void getSortedTagOrdinals(List<List<Integer>> fullTagsList, Tag tag)
648       throws IOException {
649     List<Integer> tagsOrdinalInSortedOrder = new ArrayList<Integer>();
650     int offset = tag.getTagOffset();
651     int endOffset = offset + tag.getTagLength();
652     while (offset < endOffset) {
653       Pair<Integer, Integer> result = StreamUtils.readRawVarint32(tag.getBuffer(), offset);
654       tagsOrdinalInSortedOrder.add(result.getFirst());
655       offset += result.getSecond();
656     }
657     Collections.sort(tagsOrdinalInSortedOrder);
658     fullTagsList.add(tagsOrdinalInSortedOrder);
659   }
660 
661   /*
662    * @return true when all the visibility tags in Put matches with visibility tags in Delete.
663    */
664   private static boolean compareTagsOrdinals(List<List<Integer>> putVisTags,
665       List<List<Integer>> deleteVisTags) {
666     boolean matchFound = false;
667     if (deleteVisTags.size() == putVisTags.size()) {
668       for (List<Integer> deleteTagOrdinals : deleteVisTags) {
669         matchFound = false;
670         for (List<Integer> tagOrdinals : putVisTags) {
671           if (deleteTagOrdinals.equals(tagOrdinals)) {
672             matchFound = true;
673             break;
674           }
675         }
676         if (!matchFound) break;
677       }
678     }
679     return matchFound;
680   }
681 
682   @Override
683   public byte[] encodeVisibilityForReplication(final List<Tag> tags, final Byte serializationFormat)
684       throws IOException {
685     if (tags.size() > 0
686         && (serializationFormat == null ||
687         serializationFormat == SORTED_ORDINAL_SERIALIZATION_FORMAT)) {
688       return createModifiedVisExpression(tags);
689     }
690     return null;
691   }
692 
693   /**
694    * @param tags
695    *          - all the visibility tags associated with the current Cell
696    * @return - the modified visibility expression as byte[]
697    */
698   private byte[] createModifiedVisExpression(final List<Tag> tags)
699       throws IOException {
700     StringBuilder visibilityString = new StringBuilder();
701     for (Tag tag : tags) {
702       if (tag.getType() == TagType.VISIBILITY_TAG_TYPE) {
703         if (visibilityString.length() != 0) {
704           visibilityString.append(VisibilityConstants.CLOSED_PARAN).append(
705               VisibilityConstants.OR_OPERATOR);
706         }
707         int offset = tag.getTagOffset();
708         int endOffset = offset + tag.getTagLength();
709         boolean expressionStart = true;
710         while (offset < endOffset) {
711           Pair<Integer, Integer> result = StreamUtils.readRawVarint32(tag.getBuffer(), offset);
712           int currLabelOrdinal = result.getFirst();
713           if (currLabelOrdinal < 0) {
714             int temp = -currLabelOrdinal;
715             String label = this.labelsCache.getLabel(temp);
716             if (expressionStart) {
717               // Quote every label in case of unicode characters if present
718               visibilityString.append(VisibilityConstants.OPEN_PARAN)
719                   .append(VisibilityConstants.NOT_OPERATOR).append(CellVisibility.quote(label));
720             } else {
721               visibilityString.append(VisibilityConstants.AND_OPERATOR)
722                   .append(VisibilityConstants.NOT_OPERATOR).append(CellVisibility.quote(label));
723             }
724           } else {
725             String label = this.labelsCache.getLabel(currLabelOrdinal);
726             if (expressionStart) {
727               visibilityString.append(VisibilityConstants.OPEN_PARAN).append(
728                   CellVisibility.quote(label));
729             } else {
730               visibilityString.append(VisibilityConstants.AND_OPERATOR).append(
731                   CellVisibility.quote(label));
732             }
733           }
734           expressionStart = false;
735           offset += result.getSecond();
736         }
737       }
738     }
739     if (visibilityString.length() != 0) {
740       visibilityString.append(VisibilityConstants.CLOSED_PARAN);
741       // Return the string formed as byte[]
742       return Bytes.toBytes(visibilityString.toString());
743     }
744     return null;
745   }
746 }