View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.types;
19  
20  import org.apache.hadoop.hbase.classification.InterfaceAudience;
21  import org.apache.hadoop.hbase.classification.InterfaceStability;
22  import org.apache.hadoop.hbase.util.Bytes;
23  import org.apache.hadoop.hbase.util.Order;
24  import org.apache.hadoop.hbase.util.PositionedByteRange;
25  import org.apache.hadoop.hbase.util.SimplePositionedMutableByteRange;
26  
27  /**
28   * Wraps an existing {@code DataType} implementation as a terminated
29   * version of itself. This has the useful side-effect of turning an existing
30   * {@code DataType} which is not {@code skippable} into a
31   * {@code skippable} variant.
32   */
33  @InterfaceAudience.Public
34  @InterfaceStability.Evolving
35  public class TerminatedWrapper<T> implements DataType<T> {
36  
37    protected final DataType<T> wrapped;
38    protected final byte[] term;
39  
40    /**
41     * Create a terminated version of the {@code wrapped}.
42     * @throws IllegalArgumentException when {@code term} is null or empty.
43     */
44    public TerminatedWrapper(DataType<T> wrapped, byte[] term) {
45      if (null == term || term.length == 0)
46        throw new IllegalArgumentException("terminator must be non-null and non-empty.");
47      this.wrapped = wrapped;
48      wrapped.getOrder().apply(term);
49      this.term = term;
50    }
51  
52    /**
53     * Create a terminated version of the {@code wrapped}.
54     * {@code term} is converted to a {@code byte[]} using
55     * {@link Bytes#toBytes(String)}.
56     * @throws IllegalArgumentException when {@code term} is null or empty.
57     */
58    public TerminatedWrapper(DataType<T> wrapped, String term) {
59      this(wrapped, Bytes.toBytes(term));
60    }
61  
62    @Override
63    public boolean isOrderPreserving() { return wrapped.isOrderPreserving(); }
64  
65    @Override
66    public Order getOrder() { return wrapped.getOrder(); }
67  
68    @Override
69    public boolean isNullable() { return wrapped.isNullable(); }
70  
71    @Override
72    public boolean isSkippable() { return true; }
73  
74    @Override
75    public int encodedLength(T val) {
76      return wrapped.encodedLength(val) + term.length;
77    }
78  
79    @Override
80    public Class<T> encodedClass() { return wrapped.encodedClass(); }
81  
82    /**
83     * Return the position at which {@code term} begins within {@code src},
84     * or {@code -1} if {@code term} is not found.
85     */
86    protected int terminatorPosition(PositionedByteRange src) {
87      byte[] a = src.getBytes();
88      final int offset = src.getOffset();
89      int i;
90      SKIP: for (i = src.getPosition(); i < src.getLength(); i++) {
91        if (a[offset + i] != term[0]) continue;
92        int j;
93        for (j = 1; j < term.length && offset + j < src.getLength(); j++) {
94          if (a[offset + i + j] != term[j]) continue SKIP;
95        }
96        if (j == term.length) return i; // success
97      }
98      return -1;
99    }
100 
101   /**
102    * Skip {@code src}'s position forward over one encoded value.
103    * @param src the buffer containing the encoded value.
104    * @return number of bytes skipped.
105    * @throws IllegalArgumentException when the terminator sequence is not found.
106    */
107   @Override
108   public int skip(PositionedByteRange src) {
109     if (wrapped.isSkippable()) {
110       int ret = wrapped.skip(src);
111       src.setPosition(src.getPosition() + term.length);
112       return ret + term.length;
113     } else {
114       // find the terminator position
115       final int start = src.getPosition();
116       int skipped = terminatorPosition(src);
117       if (-1 == skipped) throw new IllegalArgumentException("Terminator sequence not found.");
118       skipped += term.length;
119       src.setPosition(skipped);
120       return skipped - start;
121     }
122   }
123 
124   @Override
125   public T decode(PositionedByteRange src) {
126     if (wrapped.isSkippable()) {
127       T ret = wrapped.decode(src);
128       src.setPosition(src.getPosition() + term.length);
129       return ret;
130     } else {
131       // find the terminator position
132       int term = terminatorPosition(src);
133       if (-1 == term) throw new IllegalArgumentException("Terminator sequence not found.");
134       byte[] b = new byte[term - src.getPosition()];
135       src.get(b);
136       // TODO: should we assert that b.position == b.length?
137       T ret = wrapped.decode(new SimplePositionedMutableByteRange(b));
138       src.get(this.term);
139       return ret;
140     }
141   }
142 
143   /**
144    * Write instance {@code val} into buffer {@code dst}.
145    * @throws IllegalArgumentException when the encoded representation of
146    *           {@code val} contains the {@code term} sequence.
147    */
148   @Override
149   public int encode(PositionedByteRange dst, T val) {
150     final int start = dst.getPosition();
151     int written = wrapped.encode(dst, val);
152     PositionedByteRange b = dst.shallowCopy();
153     b.setLength(dst.getPosition());
154     b.setPosition(start);
155     if (-1 != terminatorPosition(b)) {
156       dst.setPosition(start);
157       throw new IllegalArgumentException("Encoded value contains terminator sequence.");
158     }
159     dst.put(term);
160     return written + term.length;
161   }
162 }