001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.types;
019
020import org.apache.hadoop.hbase.util.Bytes;
021import org.apache.hadoop.hbase.util.Order;
022import org.apache.hadoop.hbase.util.PositionedByteRange;
023import org.apache.hadoop.hbase.util.SimplePositionedMutableByteRange;
024import org.apache.yetus.audience.InterfaceAudience;
025
026/**
027 * Wraps an existing {@code DataType} implementation as a terminated
028 * version of itself. This has the useful side-effect of turning an existing
029 * {@code DataType} which is not {@code skippable} into a
030 * {@code skippable} variant.
031 */
032@InterfaceAudience.Public
033public class TerminatedWrapper<T> implements DataType<T> {
034
035  protected final DataType<T> wrapped;
036  protected final byte[] term;
037
038  /**
039   * Create a terminated version of the {@code wrapped}.
040   * @throws IllegalArgumentException when {@code term} is null or empty.
041   */
042  public TerminatedWrapper(DataType<T> wrapped, byte[] term) {
043    if (null == term || term.length == 0) {
044      throw new IllegalArgumentException("terminator must be non-null and non-empty.");
045    }
046    this.wrapped = wrapped;
047    wrapped.getOrder().apply(term);
048    this.term = term;
049  }
050
051  /**
052   * Create a terminated version of the {@code wrapped}.
053   * {@code term} is converted to a {@code byte[]} using
054   * {@link Bytes#toBytes(String)}.
055   * @throws IllegalArgumentException when {@code term} is null or empty.
056   */
057  public TerminatedWrapper(DataType<T> wrapped, String term) {
058    this(wrapped, Bytes.toBytes(term));
059  }
060
061  @Override
062  public boolean isOrderPreserving() {
063    return wrapped.isOrderPreserving();
064  }
065
066  @Override
067  public Order getOrder() {
068    return wrapped.getOrder();
069  }
070
071  @Override
072  public boolean isNullable() {
073    return wrapped.isNullable();
074  }
075
076  @Override
077  public boolean isSkippable() {
078    return true;
079  }
080
081  @Override
082  public int encodedLength(T val) {
083    return wrapped.encodedLength(val) + term.length;
084  }
085
086  @Override
087  public Class<T> encodedClass() {
088    return wrapped.encodedClass();
089  }
090
091  /**
092   * Return the position at which {@code term} begins within {@code src},
093   * or {@code -1} if {@code term} is not found.
094   */
095  protected int terminatorPosition(PositionedByteRange src) {
096    byte[] a = src.getBytes();
097    final int offset = src.getOffset();
098    int i;
099    SKIP: for (i = src.getPosition(); i < src.getLength(); i++) {
100      if (a[offset + i] != term[0]) {
101        continue;
102      }
103      int j;
104      for (j = 1; j < term.length && offset + j < src.getLength(); j++) {
105        if (a[offset + i + j] != term[j]) {
106          continue SKIP;
107        }
108      }
109      if (j == term.length) {
110        return i; // success
111      }
112    }
113    return -1;
114  }
115
116  /**
117   * Skip {@code src}'s position forward over one encoded value.
118   * @param src the buffer containing the encoded value.
119   * @return number of bytes skipped.
120   * @throws IllegalArgumentException when the terminator sequence is not found.
121   */
122  @Override
123  public int skip(PositionedByteRange src) {
124    if (wrapped.isSkippable()) {
125      int ret = wrapped.skip(src);
126      src.setPosition(src.getPosition() + term.length);
127      return ret + term.length;
128    } else {
129      // find the terminator position
130      final int start = src.getPosition();
131      int skipped = terminatorPosition(src);
132      if (-1 == skipped) {
133        throw new IllegalArgumentException("Terminator sequence not found.");
134      }
135      skipped += term.length;
136      src.setPosition(skipped);
137      return skipped - start;
138    }
139  }
140
141  @Override
142  public T decode(PositionedByteRange src) {
143    if (wrapped.isSkippable()) {
144      T ret = wrapped.decode(src);
145      src.setPosition(src.getPosition() + term.length);
146      return ret;
147    } else {
148      // find the terminator position
149      int term = terminatorPosition(src);
150      if (-1 == term) {
151        throw new IllegalArgumentException("Terminator sequence not found.");
152      }
153      byte[] b = new byte[term - src.getPosition()];
154      src.get(b);
155      // TODO: should we assert that b.position == b.length?
156      T ret = wrapped.decode(new SimplePositionedMutableByteRange(b));
157      src.get(this.term);
158      return ret;
159    }
160  }
161
162  /**
163   * Write instance {@code val} into buffer {@code dst}.
164   * @throws IllegalArgumentException when the encoded representation of
165   *           {@code val} contains the {@code term} sequence.
166   */
167  @Override
168  public int encode(PositionedByteRange dst, T val) {
169    final int start = dst.getPosition();
170    int written = wrapped.encode(dst, val);
171    PositionedByteRange b = dst.shallowCopy();
172    b.setLength(dst.getPosition());
173    b.setPosition(start);
174    if (-1 != terminatorPosition(b)) {
175      dst.setPosition(start);
176      throw new IllegalArgumentException("Encoded value contains terminator sequence.");
177    }
178    dst.put(term);
179    return written + term.length;
180  }
181}