001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import static org.apache.hadoop.hbase.util.Order.ASCENDING;
021import static org.apache.hadoop.hbase.util.Order.DESCENDING;
022
023import java.math.BigDecimal;
024import java.math.BigInteger;
025import java.math.MathContext;
026import java.math.RoundingMode;
027import java.nio.charset.Charset;
028
029import org.apache.yetus.audience.InterfaceAudience;
030
031import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
032
033/**
034 * Utility class that handles ordered byte arrays. That is, unlike
035 * {@link Bytes}, these methods produce byte arrays which maintain the sort
036 * order of the original values.
037 * <h3>Encoding Format summary</h3>
038 * <p>
039 * Each value is encoded as one or more bytes. The first byte of the encoding,
040 * its meaning, and a terse description of the bytes that follow is given by
041 * the following table:
042 * </p>
043 * <table summary="Encodings">
044 * <tr><th>Content Type</th><th>Encoding</th></tr>
045 * <tr><td>NULL</td><td>0x05</td></tr>
046 * <tr><td>negative infinity</td><td>0x07</td></tr>
047 * <tr><td>negative large</td><td>0x08, ~E, ~M</td></tr>
048 * <tr><td>negative medium</td><td>0x13-E, ~M</td></tr>
049 * <tr><td>negative small</td><td>0x14, -E, ~M</td></tr>
050 * <tr><td>zero</td><td>0x15</td></tr>
051 * <tr><td>positive small</td><td>0x16, ~-E, M</td></tr>
052 * <tr><td>positive medium</td><td>0x17+E, M</td></tr>
053 * <tr><td>positive large</td><td>0x22, E, M</td></tr>
054 * <tr><td>positive infinity</td><td>0x23</td></tr>
055 * <tr><td>NaN</td><td>0x25</td></tr>
056 * <tr><td>fixed-length 32-bit integer</td><td>0x27, I</td></tr>
057 * <tr><td>fixed-length 64-bit integer</td><td>0x28, I</td></tr>
058 * <tr><td>fixed-length 8-bit integer</td><td>0x29</td></tr>
059 * <tr><td>fixed-length 16-bit integer</td><td>0x2a</td></tr>
060 * <tr><td>fixed-length 32-bit float</td><td>0x30, F</td></tr>
061 * <tr><td>fixed-length 64-bit float</td><td>0x31, F</td></tr>
062 * <tr><td>TEXT</td><td>0x33, T</td></tr>
063 * <tr><td>variable length BLOB</td><td>0x35, B</td></tr>
064 * <tr><td>byte-for-byte BLOB</td><td>0x36, X</td></tr>
065 * </table>
066 *
067 * <h3>Null Encoding</h3>
068 * <p>
069 * Each value that is a NULL encodes as a single byte of 0x05. Since every
070 * other value encoding begins with a byte greater than 0x05, this forces NULL
071 * values to sort first.
072 * </p>
073 * <h3>Text Encoding</h3>
074 * <p>
075 * Each text value begins with a single byte of 0x33 and ends with a single
076 * byte of 0x00. There are zero or more intervening bytes that encode the text
077 * value. The intervening bytes are chosen so that the encoding will sort in
078 * the desired collating order. The intervening bytes may not contain a 0x00
079 * character; the only 0x00 byte allowed in a text encoding is the final byte.
080 * </p>
081 * <p>
082 * The text encoding ends in 0x00 in order to ensure that when there are two
083 * strings where one is a prefix of the other that the shorter string will
084 * sort first.
085 * </p>
086 * <h3>Binary Encoding</h3>
087 * <p>
088 * There are two encoding strategies for binary fields, referred to as
089 * "BlobVar" and "BlobCopy". BlobVar is less efficient in both space and
090 * encoding time. It has no limitations on the range of encoded values.
091 * BlobCopy is a byte-for-byte copy of the input data followed by a
092 * termination byte. It is extremely fast to encode and decode. It carries the
093 * restriction of not allowing a 0x00 value in the input byte[] as this value
094 * is used as the termination byte.
095 * </p>
096 * <h4>BlobVar</h4>
097 * <p>
098 * "BlobVar" encodes the input byte[] in a manner similar to a variable length
099 * integer encoding. As with the other {@code OrderedBytes} encodings,
100 * the first encoded byte is used to indicate what kind of value follows. This
101 * header byte is 0x37 for BlobVar encoded values. As with the traditional
102 * varint encoding, the most significant bit of each subsequent encoded
103 * {@code byte} is used as a continuation marker. The 7 remaining bits
104 * contain the 7 most significant bits of the first unencoded byte. The next
105 * encoded byte starts with a continuation marker in the MSB. The least
106 * significant bit from the first unencoded byte follows, and the remaining 6
107 * bits contain the 6 MSBs of the second unencoded byte. The encoding
108 * continues, encoding 7 bytes on to 8 encoded bytes. The MSB of the final
109 * encoded byte contains a termination marker rather than a continuation
110 * marker, and any remaining bits from the final input byte. Any trailing bits
111 * in the final encoded byte are zeros.
112 * </p>
113 * <h4>BlobCopy</h4>
114 * <p>
115 * "BlobCopy" is a simple byte-for-byte copy of the input data. It uses 0x38
116 * as the header byte, and is terminated by 0x00 in the DESCENDING case. This
117 * alternative encoding is faster and more space-efficient, but it cannot
118 * accept values containing a 0x00 byte in DESCENDING order.
119 * </p>
120 * <h3>Variable-length Numeric Encoding</h3>
121 * <p>
122 * Numeric values must be coded so as to sort in numeric order. We assume that
123 * numeric values can be both integer and floating point values. Clients must
124 * be careful to use inspection methods for encoded values (such as
125 * {@link #isNumericInfinite(PositionedByteRange)} and
126 * {@link #isNumericNaN(PositionedByteRange)} to protect against decoding
127 * values into object which do not support these numeric concepts (such as
128 * {@link Long} and {@link BigDecimal}).
129 * </p>
130 * <p>
131 * Simplest cases first: If the numeric value is a NaN, then the encoding is a
132 * single byte of 0x25. This causes NaN values to sort after every other
133 * numeric value.
134 * </p>
135 * <p>
136 * If the numeric value is a negative infinity then the encoding is a single
137 * byte of 0x07. Since every other numeric value except NaN has a larger
138 * initial byte, this encoding ensures that negative infinity will sort prior
139 * to every other numeric value other than NaN.
140 * </p>
141 * <p>
142 * If the numeric value is a positive infinity then the encoding is a single
143 * byte of 0x23. Every other numeric value encoding begins with a smaller
144 * byte, ensuring that positive infinity always sorts last among numeric
145 * values. 0x23 is also smaller than 0x33, the initial byte of a text value,
146 * ensuring that every numeric value sorts before every text value.
147 * </p>
148 * <p>
149 * If the numeric value is exactly zero then it is encoded as a single byte of
150 * 0x15. Finite negative values will have initial bytes of 0x08 through 0x14
151 * and finite positive values will have initial bytes of 0x16 through 0x22.
152 * </p>
153 * <p>
154 * For all numeric values, we compute a mantissa M and an exponent E. The
155 * mantissa is a base-100 representation of the value. The exponent E
156 * determines where to put the decimal point.
157 * </p>
158 * <p>
159 * Each centimal digit of the mantissa is stored in a byte. If the value of
160 * the centimal digit is X (hence X&ge;0 and X&le;99) then the byte value will
161 * be 2*X+1 for every byte of the mantissa, except for the last byte which
162 * will be 2*X+0. The mantissa must be the minimum number of bytes necessary
163 * to represent the value; trailing X==0 digits are omitted. This means that
164 * the mantissa will never contain a byte with the value 0x00.
165 * </p>
166 * <p>
167 * If we assume all digits of the mantissa occur to the right of the decimal
168 * point, then the exponent E is the power of one hundred by which one must
169 * multiply the mantissa to recover the original value.
170 * </p>
171 * <p>
172 * Values are classified as large, medium, or small according to the value of
173 * E. If E is 11 or more, the value is large. For E between 0 and 10, the
174 * value is medium. For E less than zero, the value is small.
175 * </p>
176 * <p>
177 * Large positive values are encoded as a single byte 0x22 followed by E as a
178 * varint and then M. Medium positive values are a single byte of 0x17+E
179 * followed by M. Small positive values are encoded as a single byte 0x16
180 * followed by the ones-complement of the varint for -E followed by M.
181 * </p>
182 * <p>
183 * Small negative values are encoded as a single byte 0x14 followed by -E as a
184 * varint and then the ones-complement of M. Medium negative values are
185 * encoded as a byte 0x13-E followed by the ones-complement of M. Large
186 * negative values consist of the single byte 0x08 followed by the
187 * ones-complement of the varint encoding of E followed by the ones-complement
188 * of M.
189 * </p>
190 * <h3>Fixed-length Integer Encoding</h3>
191 * <p>
192 * All 4-byte integers are serialized to a 5-byte, fixed-width, sortable byte
193 * format. All 8-byte integers are serialized to the equivelant 9-byte format.
194 * Serialization is performed by writing a header byte, inverting the integer
195 * sign bit and writing the resulting bytes to the byte array in big endian
196 * order.
197 * </p>
198 * <h3>Fixed-length Floating Point Encoding</h3>
199 * <p>
200 * 32-bit and 64-bit floating point numbers are encoded to a 5-byte and 9-byte
201 * encoding format, respectively. The format is identical, save for the
202 * precision respected in each step of the operation.
203 * <p>
204 * This format ensures the following total ordering of floating point values:
205 * Float.NEGATIVE_INFINITY &lt; -Float.MAX_VALUE &lt; ... &lt;
206 * -Float.MIN_VALUE &lt; -0.0 &lt; +0.0; &lt; Float.MIN_VALUE &lt; ... &lt;
207 * Float.MAX_VALUE &lt; Float.POSITIVE_INFINITY &lt; Float.NaN
208 * </p>
209 * <p>
210 * Floating point numbers are encoded as specified in IEEE 754. A 32-bit
211 * single precision float consists of a sign bit, 8-bit unsigned exponent
212 * encoded in offset-127 notation, and a 23-bit significand. The format is
213 * described further in the <a
214 * href="http://en.wikipedia.org/wiki/Single_precision"> Single Precision
215 * Floating Point Wikipedia page</a>
216 * </p>
217 * <p>
218 * The value of a normal float is -1 <sup>sign bit</sup> &times;
219 * 2<sup>exponent - 127</sup> &times; 1.significand
220 * </p>
221 * <p>
222 * The IEE754 floating point format already preserves sort ordering for
223 * positive floating point numbers when the raw bytes are compared in most
224 * significant byte order. This is discussed further at <a href=
225 * "http://www.cygnus-software.com/papers/comparingfloats/comparingfloats.htm">
226 * http://www.cygnus-software.com/papers/comparingfloats/comparingfloats.htm</a>
227 * </p>
228 * <p>
229 * Thus, we need only ensure that negative numbers sort in the the exact
230 * opposite order as positive numbers (so that say, negative infinity is less
231 * than negative 1), and that all negative numbers compare less than any
232 * positive number. To accomplish this, we invert the sign bit of all floating
233 * point numbers, and we also invert the exponent and significand bits if the
234 * floating point number was negative.
235 * </p>
236 * <p>
237 * More specifically, we first store the floating point bits into a 32-bit int
238 * {@code j} using {@link Float#floatToIntBits}. This method collapses
239 * all NaNs into a single, canonical NaN value but otherwise leaves the bits
240 * unchanged. We then compute
241 * </p>
242 *
243 * <pre>
244 * j &circ;= (j &gt;&gt; (Integer.SIZE - 1)) | Integer.MIN_SIZE
245 * </pre>
246 * <p>
247 * which inverts the sign bit and XOR's all other bits with the sign bit
248 * itself. Comparing the raw bytes of {@code j} in most significant byte
249 * order is equivalent to performing a single precision floating point
250 * comparison on the underlying bits (ignoring NaN comparisons, as NaNs don't
251 * compare equal to anything when performing floating point comparisons).
252 * </p>
253 * <p>
254 * The resulting integer is then converted into a byte array by serializing
255 * the integer one byte at a time in most significant byte order. The
256 * serialized integer is prefixed by a single header byte. All serialized
257 * values are 5 bytes in length.
258 * </p>
259 * <p>
260 * {@code OrderedBytes} encodings are heavily influenced by the
261 * <a href="http://sqlite.org/src4/doc/trunk/www/key_encoding.wiki">SQLite4 Key
262 * Encoding</a>. Slight deviations are make in the interest of order
263 * correctness and user extensibility. Fixed-width {@code Long} and
264 * {@link Double} encodings are based on implementations from the now defunct
265 * Orderly library.
266 * </p>
267 */
268@InterfaceAudience.Public
269public class OrderedBytes {
270
271  /*
272   * These constants define header bytes used to identify encoded values. Note
273   * that the values here are not exhaustive as the Numeric format encodes
274   * portions of its value within the header byte. The values listed here are
275   * directly applied to persisted data -- DO NOT modify the values specified
276   * here. Instead, gaps are placed intentionally between values so that new
277   * implementations can be inserted into the total ordering enforced here.
278   */
279  private static final byte NULL = 0x05;
280  // room for 1 expansion type
281  private static final byte NEG_INF = 0x07;
282  private static final byte NEG_LARGE = 0x08;
283  private static final byte NEG_MED_MIN = 0x09;
284  private static final byte NEG_MED_MAX = 0x13;
285  private static final byte NEG_SMALL = 0x14;
286  private static final byte ZERO = 0x15;
287  private static final byte POS_SMALL = 0x16;
288  private static final byte POS_MED_MIN = 0x17;
289  private static final byte POS_MED_MAX = 0x21;
290  private static final byte POS_LARGE = 0x22;
291  private static final byte POS_INF = 0x23;
292  // room for 2 expansion type
293  private static final byte NAN = 0x26;
294  // room for 2 expansion types
295  private static final byte FIXED_INT8 = 0x29;
296  private static final byte FIXED_INT16 = 0x2a;
297  private static final byte FIXED_INT32 = 0x2b;
298  private static final byte FIXED_INT64 = 0x2c;
299  // room for 3 expansion types
300  private static final byte FIXED_FLOAT32 = 0x30;
301  private static final byte FIXED_FLOAT64 = 0x31;
302  // room for 2 expansion type
303  private static final byte TEXT = 0x34;
304  // room for 2 expansion type
305  private static final byte BLOB_VAR = 0x37;
306  private static final byte BLOB_COPY = 0x38;
307
308  /*
309   * The following constant values are used by encoding implementations
310   */
311
312  public static final Charset UTF8 = Charset.forName("UTF-8");
313  private static final byte TERM = 0x00;
314  private static final BigDecimal E8 = BigDecimal.valueOf(1e8);
315  private static final BigDecimal E32 = BigDecimal.valueOf(1e32);
316  private static final BigDecimal EN2 = BigDecimal.valueOf(1e-2);
317  private static final BigDecimal EN10 = BigDecimal.valueOf(1e-10);
318
319  /**
320   * Max precision guaranteed to fit into a {@code long}.
321   */
322  public static final int MAX_PRECISION = 31;
323
324  /**
325   * The context used to normalize {@link BigDecimal} values.
326   */
327  public static final MathContext DEFAULT_MATH_CONTEXT =
328      new MathContext(MAX_PRECISION, RoundingMode.HALF_UP);
329
330  /**
331   * Creates the standard exception when the encoded header byte is unexpected for the decoding
332   * context.
333   * @param header value used in error message.
334   */
335  private static IllegalArgumentException unexpectedHeader(byte header) {
336    throw new IllegalArgumentException("unexpected value in first byte: 0x"
337        + Long.toHexString(header));
338  }
339
340  /**
341   * Perform unsigned comparison between two long values. Conforms to the same interface as
342   * {@link org.apache.hadoop.hbase.CellComparator}.
343   */
344  private static int unsignedCmp(long x1, long x2) {
345    int cmp;
346    if ((cmp = (x1 < x2 ? -1 : (x1 == x2 ? 0 : 1))) == 0) return 0;
347    // invert the result when either value is negative
348    if ((x1 < 0) != (x2 < 0)) return -cmp;
349    return cmp;
350  }
351
352  /**
353   * Write a 32-bit unsigned integer to {@code dst} as 4 big-endian bytes.
354   * @return number of bytes written.
355   */
356  private static int putUint32(PositionedByteRange dst, int val) {
357    dst.put((byte) (val >>> 24))
358       .put((byte) (val >>> 16))
359       .put((byte) (val >>> 8))
360       .put((byte) val);
361    return 4;
362  }
363
364  /**
365   * Encode an unsigned 64-bit unsigned integer {@code val} into {@code dst}.
366   * @param dst The destination to which encoded bytes are written.
367   * @param val The value to write.
368   * @param comp Compliment the encoded value when {@code comp} is true.
369   * @return number of bytes written.
370   */
371  @VisibleForTesting
372  static int putVaruint64(PositionedByteRange dst, long val, boolean comp) {
373    int w, y, len = 0;
374    final int offset = dst.getOffset(), start = dst.getPosition();
375    byte[] a = dst.getBytes();
376    Order ord = comp ? DESCENDING : ASCENDING;
377    if (-1 == unsignedCmp(val, 241L)) {
378      dst.put((byte) val);
379      len = dst.getPosition() - start;
380      ord.apply(a, offset + start, len);
381      return len;
382    }
383    if (-1 == unsignedCmp(val, 2288L)) {
384      y = (int) (val - 240);
385      dst.put((byte) (y / 256 + 241))
386         .put((byte) (y % 256));
387      len = dst.getPosition() - start;
388      ord.apply(a, offset + start, len);
389      return len;
390    }
391    if (-1 == unsignedCmp(val, 67824L)) {
392      y = (int) (val - 2288);
393      dst.put((byte) 249)
394         .put((byte) (y / 256))
395         .put((byte) (y % 256));
396      len = dst.getPosition() - start;
397      ord.apply(a, offset + start, len);
398      return len;
399    }
400    y = (int) val;
401    w = (int) (val >>> 32);
402    if (w == 0) {
403      if (-1 == unsignedCmp(y, 16777216L)) {
404        dst.put((byte) 250)
405           .put((byte) (y >>> 16))
406           .put((byte) (y >>> 8))
407           .put((byte) y);
408        len = dst.getPosition() - start;
409        ord.apply(a, offset + start, len);
410        return len;
411      }
412      dst.put((byte) 251);
413      putUint32(dst, y);
414      len = dst.getPosition() - start;
415      ord.apply(a, offset + start, len);
416      return len;
417    }
418    if (-1 == unsignedCmp(w, 256L)) {
419      dst.put((byte) 252)
420         .put((byte) w);
421      putUint32(dst, y);
422      len = dst.getPosition() - start;
423      ord.apply(a, offset + start, len);
424      return len;
425    }
426    if (-1 == unsignedCmp(w, 65536L)) {
427      dst.put((byte) 253)
428         .put((byte) (w >>> 8))
429         .put((byte) w);
430      putUint32(dst, y);
431      len = dst.getPosition() - start;
432      ord.apply(a, offset + start, len);
433      return len;
434    }
435    if (-1 == unsignedCmp(w, 16777216L)) {
436      dst.put((byte) 254)
437         .put((byte) (w >>> 16))
438         .put((byte) (w >>> 8))
439         .put((byte) w);
440      putUint32(dst, y);
441      len = dst.getPosition() - start;
442      ord.apply(a, offset + start, len);
443      return len;
444    }
445    dst.put((byte) 255);
446    putUint32(dst, w);
447    putUint32(dst, y);
448    len = dst.getPosition() - start;
449    ord.apply(a, offset + start, len);
450    return len;
451  }
452
453  /**
454   * Inspect {@code src} for an encoded varuint64 for its length in bytes.
455   * Preserves the state of {@code src}.
456   * @param src source buffer
457   * @param comp if true, parse the compliment of the value.
458   * @return the number of bytes consumed by this value.
459   */
460  @VisibleForTesting
461  static int lengthVaruint64(PositionedByteRange src, boolean comp) {
462    int a0 = (comp ? DESCENDING : ASCENDING).apply(src.peek()) & 0xff;
463    if (a0 <= 240) return 1;
464    if (a0 <= 248) return 2;
465    if (a0 == 249) return 3;
466    if (a0 == 250) return 4;
467    if (a0 == 251) return 5;
468    if (a0 == 252) return 6;
469    if (a0 == 253) return 7;
470    if (a0 == 254) return 8;
471    if (a0 == 255) return 9;
472    throw unexpectedHeader(src.peek());
473  }
474
475  /**
476   * Skip {@code src} over the encoded varuint64.
477   * @param src source buffer
478   * @param cmp if true, parse the compliment of the value.
479   * @return the number of bytes skipped.
480   */
481  @VisibleForTesting
482  static int skipVaruint64(PositionedByteRange src, boolean cmp) {
483    final int len = lengthVaruint64(src, cmp);
484    src.setPosition(src.getPosition() + len);
485    return len;
486  }
487
488  /**
489   * Decode a sequence of bytes in {@code src} as a varuint64. Compliment the
490   * encoded value when {@code comp} is true.
491   * @return the decoded value.
492   */
493  @VisibleForTesting
494  static long getVaruint64(PositionedByteRange src, boolean comp) {
495    assert src.getRemaining() >= lengthVaruint64(src, comp);
496    final long ret;
497    Order ord = comp ? DESCENDING : ASCENDING;
498    byte x = src.get();
499    final int a0 = ord.apply(x) & 0xff, a1, a2, a3, a4, a5, a6, a7, a8;
500    if (-1 == unsignedCmp(a0, 241)) {
501      return a0;
502    }
503    x = src.get();
504    a1 = ord.apply(x) & 0xff;
505    if (-1 == unsignedCmp(a0, 249)) {
506      return (a0 - 241L) * 256 + a1 + 240;
507    }
508    x = src.get();
509    a2 = ord.apply(x) & 0xff;
510    if (a0 == 249) {
511      return 2288L + 256 * a1 + a2;
512    }
513    x = src.get();
514    a3 = ord.apply(x) & 0xff;
515    if (a0 == 250) {
516      return ((long) a1 << 16L) | (a2 << 8) | a3;
517    }
518    x = src.get();
519    a4 = ord.apply(x) & 0xff;
520    ret = (((long) a1) << 24) | (a2 << 16) | (a3 << 8) | a4;
521    if (a0 == 251) {
522      return ret;
523    }
524    x = src.get();
525    a5 = ord.apply(x) & 0xff;
526    if (a0 == 252) {
527      return (ret << 8) | a5;
528    }
529    x = src.get();
530    a6 = ord.apply(x) & 0xff;
531    if (a0 == 253) {
532      return (ret << 16) | (a5 << 8) | a6;
533    }
534    x = src.get();
535    a7 = ord.apply(x) & 0xff;
536    if (a0 == 254) {
537      return (ret << 24) | (a5 << 16) | (a6 << 8) | a7;
538    }
539    x = src.get();
540    a8 = ord.apply(x) & 0xff;
541    return (ret << 32) | (((long) a5) << 24) | (a6 << 16) | (a7 << 8) | a8;
542  }
543
544  /**
545   * Strip all trailing zeros to ensure that no digit will be zero and round
546   * using our default context to ensure precision doesn't exceed max allowed.
547   * From Phoenix's {@code NumberUtil}.
548   * @return new {@link BigDecimal} instance
549   */
550  @VisibleForTesting
551  static BigDecimal normalize(BigDecimal val) {
552    return null == val ? null : val.stripTrailingZeros().round(DEFAULT_MATH_CONTEXT);
553  }
554
555  /**
556   * Read significand digits from {@code src} according to the magnitude
557   * of {@code e}.
558   * @param src The source from which to read encoded digits.
559   * @param e The magnitude of the first digit read.
560   * @param comp Treat encoded bytes as compliments when {@code comp} is true.
561   * @return The decoded value.
562   * @throws IllegalArgumentException when read exceeds the remaining length
563   *     of {@code src}.
564   */
565  private static BigDecimal decodeSignificand(PositionedByteRange src, int e, boolean comp) {
566    // TODO: can this be made faster?
567    byte[] a = src.getBytes();
568    final int start = src.getPosition(), offset = src.getOffset(), remaining = src.getRemaining();
569    Order ord = comp ? DESCENDING : ASCENDING;
570    BigDecimal m = BigDecimal.ZERO;
571    e--;
572    for (int i = 0;; i++) {
573      if (i > remaining) {
574        // we've exceeded this range's window
575        src.setPosition(start);
576        throw new IllegalArgumentException(
577            "Read exceeds range before termination byte found. offset: " + offset + " position: "
578                + (start + i));
579      }
580      // base-100 digits are encoded as val * 2 + 1 except for the termination digit.
581      m = m.add( // m +=
582        new BigDecimal(BigInteger.ONE, e * -2).multiply( // 100 ^ p * [decoded digit]
583          BigDecimal.valueOf((ord.apply(a[offset + start + i]) & 0xff) / 2)));
584      e--;
585      // detect termination digit
586      if ((ord.apply(a[offset + start + i]) & 1) == 0) {
587        src.setPosition(start + i + 1);
588        break;
589      }
590    }
591    return normalize(m);
592  }
593
594  /**
595   * Skip {@code src} over the significand bytes.
596   * @param src The source from which to read encoded digits.
597   * @param comp Treat encoded bytes as compliments when {@code comp} is true.
598   * @return the number of bytes skipped.
599   */
600  private static int skipSignificand(PositionedByteRange src, boolean comp) {
601    byte[] a = src.getBytes();
602    final int offset = src.getOffset(), start = src.getPosition();
603    int i = src.getPosition();
604    while (((comp ? DESCENDING : ASCENDING).apply(a[offset + i++]) & 1) != 0)
605      ;
606    src.setPosition(i);
607    return i - start;
608  }
609
610  /**
611   * <p>
612   * Encode the small magnitude floating point number {@code val} using the
613   * key encoding. The caller guarantees that 1.0 > abs(val) > 0.0.
614   * </p>
615   * <p>
616   * A floating point value is encoded as an integer exponent {@code E} and a
617   * mantissa {@code M}. The original value is equal to {@code (M * 100^E)}.
618   * {@code E} is set to the smallest value possible without making {@code M}
619   * greater than or equal to 1.0.
620   * </p>
621   * <p>
622   * For this routine, {@code E} will always be zero or negative, since the
623   * original value is less than one. The encoding written by this routine is
624   * the ones-complement of the varint of the negative of {@code E} followed
625   * by the mantissa:
626   * <pre>
627   *   Encoding:   ~-E  M
628   * </pre>
629   * </p>
630   * @param dst The destination to which encoded digits are written.
631   * @param val The value to encode.
632   * @return the number of bytes written.
633   */
634  private static int encodeNumericSmall(PositionedByteRange dst, BigDecimal val) {
635    // TODO: this can be done faster?
636    // assert 1.0 > abs(val) > 0.0
637    BigDecimal abs = val.abs();
638    assert BigDecimal.ZERO.compareTo(abs) < 0 && BigDecimal.ONE.compareTo(abs) > 0;
639    byte[] a = dst.getBytes();
640    boolean isNeg = val.signum() == -1;
641    final int offset = dst.getOffset(), start = dst.getPosition();
642    int e = 0, d, startM;
643
644    if (isNeg) { /* Small negative number: 0x14, -E, ~M */
645      dst.put(NEG_SMALL);
646    } else { /* Small positive number: 0x16, ~-E, M */
647      dst.put(POS_SMALL);
648    }
649
650    // normalize abs(val) to determine E
651    while (abs.compareTo(EN10) < 0) { abs = abs.movePointRight(8); e += 4; }
652    while (abs.compareTo(EN2) < 0) { abs = abs.movePointRight(2); e++; }
653
654    putVaruint64(dst, e, !isNeg); // encode appropriate E value.
655
656    // encode M by peeling off centimal digits, encoding x as 2x+1
657    startM = dst.getPosition();
658    // TODO: 18 is an arbitrary encoding limit. Reevaluate once we have a better handling of
659    // numeric scale.
660    for (int i = 0; i < 18 && abs.compareTo(BigDecimal.ZERO) != 0; i++) {
661      abs = abs.movePointRight(2);
662      d = abs.intValue();
663      dst.put((byte) ((2 * d + 1) & 0xff));
664      abs = abs.subtract(BigDecimal.valueOf(d));
665    }
666    // terminal digit should be 2x
667    a[offset + dst.getPosition() - 1] = (byte) (a[offset + dst.getPosition() - 1] & 0xfe);
668    if (isNeg) {
669      // negative values encoded as ~M
670      DESCENDING.apply(a, offset + startM, dst.getPosition() - startM);
671    }
672    return dst.getPosition() - start;
673  }
674
675  /**
676   * Encode the large magnitude floating point number {@code val} using
677   * the key encoding. The caller guarantees that {@code val} will be
678   * finite and abs(val) >= 1.0.
679   * <p>
680   * A floating point value is encoded as an integer exponent {@code E}
681   * and a mantissa {@code M}. The original value is equal to
682   * {@code (M * 100^E)}. {@code E} is set to the smallest value
683   * possible without making {@code M} greater than or equal to 1.0.
684   * </p>
685   * <p>
686   * Each centimal digit of the mantissa is stored in a byte. If the value of
687   * the centimal digit is {@code X} (hence {@code X>=0} and
688   * {@code X<=99}) then the byte value will be {@code 2*X+1} for
689   * every byte of the mantissa, except for the last byte which will be
690   * {@code 2*X+0}. The mantissa must be the minimum number of bytes
691   * necessary to represent the value; trailing {@code X==0} digits are
692   * omitted. This means that the mantissa will never contain a byte with the
693   * value {@code 0x00}.
694   * </p>
695   * <p>
696   * If {@code E > 10}, then this routine writes of {@code E} as a
697   * varint followed by the mantissa as described above. Otherwise, if
698   * {@code E <= 10}, this routine only writes the mantissa and leaves
699   * the {@code E} value to be encoded as part of the opening byte of the
700   * field by the calling function.
701   *
702   * <pre>
703   *   Encoding:  M       (if E<=10)
704   *              E M     (if E>10)
705   * </pre>
706   * </p>
707   * @param dst The destination to which encoded digits are written.
708   * @param val The value to encode.
709   * @return the number of bytes written.
710   */
711  private static int encodeNumericLarge(PositionedByteRange dst, BigDecimal val) {
712    // TODO: this can be done faster
713    BigDecimal abs = val.abs();
714    byte[] a = dst.getBytes();
715    boolean isNeg = val.signum() == -1;
716    final int start = dst.getPosition(), offset = dst.getOffset();
717    int e = 0, d, startM;
718
719    if (isNeg) { /* Large negative number: 0x08, ~E, ~M */
720      dst.put(NEG_LARGE);
721    } else { /* Large positive number: 0x22, E, M */
722      dst.put(POS_LARGE);
723    }
724
725    // normalize abs(val) to determine E
726    while (abs.compareTo(E32) >= 0 && e <= 350) { abs = abs.movePointLeft(32); e +=16; }
727    while (abs.compareTo(E8) >= 0 && e <= 350) { abs = abs.movePointLeft(8); e+= 4; }
728    while (abs.compareTo(BigDecimal.ONE) >= 0 && e <= 350) { abs = abs.movePointLeft(2); e++; }
729
730    // encode appropriate header byte and/or E value.
731    if (e > 10) { /* large number, write out {~,}E */
732      putVaruint64(dst, e, isNeg);
733    } else {
734      if (isNeg) { /* Medium negative number: 0x13-E, ~M */
735        dst.put(start, (byte) (NEG_MED_MAX - e));
736      } else { /* Medium positive number: 0x17+E, M */
737        dst.put(start, (byte) (POS_MED_MIN + e));
738      }
739    }
740
741    // encode M by peeling off centimal digits, encoding x as 2x+1
742    startM = dst.getPosition();
743    // TODO: 18 is an arbitrary encoding limit. Reevaluate once we have a better handling of
744    // numeric scale.
745    for (int i = 0; i < 18 && abs.compareTo(BigDecimal.ZERO) != 0; i++) {
746      abs = abs.movePointRight(2);
747      d = abs.intValue();
748      dst.put((byte) (2 * d + 1));
749      abs = abs.subtract(BigDecimal.valueOf(d));
750    }
751    // terminal digit should be 2x
752    a[offset + dst.getPosition() - 1] = (byte) (a[offset + dst.getPosition() - 1] & 0xfe);
753    if (isNeg) {
754      // negative values encoded as ~M
755      DESCENDING.apply(a, offset + startM, dst.getPosition() - startM);
756    }
757    return dst.getPosition() - start;
758  }
759
760  /**
761   * Encode a numerical value using the variable-length encoding.
762   * @param dst The destination to which encoded digits are written.
763   * @param val The value to encode.
764   * @param ord The {@link Order} to respect while encoding {@code val}.
765   * @return the number of bytes written.
766   */
767  public static int encodeNumeric(PositionedByteRange dst, long val, Order ord) {
768    return encodeNumeric(dst, BigDecimal.valueOf(val), ord);
769  }
770
771  /**
772   * Encode a numerical value using the variable-length encoding.
773   * @param dst The destination to which encoded digits are written.
774   * @param val The value to encode.
775   * @param ord The {@link Order} to respect while encoding {@code val}.
776   * @return the number of bytes written.
777   */
778  public static int encodeNumeric(PositionedByteRange dst, double val, Order ord) {
779    if (val == 0.0) {
780      dst.put(ord.apply(ZERO));
781      return 1;
782    }
783    if (Double.isNaN(val)) {
784      dst.put(ord.apply(NAN));
785      return 1;
786    }
787    if (val == Double.NEGATIVE_INFINITY) {
788      dst.put(ord.apply(NEG_INF));
789      return 1;
790    }
791    if (val == Double.POSITIVE_INFINITY) {
792      dst.put(ord.apply(POS_INF));
793      return 1;
794    }
795    return encodeNumeric(dst, BigDecimal.valueOf(val), ord);
796  }
797
798  /**
799   * Encode a numerical value using the variable-length encoding.
800   * @param dst The destination to which encoded digits are written.
801   * @param val The value to encode.
802   * @param ord The {@link Order} to respect while encoding {@code val}.
803   * @return the number of bytes written.
804   */
805  public static int encodeNumeric(PositionedByteRange dst, BigDecimal val, Order ord) {
806    final int len, offset = dst.getOffset(), start = dst.getPosition();
807    if (null == val) {
808      return encodeNull(dst, ord);
809    } else if (BigDecimal.ZERO.compareTo(val) == 0) {
810      dst.put(ord.apply(ZERO));
811      return 1;
812    }
813    BigDecimal abs = val.abs();
814    if (BigDecimal.ONE.compareTo(abs) <= 0) { // abs(v) >= 1.0
815      len = encodeNumericLarge(dst, normalize(val));
816    } else { // 1.0 > abs(v) >= 0.0
817      len = encodeNumericSmall(dst, normalize(val));
818    }
819    ord.apply(dst.getBytes(), offset + start, len);
820    return len;
821  }
822
823  /**
824   * Decode a {@link BigDecimal} from {@code src}. Assumes {@code src} encodes
825   * a value in Numeric encoding and is within the valid range of
826   * {@link BigDecimal} values. {@link BigDecimal} does not support {@code NaN}
827   * or {@code Infinte} values.
828   * @see #decodeNumericAsDouble(PositionedByteRange)
829   */
830  private static BigDecimal decodeNumericValue(PositionedByteRange src) {
831    final int e;
832    byte header = src.get();
833    boolean dsc = -1 == Integer.signum(header);
834    header = dsc ? DESCENDING.apply(header) : header;
835
836    if (header == NULL) return null;
837    if (header == NEG_LARGE) { /* Large negative number: 0x08, ~E, ~M */
838      e = (int) getVaruint64(src, !dsc);
839      return decodeSignificand(src, e, !dsc).negate();
840    }
841    if (header >= NEG_MED_MIN && header <= NEG_MED_MAX) {
842      /* Medium negative number: 0x13-E, ~M */
843      e = NEG_MED_MAX - header;
844      return decodeSignificand(src, e, !dsc).negate();
845    }
846    if (header == NEG_SMALL) { /* Small negative number: 0x14, -E, ~M */
847      e = (int) -getVaruint64(src, dsc);
848      return decodeSignificand(src, e, !dsc).negate();
849    }
850    if (header == ZERO) {
851      return BigDecimal.ZERO;
852    }
853    if (header == POS_SMALL) { /* Small positive number: 0x16, ~-E, M */
854      e = (int) -getVaruint64(src, !dsc);
855      return decodeSignificand(src, e, dsc);
856    }
857    if (header >= POS_MED_MIN && header <= POS_MED_MAX) {
858      /* Medium positive number: 0x17+E, M */
859      e = header - POS_MED_MIN;
860      return decodeSignificand(src, e, dsc);
861    }
862    if (header == POS_LARGE) { /* Large positive number: 0x22, E, M */
863      e = (int) getVaruint64(src, dsc);
864      return decodeSignificand(src, e, dsc);
865    }
866    throw unexpectedHeader(header);
867  }
868
869  /**
870   * Decode a primitive {@code double} value from the Numeric encoding. Numeric
871   * encoding is based on {@link BigDecimal}; in the event the encoded value is
872   * larger than can be represented in a {@code double}, this method performs
873   * an implicit narrowing conversion as described in
874   * {@link BigDecimal#doubleValue()}.
875   * @throws NullPointerException when the encoded value is {@code NULL}.
876   * @throws IllegalArgumentException when the encoded value is not a Numeric.
877   * @see #encodeNumeric(PositionedByteRange, double, Order)
878   * @see BigDecimal#doubleValue()
879   */
880  public static double decodeNumericAsDouble(PositionedByteRange src) {
881    // TODO: should an encoded NULL value throw unexpectedHeader() instead?
882    if (isNull(src)) {
883      throw new NullPointerException("A null value cannot be decoded to a double.");
884    }
885    if (isNumericNaN(src)) {
886      src.get();
887      return Double.NaN;
888    }
889    if (isNumericZero(src)) {
890      src.get();
891      return Double.valueOf(0.0);
892    }
893
894    byte header = -1 == Integer.signum(src.peek()) ? DESCENDING.apply(src.peek()) : src.peek();
895
896    if (header == NEG_INF) {
897      src.get();
898      return Double.NEGATIVE_INFINITY;
899    } else if (header == POS_INF) {
900      src.get();
901      return Double.POSITIVE_INFINITY;
902    } else {
903      return decodeNumericValue(src).doubleValue();
904    }
905  }
906
907  /**
908   * Decode a primitive {@code long} value from the Numeric encoding. Numeric
909   * encoding is based on {@link BigDecimal}; in the event the encoded value is
910   * larger than can be represented in a {@code long}, this method performs an
911   * implicit narrowing conversion as described in
912   * {@link BigDecimal#doubleValue()}.
913   * @throws NullPointerException when the encoded value is {@code NULL}.
914   * @throws IllegalArgumentException when the encoded value is not a Numeric.
915   * @see #encodeNumeric(PositionedByteRange, long, Order)
916   * @see BigDecimal#longValue()
917   */
918  public static long decodeNumericAsLong(PositionedByteRange src) {
919    // TODO: should an encoded NULL value throw unexpectedHeader() instead?
920    if (isNull(src)) throw new NullPointerException();
921    if (!isNumeric(src)) throw unexpectedHeader(src.peek());
922    if (isNumericNaN(src)) throw unexpectedHeader(src.peek());
923    if (isNumericInfinite(src)) throw unexpectedHeader(src.peek());
924
925    if (isNumericZero(src)) {
926      src.get();
927      return Long.valueOf(0);
928    }
929    return decodeNumericValue(src).longValue();
930  }
931
932  /**
933   * Decode a {@link BigDecimal} value from the variable-length encoding.
934   * @throws IllegalArgumentException when the encoded value is not a Numeric.
935   * @see #encodeNumeric(PositionedByteRange, BigDecimal, Order)
936   */
937  public static BigDecimal decodeNumericAsBigDecimal(PositionedByteRange src) {
938    if (isNull(src)) {
939      src.get();
940      return null;
941    }
942    if (!isNumeric(src)) throw unexpectedHeader(src.peek());
943    if (isNumericNaN(src)) throw unexpectedHeader(src.peek());
944    if (isNumericInfinite(src)) throw unexpectedHeader(src.peek());
945    return decodeNumericValue(src);
946  }
947
948  /**
949   * Encode a String value. String encoding is 0x00-terminated and so it does
950   * not support {@code \u0000} codepoints in the value.
951   * @param dst The destination to which the encoded value is written.
952   * @param val The value to encode.
953   * @param ord The {@link Order} to respect while encoding {@code val}.
954   * @return the number of bytes written.
955   * @throws IllegalArgumentException when {@code val} contains a {@code \u0000}.
956   */
957  public static int encodeString(PositionedByteRange dst, String val, Order ord) {
958    if (null == val) {
959      return encodeNull(dst, ord);
960    }
961    if (val.contains("\u0000"))
962      throw new IllegalArgumentException("Cannot encode String values containing '\\u0000'");
963    final int offset = dst.getOffset(), start = dst.getPosition();
964    dst.put(TEXT);
965    // TODO: is there no way to decode into dst directly?
966    dst.put(val.getBytes(UTF8));
967    dst.put(TERM);
968    ord.apply(dst.getBytes(), offset + start, dst.getPosition() - start);
969    return dst.getPosition() - start;
970  }
971
972  /**
973   * Decode a String value.
974   */
975  public static String decodeString(PositionedByteRange src) {
976    final byte header = src.get();
977    if (header == NULL || header == DESCENDING.apply(NULL))
978      return null;
979    assert header == TEXT || header == DESCENDING.apply(TEXT);
980    Order ord = header == TEXT ? ASCENDING : DESCENDING;
981    byte[] a = src.getBytes();
982    final int offset = src.getOffset(), start = src.getPosition();
983    final byte terminator = ord.apply(TERM);
984    int rawStartPos = offset + start, rawTermPos = rawStartPos;
985    for (; a[rawTermPos] != terminator; rawTermPos++)
986      ;
987    src.setPosition(rawTermPos - offset + 1); // advance position to TERM + 1
988    if (DESCENDING == ord) {
989      // make a copy so that we don't disturb encoded value with ord.
990      byte[] copy = new byte[rawTermPos - rawStartPos];
991      System.arraycopy(a, rawStartPos, copy, 0, copy.length);
992      ord.apply(copy);
993      return new String(copy, UTF8);
994    } else {
995      return new String(a, rawStartPos, rawTermPos - rawStartPos, UTF8);
996    }
997  }
998
999  /**
1000   * Calculate the expected BlobVar encoded length based on unencoded length.
1001   */
1002  public static int blobVarEncodedLength(int len) {
1003    if (0 == len)
1004      return 2; // 1-byte header + 1-byte terminator
1005    else
1006      return (int)
1007          Math.ceil(
1008            (len * 8) // 8-bits per input byte
1009            / 7.0)    // 7-bits of input data per encoded byte, rounded up
1010          + 1;        // + 1-byte header
1011  }
1012
1013  /**
1014   * Calculate the expected BlobVar decoded length based on encoded length.
1015   */
1016  @VisibleForTesting
1017  static int blobVarDecodedLength(int len) {
1018    return
1019        ((len
1020          - 1) // 1-byte header
1021          * 7) // 7-bits of payload per encoded byte
1022          / 8; // 8-bits per byte
1023  }
1024
1025  /**
1026   * Encode a Blob value using a modified varint encoding scheme.
1027   * <p>
1028   * This format encodes a byte[] value such that no limitations on the input
1029   * value are imposed. The first byte encodes the encoding scheme that
1030   * follows, {@link #BLOB_VAR}. Each encoded byte thereafter consists of a
1031   * header bit followed by 7 bits of payload. A header bit of '1' indicates
1032   * continuation of the encoding. A header bit of '0' indicates this byte
1033   * contains the last of the payload. An empty input value is encoded as the
1034   * header byte immediately followed by a termination byte {@code 0x00}. This
1035   * is not ambiguous with the encoded value of {@code []}, which results in
1036   * {@code [0x80, 0x00]}.
1037   * </p>
1038   * @return the number of bytes written.
1039   */
1040  public static int encodeBlobVar(PositionedByteRange dst, byte[] val, int voff, int vlen,
1041      Order ord) {
1042    if (null == val) {
1043      return encodeNull(dst, ord);
1044    }
1045    // Empty value is null-terminated. All other values are encoded as 7-bits per byte.
1046    assert dst.getRemaining() >= blobVarEncodedLength(vlen) : "buffer overflow expected.";
1047    final int offset = dst.getOffset(), start = dst.getPosition();
1048    dst.put(BLOB_VAR);
1049    if (0 == vlen) {
1050      dst.put(TERM);
1051    } else {
1052      byte s = 1, t = 0;
1053      for (int i = voff; i < vlen; i++) {
1054        dst.put((byte) (0x80 | t | ((val[i] & 0xff) >>> s)));
1055        if (s < 7) {
1056          t = (byte) (val[i] << (7 - s));
1057          s++;
1058        } else {
1059          dst.put((byte) (0x80 | val[i]));
1060          s = 1;
1061          t = 0;
1062        }
1063      }
1064      if (s > 1) {
1065        dst.put((byte) (0x7f & t));
1066      } else {
1067        dst.getBytes()[offset + dst.getPosition() - 1] =
1068          (byte) (dst.getBytes()[offset + dst.getPosition() - 1] & 0x7f);
1069      }
1070    }
1071    ord.apply(dst.getBytes(), offset + start, dst.getPosition() - start);
1072    return dst.getPosition() - start;
1073  }
1074
1075  /**
1076   * Encode a blob value using a modified varint encoding scheme.
1077   * @return the number of bytes written.
1078   * @see #encodeBlobVar(PositionedByteRange, byte[], int, int, Order)
1079   */
1080  public static int encodeBlobVar(PositionedByteRange dst, byte[] val, Order ord) {
1081    return encodeBlobVar(dst, val, 0, null != val ? val.length : 0, ord);
1082  }
1083
1084  /**
1085   * Decode a blob value that was encoded using BlobVar encoding.
1086   */
1087  public static byte[] decodeBlobVar(PositionedByteRange src) {
1088    final byte header = src.get();
1089    if (header == NULL || header == DESCENDING.apply(NULL)) {
1090      return null;
1091    }
1092    assert header == BLOB_VAR || header == DESCENDING.apply(BLOB_VAR);
1093    Order ord = BLOB_VAR == header ? ASCENDING : DESCENDING;
1094    if (src.peek() == ord.apply(TERM)) {
1095      // skip empty input buffer.
1096      src.get();
1097      return new byte[0];
1098    }
1099    final int offset = src.getOffset(), start = src.getPosition();
1100    int end;
1101    byte[] a = src.getBytes();
1102    for (end = start; (byte) (ord.apply(a[offset + end]) & 0x80) != TERM; end++)
1103      ;
1104    end++; // increment end to 1-past last byte
1105    // create ret buffer using length of encoded data + 1 (header byte)
1106    PositionedByteRange ret = new SimplePositionedMutableByteRange(blobVarDecodedLength(end - start
1107        + 1));
1108    int s = 6;
1109    byte t = (byte) ((ord.apply(a[offset + start]) << 1) & 0xff);
1110    for (int i = start + 1; i < end; i++) {
1111      if (s == 7) {
1112        ret.put((byte) (t | (ord.apply(a[offset + i]) & 0x7f)));
1113        i++;
1114               // explicitly reset t -- clean up overflow buffer after decoding
1115               // a full cycle and retain assertion condition below. This happens
1116        t = 0; // when the LSB in the last encoded byte is 1. (HBASE-9893)
1117      } else {
1118        ret.put((byte) (t | ((ord.apply(a[offset + i]) & 0x7f) >>> s)));
1119      }
1120      if (i == end) break;
1121      t = (byte) ((ord.apply(a[offset + i]) << (8 - s)) & 0xff);
1122      s = s == 1 ? 7 : s - 1;
1123    }
1124    src.setPosition(end);
1125    assert t == 0 : "Unexpected bits remaining after decoding blob.";
1126    assert ret.getPosition() == ret.getLength() : "Allocated unnecessarily large return buffer.";
1127    return ret.getBytes();
1128  }
1129
1130  /**
1131   * Encode a Blob value as a byte-for-byte copy. BlobCopy encoding in
1132   * DESCENDING order is NULL terminated so as to preserve proper sorting of
1133   * {@code []} and so it does not support {@code 0x00} in the value.
1134   * @return the number of bytes written.
1135   * @throws IllegalArgumentException when {@code ord} is DESCENDING and
1136   *    {@code val} contains a {@code 0x00} byte.
1137   */
1138  public static int encodeBlobCopy(PositionedByteRange dst, byte[] val, int voff, int vlen,
1139      Order ord) {
1140    if (null == val) {
1141      encodeNull(dst, ord);
1142      if (ASCENDING == ord) return 1;
1143      else {
1144        // DESCENDING ordered BlobCopy requires a termination bit to preserve
1145        // sort-order semantics of null values.
1146        dst.put(ord.apply(TERM));
1147        return 2;
1148      }
1149    }
1150    // Blobs as final entry in a compound key are written unencoded.
1151    assert dst.getRemaining() >= vlen + (ASCENDING == ord ? 1 : 2);
1152    if (DESCENDING == ord) {
1153      for (int i = 0; i < vlen; i++) {
1154        if (TERM == val[voff + i]) {
1155          throw new IllegalArgumentException("0x00 bytes not permitted in value.");
1156        }
1157      }
1158    }
1159    final int offset = dst.getOffset(), start = dst.getPosition();
1160    dst.put(BLOB_COPY);
1161    dst.put(val, voff, vlen);
1162    // DESCENDING ordered BlobCopy requires a termination bit to preserve
1163    // sort-order semantics of null values.
1164    if (DESCENDING == ord) dst.put(TERM);
1165    ord.apply(dst.getBytes(), offset + start, dst.getPosition() - start);
1166    return dst.getPosition() - start;
1167  }
1168
1169  /**
1170   * Encode a Blob value as a byte-for-byte copy. BlobCopy encoding in
1171   * DESCENDING order is NULL terminated so as to preserve proper sorting of
1172   * {@code []} and so it does not support {@code 0x00} in the value.
1173   * @return the number of bytes written.
1174   * @throws IllegalArgumentException when {@code ord} is DESCENDING and
1175   *    {@code val} contains a {@code 0x00} byte.
1176   * @see #encodeBlobCopy(PositionedByteRange, byte[], int, int, Order)
1177   */
1178  public static int encodeBlobCopy(PositionedByteRange dst, byte[] val, Order ord) {
1179    return encodeBlobCopy(dst, val, 0, null != val ? val.length : 0, ord);
1180  }
1181
1182  /**
1183   * Decode a Blob value, byte-for-byte copy.
1184   * @see #encodeBlobCopy(PositionedByteRange, byte[], int, int, Order)
1185   */
1186  public static byte[] decodeBlobCopy(PositionedByteRange src) {
1187    byte header = src.get();
1188    if (header == NULL || header == DESCENDING.apply(NULL)) {
1189      return null;
1190    }
1191    assert header == BLOB_COPY || header == DESCENDING.apply(BLOB_COPY);
1192    Order ord = header == BLOB_COPY ? ASCENDING : DESCENDING;
1193    final int length = src.getRemaining() - (ASCENDING == ord ? 0 : 1);
1194    byte[] ret = new byte[length];
1195    src.get(ret);
1196    ord.apply(ret, 0, ret.length);
1197    // DESCENDING ordered BlobCopy requires a termination bit to preserve
1198    // sort-order semantics of null values.
1199    if (DESCENDING == ord) src.get();
1200    return ret;
1201  }
1202
1203  /**
1204   * Encode a null value.
1205   * @param dst The destination to which encoded digits are written.
1206   * @param ord The {@link Order} to respect while encoding {@code val}.
1207   * @return the number of bytes written.
1208   */
1209  public static int encodeNull(PositionedByteRange dst, Order ord) {
1210    dst.put(ord.apply(NULL));
1211    return 1;
1212  }
1213
1214  /**
1215   * Encode an {@code int8} value using the fixed-length encoding.
1216   * @return the number of bytes written.
1217   * @see #encodeInt64(PositionedByteRange, long, Order)
1218   * @see #decodeInt8(PositionedByteRange)
1219   */
1220  public static int encodeInt8(PositionedByteRange dst, byte val, Order ord) {
1221    final int offset = dst.getOffset(), start = dst.getPosition();
1222    dst.put(FIXED_INT8)
1223       .put((byte) (val ^ 0x80));
1224    ord.apply(dst.getBytes(), offset + start, 2);
1225    return 2;
1226  }
1227
1228  /**
1229   * Decode an {@code int8} value.
1230   * @see #encodeInt8(PositionedByteRange, byte, Order)
1231   */
1232  public static byte decodeInt8(PositionedByteRange src) {
1233    final byte header = src.get();
1234    assert header == FIXED_INT8 || header == DESCENDING.apply(FIXED_INT8);
1235    Order ord = header == FIXED_INT8 ? ASCENDING : DESCENDING;
1236    return (byte)((ord.apply(src.get()) ^ 0x80) & 0xff);
1237  }
1238
1239  /**
1240   * Encode an {@code int16} value using the fixed-length encoding.
1241   * @return the number of bytes written.
1242   * @see #encodeInt64(PositionedByteRange, long, Order)
1243   * @see #decodeInt16(PositionedByteRange)
1244   */
1245  public static int encodeInt16(PositionedByteRange dst, short val, Order ord) {
1246    final int offset = dst.getOffset(), start = dst.getPosition();
1247    dst.put(FIXED_INT16)
1248       .put((byte) ((val >> 8) ^ 0x80))
1249       .put((byte) val);
1250    ord.apply(dst.getBytes(), offset + start, 3);
1251    return 3;
1252  }
1253
1254  /**
1255   * Decode an {@code int16} value.
1256   * @see #encodeInt16(PositionedByteRange, short, Order)
1257   */
1258  public static short decodeInt16(PositionedByteRange src) {
1259    final byte header = src.get();
1260    assert header == FIXED_INT16 || header == DESCENDING.apply(FIXED_INT16);
1261    Order ord = header == FIXED_INT16 ? ASCENDING : DESCENDING;
1262    short val = (short) ((ord.apply(src.get()) ^ 0x80) & 0xff);
1263    val = (short) ((val << 8) + (ord.apply(src.get()) & 0xff));
1264    return val;
1265  }
1266
1267  /**
1268   * Encode an {@code int32} value using the fixed-length encoding.
1269   * @return the number of bytes written.
1270   * @see #encodeInt64(PositionedByteRange, long, Order)
1271   * @see #decodeInt32(PositionedByteRange)
1272   */
1273  public static int encodeInt32(PositionedByteRange dst, int val, Order ord) {
1274    final int offset = dst.getOffset(), start = dst.getPosition();
1275    dst.put(FIXED_INT32)
1276        .put((byte) ((val >> 24) ^ 0x80))
1277        .put((byte) (val >> 16))
1278        .put((byte) (val >> 8))
1279        .put((byte) val);
1280    ord.apply(dst.getBytes(), offset + start, 5);
1281    return 5;
1282  }
1283
1284  /**
1285   * Decode an {@code int32} value.
1286   * @see #encodeInt32(PositionedByteRange, int, Order)
1287   */
1288  public static int decodeInt32(PositionedByteRange src) {
1289    final byte header = src.get();
1290    assert header == FIXED_INT32 || header == DESCENDING.apply(FIXED_INT32);
1291    Order ord = header == FIXED_INT32 ? ASCENDING : DESCENDING;
1292    int val = (ord.apply(src.get()) ^ 0x80) & 0xff;
1293    for (int i = 1; i < 4; i++) {
1294      val = (val << 8) + (ord.apply(src.get()) & 0xff);
1295    }
1296    return val;
1297  }
1298
1299  /**
1300   * Encode an {@code int64} value using the fixed-length encoding.
1301   * <p>
1302   * This format ensures that all longs sort in their natural order, as they
1303   * would sort when using signed long comparison.
1304   * </p>
1305   * <p>
1306   * All Longs are serialized to an 8-byte, fixed-width sortable byte format.
1307   * Serialization is performed by inverting the integer sign bit and writing
1308   * the resulting bytes to the byte array in big endian order. The encoded
1309   * value is prefixed by the {@link #FIXED_INT64} header byte. This encoding
1310   * is designed to handle java language primitives and so Null values are NOT
1311   * supported by this implementation.
1312   * </p>
1313   * <p>
1314   * For example:
1315   * </p>
1316   * <pre>
1317   * Input:   0x0000000000000005 (5)
1318   * Result:  0x288000000000000005
1319   *
1320   * Input:   0xfffffffffffffffb (-4)
1321   * Result:  0x280000000000000004
1322   *
1323   * Input:   0x7fffffffffffffff (Long.MAX_VALUE)
1324   * Result:  0x28ffffffffffffffff
1325   *
1326   * Input:   0x8000000000000000 (Long.MIN_VALUE)
1327   * Result:  0x287fffffffffffffff
1328   * </pre>
1329   * <p>
1330   * This encoding format, and much of this documentation string, is based on
1331   * Orderly's {@code FixedIntWritableRowKey}.
1332   * </p>
1333   * @return the number of bytes written.
1334   * @see #decodeInt64(PositionedByteRange)
1335   */
1336  public static int encodeInt64(PositionedByteRange dst, long val, Order ord) {
1337    final int offset = dst.getOffset(), start = dst.getPosition();
1338    dst.put(FIXED_INT64)
1339       .put((byte) ((val >> 56) ^ 0x80))
1340       .put((byte) (val >> 48))
1341       .put((byte) (val >> 40))
1342       .put((byte) (val >> 32))
1343       .put((byte) (val >> 24))
1344       .put((byte) (val >> 16))
1345       .put((byte) (val >> 8))
1346       .put((byte) val);
1347    ord.apply(dst.getBytes(), offset + start, 9);
1348    return 9;
1349  }
1350
1351  /**
1352   * Decode an {@code int64} value.
1353   * @see #encodeInt64(PositionedByteRange, long, Order)
1354   */
1355  public static long decodeInt64(PositionedByteRange src) {
1356    final byte header = src.get();
1357    assert header == FIXED_INT64 || header == DESCENDING.apply(FIXED_INT64);
1358    Order ord = header == FIXED_INT64 ? ASCENDING : DESCENDING;
1359    long val = (ord.apply(src.get()) ^ 0x80) & 0xff;
1360    for (int i = 1; i < 8; i++) {
1361      val = (val << 8) + (ord.apply(src.get()) & 0xff);
1362    }
1363    return val;
1364  }
1365
1366  /**
1367   * Encode a 32-bit floating point value using the fixed-length encoding.
1368   * Encoding format is described at length in
1369   * {@link #encodeFloat64(PositionedByteRange, double, Order)}.
1370   * @return the number of bytes written.
1371   * @see #decodeFloat32(PositionedByteRange)
1372   * @see #encodeFloat64(PositionedByteRange, double, Order)
1373   */
1374  public static int encodeFloat32(PositionedByteRange dst, float val, Order ord) {
1375    final int offset = dst.getOffset(), start = dst.getPosition();
1376    int i = Float.floatToIntBits(val);
1377    i ^= ((i >> (Integer.SIZE - 1)) | Integer.MIN_VALUE);
1378    dst.put(FIXED_FLOAT32)
1379        .put((byte) (i >> 24))
1380        .put((byte) (i >> 16))
1381        .put((byte) (i >> 8))
1382        .put((byte) i);
1383    ord.apply(dst.getBytes(), offset + start, 5);
1384    return 5;
1385  }
1386
1387  /**
1388   * Decode a 32-bit floating point value using the fixed-length encoding.
1389   * @see #encodeFloat32(PositionedByteRange, float, Order)
1390   */
1391  public static float decodeFloat32(PositionedByteRange src) {
1392    final byte header = src.get();
1393    assert header == FIXED_FLOAT32 || header == DESCENDING.apply(FIXED_FLOAT32);
1394    Order ord = header == FIXED_FLOAT32 ? ASCENDING : DESCENDING;
1395    int val = ord.apply(src.get()) & 0xff;
1396    for (int i = 1; i < 4; i++) {
1397      val = (val << 8) + (ord.apply(src.get()) & 0xff);
1398    }
1399    val ^= (~val >> (Integer.SIZE - 1)) | Integer.MIN_VALUE;
1400    return Float.intBitsToFloat(val);
1401  }
1402
1403  /**
1404   * Encode a 64-bit floating point value using the fixed-length encoding.
1405   * <p>
1406   * This format ensures the following total ordering of floating point
1407   * values: Double.NEGATIVE_INFINITY &lt; -Double.MAX_VALUE &lt; ... &lt;
1408   * -Double.MIN_VALUE &lt; -0.0 &lt; +0.0; &lt; Double.MIN_VALUE &lt; ...
1409   * &lt; Double.MAX_VALUE &lt; Double.POSITIVE_INFINITY &lt; Double.NaN
1410   * </p>
1411   * <p>
1412   * Floating point numbers are encoded as specified in IEEE 754. A 64-bit
1413   * double precision float consists of a sign bit, 11-bit unsigned exponent
1414   * encoded in offset-1023 notation, and a 52-bit significand. The format is
1415   * described further in the <a
1416   * href="http://en.wikipedia.org/wiki/Double_precision"> Double Precision
1417   * Floating Point Wikipedia page</a> </p>
1418   * <p>
1419   * The value of a normal float is -1 <sup>sign bit</sup> &times;
1420   * 2<sup>exponent - 1023</sup> &times; 1.significand
1421   * </p>
1422   * <p>
1423   * The IEE754 floating point format already preserves sort ordering for
1424   * positive floating point numbers when the raw bytes are compared in most
1425   * significant byte order. This is discussed further at <a href=
1426   * "http://www.cygnus-software.com/papers/comparingfloats/comparingfloats.htm"
1427   * > http://www.cygnus-software.com/papers/comparingfloats/comparingfloats.
1428   * htm</a>
1429   * </p>
1430   * <p>
1431   * Thus, we need only ensure that negative numbers sort in the the exact
1432   * opposite order as positive numbers (so that say, negative infinity is
1433   * less than negative 1), and that all negative numbers compare less than
1434   * any positive number. To accomplish this, we invert the sign bit of all
1435   * floating point numbers, and we also invert the exponent and significand
1436   * bits if the floating point number was negative.
1437   * </p>
1438   * <p>
1439   * More specifically, we first store the floating point bits into a 64-bit
1440   * long {@code l} using {@link Double#doubleToLongBits}. This method
1441   * collapses all NaNs into a single, canonical NaN value but otherwise
1442   * leaves the bits unchanged. We then compute
1443   * </p>
1444   * <pre>
1445   * l &circ;= (l &gt;&gt; (Long.SIZE - 1)) | Long.MIN_SIZE
1446   * </pre>
1447   * <p>
1448   * which inverts the sign bit and XOR's all other bits with the sign bit
1449   * itself. Comparing the raw bytes of {@code l} in most significant
1450   * byte order is equivalent to performing a double precision floating point
1451   * comparison on the underlying bits (ignoring NaN comparisons, as NaNs
1452   * don't compare equal to anything when performing floating point
1453   * comparisons).
1454   * </p>
1455   * <p>
1456   * The resulting long integer is then converted into a byte array by
1457   * serializing the long one byte at a time in most significant byte order.
1458   * The serialized integer is prefixed by a single header byte. All
1459   * serialized values are 9 bytes in length.
1460   * </p>
1461   * <p>
1462   * This encoding format, and much of this highly detailed documentation
1463   * string, is based on Orderly's {@code DoubleWritableRowKey}.
1464   * </p>
1465   * @return the number of bytes written.
1466   * @see #decodeFloat64(PositionedByteRange)
1467   */
1468  public static int encodeFloat64(PositionedByteRange dst, double val, Order ord) {
1469    final int offset = dst.getOffset(), start = dst.getPosition();
1470    long lng = Double.doubleToLongBits(val);
1471    lng ^= ((lng >> (Long.SIZE - 1)) | Long.MIN_VALUE);
1472    dst.put(FIXED_FLOAT64)
1473        .put((byte) (lng >> 56))
1474        .put((byte) (lng >> 48))
1475        .put((byte) (lng >> 40))
1476        .put((byte) (lng >> 32))
1477        .put((byte) (lng >> 24))
1478        .put((byte) (lng >> 16))
1479        .put((byte) (lng >> 8))
1480        .put((byte) lng);
1481    ord.apply(dst.getBytes(), offset + start, 9);
1482    return 9;
1483  }
1484
1485  /**
1486   * Decode a 64-bit floating point value using the fixed-length encoding.
1487   * @see #encodeFloat64(PositionedByteRange, double, Order)
1488   */
1489  public static double decodeFloat64(PositionedByteRange src) {
1490    final byte header = src.get();
1491    assert header == FIXED_FLOAT64 || header == DESCENDING.apply(FIXED_FLOAT64);
1492    Order ord = header == FIXED_FLOAT64 ? ASCENDING : DESCENDING;
1493    long val = ord.apply(src.get()) & 0xff;
1494    for (int i = 1; i < 8; i++) {
1495      val = (val << 8) + (ord.apply(src.get()) & 0xff);
1496    }
1497    val ^= (~val >> (Long.SIZE - 1)) | Long.MIN_VALUE;
1498    return Double.longBitsToDouble(val);
1499  }
1500
1501  /**
1502   * Returns true when {@code src} appears to be positioned an encoded value,
1503   * false otherwise.
1504   */
1505  public static boolean isEncodedValue(PositionedByteRange src) {
1506    return isNull(src) || isNumeric(src) || isFixedInt8(src) || isFixedInt16(src)
1507        || isFixedInt32(src) || isFixedInt64(src)
1508        || isFixedFloat32(src) || isFixedFloat64(src) || isText(src) || isBlobCopy(src)
1509        || isBlobVar(src);
1510  }
1511
1512  /**
1513   * Return true when the next encoded value in {@code src} is null, false
1514   * otherwise.
1515   */
1516  public static boolean isNull(PositionedByteRange src) {
1517    return NULL ==
1518        (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1519  }
1520
1521  /**
1522   * Return true when the next encoded value in {@code src} uses Numeric
1523   * encoding, false otherwise. {@code NaN}, {@code +/-Inf} are valid Numeric
1524   * values.
1525   */
1526  public static boolean isNumeric(PositionedByteRange src) {
1527    byte x = (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1528    return x >= NEG_INF && x <= NAN;
1529  }
1530
1531  /**
1532   * Return true when the next encoded value in {@code src} uses Numeric
1533   * encoding and is {@code Infinite}, false otherwise.
1534   */
1535  public static boolean isNumericInfinite(PositionedByteRange src) {
1536    byte x = (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1537    return NEG_INF == x || POS_INF == x;
1538  }
1539
1540  /**
1541   * Return true when the next encoded value in {@code src} uses Numeric
1542   * encoding and is {@code NaN}, false otherwise.
1543   */
1544  public static boolean isNumericNaN(PositionedByteRange src) {
1545    return NAN == (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1546  }
1547
1548  /**
1549   * Return true when the next encoded value in {@code src} uses Numeric
1550   * encoding and is {@code 0}, false otherwise.
1551   */
1552  public static boolean isNumericZero(PositionedByteRange src) {
1553    return ZERO ==
1554        (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1555  }
1556
1557  /**
1558   * Return true when the next encoded value in {@code src} uses fixed-width
1559   * Int8 encoding, false otherwise.
1560   */
1561  public static boolean isFixedInt8(PositionedByteRange src) {
1562    return FIXED_INT8 ==
1563        (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1564  }
1565
1566  /**
1567   * Return true when the next encoded value in {@code src} uses fixed-width
1568   * Int16 encoding, false otherwise.
1569   */
1570  public static boolean isFixedInt16(PositionedByteRange src) {
1571    return FIXED_INT16 ==
1572        (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1573  }
1574
1575  /**
1576   * Return true when the next encoded value in {@code src} uses fixed-width
1577   * Int32 encoding, false otherwise.
1578   */
1579  public static boolean isFixedInt32(PositionedByteRange src) {
1580    return FIXED_INT32 ==
1581        (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1582  }
1583
1584  /**
1585   * Return true when the next encoded value in {@code src} uses fixed-width
1586   * Int64 encoding, false otherwise.
1587   */
1588  public static boolean isFixedInt64(PositionedByteRange src) {
1589    return FIXED_INT64 ==
1590        (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1591  }
1592
1593  /**
1594   * Return true when the next encoded value in {@code src} uses fixed-width
1595   * Float32 encoding, false otherwise.
1596   */
1597  public static boolean isFixedFloat32(PositionedByteRange src) {
1598    return FIXED_FLOAT32 ==
1599        (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1600  }
1601
1602  /**
1603   * Return true when the next encoded value in {@code src} uses fixed-width
1604   * Float64 encoding, false otherwise.
1605   */
1606  public static boolean isFixedFloat64(PositionedByteRange src) {
1607    return FIXED_FLOAT64 ==
1608        (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1609  }
1610
1611  /**
1612   * Return true when the next encoded value in {@code src} uses Text encoding,
1613   * false otherwise.
1614   */
1615  public static boolean isText(PositionedByteRange src) {
1616    return TEXT ==
1617        (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1618  }
1619
1620  /**
1621   * Return true when the next encoded value in {@code src} uses BlobVar
1622   * encoding, false otherwise.
1623   */
1624  public static boolean isBlobVar(PositionedByteRange src) {
1625    return BLOB_VAR ==
1626        (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1627  }
1628
1629  /**
1630   * Return true when the next encoded value in {@code src} uses BlobCopy
1631   * encoding, false otherwise.
1632   */
1633  public static boolean isBlobCopy(PositionedByteRange src) {
1634    return BLOB_COPY ==
1635        (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1636  }
1637
1638  /**
1639   * Skip {@code buff}'s position forward over one encoded value.
1640   * @return number of bytes skipped.
1641   */
1642  public static int skip(PositionedByteRange src) {
1643    final int start = src.getPosition();
1644    byte header = src.get();
1645    Order ord = (-1 == Integer.signum(header)) ? DESCENDING : ASCENDING;
1646    header = ord.apply(header);
1647
1648    switch (header) {
1649      case NULL:
1650      case NEG_INF:
1651        return 1;
1652      case NEG_LARGE: /* Large negative number: 0x08, ~E, ~M */
1653        skipVaruint64(src, DESCENDING != ord);
1654        skipSignificand(src, DESCENDING != ord);
1655        return src.getPosition() - start;
1656      case NEG_MED_MIN: /* Medium negative number: 0x13-E, ~M */
1657      case NEG_MED_MIN + 0x01:
1658      case NEG_MED_MIN + 0x02:
1659      case NEG_MED_MIN + 0x03:
1660      case NEG_MED_MIN + 0x04:
1661      case NEG_MED_MIN + 0x05:
1662      case NEG_MED_MIN + 0x06:
1663      case NEG_MED_MIN + 0x07:
1664      case NEG_MED_MIN + 0x08:
1665      case NEG_MED_MIN + 0x09:
1666      case NEG_MED_MAX:
1667        skipSignificand(src, DESCENDING != ord);
1668        return src.getPosition() - start;
1669      case NEG_SMALL: /* Small negative number: 0x14, -E, ~M */
1670        skipVaruint64(src, DESCENDING == ord);
1671        skipSignificand(src, DESCENDING != ord);
1672        return src.getPosition() - start;
1673      case ZERO:
1674        return 1;
1675      case POS_SMALL: /* Small positive number: 0x16, ~-E, M */
1676        skipVaruint64(src, DESCENDING != ord);
1677        skipSignificand(src, DESCENDING == ord);
1678        return src.getPosition() - start;
1679      case POS_MED_MIN: /* Medium positive number: 0x17+E, M */
1680      case POS_MED_MIN + 0x01:
1681      case POS_MED_MIN + 0x02:
1682      case POS_MED_MIN + 0x03:
1683      case POS_MED_MIN + 0x04:
1684      case POS_MED_MIN + 0x05:
1685      case POS_MED_MIN + 0x06:
1686      case POS_MED_MIN + 0x07:
1687      case POS_MED_MIN + 0x08:
1688      case POS_MED_MIN + 0x09:
1689      case POS_MED_MAX:
1690        skipSignificand(src, DESCENDING == ord);
1691        return src.getPosition() - start;
1692      case POS_LARGE: /* Large positive number: 0x22, E, M */
1693        skipVaruint64(src, DESCENDING == ord);
1694        skipSignificand(src, DESCENDING == ord);
1695        return src.getPosition() - start;
1696      case POS_INF:
1697        return 1;
1698      case NAN:
1699        return 1;
1700      case FIXED_INT8:
1701        src.setPosition(src.getPosition() + 1);
1702        return src.getPosition() - start;
1703      case FIXED_INT16:
1704        src.setPosition(src.getPosition() + 2);
1705        return src.getPosition() - start;
1706      case FIXED_INT32:
1707        src.setPosition(src.getPosition() + 4);
1708        return src.getPosition() - start;
1709      case FIXED_INT64:
1710        src.setPosition(src.getPosition() + 8);
1711        return src.getPosition() - start;
1712      case FIXED_FLOAT32:
1713        src.setPosition(src.getPosition() + 4);
1714        return src.getPosition() - start;
1715      case FIXED_FLOAT64:
1716        src.setPosition(src.getPosition() + 8);
1717        return src.getPosition() - start;
1718      case TEXT:
1719        // for null-terminated values, skip to the end.
1720        do {
1721          header = ord.apply(src.get());
1722        } while (header != TERM);
1723        return src.getPosition() - start;
1724      case BLOB_VAR:
1725        // read until we find a 0 in the MSB
1726        do {
1727          header = ord.apply(src.get());
1728        } while ((byte) (header & 0x80) != TERM);
1729        return src.getPosition() - start;
1730      case BLOB_COPY:
1731        if (Order.DESCENDING == ord) {
1732          // if descending, read to termination byte.
1733          do {
1734            header = ord.apply(src.get());
1735          } while (header != TERM);
1736          return src.getPosition() - start;
1737        } else {
1738          // otherwise, just skip to the end.
1739          src.setPosition(src.getLength());
1740          return src.getPosition() - start;
1741        }
1742      default:
1743        throw unexpectedHeader(header);
1744    }
1745  }
1746
1747  /**
1748   * Return the number of encoded entries remaining in {@code buff}. The
1749   * state of {@code buff} is not modified through use of this method.
1750   */
1751  public static int length(PositionedByteRange buff) {
1752    PositionedByteRange b =
1753        new SimplePositionedMutableByteRange(buff.getBytes(), buff.getOffset(), buff.getLength());
1754    b.setPosition(buff.getPosition());
1755    int cnt = 0;
1756    for (; isEncodedValue(b); skip(b), cnt++)
1757      ;
1758    return cnt;
1759  }
1760}