001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.types; 019 020import org.apache.hadoop.hbase.util.Bytes; 021import org.apache.hadoop.hbase.util.Order; 022import org.apache.hadoop.hbase.util.PositionedByteRange; 023import org.apache.hadoop.hbase.util.SimplePositionedMutableByteRange; 024import org.apache.yetus.audience.InterfaceAudience; 025 026/** 027 * Wraps an existing {@code DataType} implementation as a terminated 028 * version of itself. This has the useful side-effect of turning an existing 029 * {@code DataType} which is not {@code skippable} into a 030 * {@code skippable} variant. 031 */ 032@InterfaceAudience.Public 033public class TerminatedWrapper<T> implements DataType<T> { 034 035 protected final DataType<T> wrapped; 036 protected final byte[] term; 037 038 /** 039 * Create a terminated version of the {@code wrapped}. 040 * @throws IllegalArgumentException when {@code term} is null or empty. 041 */ 042 public TerminatedWrapper(DataType<T> wrapped, byte[] term) { 043 if (null == term || term.length == 0) { 044 throw new IllegalArgumentException("terminator must be non-null and non-empty."); 045 } 046 this.wrapped = wrapped; 047 wrapped.getOrder().apply(term); 048 this.term = term; 049 } 050 051 /** 052 * Create a terminated version of the {@code wrapped}. 053 * {@code term} is converted to a {@code byte[]} using 054 * {@link Bytes#toBytes(String)}. 055 * @throws IllegalArgumentException when {@code term} is null or empty. 056 */ 057 public TerminatedWrapper(DataType<T> wrapped, String term) { 058 this(wrapped, Bytes.toBytes(term)); 059 } 060 061 @Override 062 public boolean isOrderPreserving() { 063 return wrapped.isOrderPreserving(); 064 } 065 066 @Override 067 public Order getOrder() { 068 return wrapped.getOrder(); 069 } 070 071 @Override 072 public boolean isNullable() { 073 return wrapped.isNullable(); 074 } 075 076 @Override 077 public boolean isSkippable() { 078 return true; 079 } 080 081 @Override 082 public int encodedLength(T val) { 083 return wrapped.encodedLength(val) + term.length; 084 } 085 086 @Override 087 public Class<T> encodedClass() { 088 return wrapped.encodedClass(); 089 } 090 091 /** 092 * Return the position at which {@code term} begins within {@code src}, 093 * or {@code -1} if {@code term} is not found. 094 */ 095 protected int terminatorPosition(PositionedByteRange src) { 096 byte[] a = src.getBytes(); 097 final int offset = src.getOffset(); 098 int i; 099 SKIP: for (i = src.getPosition(); i < src.getLength(); i++) { 100 if (a[offset + i] != term[0]) { 101 continue; 102 } 103 int j; 104 for (j = 1; j < term.length && offset + j < src.getLength(); j++) { 105 if (a[offset + i + j] != term[j]) { 106 continue SKIP; 107 } 108 } 109 if (j == term.length) { 110 return i; // success 111 } 112 } 113 return -1; 114 } 115 116 /** 117 * Skip {@code src}'s position forward over one encoded value. 118 * @param src the buffer containing the encoded value. 119 * @return number of bytes skipped. 120 * @throws IllegalArgumentException when the terminator sequence is not found. 121 */ 122 @Override 123 public int skip(PositionedByteRange src) { 124 if (wrapped.isSkippable()) { 125 int ret = wrapped.skip(src); 126 src.setPosition(src.getPosition() + term.length); 127 return ret + term.length; 128 } else { 129 // find the terminator position 130 final int start = src.getPosition(); 131 int skipped = terminatorPosition(src); 132 if (-1 == skipped) { 133 throw new IllegalArgumentException("Terminator sequence not found."); 134 } 135 skipped += term.length; 136 src.setPosition(skipped); 137 return skipped - start; 138 } 139 } 140 141 @Override 142 public T decode(PositionedByteRange src) { 143 if (wrapped.isSkippable()) { 144 T ret = wrapped.decode(src); 145 src.setPosition(src.getPosition() + term.length); 146 return ret; 147 } else { 148 // find the terminator position 149 int term = terminatorPosition(src); 150 if (-1 == term) { 151 throw new IllegalArgumentException("Terminator sequence not found."); 152 } 153 byte[] b = new byte[term - src.getPosition()]; 154 src.get(b); 155 // TODO: should we assert that b.position == b.length? 156 T ret = wrapped.decode(new SimplePositionedMutableByteRange(b)); 157 src.get(this.term); 158 return ret; 159 } 160 } 161 162 /** 163 * Write instance {@code val} into buffer {@code dst}. 164 * @throws IllegalArgumentException when the encoded representation of 165 * {@code val} contains the {@code term} sequence. 166 */ 167 @Override 168 public int encode(PositionedByteRange dst, T val) { 169 final int start = dst.getPosition(); 170 int written = wrapped.encode(dst, val); 171 PositionedByteRange b = dst.shallowCopy(); 172 b.setLength(dst.getPosition()); 173 b.setPosition(start); 174 if (-1 != terminatorPosition(b)) { 175 dst.setPosition(start); 176 throw new IllegalArgumentException("Encoded value contains terminator sequence."); 177 } 178 dst.put(term); 179 return written + term.length; 180 } 181}