001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.types;
019
020import org.apache.hadoop.hbase.util.Bytes;
021import org.apache.hadoop.hbase.util.Order;
022import org.apache.hadoop.hbase.util.PositionedByteRange;
023import org.apache.hadoop.hbase.util.SimplePositionedMutableByteRange;
024import org.apache.yetus.audience.InterfaceAudience;
025
026/**
027 * Wraps an existing {@code DataType} implementation as a terminated version of itself. This has the
028 * useful side-effect of turning an existing {@code DataType} which is not {@code skippable} into a
029 * {@code skippable} variant.
030 */
031@InterfaceAudience.Public
032public class TerminatedWrapper<T> implements DataType<T> {
033
034  protected final DataType<T> wrapped;
035  protected final byte[] term;
036
037  /**
038   * Create a terminated version of the {@code wrapped}.
039   * @throws IllegalArgumentException when {@code term} is null or empty.
040   */
041  public TerminatedWrapper(DataType<T> wrapped, byte[] term) {
042    if (null == term || term.length == 0) {
043      throw new IllegalArgumentException("terminator must be non-null and non-empty.");
044    }
045    this.wrapped = wrapped;
046    wrapped.getOrder().apply(term);
047    this.term = term;
048  }
049
050  /**
051   * Create a terminated version of the {@code wrapped}. {@code term} is converted to a
052   * {@code byte[]} using {@link Bytes#toBytes(String)}.
053   * @throws IllegalArgumentException when {@code term} is null or empty.
054   */
055  public TerminatedWrapper(DataType<T> wrapped, String term) {
056    this(wrapped, Bytes.toBytes(term));
057  }
058
059  @Override
060  public boolean isOrderPreserving() {
061    return wrapped.isOrderPreserving();
062  }
063
064  @Override
065  public Order getOrder() {
066    return wrapped.getOrder();
067  }
068
069  @Override
070  public boolean isNullable() {
071    return wrapped.isNullable();
072  }
073
074  @Override
075  public boolean isSkippable() {
076    return true;
077  }
078
079  @Override
080  public int encodedLength(T val) {
081    return wrapped.encodedLength(val) + term.length;
082  }
083
084  @Override
085  public Class<T> encodedClass() {
086    return wrapped.encodedClass();
087  }
088
089  /**
090   * Return the position at which {@code term} begins within {@code src}, or {@code -1} if
091   * {@code term} is not found.
092   */
093  protected int terminatorPosition(PositionedByteRange src) {
094    byte[] a = src.getBytes();
095    final int offset = src.getOffset();
096    int i;
097    SKIP: for (i = src.getPosition(); i < src.getLength(); i++) {
098      if (a[offset + i] != term[0]) {
099        continue;
100      }
101      int j;
102      for (j = 1; j < term.length && offset + j < src.getLength(); j++) {
103        if (a[offset + i + j] != term[j]) {
104          continue SKIP;
105        }
106      }
107      if (j == term.length) {
108        return i; // success
109      }
110    }
111    return -1;
112  }
113
114  /**
115   * Skip {@code src}'s position forward over one encoded value.
116   * @param src the buffer containing the encoded value.
117   * @return number of bytes skipped.
118   * @throws IllegalArgumentException when the terminator sequence is not found.
119   */
120  @Override
121  public int skip(PositionedByteRange src) {
122    if (wrapped.isSkippable()) {
123      int ret = wrapped.skip(src);
124      src.setPosition(src.getPosition() + term.length);
125      return ret + term.length;
126    } else {
127      // find the terminator position
128      final int start = src.getPosition();
129      int skipped = terminatorPosition(src);
130      if (-1 == skipped) {
131        throw new IllegalArgumentException("Terminator sequence not found.");
132      }
133      skipped += term.length;
134      src.setPosition(skipped);
135      return skipped - start;
136    }
137  }
138
139  @Override
140  public T decode(PositionedByteRange src) {
141    if (wrapped.isSkippable()) {
142      T ret = wrapped.decode(src);
143      src.setPosition(src.getPosition() + term.length);
144      return ret;
145    } else {
146      // find the terminator position
147      int term = terminatorPosition(src);
148      if (-1 == term) {
149        throw new IllegalArgumentException("Terminator sequence not found.");
150      }
151      byte[] b = new byte[term - src.getPosition()];
152      src.get(b);
153      // TODO: should we assert that b.position == b.length?
154      T ret = wrapped.decode(new SimplePositionedMutableByteRange(b));
155      src.get(this.term);
156      return ret;
157    }
158  }
159
160  /**
161   * Write instance {@code val} into buffer {@code dst}.
162   * @throws IllegalArgumentException when the encoded representation of {@code val} contains the
163   *                                  {@code term} sequence.
164   */
165  @Override
166  public int encode(PositionedByteRange dst, T val) {
167    final int start = dst.getPosition();
168    int written = wrapped.encode(dst, val);
169    PositionedByteRange b = dst.shallowCopy();
170    b.setLength(dst.getPosition());
171    b.setPosition(start);
172    if (-1 != terminatorPosition(b)) {
173      dst.setPosition(start);
174      throw new IllegalArgumentException("Encoded value contains terminator sequence.");
175    }
176    dst.put(term);
177    return written + term.length;
178  }
179}