001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import static org.apache.hadoop.hbase.util.Order.ASCENDING;
021import static org.apache.hadoop.hbase.util.Order.DESCENDING;
022
023import java.math.BigDecimal;
024import java.math.BigInteger;
025import java.math.MathContext;
026import java.math.RoundingMode;
027import java.nio.charset.Charset;
028
029import org.apache.yetus.audience.InterfaceAudience;
030
031/**
032 * Utility class that handles ordered byte arrays. That is, unlike
033 * {@link Bytes}, these methods produce byte arrays which maintain the sort
034 * order of the original values.
035 * <h3>Encoding Format summary</h3>
036 * <p>
037 * Each value is encoded as one or more bytes. The first byte of the encoding,
038 * its meaning, and a terse description of the bytes that follow is given by
039 * the following table:
040 * </p>
041 * <table summary="Encodings">
042 * <tr><th>Content Type</th><th>Encoding</th></tr>
043 * <tr><td>NULL</td><td>0x05</td></tr>
044 * <tr><td>negative infinity</td><td>0x07</td></tr>
045 * <tr><td>negative large</td><td>0x08, ~E, ~M</td></tr>
046 * <tr><td>negative medium</td><td>0x13-E, ~M</td></tr>
047 * <tr><td>negative small</td><td>0x14, -E, ~M</td></tr>
048 * <tr><td>zero</td><td>0x15</td></tr>
049 * <tr><td>positive small</td><td>0x16, ~-E, M</td></tr>
050 * <tr><td>positive medium</td><td>0x17+E, M</td></tr>
051 * <tr><td>positive large</td><td>0x22, E, M</td></tr>
052 * <tr><td>positive infinity</td><td>0x23</td></tr>
053 * <tr><td>NaN</td><td>0x25</td></tr>
054 * <tr><td>fixed-length 32-bit integer</td><td>0x27, I</td></tr>
055 * <tr><td>fixed-length 64-bit integer</td><td>0x28, I</td></tr>
056 * <tr><td>fixed-length 8-bit integer</td><td>0x29</td></tr>
057 * <tr><td>fixed-length 16-bit integer</td><td>0x2a</td></tr>
058 * <tr><td>fixed-length 32-bit float</td><td>0x30, F</td></tr>
059 * <tr><td>fixed-length 64-bit float</td><td>0x31, F</td></tr>
060 * <tr><td>TEXT</td><td>0x33, T</td></tr>
061 * <tr><td>variable length BLOB</td><td>0x35, B</td></tr>
062 * <tr><td>byte-for-byte BLOB</td><td>0x36, X</td></tr>
063 * </table>
064 *
065 * <h3>Null Encoding</h3>
066 * <p>
067 * Each value that is a NULL encodes as a single byte of 0x05. Since every
068 * other value encoding begins with a byte greater than 0x05, this forces NULL
069 * values to sort first.
070 * </p>
071 * <h3>Text Encoding</h3>
072 * <p>
073 * Each text value begins with a single byte of 0x33 and ends with a single
074 * byte of 0x00. There are zero or more intervening bytes that encode the text
075 * value. The intervening bytes are chosen so that the encoding will sort in
076 * the desired collating order. The intervening bytes may not contain a 0x00
077 * character; the only 0x00 byte allowed in a text encoding is the final byte.
078 * </p>
079 * <p>
080 * The text encoding ends in 0x00 in order to ensure that when there are two
081 * strings where one is a prefix of the other that the shorter string will
082 * sort first.
083 * </p>
084 * <h3>Binary Encoding</h3>
085 * <p>
086 * There are two encoding strategies for binary fields, referred to as
087 * "BlobVar" and "BlobCopy". BlobVar is less efficient in both space and
088 * encoding time. It has no limitations on the range of encoded values.
089 * BlobCopy is a byte-for-byte copy of the input data followed by a
090 * termination byte. It is extremely fast to encode and decode. It carries the
091 * restriction of not allowing a 0x00 value in the input byte[] as this value
092 * is used as the termination byte.
093 * </p>
094 * <h4>BlobVar</h4>
095 * <p>
096 * "BlobVar" encodes the input byte[] in a manner similar to a variable length
097 * integer encoding. As with the other {@code OrderedBytes} encodings,
098 * the first encoded byte is used to indicate what kind of value follows. This
099 * header byte is 0x37 for BlobVar encoded values. As with the traditional
100 * varint encoding, the most significant bit of each subsequent encoded
101 * {@code byte} is used as a continuation marker. The 7 remaining bits
102 * contain the 7 most significant bits of the first unencoded byte. The next
103 * encoded byte starts with a continuation marker in the MSB. The least
104 * significant bit from the first unencoded byte follows, and the remaining 6
105 * bits contain the 6 MSBs of the second unencoded byte. The encoding
106 * continues, encoding 7 bytes on to 8 encoded bytes. The MSB of the final
107 * encoded byte contains a termination marker rather than a continuation
108 * marker, and any remaining bits from the final input byte. Any trailing bits
109 * in the final encoded byte are zeros.
110 * </p>
111 * <h4>BlobCopy</h4>
112 * <p>
113 * "BlobCopy" is a simple byte-for-byte copy of the input data. It uses 0x38
114 * as the header byte, and is terminated by 0x00 in the DESCENDING case. This
115 * alternative encoding is faster and more space-efficient, but it cannot
116 * accept values containing a 0x00 byte in DESCENDING order.
117 * </p>
118 * <h3>Variable-length Numeric Encoding</h3>
119 * <p>
120 * Numeric values must be coded so as to sort in numeric order. We assume that
121 * numeric values can be both integer and floating point values. Clients must
122 * be careful to use inspection methods for encoded values (such as
123 * {@link #isNumericInfinite(PositionedByteRange)} and
124 * {@link #isNumericNaN(PositionedByteRange)} to protect against decoding
125 * values into object which do not support these numeric concepts (such as
126 * {@link Long} and {@link BigDecimal}).
127 * </p>
128 * <p>
129 * Simplest cases first: If the numeric value is a NaN, then the encoding is a
130 * single byte of 0x25. This causes NaN values to sort after every other
131 * numeric value.
132 * </p>
133 * <p>
134 * If the numeric value is a negative infinity then the encoding is a single
135 * byte of 0x07. Since every other numeric value except NaN has a larger
136 * initial byte, this encoding ensures that negative infinity will sort prior
137 * to every other numeric value other than NaN.
138 * </p>
139 * <p>
140 * If the numeric value is a positive infinity then the encoding is a single
141 * byte of 0x23. Every other numeric value encoding begins with a smaller
142 * byte, ensuring that positive infinity always sorts last among numeric
143 * values. 0x23 is also smaller than 0x33, the initial byte of a text value,
144 * ensuring that every numeric value sorts before every text value.
145 * </p>
146 * <p>
147 * If the numeric value is exactly zero then it is encoded as a single byte of
148 * 0x15. Finite negative values will have initial bytes of 0x08 through 0x14
149 * and finite positive values will have initial bytes of 0x16 through 0x22.
150 * </p>
151 * <p>
152 * For all numeric values, we compute a mantissa M and an exponent E. The
153 * mantissa is a base-100 representation of the value. The exponent E
154 * determines where to put the decimal point.
155 * </p>
156 * <p>
157 * Each centimal digit of the mantissa is stored in a byte. If the value of
158 * the centimal digit is X (hence X&ge;0 and X&le;99) then the byte value will
159 * be 2*X+1 for every byte of the mantissa, except for the last byte which
160 * will be 2*X+0. The mantissa must be the minimum number of bytes necessary
161 * to represent the value; trailing X==0 digits are omitted. This means that
162 * the mantissa will never contain a byte with the value 0x00.
163 * </p>
164 * <p>
165 * If we assume all digits of the mantissa occur to the right of the decimal
166 * point, then the exponent E is the power of one hundred by which one must
167 * multiply the mantissa to recover the original value.
168 * </p>
169 * <p>
170 * Values are classified as large, medium, or small according to the value of
171 * E. If E is 11 or more, the value is large. For E between 0 and 10, the
172 * value is medium. For E less than zero, the value is small.
173 * </p>
174 * <p>
175 * Large positive values are encoded as a single byte 0x22 followed by E as a
176 * varint and then M. Medium positive values are a single byte of 0x17+E
177 * followed by M. Small positive values are encoded as a single byte 0x16
178 * followed by the ones-complement of the varint for -E followed by M.
179 * </p>
180 * <p>
181 * Small negative values are encoded as a single byte 0x14 followed by -E as a
182 * varint and then the ones-complement of M. Medium negative values are
183 * encoded as a byte 0x13-E followed by the ones-complement of M. Large
184 * negative values consist of the single byte 0x08 followed by the
185 * ones-complement of the varint encoding of E followed by the ones-complement
186 * of M.
187 * </p>
188 * <h3>Fixed-length Integer Encoding</h3>
189 * <p>
190 * All 4-byte integers are serialized to a 5-byte, fixed-width, sortable byte
191 * format. All 8-byte integers are serialized to the equivelant 9-byte format.
192 * Serialization is performed by writing a header byte, inverting the integer
193 * sign bit and writing the resulting bytes to the byte array in big endian
194 * order.
195 * </p>
196 * <h3>Fixed-length Floating Point Encoding</h3>
197 * <p>
198 * 32-bit and 64-bit floating point numbers are encoded to a 5-byte and 9-byte
199 * encoding format, respectively. The format is identical, save for the
200 * precision respected in each step of the operation.
201 * <p>
202 * This format ensures the following total ordering of floating point values:
203 * Float.NEGATIVE_INFINITY &lt; -Float.MAX_VALUE &lt; ... &lt;
204 * -Float.MIN_VALUE &lt; -0.0 &lt; +0.0; &lt; Float.MIN_VALUE &lt; ... &lt;
205 * Float.MAX_VALUE &lt; Float.POSITIVE_INFINITY &lt; Float.NaN
206 * </p>
207 * <p>
208 * Floating point numbers are encoded as specified in IEEE 754. A 32-bit
209 * single precision float consists of a sign bit, 8-bit unsigned exponent
210 * encoded in offset-127 notation, and a 23-bit significand. The format is
211 * described further in the <a
212 * href="http://en.wikipedia.org/wiki/Single_precision"> Single Precision
213 * Floating Point Wikipedia page</a>
214 * </p>
215 * <p>
216 * The value of a normal float is -1 <sup>sign bit</sup> &times;
217 * 2<sup>exponent - 127</sup> &times; 1.significand
218 * </p>
219 * <p>
220 * The IEE754 floating point format already preserves sort ordering for
221 * positive floating point numbers when the raw bytes are compared in most
222 * significant byte order. This is discussed further at <a href=
223 * "http://www.cygnus-software.com/papers/comparingfloats/comparingfloats.htm">
224 * http://www.cygnus-software.com/papers/comparingfloats/comparingfloats.htm</a>
225 * </p>
226 * <p>
227 * Thus, we need only ensure that negative numbers sort in the the exact
228 * opposite order as positive numbers (so that say, negative infinity is less
229 * than negative 1), and that all negative numbers compare less than any
230 * positive number. To accomplish this, we invert the sign bit of all floating
231 * point numbers, and we also invert the exponent and significand bits if the
232 * floating point number was negative.
233 * </p>
234 * <p>
235 * More specifically, we first store the floating point bits into a 32-bit int
236 * {@code j} using {@link Float#floatToIntBits}. This method collapses
237 * all NaNs into a single, canonical NaN value but otherwise leaves the bits
238 * unchanged. We then compute
239 * </p>
240 *
241 * <pre>
242 * j &circ;= (j &gt;&gt; (Integer.SIZE - 1)) | Integer.MIN_SIZE
243 * </pre>
244 * <p>
245 * which inverts the sign bit and XOR's all other bits with the sign bit
246 * itself. Comparing the raw bytes of {@code j} in most significant byte
247 * order is equivalent to performing a single precision floating point
248 * comparison on the underlying bits (ignoring NaN comparisons, as NaNs don't
249 * compare equal to anything when performing floating point comparisons).
250 * </p>
251 * <p>
252 * The resulting integer is then converted into a byte array by serializing
253 * the integer one byte at a time in most significant byte order. The
254 * serialized integer is prefixed by a single header byte. All serialized
255 * values are 5 bytes in length.
256 * </p>
257 * <p>
258 * {@code OrderedBytes} encodings are heavily influenced by the
259 * <a href="http://sqlite.org/src4/doc/trunk/www/key_encoding.wiki">SQLite4 Key
260 * Encoding</a>. Slight deviations are make in the interest of order
261 * correctness and user extensibility. Fixed-width {@code Long} and
262 * {@link Double} encodings are based on implementations from the now defunct
263 * Orderly library.
264 * </p>
265 */
266@InterfaceAudience.Public
267public class OrderedBytes {
268
269  /*
270   * These constants define header bytes used to identify encoded values. Note
271   * that the values here are not exhaustive as the Numeric format encodes
272   * portions of its value within the header byte. The values listed here are
273   * directly applied to persisted data -- DO NOT modify the values specified
274   * here. Instead, gaps are placed intentionally between values so that new
275   * implementations can be inserted into the total ordering enforced here.
276   */
277  private static final byte NULL = 0x05;
278  // room for 1 expansion type
279  private static final byte NEG_INF = 0x07;
280  private static final byte NEG_LARGE = 0x08;
281  private static final byte NEG_MED_MIN = 0x09;
282  private static final byte NEG_MED_MAX = 0x13;
283  private static final byte NEG_SMALL = 0x14;
284  private static final byte ZERO = 0x15;
285  private static final byte POS_SMALL = 0x16;
286  private static final byte POS_MED_MIN = 0x17;
287  private static final byte POS_MED_MAX = 0x21;
288  private static final byte POS_LARGE = 0x22;
289  private static final byte POS_INF = 0x23;
290  // room for 2 expansion type
291  private static final byte NAN = 0x26;
292  // room for 2 expansion types
293  private static final byte FIXED_INT8 = 0x29;
294  private static final byte FIXED_INT16 = 0x2a;
295  private static final byte FIXED_INT32 = 0x2b;
296  private static final byte FIXED_INT64 = 0x2c;
297  // room for 3 expansion types
298  private static final byte FIXED_FLOAT32 = 0x30;
299  private static final byte FIXED_FLOAT64 = 0x31;
300  // room for 2 expansion type
301  private static final byte TEXT = 0x34;
302  // room for 2 expansion type
303  private static final byte BLOB_VAR = 0x37;
304  private static final byte BLOB_COPY = 0x38;
305
306  /*
307   * The following constant values are used by encoding implementations
308   */
309
310  public static final Charset UTF8 = Charset.forName("UTF-8");
311  private static final byte TERM = 0x00;
312  private static final BigDecimal E8 = BigDecimal.valueOf(1e8);
313  private static final BigDecimal E32 = BigDecimal.valueOf(1e32);
314  private static final BigDecimal EN2 = BigDecimal.valueOf(1e-2);
315  private static final BigDecimal EN10 = BigDecimal.valueOf(1e-10);
316
317  /**
318   * Max precision guaranteed to fit into a {@code long}.
319   */
320  public static final int MAX_PRECISION = 31;
321
322  /**
323   * The context used to normalize {@link BigDecimal} values.
324   */
325  public static final MathContext DEFAULT_MATH_CONTEXT =
326      new MathContext(MAX_PRECISION, RoundingMode.HALF_UP);
327
328  /**
329   * Creates the standard exception when the encoded header byte is unexpected for the decoding
330   * context.
331   * @param header value used in error message.
332   */
333  private static IllegalArgumentException unexpectedHeader(byte header) {
334    throw new IllegalArgumentException("unexpected value in first byte: 0x"
335        + Long.toHexString(header));
336  }
337
338  /**
339   * Perform unsigned comparison between two long values. Conforms to the same interface as
340   * {@link org.apache.hadoop.hbase.CellComparator}.
341   */
342  private static int unsignedCmp(long x1, long x2) {
343    int cmp;
344    if ((cmp = (x1 < x2 ? -1 : (x1 == x2 ? 0 : 1))) == 0) return 0;
345    // invert the result when either value is negative
346    if ((x1 < 0) != (x2 < 0)) return -cmp;
347    return cmp;
348  }
349
350  /**
351   * Write a 32-bit unsigned integer to {@code dst} as 4 big-endian bytes.
352   * @return number of bytes written.
353   */
354  private static int putUint32(PositionedByteRange dst, int val) {
355    dst.put((byte) (val >>> 24))
356       .put((byte) (val >>> 16))
357       .put((byte) (val >>> 8))
358       .put((byte) val);
359    return 4;
360  }
361
362  /**
363   * Encode an unsigned 64-bit unsigned integer {@code val} into {@code dst}.
364   * @param dst The destination to which encoded bytes are written.
365   * @param val The value to write.
366   * @param comp Compliment the encoded value when {@code comp} is true.
367   * @return number of bytes written.
368   */
369  @InterfaceAudience.Private
370  static int putVaruint64(PositionedByteRange dst, long val, boolean comp) {
371    int w, y, len = 0;
372    final int offset = dst.getOffset(), start = dst.getPosition();
373    byte[] a = dst.getBytes();
374    Order ord = comp ? DESCENDING : ASCENDING;
375    if (-1 == unsignedCmp(val, 241L)) {
376      dst.put((byte) val);
377      len = dst.getPosition() - start;
378      ord.apply(a, offset + start, len);
379      return len;
380    }
381    if (-1 == unsignedCmp(val, 2288L)) {
382      y = (int) (val - 240);
383      dst.put((byte) (y / 256 + 241))
384         .put((byte) (y % 256));
385      len = dst.getPosition() - start;
386      ord.apply(a, offset + start, len);
387      return len;
388    }
389    if (-1 == unsignedCmp(val, 67824L)) {
390      y = (int) (val - 2288);
391      dst.put((byte) 249)
392         .put((byte) (y / 256))
393         .put((byte) (y % 256));
394      len = dst.getPosition() - start;
395      ord.apply(a, offset + start, len);
396      return len;
397    }
398    y = (int) val;
399    w = (int) (val >>> 32);
400    if (w == 0) {
401      if (-1 == unsignedCmp(y, 16777216L)) {
402        dst.put((byte) 250)
403           .put((byte) (y >>> 16))
404           .put((byte) (y >>> 8))
405           .put((byte) y);
406        len = dst.getPosition() - start;
407        ord.apply(a, offset + start, len);
408        return len;
409      }
410      dst.put((byte) 251);
411      putUint32(dst, y);
412      len = dst.getPosition() - start;
413      ord.apply(a, offset + start, len);
414      return len;
415    }
416    if (-1 == unsignedCmp(w, 256L)) {
417      dst.put((byte) 252)
418         .put((byte) w);
419      putUint32(dst, y);
420      len = dst.getPosition() - start;
421      ord.apply(a, offset + start, len);
422      return len;
423    }
424    if (-1 == unsignedCmp(w, 65536L)) {
425      dst.put((byte) 253)
426         .put((byte) (w >>> 8))
427         .put((byte) w);
428      putUint32(dst, y);
429      len = dst.getPosition() - start;
430      ord.apply(a, offset + start, len);
431      return len;
432    }
433    if (-1 == unsignedCmp(w, 16777216L)) {
434      dst.put((byte) 254)
435         .put((byte) (w >>> 16))
436         .put((byte) (w >>> 8))
437         .put((byte) w);
438      putUint32(dst, y);
439      len = dst.getPosition() - start;
440      ord.apply(a, offset + start, len);
441      return len;
442    }
443    dst.put((byte) 255);
444    putUint32(dst, w);
445    putUint32(dst, y);
446    len = dst.getPosition() - start;
447    ord.apply(a, offset + start, len);
448    return len;
449  }
450
451  /**
452   * Inspect {@code src} for an encoded varuint64 for its length in bytes.
453   * Preserves the state of {@code src}.
454   * @param src source buffer
455   * @param comp if true, parse the compliment of the value.
456   * @return the number of bytes consumed by this value.
457   */
458  @InterfaceAudience.Private
459  static int lengthVaruint64(PositionedByteRange src, boolean comp) {
460    int a0 = (comp ? DESCENDING : ASCENDING).apply(src.peek()) & 0xff;
461    if (a0 <= 240) return 1;
462    if (a0 <= 248) return 2;
463    if (a0 == 249) return 3;
464    if (a0 == 250) return 4;
465    if (a0 == 251) return 5;
466    if (a0 == 252) return 6;
467    if (a0 == 253) return 7;
468    if (a0 == 254) return 8;
469    if (a0 == 255) return 9;
470    throw unexpectedHeader(src.peek());
471  }
472
473  /**
474   * Skip {@code src} over the encoded varuint64.
475   * @param src source buffer
476   * @param cmp if true, parse the compliment of the value.
477   * @return the number of bytes skipped.
478   */
479  @InterfaceAudience.Private
480  static int skipVaruint64(PositionedByteRange src, boolean cmp) {
481    final int len = lengthVaruint64(src, cmp);
482    src.setPosition(src.getPosition() + len);
483    return len;
484  }
485
486  /**
487   * Decode a sequence of bytes in {@code src} as a varuint64. Compliment the
488   * encoded value when {@code comp} is true.
489   * @return the decoded value.
490   */
491  @InterfaceAudience.Private
492  static long getVaruint64(PositionedByteRange src, boolean comp) {
493    assert src.getRemaining() >= lengthVaruint64(src, comp);
494    final long ret;
495    Order ord = comp ? DESCENDING : ASCENDING;
496    byte x = src.get();
497    final int a0 = ord.apply(x) & 0xff, a1, a2, a3, a4, a5, a6, a7, a8;
498    if (-1 == unsignedCmp(a0, 241)) {
499      return a0;
500    }
501    x = src.get();
502    a1 = ord.apply(x) & 0xff;
503    if (-1 == unsignedCmp(a0, 249)) {
504      return (a0 - 241L) * 256 + a1 + 240;
505    }
506    x = src.get();
507    a2 = ord.apply(x) & 0xff;
508    if (a0 == 249) {
509      return 2288L + 256 * a1 + a2;
510    }
511    x = src.get();
512    a3 = ord.apply(x) & 0xff;
513    if (a0 == 250) {
514      return ((long) a1 << 16L) | (a2 << 8) | a3;
515    }
516    x = src.get();
517    a4 = ord.apply(x) & 0xff;
518    ret = (((long) a1) << 24) | (a2 << 16) | (a3 << 8) | a4;
519    if (a0 == 251) {
520      return ret;
521    }
522    x = src.get();
523    a5 = ord.apply(x) & 0xff;
524    if (a0 == 252) {
525      return (ret << 8) | a5;
526    }
527    x = src.get();
528    a6 = ord.apply(x) & 0xff;
529    if (a0 == 253) {
530      return (ret << 16) | (a5 << 8) | a6;
531    }
532    x = src.get();
533    a7 = ord.apply(x) & 0xff;
534    if (a0 == 254) {
535      return (ret << 24) | (a5 << 16) | (a6 << 8) | a7;
536    }
537    x = src.get();
538    a8 = ord.apply(x) & 0xff;
539    return (ret << 32) | (((long) a5) << 24) | (a6 << 16) | (a7 << 8) | a8;
540  }
541
542  /**
543   * Strip all trailing zeros to ensure that no digit will be zero and round
544   * using our default context to ensure precision doesn't exceed max allowed.
545   * From Phoenix's {@code NumberUtil}.
546   * @return new {@link BigDecimal} instance
547   */
548  @InterfaceAudience.Private
549  static BigDecimal normalize(BigDecimal val) {
550    return null == val ? null : val.stripTrailingZeros().round(DEFAULT_MATH_CONTEXT);
551  }
552
553  /**
554   * Read significand digits from {@code src} according to the magnitude
555   * of {@code e}.
556   * @param src The source from which to read encoded digits.
557   * @param e The magnitude of the first digit read.
558   * @param comp Treat encoded bytes as compliments when {@code comp} is true.
559   * @return The decoded value.
560   * @throws IllegalArgumentException when read exceeds the remaining length
561   *     of {@code src}.
562   */
563  private static BigDecimal decodeSignificand(PositionedByteRange src, int e, boolean comp) {
564    // TODO: can this be made faster?
565    byte[] a = src.getBytes();
566    final int start = src.getPosition(), offset = src.getOffset(), remaining = src.getRemaining();
567    Order ord = comp ? DESCENDING : ASCENDING;
568    BigDecimal m = BigDecimal.ZERO;
569    e--;
570    for (int i = 0;; i++) {
571      if (i > remaining) {
572        // we've exceeded this range's window
573        src.setPosition(start);
574        throw new IllegalArgumentException(
575            "Read exceeds range before termination byte found. offset: " + offset + " position: "
576                + (start + i));
577      }
578      // base-100 digits are encoded as val * 2 + 1 except for the termination digit.
579      m = m.add( // m +=
580        new BigDecimal(BigInteger.ONE, e * -2).multiply( // 100 ^ p * [decoded digit]
581          BigDecimal.valueOf((ord.apply(a[offset + start + i]) & 0xff) / 2)));
582      e--;
583      // detect termination digit
584      if ((ord.apply(a[offset + start + i]) & 1) == 0) {
585        src.setPosition(start + i + 1);
586        break;
587      }
588    }
589    return normalize(m);
590  }
591
592  /**
593   * Skip {@code src} over the significand bytes.
594   * @param src The source from which to read encoded digits.
595   * @param comp Treat encoded bytes as compliments when {@code comp} is true.
596   * @return the number of bytes skipped.
597   */
598  private static int skipSignificand(PositionedByteRange src, boolean comp) {
599    byte[] a = src.getBytes();
600    final int offset = src.getOffset(), start = src.getPosition();
601    int i = src.getPosition();
602    while (((comp ? DESCENDING : ASCENDING).apply(a[offset + i++]) & 1) != 0)
603      ;
604    src.setPosition(i);
605    return i - start;
606  }
607
608  /**
609   * <p>
610   * Encode the small magnitude floating point number {@code val} using the
611   * key encoding. The caller guarantees that 1.0 > abs(val) > 0.0.
612   * </p>
613   * <p>
614   * A floating point value is encoded as an integer exponent {@code E} and a
615   * mantissa {@code M}. The original value is equal to {@code (M * 100^E)}.
616   * {@code E} is set to the smallest value possible without making {@code M}
617   * greater than or equal to 1.0.
618   * </p>
619   * <p>
620   * For this routine, {@code E} will always be zero or negative, since the
621   * original value is less than one. The encoding written by this routine is
622   * the ones-complement of the varint of the negative of {@code E} followed
623   * by the mantissa:
624   * <pre>
625   *   Encoding:   ~-E  M
626   * </pre>
627   * </p>
628   * @param dst The destination to which encoded digits are written.
629   * @param val The value to encode.
630   * @return the number of bytes written.
631   */
632  private static int encodeNumericSmall(PositionedByteRange dst, BigDecimal val) {
633    // TODO: this can be done faster?
634    // assert 1.0 > abs(val) > 0.0
635    BigDecimal abs = val.abs();
636    assert BigDecimal.ZERO.compareTo(abs) < 0 && BigDecimal.ONE.compareTo(abs) > 0;
637    byte[] a = dst.getBytes();
638    boolean isNeg = val.signum() == -1;
639    final int offset = dst.getOffset(), start = dst.getPosition();
640    int e = 0, d, startM;
641
642    if (isNeg) { /* Small negative number: 0x14, -E, ~M */
643      dst.put(NEG_SMALL);
644    } else { /* Small positive number: 0x16, ~-E, M */
645      dst.put(POS_SMALL);
646    }
647
648    // normalize abs(val) to determine E
649    while (abs.compareTo(EN10) < 0) { abs = abs.movePointRight(8); e += 4; }
650    while (abs.compareTo(EN2) < 0) { abs = abs.movePointRight(2); e++; }
651
652    putVaruint64(dst, e, !isNeg); // encode appropriate E value.
653
654    // encode M by peeling off centimal digits, encoding x as 2x+1
655    startM = dst.getPosition();
656    // TODO: 18 is an arbitrary encoding limit. Reevaluate once we have a better handling of
657    // numeric scale.
658    for (int i = 0; i < 18 && abs.compareTo(BigDecimal.ZERO) != 0; i++) {
659      abs = abs.movePointRight(2);
660      d = abs.intValue();
661      dst.put((byte) ((2 * d + 1) & 0xff));
662      abs = abs.subtract(BigDecimal.valueOf(d));
663    }
664    // terminal digit should be 2x
665    a[offset + dst.getPosition() - 1] = (byte) (a[offset + dst.getPosition() - 1] & 0xfe);
666    if (isNeg) {
667      // negative values encoded as ~M
668      DESCENDING.apply(a, offset + startM, dst.getPosition() - startM);
669    }
670    return dst.getPosition() - start;
671  }
672
673  /**
674   * Encode the large magnitude floating point number {@code val} using
675   * the key encoding. The caller guarantees that {@code val} will be
676   * finite and abs(val) >= 1.0.
677   * <p>
678   * A floating point value is encoded as an integer exponent {@code E}
679   * and a mantissa {@code M}. The original value is equal to
680   * {@code (M * 100^E)}. {@code E} is set to the smallest value
681   * possible without making {@code M} greater than or equal to 1.0.
682   * </p>
683   * <p>
684   * Each centimal digit of the mantissa is stored in a byte. If the value of
685   * the centimal digit is {@code X} (hence {@code X>=0} and
686   * {@code X<=99}) then the byte value will be {@code 2*X+1} for
687   * every byte of the mantissa, except for the last byte which will be
688   * {@code 2*X+0}. The mantissa must be the minimum number of bytes
689   * necessary to represent the value; trailing {@code X==0} digits are
690   * omitted. This means that the mantissa will never contain a byte with the
691   * value {@code 0x00}.
692   * </p>
693   * <p>
694   * If {@code E > 10}, then this routine writes of {@code E} as a
695   * varint followed by the mantissa as described above. Otherwise, if
696   * {@code E <= 10}, this routine only writes the mantissa and leaves
697   * the {@code E} value to be encoded as part of the opening byte of the
698   * field by the calling function.
699   *
700   * <pre>
701   *   Encoding:  M       (if E<=10)
702   *              E M     (if E>10)
703   * </pre>
704   * </p>
705   * @param dst The destination to which encoded digits are written.
706   * @param val The value to encode.
707   * @return the number of bytes written.
708   */
709  private static int encodeNumericLarge(PositionedByteRange dst, BigDecimal val) {
710    // TODO: this can be done faster
711    BigDecimal abs = val.abs();
712    byte[] a = dst.getBytes();
713    boolean isNeg = val.signum() == -1;
714    final int start = dst.getPosition(), offset = dst.getOffset();
715    int e = 0, d, startM;
716
717    if (isNeg) { /* Large negative number: 0x08, ~E, ~M */
718      dst.put(NEG_LARGE);
719    } else { /* Large positive number: 0x22, E, M */
720      dst.put(POS_LARGE);
721    }
722
723    // normalize abs(val) to determine E
724    while (abs.compareTo(E32) >= 0 && e <= 350) { abs = abs.movePointLeft(32); e +=16; }
725    while (abs.compareTo(E8) >= 0 && e <= 350) { abs = abs.movePointLeft(8); e+= 4; }
726    while (abs.compareTo(BigDecimal.ONE) >= 0 && e <= 350) { abs = abs.movePointLeft(2); e++; }
727
728    // encode appropriate header byte and/or E value.
729    if (e > 10) { /* large number, write out {~,}E */
730      putVaruint64(dst, e, isNeg);
731    } else {
732      if (isNeg) { /* Medium negative number: 0x13-E, ~M */
733        dst.put(start, (byte) (NEG_MED_MAX - e));
734      } else { /* Medium positive number: 0x17+E, M */
735        dst.put(start, (byte) (POS_MED_MIN + e));
736      }
737    }
738
739    // encode M by peeling off centimal digits, encoding x as 2x+1
740    startM = dst.getPosition();
741    // TODO: 18 is an arbitrary encoding limit. Reevaluate once we have a better handling of
742    // numeric scale.
743    for (int i = 0; i < 18 && abs.compareTo(BigDecimal.ZERO) != 0; i++) {
744      abs = abs.movePointRight(2);
745      d = abs.intValue();
746      dst.put((byte) (2 * d + 1));
747      abs = abs.subtract(BigDecimal.valueOf(d));
748    }
749    // terminal digit should be 2x
750    a[offset + dst.getPosition() - 1] = (byte) (a[offset + dst.getPosition() - 1] & 0xfe);
751    if (isNeg) {
752      // negative values encoded as ~M
753      DESCENDING.apply(a, offset + startM, dst.getPosition() - startM);
754    }
755    return dst.getPosition() - start;
756  }
757
758  /**
759   * Encode a numerical value using the variable-length encoding.
760   * @param dst The destination to which encoded digits are written.
761   * @param val The value to encode.
762   * @param ord The {@link Order} to respect while encoding {@code val}.
763   * @return the number of bytes written.
764   */
765  public static int encodeNumeric(PositionedByteRange dst, long val, Order ord) {
766    return encodeNumeric(dst, BigDecimal.valueOf(val), ord);
767  }
768
769  /**
770   * Encode a numerical value using the variable-length encoding.
771   * @param dst The destination to which encoded digits are written.
772   * @param val The value to encode.
773   * @param ord The {@link Order} to respect while encoding {@code val}.
774   * @return the number of bytes written.
775   */
776  public static int encodeNumeric(PositionedByteRange dst, double val, Order ord) {
777    if (val == 0.0) {
778      dst.put(ord.apply(ZERO));
779      return 1;
780    }
781    if (Double.isNaN(val)) {
782      dst.put(ord.apply(NAN));
783      return 1;
784    }
785    if (val == Double.NEGATIVE_INFINITY) {
786      dst.put(ord.apply(NEG_INF));
787      return 1;
788    }
789    if (val == Double.POSITIVE_INFINITY) {
790      dst.put(ord.apply(POS_INF));
791      return 1;
792    }
793    return encodeNumeric(dst, BigDecimal.valueOf(val), ord);
794  }
795
796  /**
797   * Encode a numerical value using the variable-length encoding.
798   * @param dst The destination to which encoded digits are written.
799   * @param val The value to encode.
800   * @param ord The {@link Order} to respect while encoding {@code val}.
801   * @return the number of bytes written.
802   */
803  public static int encodeNumeric(PositionedByteRange dst, BigDecimal val, Order ord) {
804    final int len, offset = dst.getOffset(), start = dst.getPosition();
805    if (null == val) {
806      return encodeNull(dst, ord);
807    } else if (BigDecimal.ZERO.compareTo(val) == 0) {
808      dst.put(ord.apply(ZERO));
809      return 1;
810    }
811    BigDecimal abs = val.abs();
812    if (BigDecimal.ONE.compareTo(abs) <= 0) { // abs(v) >= 1.0
813      len = encodeNumericLarge(dst, normalize(val));
814    } else { // 1.0 > abs(v) >= 0.0
815      len = encodeNumericSmall(dst, normalize(val));
816    }
817    ord.apply(dst.getBytes(), offset + start, len);
818    return len;
819  }
820
821  /**
822   * Decode a {@link BigDecimal} from {@code src}. Assumes {@code src} encodes
823   * a value in Numeric encoding and is within the valid range of
824   * {@link BigDecimal} values. {@link BigDecimal} does not support {@code NaN}
825   * or {@code Infinte} values.
826   * @see #decodeNumericAsDouble(PositionedByteRange)
827   */
828  private static BigDecimal decodeNumericValue(PositionedByteRange src) {
829    final int e;
830    byte header = src.get();
831    boolean dsc = -1 == Integer.signum(header);
832    header = dsc ? DESCENDING.apply(header) : header;
833
834    if (header == NULL) return null;
835    if (header == NEG_LARGE) { /* Large negative number: 0x08, ~E, ~M */
836      e = (int) getVaruint64(src, !dsc);
837      return decodeSignificand(src, e, !dsc).negate();
838    }
839    if (header >= NEG_MED_MIN && header <= NEG_MED_MAX) {
840      /* Medium negative number: 0x13-E, ~M */
841      e = NEG_MED_MAX - header;
842      return decodeSignificand(src, e, !dsc).negate();
843    }
844    if (header == NEG_SMALL) { /* Small negative number: 0x14, -E, ~M */
845      e = (int) -getVaruint64(src, dsc);
846      return decodeSignificand(src, e, !dsc).negate();
847    }
848    if (header == ZERO) {
849      return BigDecimal.ZERO;
850    }
851    if (header == POS_SMALL) { /* Small positive number: 0x16, ~-E, M */
852      e = (int) -getVaruint64(src, !dsc);
853      return decodeSignificand(src, e, dsc);
854    }
855    if (header >= POS_MED_MIN && header <= POS_MED_MAX) {
856      /* Medium positive number: 0x17+E, M */
857      e = header - POS_MED_MIN;
858      return decodeSignificand(src, e, dsc);
859    }
860    if (header == POS_LARGE) { /* Large positive number: 0x22, E, M */
861      e = (int) getVaruint64(src, dsc);
862      return decodeSignificand(src, e, dsc);
863    }
864    throw unexpectedHeader(header);
865  }
866
867  /**
868   * Decode a primitive {@code double} value from the Numeric encoding. Numeric
869   * encoding is based on {@link BigDecimal}; in the event the encoded value is
870   * larger than can be represented in a {@code double}, this method performs
871   * an implicit narrowing conversion as described in
872   * {@link BigDecimal#doubleValue()}.
873   * @throws NullPointerException when the encoded value is {@code NULL}.
874   * @throws IllegalArgumentException when the encoded value is not a Numeric.
875   * @see #encodeNumeric(PositionedByteRange, double, Order)
876   * @see BigDecimal#doubleValue()
877   */
878  public static double decodeNumericAsDouble(PositionedByteRange src) {
879    // TODO: should an encoded NULL value throw unexpectedHeader() instead?
880    if (isNull(src)) {
881      throw new NullPointerException("A null value cannot be decoded to a double.");
882    }
883    if (isNumericNaN(src)) {
884      src.get();
885      return Double.NaN;
886    }
887    if (isNumericZero(src)) {
888      src.get();
889      return Double.valueOf(0.0);
890    }
891
892    byte header = -1 == Integer.signum(src.peek()) ? DESCENDING.apply(src.peek()) : src.peek();
893
894    if (header == NEG_INF) {
895      src.get();
896      return Double.NEGATIVE_INFINITY;
897    } else if (header == POS_INF) {
898      src.get();
899      return Double.POSITIVE_INFINITY;
900    } else {
901      return decodeNumericValue(src).doubleValue();
902    }
903  }
904
905  /**
906   * Decode a primitive {@code long} value from the Numeric encoding. Numeric
907   * encoding is based on {@link BigDecimal}; in the event the encoded value is
908   * larger than can be represented in a {@code long}, this method performs an
909   * implicit narrowing conversion as described in
910   * {@link BigDecimal#doubleValue()}.
911   * @throws NullPointerException when the encoded value is {@code NULL}.
912   * @throws IllegalArgumentException when the encoded value is not a Numeric.
913   * @see #encodeNumeric(PositionedByteRange, long, Order)
914   * @see BigDecimal#longValue()
915   */
916  public static long decodeNumericAsLong(PositionedByteRange src) {
917    // TODO: should an encoded NULL value throw unexpectedHeader() instead?
918    if (isNull(src)) throw new NullPointerException();
919    if (!isNumeric(src)) throw unexpectedHeader(src.peek());
920    if (isNumericNaN(src)) throw unexpectedHeader(src.peek());
921    if (isNumericInfinite(src)) throw unexpectedHeader(src.peek());
922
923    if (isNumericZero(src)) {
924      src.get();
925      return Long.valueOf(0);
926    }
927    return decodeNumericValue(src).longValue();
928  }
929
930  /**
931   * Decode a {@link BigDecimal} value from the variable-length encoding.
932   * @throws IllegalArgumentException when the encoded value is not a Numeric.
933   * @see #encodeNumeric(PositionedByteRange, BigDecimal, Order)
934   */
935  public static BigDecimal decodeNumericAsBigDecimal(PositionedByteRange src) {
936    if (isNull(src)) {
937      src.get();
938      return null;
939    }
940    if (!isNumeric(src)) throw unexpectedHeader(src.peek());
941    if (isNumericNaN(src)) throw unexpectedHeader(src.peek());
942    if (isNumericInfinite(src)) throw unexpectedHeader(src.peek());
943    return decodeNumericValue(src);
944  }
945
946  /**
947   * Encode a String value. String encoding is 0x00-terminated and so it does
948   * not support {@code \u0000} codepoints in the value.
949   * @param dst The destination to which the encoded value is written.
950   * @param val The value to encode.
951   * @param ord The {@link Order} to respect while encoding {@code val}.
952   * @return the number of bytes written.
953   * @throws IllegalArgumentException when {@code val} contains a {@code \u0000}.
954   */
955  public static int encodeString(PositionedByteRange dst, String val, Order ord) {
956    if (null == val) {
957      return encodeNull(dst, ord);
958    }
959    if (val.contains("\u0000"))
960      throw new IllegalArgumentException("Cannot encode String values containing '\\u0000'");
961    final int offset = dst.getOffset(), start = dst.getPosition();
962    dst.put(TEXT);
963    // TODO: is there no way to decode into dst directly?
964    dst.put(val.getBytes(UTF8));
965    dst.put(TERM);
966    ord.apply(dst.getBytes(), offset + start, dst.getPosition() - start);
967    return dst.getPosition() - start;
968  }
969
970  /**
971   * Decode a String value.
972   */
973  public static String decodeString(PositionedByteRange src) {
974    final byte header = src.get();
975    if (header == NULL || header == DESCENDING.apply(NULL))
976      return null;
977    assert header == TEXT || header == DESCENDING.apply(TEXT);
978    Order ord = header == TEXT ? ASCENDING : DESCENDING;
979    byte[] a = src.getBytes();
980    final int offset = src.getOffset(), start = src.getPosition();
981    final byte terminator = ord.apply(TERM);
982    int rawStartPos = offset + start, rawTermPos = rawStartPos;
983    for (; a[rawTermPos] != terminator; rawTermPos++)
984      ;
985    src.setPosition(rawTermPos - offset + 1); // advance position to TERM + 1
986    if (DESCENDING == ord) {
987      // make a copy so that we don't disturb encoded value with ord.
988      byte[] copy = new byte[rawTermPos - rawStartPos];
989      System.arraycopy(a, rawStartPos, copy, 0, copy.length);
990      ord.apply(copy);
991      return new String(copy, UTF8);
992    } else {
993      return new String(a, rawStartPos, rawTermPos - rawStartPos, UTF8);
994    }
995  }
996
997  /**
998   * Calculate the expected BlobVar encoded length based on unencoded length.
999   */
1000  public static int blobVarEncodedLength(int len) {
1001    if (0 == len)
1002      return 2; // 1-byte header + 1-byte terminator
1003    else
1004      return (int)
1005          Math.ceil(
1006            (len * 8) // 8-bits per input byte
1007            / 7.0)    // 7-bits of input data per encoded byte, rounded up
1008          + 1;        // + 1-byte header
1009  }
1010
1011  /**
1012   * Calculate the expected BlobVar decoded length based on encoded length.
1013   */
1014  @InterfaceAudience.Private
1015  static int blobVarDecodedLength(int len) {
1016    return
1017        ((len
1018          - 1) // 1-byte header
1019          * 7) // 7-bits of payload per encoded byte
1020          / 8; // 8-bits per byte
1021  }
1022
1023  /**
1024   * Encode a Blob value using a modified varint encoding scheme.
1025   * <p>
1026   * This format encodes a byte[] value such that no limitations on the input
1027   * value are imposed. The first byte encodes the encoding scheme that
1028   * follows, {@link #BLOB_VAR}. Each encoded byte thereafter consists of a
1029   * header bit followed by 7 bits of payload. A header bit of '1' indicates
1030   * continuation of the encoding. A header bit of '0' indicates this byte
1031   * contains the last of the payload. An empty input value is encoded as the
1032   * header byte immediately followed by a termination byte {@code 0x00}. This
1033   * is not ambiguous with the encoded value of {@code []}, which results in
1034   * {@code [0x80, 0x00]}.
1035   * </p>
1036   * @return the number of bytes written.
1037   */
1038  public static int encodeBlobVar(PositionedByteRange dst, byte[] val, int voff, int vlen,
1039      Order ord) {
1040    if (null == val) {
1041      return encodeNull(dst, ord);
1042    }
1043    // Empty value is null-terminated. All other values are encoded as 7-bits per byte.
1044    assert dst.getRemaining() >= blobVarEncodedLength(vlen) : "buffer overflow expected.";
1045    final int offset = dst.getOffset(), start = dst.getPosition();
1046    dst.put(BLOB_VAR);
1047    if (0 == vlen) {
1048      dst.put(TERM);
1049    } else {
1050      byte s = 1, t = 0;
1051      for (int i = voff; i < vlen; i++) {
1052        dst.put((byte) (0x80 | t | ((val[i] & 0xff) >>> s)));
1053        if (s < 7) {
1054          t = (byte) (val[i] << (7 - s));
1055          s++;
1056        } else {
1057          dst.put((byte) (0x80 | val[i]));
1058          s = 1;
1059          t = 0;
1060        }
1061      }
1062      if (s > 1) {
1063        dst.put((byte) (0x7f & t));
1064      } else {
1065        dst.getBytes()[offset + dst.getPosition() - 1] =
1066          (byte) (dst.getBytes()[offset + dst.getPosition() - 1] & 0x7f);
1067      }
1068    }
1069    ord.apply(dst.getBytes(), offset + start, dst.getPosition() - start);
1070    return dst.getPosition() - start;
1071  }
1072
1073  /**
1074   * Encode a blob value using a modified varint encoding scheme.
1075   * @return the number of bytes written.
1076   * @see #encodeBlobVar(PositionedByteRange, byte[], int, int, Order)
1077   */
1078  public static int encodeBlobVar(PositionedByteRange dst, byte[] val, Order ord) {
1079    return encodeBlobVar(dst, val, 0, null != val ? val.length : 0, ord);
1080  }
1081
1082  /**
1083   * Decode a blob value that was encoded using BlobVar encoding.
1084   */
1085  public static byte[] decodeBlobVar(PositionedByteRange src) {
1086    final byte header = src.get();
1087    if (header == NULL || header == DESCENDING.apply(NULL)) {
1088      return null;
1089    }
1090    assert header == BLOB_VAR || header == DESCENDING.apply(BLOB_VAR);
1091    Order ord = BLOB_VAR == header ? ASCENDING : DESCENDING;
1092    if (src.peek() == ord.apply(TERM)) {
1093      // skip empty input buffer.
1094      src.get();
1095      return new byte[0];
1096    }
1097    final int offset = src.getOffset(), start = src.getPosition();
1098    int end;
1099    byte[] a = src.getBytes();
1100    for (end = start; (byte) (ord.apply(a[offset + end]) & 0x80) != TERM; end++)
1101      ;
1102    end++; // increment end to 1-past last byte
1103    // create ret buffer using length of encoded data + 1 (header byte)
1104    PositionedByteRange ret = new SimplePositionedMutableByteRange(blobVarDecodedLength(end - start
1105        + 1));
1106    int s = 6;
1107    byte t = (byte) ((ord.apply(a[offset + start]) << 1) & 0xff);
1108    for (int i = start + 1; i < end; i++) {
1109      if (s == 7) {
1110        ret.put((byte) (t | (ord.apply(a[offset + i]) & 0x7f)));
1111        i++;
1112               // explicitly reset t -- clean up overflow buffer after decoding
1113               // a full cycle and retain assertion condition below. This happens
1114        t = 0; // when the LSB in the last encoded byte is 1. (HBASE-9893)
1115      } else {
1116        ret.put((byte) (t | ((ord.apply(a[offset + i]) & 0x7f) >>> s)));
1117      }
1118      if (i == end) break;
1119      t = (byte) ((ord.apply(a[offset + i]) << (8 - s)) & 0xff);
1120      s = s == 1 ? 7 : s - 1;
1121    }
1122    src.setPosition(end);
1123    assert t == 0 : "Unexpected bits remaining after decoding blob.";
1124    assert ret.getPosition() == ret.getLength() : "Allocated unnecessarily large return buffer.";
1125    return ret.getBytes();
1126  }
1127
1128  /**
1129   * Encode a Blob value as a byte-for-byte copy. BlobCopy encoding in
1130   * DESCENDING order is NULL terminated so as to preserve proper sorting of
1131   * {@code []} and so it does not support {@code 0x00} in the value.
1132   * @return the number of bytes written.
1133   * @throws IllegalArgumentException when {@code ord} is DESCENDING and
1134   *    {@code val} contains a {@code 0x00} byte.
1135   */
1136  public static int encodeBlobCopy(PositionedByteRange dst, byte[] val, int voff, int vlen,
1137      Order ord) {
1138    if (null == val) {
1139      encodeNull(dst, ord);
1140      if (ASCENDING == ord) return 1;
1141      else {
1142        // DESCENDING ordered BlobCopy requires a termination bit to preserve
1143        // sort-order semantics of null values.
1144        dst.put(ord.apply(TERM));
1145        return 2;
1146      }
1147    }
1148    // Blobs as final entry in a compound key are written unencoded.
1149    assert dst.getRemaining() >= vlen + (ASCENDING == ord ? 1 : 2);
1150    if (DESCENDING == ord) {
1151      for (int i = 0; i < vlen; i++) {
1152        if (TERM == val[voff + i]) {
1153          throw new IllegalArgumentException("0x00 bytes not permitted in value.");
1154        }
1155      }
1156    }
1157    final int offset = dst.getOffset(), start = dst.getPosition();
1158    dst.put(BLOB_COPY);
1159    dst.put(val, voff, vlen);
1160    // DESCENDING ordered BlobCopy requires a termination bit to preserve
1161    // sort-order semantics of null values.
1162    if (DESCENDING == ord) dst.put(TERM);
1163    ord.apply(dst.getBytes(), offset + start, dst.getPosition() - start);
1164    return dst.getPosition() - start;
1165  }
1166
1167  /**
1168   * Encode a Blob value as a byte-for-byte copy. BlobCopy encoding in
1169   * DESCENDING order is NULL terminated so as to preserve proper sorting of
1170   * {@code []} and so it does not support {@code 0x00} in the value.
1171   * @return the number of bytes written.
1172   * @throws IllegalArgumentException when {@code ord} is DESCENDING and
1173   *    {@code val} contains a {@code 0x00} byte.
1174   * @see #encodeBlobCopy(PositionedByteRange, byte[], int, int, Order)
1175   */
1176  public static int encodeBlobCopy(PositionedByteRange dst, byte[] val, Order ord) {
1177    return encodeBlobCopy(dst, val, 0, null != val ? val.length : 0, ord);
1178  }
1179
1180  /**
1181   * Decode a Blob value, byte-for-byte copy.
1182   * @see #encodeBlobCopy(PositionedByteRange, byte[], int, int, Order)
1183   */
1184  public static byte[] decodeBlobCopy(PositionedByteRange src) {
1185    byte header = src.get();
1186    if (header == NULL || header == DESCENDING.apply(NULL)) {
1187      return null;
1188    }
1189    assert header == BLOB_COPY || header == DESCENDING.apply(BLOB_COPY);
1190    Order ord = header == BLOB_COPY ? ASCENDING : DESCENDING;
1191    final int length = src.getRemaining() - (ASCENDING == ord ? 0 : 1);
1192    byte[] ret = new byte[length];
1193    src.get(ret);
1194    ord.apply(ret, 0, ret.length);
1195    // DESCENDING ordered BlobCopy requires a termination bit to preserve
1196    // sort-order semantics of null values.
1197    if (DESCENDING == ord) src.get();
1198    return ret;
1199  }
1200
1201  /**
1202   * Encode a null value.
1203   * @param dst The destination to which encoded digits are written.
1204   * @param ord The {@link Order} to respect while encoding {@code val}.
1205   * @return the number of bytes written.
1206   */
1207  public static int encodeNull(PositionedByteRange dst, Order ord) {
1208    dst.put(ord.apply(NULL));
1209    return 1;
1210  }
1211
1212  /**
1213   * Encode an {@code int8} value using the fixed-length encoding.
1214   * @return the number of bytes written.
1215   * @see #encodeInt64(PositionedByteRange, long, Order)
1216   * @see #decodeInt8(PositionedByteRange)
1217   */
1218  public static int encodeInt8(PositionedByteRange dst, byte val, Order ord) {
1219    final int offset = dst.getOffset(), start = dst.getPosition();
1220    dst.put(FIXED_INT8)
1221       .put((byte) (val ^ 0x80));
1222    ord.apply(dst.getBytes(), offset + start, 2);
1223    return 2;
1224  }
1225
1226  /**
1227   * Decode an {@code int8} value.
1228   * @see #encodeInt8(PositionedByteRange, byte, Order)
1229   */
1230  public static byte decodeInt8(PositionedByteRange src) {
1231    final byte header = src.get();
1232    assert header == FIXED_INT8 || header == DESCENDING.apply(FIXED_INT8);
1233    Order ord = header == FIXED_INT8 ? ASCENDING : DESCENDING;
1234    return (byte)((ord.apply(src.get()) ^ 0x80) & 0xff);
1235  }
1236
1237  /**
1238   * Encode an {@code int16} value using the fixed-length encoding.
1239   * @return the number of bytes written.
1240   * @see #encodeInt64(PositionedByteRange, long, Order)
1241   * @see #decodeInt16(PositionedByteRange)
1242   */
1243  public static int encodeInt16(PositionedByteRange dst, short val, Order ord) {
1244    final int offset = dst.getOffset(), start = dst.getPosition();
1245    dst.put(FIXED_INT16)
1246       .put((byte) ((val >> 8) ^ 0x80))
1247       .put((byte) val);
1248    ord.apply(dst.getBytes(), offset + start, 3);
1249    return 3;
1250  }
1251
1252  /**
1253   * Decode an {@code int16} value.
1254   * @see #encodeInt16(PositionedByteRange, short, Order)
1255   */
1256  public static short decodeInt16(PositionedByteRange src) {
1257    final byte header = src.get();
1258    assert header == FIXED_INT16 || header == DESCENDING.apply(FIXED_INT16);
1259    Order ord = header == FIXED_INT16 ? ASCENDING : DESCENDING;
1260    short val = (short) ((ord.apply(src.get()) ^ 0x80) & 0xff);
1261    val = (short) ((val << 8) + (ord.apply(src.get()) & 0xff));
1262    return val;
1263  }
1264
1265  /**
1266   * Encode an {@code int32} value using the fixed-length encoding.
1267   * @return the number of bytes written.
1268   * @see #encodeInt64(PositionedByteRange, long, Order)
1269   * @see #decodeInt32(PositionedByteRange)
1270   */
1271  public static int encodeInt32(PositionedByteRange dst, int val, Order ord) {
1272    final int offset = dst.getOffset(), start = dst.getPosition();
1273    dst.put(FIXED_INT32)
1274        .put((byte) ((val >> 24) ^ 0x80))
1275        .put((byte) (val >> 16))
1276        .put((byte) (val >> 8))
1277        .put((byte) val);
1278    ord.apply(dst.getBytes(), offset + start, 5);
1279    return 5;
1280  }
1281
1282  /**
1283   * Decode an {@code int32} value.
1284   * @see #encodeInt32(PositionedByteRange, int, Order)
1285   */
1286  public static int decodeInt32(PositionedByteRange src) {
1287    final byte header = src.get();
1288    assert header == FIXED_INT32 || header == DESCENDING.apply(FIXED_INT32);
1289    Order ord = header == FIXED_INT32 ? ASCENDING : DESCENDING;
1290    int val = (ord.apply(src.get()) ^ 0x80) & 0xff;
1291    for (int i = 1; i < 4; i++) {
1292      val = (val << 8) + (ord.apply(src.get()) & 0xff);
1293    }
1294    return val;
1295  }
1296
1297  /**
1298   * Encode an {@code int64} value using the fixed-length encoding.
1299   * <p>
1300   * This format ensures that all longs sort in their natural order, as they
1301   * would sort when using signed long comparison.
1302   * </p>
1303   * <p>
1304   * All Longs are serialized to an 8-byte, fixed-width sortable byte format.
1305   * Serialization is performed by inverting the integer sign bit and writing
1306   * the resulting bytes to the byte array in big endian order. The encoded
1307   * value is prefixed by the {@link #FIXED_INT64} header byte. This encoding
1308   * is designed to handle java language primitives and so Null values are NOT
1309   * supported by this implementation.
1310   * </p>
1311   * <p>
1312   * For example:
1313   * </p>
1314   * <pre>
1315   * Input:   0x0000000000000005 (5)
1316   * Result:  0x288000000000000005
1317   *
1318   * Input:   0xfffffffffffffffb (-4)
1319   * Result:  0x280000000000000004
1320   *
1321   * Input:   0x7fffffffffffffff (Long.MAX_VALUE)
1322   * Result:  0x28ffffffffffffffff
1323   *
1324   * Input:   0x8000000000000000 (Long.MIN_VALUE)
1325   * Result:  0x287fffffffffffffff
1326   * </pre>
1327   * <p>
1328   * This encoding format, and much of this documentation string, is based on
1329   * Orderly's {@code FixedIntWritableRowKey}.
1330   * </p>
1331   * @return the number of bytes written.
1332   * @see #decodeInt64(PositionedByteRange)
1333   */
1334  public static int encodeInt64(PositionedByteRange dst, long val, Order ord) {
1335    final int offset = dst.getOffset(), start = dst.getPosition();
1336    dst.put(FIXED_INT64)
1337       .put((byte) ((val >> 56) ^ 0x80))
1338       .put((byte) (val >> 48))
1339       .put((byte) (val >> 40))
1340       .put((byte) (val >> 32))
1341       .put((byte) (val >> 24))
1342       .put((byte) (val >> 16))
1343       .put((byte) (val >> 8))
1344       .put((byte) val);
1345    ord.apply(dst.getBytes(), offset + start, 9);
1346    return 9;
1347  }
1348
1349  /**
1350   * Decode an {@code int64} value.
1351   * @see #encodeInt64(PositionedByteRange, long, Order)
1352   */
1353  public static long decodeInt64(PositionedByteRange src) {
1354    final byte header = src.get();
1355    assert header == FIXED_INT64 || header == DESCENDING.apply(FIXED_INT64);
1356    Order ord = header == FIXED_INT64 ? ASCENDING : DESCENDING;
1357    long val = (ord.apply(src.get()) ^ 0x80) & 0xff;
1358    for (int i = 1; i < 8; i++) {
1359      val = (val << 8) + (ord.apply(src.get()) & 0xff);
1360    }
1361    return val;
1362  }
1363
1364  /**
1365   * Encode a 32-bit floating point value using the fixed-length encoding.
1366   * Encoding format is described at length in
1367   * {@link #encodeFloat64(PositionedByteRange, double, Order)}.
1368   * @return the number of bytes written.
1369   * @see #decodeFloat32(PositionedByteRange)
1370   * @see #encodeFloat64(PositionedByteRange, double, Order)
1371   */
1372  public static int encodeFloat32(PositionedByteRange dst, float val, Order ord) {
1373    final int offset = dst.getOffset(), start = dst.getPosition();
1374    int i = Float.floatToIntBits(val);
1375    i ^= ((i >> (Integer.SIZE - 1)) | Integer.MIN_VALUE);
1376    dst.put(FIXED_FLOAT32)
1377        .put((byte) (i >> 24))
1378        .put((byte) (i >> 16))
1379        .put((byte) (i >> 8))
1380        .put((byte) i);
1381    ord.apply(dst.getBytes(), offset + start, 5);
1382    return 5;
1383  }
1384
1385  /**
1386   * Decode a 32-bit floating point value using the fixed-length encoding.
1387   * @see #encodeFloat32(PositionedByteRange, float, Order)
1388   */
1389  public static float decodeFloat32(PositionedByteRange src) {
1390    final byte header = src.get();
1391    assert header == FIXED_FLOAT32 || header == DESCENDING.apply(FIXED_FLOAT32);
1392    Order ord = header == FIXED_FLOAT32 ? ASCENDING : DESCENDING;
1393    int val = ord.apply(src.get()) & 0xff;
1394    for (int i = 1; i < 4; i++) {
1395      val = (val << 8) + (ord.apply(src.get()) & 0xff);
1396    }
1397    val ^= (~val >> (Integer.SIZE - 1)) | Integer.MIN_VALUE;
1398    return Float.intBitsToFloat(val);
1399  }
1400
1401  /**
1402   * Encode a 64-bit floating point value using the fixed-length encoding.
1403   * <p>
1404   * This format ensures the following total ordering of floating point
1405   * values: Double.NEGATIVE_INFINITY &lt; -Double.MAX_VALUE &lt; ... &lt;
1406   * -Double.MIN_VALUE &lt; -0.0 &lt; +0.0; &lt; Double.MIN_VALUE &lt; ...
1407   * &lt; Double.MAX_VALUE &lt; Double.POSITIVE_INFINITY &lt; Double.NaN
1408   * </p>
1409   * <p>
1410   * Floating point numbers are encoded as specified in IEEE 754. A 64-bit
1411   * double precision float consists of a sign bit, 11-bit unsigned exponent
1412   * encoded in offset-1023 notation, and a 52-bit significand. The format is
1413   * described further in the <a
1414   * href="http://en.wikipedia.org/wiki/Double_precision"> Double Precision
1415   * Floating Point Wikipedia page</a> </p>
1416   * <p>
1417   * The value of a normal float is -1 <sup>sign bit</sup> &times;
1418   * 2<sup>exponent - 1023</sup> &times; 1.significand
1419   * </p>
1420   * <p>
1421   * The IEE754 floating point format already preserves sort ordering for
1422   * positive floating point numbers when the raw bytes are compared in most
1423   * significant byte order. This is discussed further at <a href=
1424   * "http://www.cygnus-software.com/papers/comparingfloats/comparingfloats.htm"
1425   * > http://www.cygnus-software.com/papers/comparingfloats/comparingfloats.
1426   * htm</a>
1427   * </p>
1428   * <p>
1429   * Thus, we need only ensure that negative numbers sort in the the exact
1430   * opposite order as positive numbers (so that say, negative infinity is
1431   * less than negative 1), and that all negative numbers compare less than
1432   * any positive number. To accomplish this, we invert the sign bit of all
1433   * floating point numbers, and we also invert the exponent and significand
1434   * bits if the floating point number was negative.
1435   * </p>
1436   * <p>
1437   * More specifically, we first store the floating point bits into a 64-bit
1438   * long {@code l} using {@link Double#doubleToLongBits}. This method
1439   * collapses all NaNs into a single, canonical NaN value but otherwise
1440   * leaves the bits unchanged. We then compute
1441   * </p>
1442   * <pre>
1443   * l &circ;= (l &gt;&gt; (Long.SIZE - 1)) | Long.MIN_SIZE
1444   * </pre>
1445   * <p>
1446   * which inverts the sign bit and XOR's all other bits with the sign bit
1447   * itself. Comparing the raw bytes of {@code l} in most significant
1448   * byte order is equivalent to performing a double precision floating point
1449   * comparison on the underlying bits (ignoring NaN comparisons, as NaNs
1450   * don't compare equal to anything when performing floating point
1451   * comparisons).
1452   * </p>
1453   * <p>
1454   * The resulting long integer is then converted into a byte array by
1455   * serializing the long one byte at a time in most significant byte order.
1456   * The serialized integer is prefixed by a single header byte. All
1457   * serialized values are 9 bytes in length.
1458   * </p>
1459   * <p>
1460   * This encoding format, and much of this highly detailed documentation
1461   * string, is based on Orderly's {@code DoubleWritableRowKey}.
1462   * </p>
1463   * @return the number of bytes written.
1464   * @see #decodeFloat64(PositionedByteRange)
1465   */
1466  public static int encodeFloat64(PositionedByteRange dst, double val, Order ord) {
1467    final int offset = dst.getOffset(), start = dst.getPosition();
1468    long lng = Double.doubleToLongBits(val);
1469    lng ^= ((lng >> (Long.SIZE - 1)) | Long.MIN_VALUE);
1470    dst.put(FIXED_FLOAT64)
1471        .put((byte) (lng >> 56))
1472        .put((byte) (lng >> 48))
1473        .put((byte) (lng >> 40))
1474        .put((byte) (lng >> 32))
1475        .put((byte) (lng >> 24))
1476        .put((byte) (lng >> 16))
1477        .put((byte) (lng >> 8))
1478        .put((byte) lng);
1479    ord.apply(dst.getBytes(), offset + start, 9);
1480    return 9;
1481  }
1482
1483  /**
1484   * Decode a 64-bit floating point value using the fixed-length encoding.
1485   * @see #encodeFloat64(PositionedByteRange, double, Order)
1486   */
1487  public static double decodeFloat64(PositionedByteRange src) {
1488    final byte header = src.get();
1489    assert header == FIXED_FLOAT64 || header == DESCENDING.apply(FIXED_FLOAT64);
1490    Order ord = header == FIXED_FLOAT64 ? ASCENDING : DESCENDING;
1491    long val = ord.apply(src.get()) & 0xff;
1492    for (int i = 1; i < 8; i++) {
1493      val = (val << 8) + (ord.apply(src.get()) & 0xff);
1494    }
1495    val ^= (~val >> (Long.SIZE - 1)) | Long.MIN_VALUE;
1496    return Double.longBitsToDouble(val);
1497  }
1498
1499  /**
1500   * Returns true when {@code src} appears to be positioned an encoded value,
1501   * false otherwise.
1502   */
1503  public static boolean isEncodedValue(PositionedByteRange src) {
1504    return isNull(src) || isNumeric(src) || isFixedInt8(src) || isFixedInt16(src)
1505        || isFixedInt32(src) || isFixedInt64(src)
1506        || isFixedFloat32(src) || isFixedFloat64(src) || isText(src) || isBlobCopy(src)
1507        || isBlobVar(src);
1508  }
1509
1510  /**
1511   * Return true when the next encoded value in {@code src} is null, false
1512   * otherwise.
1513   */
1514  public static boolean isNull(PositionedByteRange src) {
1515    return NULL ==
1516        (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1517  }
1518
1519  /**
1520   * Return true when the next encoded value in {@code src} uses Numeric
1521   * encoding, false otherwise. {@code NaN}, {@code +/-Inf} are valid Numeric
1522   * values.
1523   */
1524  public static boolean isNumeric(PositionedByteRange src) {
1525    byte x = (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1526    return x >= NEG_INF && x <= NAN;
1527  }
1528
1529  /**
1530   * Return true when the next encoded value in {@code src} uses Numeric
1531   * encoding and is {@code Infinite}, false otherwise.
1532   */
1533  public static boolean isNumericInfinite(PositionedByteRange src) {
1534    byte x = (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1535    return NEG_INF == x || POS_INF == x;
1536  }
1537
1538  /**
1539   * Return true when the next encoded value in {@code src} uses Numeric
1540   * encoding and is {@code NaN}, false otherwise.
1541   */
1542  public static boolean isNumericNaN(PositionedByteRange src) {
1543    return NAN == (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1544  }
1545
1546  /**
1547   * Return true when the next encoded value in {@code src} uses Numeric
1548   * encoding and is {@code 0}, false otherwise.
1549   */
1550  public static boolean isNumericZero(PositionedByteRange src) {
1551    return ZERO ==
1552        (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1553  }
1554
1555  /**
1556   * Return true when the next encoded value in {@code src} uses fixed-width
1557   * Int8 encoding, false otherwise.
1558   */
1559  public static boolean isFixedInt8(PositionedByteRange src) {
1560    return FIXED_INT8 ==
1561        (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1562  }
1563
1564  /**
1565   * Return true when the next encoded value in {@code src} uses fixed-width
1566   * Int16 encoding, false otherwise.
1567   */
1568  public static boolean isFixedInt16(PositionedByteRange src) {
1569    return FIXED_INT16 ==
1570        (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1571  }
1572
1573  /**
1574   * Return true when the next encoded value in {@code src} uses fixed-width
1575   * Int32 encoding, false otherwise.
1576   */
1577  public static boolean isFixedInt32(PositionedByteRange src) {
1578    return FIXED_INT32 ==
1579        (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1580  }
1581
1582  /**
1583   * Return true when the next encoded value in {@code src} uses fixed-width
1584   * Int64 encoding, false otherwise.
1585   */
1586  public static boolean isFixedInt64(PositionedByteRange src) {
1587    return FIXED_INT64 ==
1588        (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1589  }
1590
1591  /**
1592   * Return true when the next encoded value in {@code src} uses fixed-width
1593   * Float32 encoding, false otherwise.
1594   */
1595  public static boolean isFixedFloat32(PositionedByteRange src) {
1596    return FIXED_FLOAT32 ==
1597        (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1598  }
1599
1600  /**
1601   * Return true when the next encoded value in {@code src} uses fixed-width
1602   * Float64 encoding, false otherwise.
1603   */
1604  public static boolean isFixedFloat64(PositionedByteRange src) {
1605    return FIXED_FLOAT64 ==
1606        (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1607  }
1608
1609  /**
1610   * Return true when the next encoded value in {@code src} uses Text encoding,
1611   * false otherwise.
1612   */
1613  public static boolean isText(PositionedByteRange src) {
1614    return TEXT ==
1615        (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1616  }
1617
1618  /**
1619   * Return true when the next encoded value in {@code src} uses BlobVar
1620   * encoding, false otherwise.
1621   */
1622  public static boolean isBlobVar(PositionedByteRange src) {
1623    return BLOB_VAR ==
1624        (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1625  }
1626
1627  /**
1628   * Return true when the next encoded value in {@code src} uses BlobCopy
1629   * encoding, false otherwise.
1630   */
1631  public static boolean isBlobCopy(PositionedByteRange src) {
1632    return BLOB_COPY ==
1633        (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1634  }
1635
1636  /**
1637   * Skip {@code buff}'s position forward over one encoded value.
1638   * @return number of bytes skipped.
1639   */
1640  public static int skip(PositionedByteRange src) {
1641    final int start = src.getPosition();
1642    byte header = src.get();
1643    Order ord = (-1 == Integer.signum(header)) ? DESCENDING : ASCENDING;
1644    header = ord.apply(header);
1645
1646    switch (header) {
1647      case NULL:
1648      case NEG_INF:
1649        return 1;
1650      case NEG_LARGE: /* Large negative number: 0x08, ~E, ~M */
1651        skipVaruint64(src, DESCENDING != ord);
1652        skipSignificand(src, DESCENDING != ord);
1653        return src.getPosition() - start;
1654      case NEG_MED_MIN: /* Medium negative number: 0x13-E, ~M */
1655      case NEG_MED_MIN + 0x01:
1656      case NEG_MED_MIN + 0x02:
1657      case NEG_MED_MIN + 0x03:
1658      case NEG_MED_MIN + 0x04:
1659      case NEG_MED_MIN + 0x05:
1660      case NEG_MED_MIN + 0x06:
1661      case NEG_MED_MIN + 0x07:
1662      case NEG_MED_MIN + 0x08:
1663      case NEG_MED_MIN + 0x09:
1664      case NEG_MED_MAX:
1665        skipSignificand(src, DESCENDING != ord);
1666        return src.getPosition() - start;
1667      case NEG_SMALL: /* Small negative number: 0x14, -E, ~M */
1668        skipVaruint64(src, DESCENDING == ord);
1669        skipSignificand(src, DESCENDING != ord);
1670        return src.getPosition() - start;
1671      case ZERO:
1672        return 1;
1673      case POS_SMALL: /* Small positive number: 0x16, ~-E, M */
1674        skipVaruint64(src, DESCENDING != ord);
1675        skipSignificand(src, DESCENDING == ord);
1676        return src.getPosition() - start;
1677      case POS_MED_MIN: /* Medium positive number: 0x17+E, M */
1678      case POS_MED_MIN + 0x01:
1679      case POS_MED_MIN + 0x02:
1680      case POS_MED_MIN + 0x03:
1681      case POS_MED_MIN + 0x04:
1682      case POS_MED_MIN + 0x05:
1683      case POS_MED_MIN + 0x06:
1684      case POS_MED_MIN + 0x07:
1685      case POS_MED_MIN + 0x08:
1686      case POS_MED_MIN + 0x09:
1687      case POS_MED_MAX:
1688        skipSignificand(src, DESCENDING == ord);
1689        return src.getPosition() - start;
1690      case POS_LARGE: /* Large positive number: 0x22, E, M */
1691        skipVaruint64(src, DESCENDING == ord);
1692        skipSignificand(src, DESCENDING == ord);
1693        return src.getPosition() - start;
1694      case POS_INF:
1695        return 1;
1696      case NAN:
1697        return 1;
1698      case FIXED_INT8:
1699        src.setPosition(src.getPosition() + 1);
1700        return src.getPosition() - start;
1701      case FIXED_INT16:
1702        src.setPosition(src.getPosition() + 2);
1703        return src.getPosition() - start;
1704      case FIXED_INT32:
1705        src.setPosition(src.getPosition() + 4);
1706        return src.getPosition() - start;
1707      case FIXED_INT64:
1708        src.setPosition(src.getPosition() + 8);
1709        return src.getPosition() - start;
1710      case FIXED_FLOAT32:
1711        src.setPosition(src.getPosition() + 4);
1712        return src.getPosition() - start;
1713      case FIXED_FLOAT64:
1714        src.setPosition(src.getPosition() + 8);
1715        return src.getPosition() - start;
1716      case TEXT:
1717        // for null-terminated values, skip to the end.
1718        do {
1719          header = ord.apply(src.get());
1720        } while (header != TERM);
1721        return src.getPosition() - start;
1722      case BLOB_VAR:
1723        // read until we find a 0 in the MSB
1724        do {
1725          header = ord.apply(src.get());
1726        } while ((byte) (header & 0x80) != TERM);
1727        return src.getPosition() - start;
1728      case BLOB_COPY:
1729        if (Order.DESCENDING == ord) {
1730          // if descending, read to termination byte.
1731          do {
1732            header = ord.apply(src.get());
1733          } while (header != TERM);
1734          return src.getPosition() - start;
1735        } else {
1736          // otherwise, just skip to the end.
1737          src.setPosition(src.getLength());
1738          return src.getPosition() - start;
1739        }
1740      default:
1741        throw unexpectedHeader(header);
1742    }
1743  }
1744
1745  /**
1746   * Return the number of encoded entries remaining in {@code buff}. The
1747   * state of {@code buff} is not modified through use of this method.
1748   */
1749  public static int length(PositionedByteRange buff) {
1750    PositionedByteRange b =
1751        new SimplePositionedMutableByteRange(buff.getBytes(), buff.getOffset(), buff.getLength());
1752    b.setPosition(buff.getPosition());
1753    int cnt = 0;
1754    for (; isEncodedValue(b); skip(b), cnt++)
1755      ;
1756    return cnt;
1757  }
1758}