001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.types; 019 020import org.apache.hadoop.hbase.util.Bytes; 021import org.apache.hadoop.hbase.util.Order; 022import org.apache.hadoop.hbase.util.PositionedByteRange; 023import org.apache.hadoop.hbase.util.SimplePositionedMutableByteRange; 024import org.apache.yetus.audience.InterfaceAudience; 025 026/** 027 * Wraps an existing {@code DataType} implementation as a terminated version of itself. This has the 028 * useful side-effect of turning an existing {@code DataType} which is not {@code skippable} into a 029 * {@code skippable} variant. 030 */ 031@InterfaceAudience.Public 032public class TerminatedWrapper<T> implements DataType<T> { 033 034 protected final DataType<T> wrapped; 035 protected final byte[] term; 036 037 /** 038 * Create a terminated version of the {@code wrapped}. 039 * @throws IllegalArgumentException when {@code term} is null or empty. 040 */ 041 public TerminatedWrapper(DataType<T> wrapped, byte[] term) { 042 if (null == term || term.length == 0) { 043 throw new IllegalArgumentException("terminator must be non-null and non-empty."); 044 } 045 this.wrapped = wrapped; 046 wrapped.getOrder().apply(term); 047 this.term = term; 048 } 049 050 /** 051 * Create a terminated version of the {@code wrapped}. {@code term} is converted to a 052 * {@code byte[]} using {@link Bytes#toBytes(String)}. 053 * @throws IllegalArgumentException when {@code term} is null or empty. 054 */ 055 public TerminatedWrapper(DataType<T> wrapped, String term) { 056 this(wrapped, Bytes.toBytes(term)); 057 } 058 059 @Override 060 public boolean isOrderPreserving() { 061 return wrapped.isOrderPreserving(); 062 } 063 064 @Override 065 public Order getOrder() { 066 return wrapped.getOrder(); 067 } 068 069 @Override 070 public boolean isNullable() { 071 return wrapped.isNullable(); 072 } 073 074 @Override 075 public boolean isSkippable() { 076 return true; 077 } 078 079 @Override 080 public int encodedLength(T val) { 081 return wrapped.encodedLength(val) + term.length; 082 } 083 084 @Override 085 public Class<T> encodedClass() { 086 return wrapped.encodedClass(); 087 } 088 089 /** 090 * Return the position at which {@code term} begins within {@code src}, or {@code -1} if 091 * {@code term} is not found. 092 */ 093 protected int terminatorPosition(PositionedByteRange src) { 094 byte[] a = src.getBytes(); 095 final int offset = src.getOffset(); 096 int i; 097 SKIP: for (i = src.getPosition(); i < src.getLength(); i++) { 098 if (a[offset + i] != term[0]) { 099 continue; 100 } 101 int j; 102 for (j = 1; j < term.length && offset + j < src.getLength(); j++) { 103 if (a[offset + i + j] != term[j]) { 104 continue SKIP; 105 } 106 } 107 if (j == term.length) { 108 return i; // success 109 } 110 } 111 return -1; 112 } 113 114 /** 115 * Skip {@code src}'s position forward over one encoded value. 116 * @param src the buffer containing the encoded value. 117 * @return number of bytes skipped. 118 * @throws IllegalArgumentException when the terminator sequence is not found. 119 */ 120 @Override 121 public int skip(PositionedByteRange src) { 122 if (wrapped.isSkippable()) { 123 int ret = wrapped.skip(src); 124 src.setPosition(src.getPosition() + term.length); 125 return ret + term.length; 126 } else { 127 // find the terminator position 128 final int start = src.getPosition(); 129 int skipped = terminatorPosition(src); 130 if (-1 == skipped) { 131 throw new IllegalArgumentException("Terminator sequence not found."); 132 } 133 skipped += term.length; 134 src.setPosition(skipped); 135 return skipped - start; 136 } 137 } 138 139 @Override 140 public T decode(PositionedByteRange src) { 141 if (wrapped.isSkippable()) { 142 T ret = wrapped.decode(src); 143 src.setPosition(src.getPosition() + term.length); 144 return ret; 145 } else { 146 // find the terminator position 147 int term = terminatorPosition(src); 148 if (-1 == term) { 149 throw new IllegalArgumentException("Terminator sequence not found."); 150 } 151 byte[] b = new byte[term - src.getPosition()]; 152 src.get(b); 153 // TODO: should we assert that b.position == b.length? 154 T ret = wrapped.decode(new SimplePositionedMutableByteRange(b)); 155 src.get(this.term); 156 return ret; 157 } 158 } 159 160 /** 161 * Write instance {@code val} into buffer {@code dst}. 162 * @throws IllegalArgumentException when the encoded representation of {@code val} contains the 163 * {@code term} sequence. 164 */ 165 @Override 166 public int encode(PositionedByteRange dst, T val) { 167 final int start = dst.getPosition(); 168 int written = wrapped.encode(dst, val); 169 PositionedByteRange b = dst.shallowCopy(); 170 b.setLength(dst.getPosition()); 171 b.setPosition(start); 172 if (-1 != terminatorPosition(b)) { 173 dst.setPosition(start); 174 throw new IllegalArgumentException("Encoded value contains terminator sequence."); 175 } 176 dst.put(term); 177 return written + term.length; 178 } 179}