001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import static org.apache.hadoop.hbase.util.Order.ASCENDING;
021import static org.apache.hadoop.hbase.util.Order.DESCENDING;
022
023import java.math.BigDecimal;
024import java.math.MathContext;
025import java.math.RoundingMode;
026import java.nio.charset.Charset;
027import org.apache.yetus.audience.InterfaceAudience;
028
029/**
030 * Utility class that handles ordered byte arrays. That is, unlike {@link Bytes}, these methods
031 * produce byte arrays which maintain the sort order of the original values.
032 * <h3>Encoding Format summary</h3>
033 * <p>
034 * Each value is encoded as one or more bytes. The first byte of the encoding, its meaning, and a
035 * terse description of the bytes that follow is given by the following table:
036 * </p>
037 * <table summary="Encodings">
038 * <tr>
039 * <th>Content Type</th>
040 * <th>Encoding</th>
041 * </tr>
042 * <tr>
043 * <td>NULL</td>
044 * <td>0x05</td>
045 * </tr>
046 * <tr>
047 * <td>negative infinity</td>
048 * <td>0x07</td>
049 * </tr>
050 * <tr>
051 * <td>negative large</td>
052 * <td>0x08, ~E, ~M</td>
053 * </tr>
054 * <tr>
055 * <td>negative medium</td>
056 * <td>0x13-E, ~M</td>
057 * </tr>
058 * <tr>
059 * <td>negative small</td>
060 * <td>0x14, -E, ~M</td>
061 * </tr>
062 * <tr>
063 * <td>zero</td>
064 * <td>0x15</td>
065 * </tr>
066 * <tr>
067 * <td>positive small</td>
068 * <td>0x16, ~-E, M</td>
069 * </tr>
070 * <tr>
071 * <td>positive medium</td>
072 * <td>0x17+E, M</td>
073 * </tr>
074 * <tr>
075 * <td>positive large</td>
076 * <td>0x22, E, M</td>
077 * </tr>
078 * <tr>
079 * <td>positive infinity</td>
080 * <td>0x23</td>
081 * </tr>
082 * <tr>
083 * <td>NaN</td>
084 * <td>0x25</td>
085 * </tr>
086 * <tr>
087 * <td>fixed-length 32-bit integer</td>
088 * <td>0x27, I</td>
089 * </tr>
090 * <tr>
091 * <td>fixed-length 64-bit integer</td>
092 * <td>0x28, I</td>
093 * </tr>
094 * <tr>
095 * <td>fixed-length 8-bit integer</td>
096 * <td>0x29</td>
097 * </tr>
098 * <tr>
099 * <td>fixed-length 16-bit integer</td>
100 * <td>0x2a</td>
101 * </tr>
102 * <tr>
103 * <td>fixed-length 32-bit float</td>
104 * <td>0x30, F</td>
105 * </tr>
106 * <tr>
107 * <td>fixed-length 64-bit float</td>
108 * <td>0x31, F</td>
109 * </tr>
110 * <tr>
111 * <td>TEXT</td>
112 * <td>0x33, T</td>
113 * </tr>
114 * <tr>
115 * <td>variable length BLOB</td>
116 * <td>0x35, B</td>
117 * </tr>
118 * <tr>
119 * <td>byte-for-byte BLOB</td>
120 * <td>0x36, X</td>
121 * </tr>
122 * </table>
123 * <h3>Null Encoding</h3>
124 * <p>
125 * Each value that is a NULL encodes as a single byte of 0x05. Since every other value encoding
126 * begins with a byte greater than 0x05, this forces NULL values to sort first.
127 * </p>
128 * <h3>Text Encoding</h3>
129 * <p>
130 * Each text value begins with a single byte of 0x33 and ends with a single byte of 0x00. There are
131 * zero or more intervening bytes that encode the text value. The intervening bytes are chosen so
132 * that the encoding will sort in the desired collating order. The intervening bytes may not contain
133 * a 0x00 character; the only 0x00 byte allowed in a text encoding is the final byte.
134 * </p>
135 * <p>
136 * The text encoding ends in 0x00 in order to ensure that when there are two strings where one is a
137 * prefix of the other that the shorter string will sort first.
138 * </p>
139 * <h3>Binary Encoding</h3>
140 * <p>
141 * There are two encoding strategies for binary fields, referred to as "BlobVar" and "BlobCopy".
142 * BlobVar is less efficient in both space and encoding time. It has no limitations on the range of
143 * encoded values. BlobCopy is a byte-for-byte copy of the input data followed by a termination
144 * byte. It is extremely fast to encode and decode. It carries the restriction of not allowing a
145 * 0x00 value in the input byte[] as this value is used as the termination byte.
146 * </p>
147 * <h4>BlobVar</h4>
148 * <p>
149 * "BlobVar" encodes the input byte[] in a manner similar to a variable length integer encoding. As
150 * with the other {@code OrderedBytes} encodings, the first encoded byte is used to indicate what
151 * kind of value follows. This header byte is 0x37 for BlobVar encoded values. As with the
152 * traditional varint encoding, the most significant bit of each subsequent encoded {@code byte} is
153 * used as a continuation marker. The 7 remaining bits contain the 7 most significant bits of the
154 * first unencoded byte. The next encoded byte starts with a continuation marker in the MSB. The
155 * least significant bit from the first unencoded byte follows, and the remaining 6 bits contain the
156 * 6 MSBs of the second unencoded byte. The encoding continues, encoding 7 bytes on to 8 encoded
157 * bytes. The MSB of the final encoded byte contains a termination marker rather than a continuation
158 * marker, and any remaining bits from the final input byte. Any trailing bits in the final encoded
159 * byte are zeros.
160 * </p>
161 * <h4>BlobCopy</h4>
162 * <p>
163 * "BlobCopy" is a simple byte-for-byte copy of the input data. It uses 0x38 as the header byte, and
164 * is terminated by 0x00 in the DESCENDING case. This alternative encoding is faster and more
165 * space-efficient, but it cannot accept values containing a 0x00 byte in DESCENDING order.
166 * </p>
167 * <h3>Variable-length Numeric Encoding</h3>
168 * <p>
169 * Numeric values must be coded so as to sort in numeric order. We assume that numeric values can be
170 * both integer and floating point values. Clients must be careful to use inspection methods for
171 * encoded values (such as {@link #isNumericInfinite(PositionedByteRange)} and
172 * {@link #isNumericNaN(PositionedByteRange)} to protect against decoding values into object which
173 * do not support these numeric concepts (such as {@link Long} and {@link BigDecimal}).
174 * </p>
175 * <p>
176 * Simplest cases first: If the numeric value is a NaN, then the encoding is a single byte of 0x25.
177 * This causes NaN values to sort after every other numeric value.
178 * </p>
179 * <p>
180 * If the numeric value is a negative infinity then the encoding is a single byte of 0x07. Since
181 * every other numeric value except NaN has a larger initial byte, this encoding ensures that
182 * negative infinity will sort prior to every other numeric value other than NaN.
183 * </p>
184 * <p>
185 * If the numeric value is a positive infinity then the encoding is a single byte of 0x23. Every
186 * other numeric value encoding begins with a smaller byte, ensuring that positive infinity always
187 * sorts last among numeric values. 0x23 is also smaller than 0x33, the initial byte of a text
188 * value, ensuring that every numeric value sorts before every text value.
189 * </p>
190 * <p>
191 * If the numeric value is exactly zero then it is encoded as a single byte of 0x15. Finite negative
192 * values will have initial bytes of 0x08 through 0x14 and finite positive values will have initial
193 * bytes of 0x16 through 0x22.
194 * </p>
195 * <p>
196 * For all numeric values, we compute a mantissa M and an exponent E. The mantissa is a base-100
197 * representation of the value. The exponent E determines where to put the decimal point.
198 * </p>
199 * <p>
200 * Each centimal digit of the mantissa is stored in a byte. If the value of the centimal digit is X
201 * (hence X&ge;0 and X&le;99) then the byte value will be 2*X+1 for every byte of the mantissa,
202 * except for the last byte which will be 2*X+0. The mantissa must be the minimum number of bytes
203 * necessary to represent the value; trailing X==0 digits are omitted. This means that the mantissa
204 * will never contain a byte with the value 0x00.
205 * </p>
206 * <p>
207 * If we assume all digits of the mantissa occur to the right of the decimal point, then the
208 * exponent E is the power of one hundred by which one must multiply the mantissa to recover the
209 * original value.
210 * </p>
211 * <p>
212 * Values are classified as large, medium, or small according to the value of E. If E is 11 or more,
213 * the value is large. For E between 0 and 10, the value is medium. For E less than zero, the value
214 * is small.
215 * </p>
216 * <p>
217 * Large positive values are encoded as a single byte 0x22 followed by E as a varint and then M.
218 * Medium positive values are a single byte of 0x17+E followed by M. Small positive values are
219 * encoded as a single byte 0x16 followed by the ones-complement of the varint for -E followed by M.
220 * </p>
221 * <p>
222 * Small negative values are encoded as a single byte 0x14 followed by -E as a varint and then the
223 * ones-complement of M. Medium negative values are encoded as a byte 0x13-E followed by the
224 * ones-complement of M. Large negative values consist of the single byte 0x08 followed by the
225 * ones-complement of the varint encoding of E followed by the ones-complement of M.
226 * </p>
227 * <h3>Fixed-length Integer Encoding</h3>
228 * <p>
229 * All 4-byte integers are serialized to a 5-byte, fixed-width, sortable byte format. All 8-byte
230 * integers are serialized to the equivelant 9-byte format. Serialization is performed by writing a
231 * header byte, inverting the integer sign bit and writing the resulting bytes to the byte array in
232 * big endian order.
233 * </p>
234 * <h3>Fixed-length Floating Point Encoding</h3>
235 * <p>
236 * 32-bit and 64-bit floating point numbers are encoded to a 5-byte and 9-byte encoding format,
237 * respectively. The format is identical, save for the precision respected in each step of the
238 * operation.
239 * <p>
240 * This format ensures the following total ordering of floating point values:
241 * Float.NEGATIVE_INFINITY &lt; -Float.MAX_VALUE &lt; ... &lt; -Float.MIN_VALUE &lt; -0.0 &lt; +0.0;
242 * &lt; Float.MIN_VALUE &lt; ... &lt; Float.MAX_VALUE &lt; Float.POSITIVE_INFINITY &lt; Float.NaN
243 * </p>
244 * <p>
245 * Floating point numbers are encoded as specified in IEEE 754. A 32-bit single precision float
246 * consists of a sign bit, 8-bit unsigned exponent encoded in offset-127 notation, and a 23-bit
247 * significand. The format is described further in the
248 * <a href="http://en.wikipedia.org/wiki/Single_precision"> Single Precision Floating Point
249 * Wikipedia page</a>
250 * </p>
251 * <p>
252 * The value of a normal float is -1 <sup>sign bit</sup> &times; 2<sup>exponent - 127</sup> &times;
253 * 1.significand
254 * </p>
255 * <p>
256 * The IEE754 floating point format already preserves sort ordering for positive floating point
257 * numbers when the raw bytes are compared in most significant byte order. This is discussed further
258 * at <a href= "http://www.cygnus-software.com/papers/comparingfloats/comparingfloats.htm">
259 * http://www.cygnus-software.com/papers/comparingfloats/comparingfloats.htm</a>
260 * </p>
261 * <p>
262 * Thus, we need only ensure that negative numbers sort in the the exact opposite order as positive
263 * numbers (so that say, negative infinity is less than negative 1), and that all negative numbers
264 * compare less than any positive number. To accomplish this, we invert the sign bit of all floating
265 * point numbers, and we also invert the exponent and significand bits if the floating point number
266 * was negative.
267 * </p>
268 * <p>
269 * More specifically, we first store the floating point bits into a 32-bit int {@code j} using
270 * {@link Float#floatToIntBits}. This method collapses all NaNs into a single, canonical NaN value
271 * but otherwise leaves the bits unchanged. We then compute
272 * </p>
273 *
274 * <pre>
275 * j &circ;= (j &gt;&gt; (Integer.SIZE - 1)) | Integer.MIN_SIZE
276 * </pre>
277 * <p>
278 * which inverts the sign bit and XOR's all other bits with the sign bit itself. Comparing the raw
279 * bytes of {@code j} in most significant byte order is equivalent to performing a single precision
280 * floating point comparison on the underlying bits (ignoring NaN comparisons, as NaNs don't compare
281 * equal to anything when performing floating point comparisons).
282 * </p>
283 * <p>
284 * The resulting integer is then converted into a byte array by serializing the integer one byte at
285 * a time in most significant byte order. The serialized integer is prefixed by a single header
286 * byte. All serialized values are 5 bytes in length.
287 * </p>
288 * <p>
289 * {@code OrderedBytes} encodings are heavily influenced by the
290 * <a href="http://sqlite.org/src4/doc/trunk/www/key_encoding.wiki">SQLite4 Key Encoding</a>. Slight
291 * deviations are make in the interest of order correctness and user extensibility. Fixed-width
292 * {@code Long} and {@link Double} encodings are based on implementations from the now defunct
293 * Orderly library.
294 * </p>
295 */
296@InterfaceAudience.Public
297public class OrderedBytes {
298
299  /*
300   * These constants define header bytes used to identify encoded values. Note that the values here
301   * are not exhaustive as the Numeric format encodes portions of its value within the header byte.
302   * The values listed here are directly applied to persisted data -- DO NOT modify the values
303   * specified here. Instead, gaps are placed intentionally between values so that new
304   * implementations can be inserted into the total ordering enforced here.
305   */
306  private static final byte NULL = 0x05;
307  // room for 1 expansion type
308  private static final byte NEG_INF = 0x07;
309  private static final byte NEG_LARGE = 0x08;
310  private static final byte NEG_MED_MIN = 0x09;
311  private static final byte NEG_MED_MAX = 0x13;
312  private static final byte NEG_SMALL = 0x14;
313  private static final byte ZERO = 0x15;
314  private static final byte POS_SMALL = 0x16;
315  private static final byte POS_MED_MIN = 0x17;
316  private static final byte POS_MED_MAX = 0x21;
317  private static final byte POS_LARGE = 0x22;
318  private static final byte POS_INF = 0x23;
319  // room for 2 expansion type
320  private static final byte NAN = 0x26;
321  // room for 2 expansion types
322  private static final byte FIXED_INT8 = 0x29;
323  private static final byte FIXED_INT16 = 0x2a;
324  private static final byte FIXED_INT32 = 0x2b;
325  private static final byte FIXED_INT64 = 0x2c;
326  // room for 3 expansion types
327  private static final byte FIXED_FLOAT32 = 0x30;
328  private static final byte FIXED_FLOAT64 = 0x31;
329  // room for 2 expansion type
330  private static final byte TEXT = 0x34;
331  // room for 2 expansion type
332  private static final byte BLOB_VAR = 0x37;
333  private static final byte BLOB_COPY = 0x38;
334
335  /*
336   * The following constant values are used by encoding implementations
337   */
338
339  public static final Charset UTF8 = Charset.forName("UTF-8");
340  private static final byte TERM = 0x00;
341
342  /**
343   * Max precision guaranteed to fit into a {@code long}.
344   */
345  public static final int MAX_PRECISION = 31;
346
347  /**
348   * The context used to normalize {@link BigDecimal} values.
349   */
350  public static final MathContext DEFAULT_MATH_CONTEXT =
351    new MathContext(MAX_PRECISION, RoundingMode.HALF_UP);
352
353  /**
354   * Creates the standard exception when the encoded header byte is unexpected for the decoding
355   * context.
356   * @param header value used in error message.
357   */
358  private static IllegalArgumentException unexpectedHeader(byte header) {
359    throw new IllegalArgumentException(
360      "unexpected value in first byte: 0x" + Long.toHexString(header));
361  }
362
363  /**
364   * Perform unsigned comparison between two long values. Conforms to the same interface as
365   * {@link org.apache.hadoop.hbase.CellComparator}.
366   */
367  private static int unsignedCmp(long x1, long x2) {
368    int cmp;
369    if ((cmp = (x1 < x2 ? -1 : (x1 == x2 ? 0 : 1))) == 0) return 0;
370    // invert the result when either value is negative
371    if ((x1 < 0) != (x2 < 0)) return -cmp;
372    return cmp;
373  }
374
375  /**
376   * Write a 32-bit unsigned integer to {@code dst} as 4 big-endian bytes.
377   * @return number of bytes written.
378   */
379  private static int putUint32(PositionedByteRange dst, int val) {
380    dst.put((byte) (val >>> 24)).put((byte) (val >>> 16)).put((byte) (val >>> 8)).put((byte) val);
381    return 4;
382  }
383
384  /**
385   * Encode an unsigned 64-bit unsigned integer {@code val} into {@code dst}.
386   * @param dst  The destination to which encoded bytes are written.
387   * @param val  The value to write.
388   * @param comp Compliment the encoded value when {@code comp} is true.
389   * @return number of bytes written.
390   */
391  static int putVaruint64(PositionedByteRange dst, long val, boolean comp) {
392    int w, y, len = 0;
393    final int offset = dst.getOffset(), start = dst.getPosition();
394    byte[] a = dst.getBytes();
395    Order ord = comp ? DESCENDING : ASCENDING;
396    if (-1 == unsignedCmp(val, 241L)) {
397      dst.put((byte) val);
398      len = dst.getPosition() - start;
399      ord.apply(a, offset + start, len);
400      return len;
401    }
402    if (-1 == unsignedCmp(val, 2288L)) {
403      y = (int) (val - 240);
404      dst.put((byte) (y / 256 + 241)).put((byte) (y % 256));
405      len = dst.getPosition() - start;
406      ord.apply(a, offset + start, len);
407      return len;
408    }
409    if (-1 == unsignedCmp(val, 67824L)) {
410      y = (int) (val - 2288);
411      dst.put((byte) 249).put((byte) (y / 256)).put((byte) (y % 256));
412      len = dst.getPosition() - start;
413      ord.apply(a, offset + start, len);
414      return len;
415    }
416    y = (int) val;
417    w = (int) (val >>> 32);
418    if (w == 0) {
419      if (-1 == unsignedCmp(y, 16777216L)) {
420        dst.put((byte) 250).put((byte) (y >>> 16)).put((byte) (y >>> 8)).put((byte) y);
421        len = dst.getPosition() - start;
422        ord.apply(a, offset + start, len);
423        return len;
424      }
425      dst.put((byte) 251);
426      putUint32(dst, y);
427      len = dst.getPosition() - start;
428      ord.apply(a, offset + start, len);
429      return len;
430    }
431    if (-1 == unsignedCmp(w, 256L)) {
432      dst.put((byte) 252).put((byte) w);
433      putUint32(dst, y);
434      len = dst.getPosition() - start;
435      ord.apply(a, offset + start, len);
436      return len;
437    }
438    if (-1 == unsignedCmp(w, 65536L)) {
439      dst.put((byte) 253).put((byte) (w >>> 8)).put((byte) w);
440      putUint32(dst, y);
441      len = dst.getPosition() - start;
442      ord.apply(a, offset + start, len);
443      return len;
444    }
445    if (-1 == unsignedCmp(w, 16777216L)) {
446      dst.put((byte) 254).put((byte) (w >>> 16)).put((byte) (w >>> 8)).put((byte) w);
447      putUint32(dst, y);
448      len = dst.getPosition() - start;
449      ord.apply(a, offset + start, len);
450      return len;
451    }
452    dst.put((byte) 255);
453    putUint32(dst, w);
454    putUint32(dst, y);
455    len = dst.getPosition() - start;
456    ord.apply(a, offset + start, len);
457    return len;
458  }
459
460  /**
461   * Inspect {@code src} for an encoded varuint64 for its length in bytes. Preserves the state of
462   * {@code src}.
463   * @param src  source buffer
464   * @param comp if true, parse the compliment of the value.
465   * @return the number of bytes consumed by this value.
466   */
467  static int lengthVaruint64(PositionedByteRange src, boolean comp) {
468    int a0 = (comp ? DESCENDING : ASCENDING).apply(src.peek()) & 0xff;
469    if (a0 <= 240) return 1;
470    if (a0 <= 248) return 2;
471    if (a0 == 249) return 3;
472    if (a0 == 250) return 4;
473    if (a0 == 251) return 5;
474    if (a0 == 252) return 6;
475    if (a0 == 253) return 7;
476    if (a0 == 254) return 8;
477    if (a0 == 255) return 9;
478    throw unexpectedHeader(src.peek());
479  }
480
481  /**
482   * Skip {@code src} over the encoded varuint64.
483   * @param src source buffer
484   * @param cmp if true, parse the compliment of the value.
485   * @return the number of bytes skipped.
486   */
487  static int skipVaruint64(PositionedByteRange src, boolean cmp) {
488    final int len = lengthVaruint64(src, cmp);
489    src.setPosition(src.getPosition() + len);
490    return len;
491  }
492
493  /**
494   * Decode a sequence of bytes in {@code src} as a varuint64. Compliment the encoded value when
495   * {@code comp} is true.
496   * @return the decoded value.
497   */
498  static long getVaruint64(PositionedByteRange src, boolean comp) {
499    assert src.getRemaining() >= lengthVaruint64(src, comp);
500    final long ret;
501    Order ord = comp ? DESCENDING : ASCENDING;
502    byte x = src.get();
503    final int a0 = ord.apply(x) & 0xff, a1, a2, a3, a4, a5, a6, a7, a8;
504    if (-1 == unsignedCmp(a0, 241)) {
505      return a0;
506    }
507    x = src.get();
508    a1 = ord.apply(x) & 0xff;
509    if (-1 == unsignedCmp(a0, 249)) {
510      return (a0 - 241L) * 256 + a1 + 240;
511    }
512    x = src.get();
513    a2 = ord.apply(x) & 0xff;
514    if (a0 == 249) {
515      return 2288L + 256 * a1 + a2;
516    }
517    x = src.get();
518    a3 = ord.apply(x) & 0xff;
519    if (a0 == 250) {
520      return ((long) a1 << 16L) | (a2 << 8) | a3;
521    }
522    x = src.get();
523    a4 = ord.apply(x) & 0xff;
524    ret = (((long) a1) << 24) | (a2 << 16) | (a3 << 8) | a4;
525    if (a0 == 251) {
526      return ret;
527    }
528    x = src.get();
529    a5 = ord.apply(x) & 0xff;
530    if (a0 == 252) {
531      return (ret << 8) | a5;
532    }
533    x = src.get();
534    a6 = ord.apply(x) & 0xff;
535    if (a0 == 253) {
536      return (ret << 16) | (a5 << 8) | a6;
537    }
538    x = src.get();
539    a7 = ord.apply(x) & 0xff;
540    if (a0 == 254) {
541      return (ret << 24) | (a5 << 16) | (a6 << 8) | a7;
542    }
543    x = src.get();
544    a8 = ord.apply(x) & 0xff;
545    return (ret << 32) | (((long) a5) << 24) | (a6 << 16) | (a7 << 8) | a8;
546  }
547
548  /**
549   * Strip all trailing zeros to ensure that no digit will be zero and round using our default
550   * context to ensure precision doesn't exceed max allowed. From Phoenix's {@code NumberUtil}.
551   * @return new {@link BigDecimal} instance
552   */
553  static BigDecimal normalize(BigDecimal val) {
554    return null == val ? null : val.stripTrailingZeros().round(DEFAULT_MATH_CONTEXT);
555  }
556
557  /**
558   * Read significand digits from {@code src} according to the magnitude of {@code e}.
559   * @param src  The source from which to read encoded digits.
560   * @param e    The magnitude of the first digit read.
561   * @param comp Treat encoded bytes as compliments when {@code comp} is true.
562   * @return The decoded value.
563   * @throws IllegalArgumentException when read exceeds the remaining length of {@code src}.
564   */
565  private static BigDecimal decodeSignificand(PositionedByteRange src, int e, boolean comp) {
566    // TODO: can this be made faster?
567    byte[] a = src.getBytes();
568    final int start = src.getPosition(), offset = src.getOffset(), remaining = src.getRemaining();
569    Order ord = comp ? DESCENDING : ASCENDING;
570    BigDecimal m;
571    StringBuilder sb = new StringBuilder();
572    for (int i = 0;; i++) {
573      if (i > remaining) {
574        // we've exceeded this range's window
575        src.setPosition(start);
576        throw new IllegalArgumentException(
577          "Read exceeds range before termination byte found. offset: " + offset + " position: "
578            + (start + i));
579      }
580      // one byte -> 2 digits
581      // base-100 digits are encoded as val * 2 + 1 except for the termination digit.
582      int twoDigits = (ord.apply(a[offset + start + i]) & 0xff) / 2;
583      sb.append(String.format("%02d", twoDigits));
584      // detect termination digit
585      // Besides, as we will normalise the return value at last,
586      // we only need to decode at most MAX_PRECISION + 2 digits here.
587      if ((ord.apply(a[offset + start + i]) & 1) == 0 || sb.length() > MAX_PRECISION + 1) {
588        src.setPosition(start + i + 1);
589        break;
590      }
591    }
592    m = new BigDecimal(sb.toString());
593    int stepsMoveLeft = sb.charAt(0) != '0' ? m.precision() : m.precision() + 1;
594    stepsMoveLeft -= e * 2;
595
596    return normalize(m.movePointLeft(stepsMoveLeft));
597  }
598
599  /**
600   * Skip {@code src} over the significand bytes.
601   * @param src  The source from which to read encoded digits.
602   * @param comp Treat encoded bytes as compliments when {@code comp} is true.
603   * @return the number of bytes skipped.
604   */
605  private static int skipSignificand(PositionedByteRange src, boolean comp) {
606    byte[] a = src.getBytes();
607    final int offset = src.getOffset(), start = src.getPosition();
608    int i = src.getPosition();
609    while (((comp ? DESCENDING : ASCENDING).apply(a[offset + i++]) & 1) != 0)
610      ;
611    src.setPosition(i);
612    return i - start;
613  }
614
615  /**
616   * <p>
617   * Encode the small magnitude floating point number {@code val} using the key encoding. The caller
618   * guarantees that 1.0 > abs(val) > 0.0.
619   * </p>
620   * <p>
621   * A floating point value is encoded as an integer exponent {@code E} and a mantissa {@code M}.
622   * The original value is equal to {@code (M * 100^E)}. {@code E} is set to the smallest value
623   * possible without making {@code M} greater than or equal to 1.0.
624   * </p>
625   * <p>
626   * For this routine, {@code E} will always be zero or negative, since the original value is less
627   * than one. The encoding written by this routine is the ones-complement of the varint of the
628   * negative of {@code E} followed by the mantissa:
629   *
630   * <pre>
631   *   Encoding:   ~-E  M
632   * </pre>
633   * </p>
634   * @param dst The destination to which encoded digits are written.
635   * @param val The value to encode.
636   * @return the number of bytes written.
637   */
638  private static int encodeNumericSmall(PositionedByteRange dst, BigDecimal val) {
639    // TODO: this can be done faster?
640    // assert 1.0 > abs(val) > 0.0
641    BigDecimal abs = val.abs();
642    assert BigDecimal.ZERO.compareTo(abs) < 0 && BigDecimal.ONE.compareTo(abs) > 0;
643    byte[] a = dst.getBytes();
644    boolean isNeg = val.signum() == -1;
645    final int offset = dst.getOffset(), start = dst.getPosition();
646    int e = 0, startM;
647
648    if (isNeg) { /* Small negative number: 0x14, -E, ~M */
649      dst.put(NEG_SMALL);
650    } else { /* Small positive number: 0x16, ~-E, M */
651      dst.put(POS_SMALL);
652    }
653
654    // normalize abs(val) to determine E
655    int zerosBeforeFirstNonZero = abs.scale() - abs.precision();
656    int lengthToMoveRight =
657      zerosBeforeFirstNonZero % 2 == 0 ? zerosBeforeFirstNonZero : zerosBeforeFirstNonZero - 1;
658    e = lengthToMoveRight / 2;
659    abs = abs.movePointRight(lengthToMoveRight);
660
661    putVaruint64(dst, e, !isNeg); // encode appropriate E value.
662
663    // encode M by peeling off centimal digits, encoding x as 2x+1
664    startM = dst.getPosition();
665    encodeToCentimal(dst, abs);
666    // terminal digit should be 2x
667    a[offset + dst.getPosition() - 1] = (byte) (a[offset + dst.getPosition() - 1] & 0xfe);
668    if (isNeg) {
669      // negative values encoded as ~M
670      DESCENDING.apply(a, offset + startM, dst.getPosition() - startM);
671    }
672    return dst.getPosition() - start;
673  }
674
675  /**
676   * Encode the large magnitude floating point number {@code val} using the key encoding. The caller
677   * guarantees that {@code val} will be finite and abs(val) >= 1.0.
678   * <p>
679   * A floating point value is encoded as an integer exponent {@code E} and a mantissa {@code M}.
680   * The original value is equal to {@code (M * 100^E)}. {@code E} is set to the smallest value
681   * possible without making {@code M} greater than or equal to 1.0.
682   * </p>
683   * <p>
684   * Each centimal digit of the mantissa is stored in a byte. If the value of the centimal digit is
685   * {@code X} (hence {@code X>=0} and {@code X<=99}) then the byte value will be {@code 2*X+1} for
686   * every byte of the mantissa, except for the last byte which will be {@code 2*X+0}. The mantissa
687   * must be the minimum number of bytes necessary to represent the value; trailing {@code X==0}
688   * digits are omitted. This means that the mantissa will never contain a byte with the value
689   * {@code 0x00}.
690   * </p>
691   * <p>
692   * If {@code E > 10}, then this routine writes of {@code E} as a varint followed by the mantissa
693   * as described above. Otherwise, if {@code E <= 10}, this routine only writes the mantissa and
694   * leaves the {@code E} value to be encoded as part of the opening byte of the field by the
695   * calling function.
696   *
697   * <pre>
698   *   Encoding:  M       (if E<=10)
699   *              E M     (if E>10)
700   * </pre>
701   * </p>
702   * @param dst The destination to which encoded digits are written.
703   * @param val The value to encode.
704   * @return the number of bytes written.
705   */
706  private static int encodeNumericLarge(PositionedByteRange dst, BigDecimal val) {
707    // TODO: this can be done faster
708    BigDecimal abs = val.abs();
709    byte[] a = dst.getBytes();
710    boolean isNeg = val.signum() == -1;
711    final int start = dst.getPosition(), offset = dst.getOffset();
712    int e = 0, startM;
713
714    if (isNeg) { /* Large negative number: 0x08, ~E, ~M */
715      dst.put(NEG_LARGE);
716    } else { /* Large positive number: 0x22, E, M */
717      dst.put(POS_LARGE);
718    }
719
720    // normalize abs(val) to determine E
721    int integerDigits = abs.precision() - abs.scale();
722    int lengthToMoveLeft = integerDigits % 2 == 0 ? integerDigits : integerDigits + 1;
723    e = lengthToMoveLeft / 2;
724    abs = abs.movePointLeft(lengthToMoveLeft);
725
726    // encode appropriate header byte and/or E value.
727    if (e > 10) { /* large number, write out {~,}E */
728      putVaruint64(dst, e, isNeg);
729    } else {
730      if (isNeg) { /* Medium negative number: 0x13-E, ~M */
731        dst.put(start, (byte) (NEG_MED_MAX - e));
732      } else { /* Medium positive number: 0x17+E, M */
733        dst.put(start, (byte) (POS_MED_MIN + e));
734      }
735    }
736
737    // encode M by peeling off centimal digits, encoding x as 2x+1
738    startM = dst.getPosition();
739    encodeToCentimal(dst, abs);
740    // terminal digit should be 2x
741    a[offset + dst.getPosition() - 1] = (byte) (a[offset + dst.getPosition() - 1] & 0xfe);
742    if (isNeg) {
743      // negative values encoded as ~M
744      DESCENDING.apply(a, offset + startM, dst.getPosition() - startM);
745    }
746    return dst.getPosition() - start;
747  }
748
749  /**
750   * Encode a value val in [0.01, 1.0) into Centimals. Util function for
751   * {@link OrderedBytes#encodeNumericLarge(PositionedByteRange, BigDecimal) and
752   * {@link OrderedBytes#encodeNumericSmall(PositionedByteRange, BigDecimal)}
753   * @param dst The destination to which encoded digits are written.
754   * @param val A BigDecimal after the normalization. The value must be in [0.01, 1.0).
755   */
756  private static void encodeToCentimal(PositionedByteRange dst, BigDecimal val) {
757    // The input value val must be in [0.01, 1.0)
758    String stringOfAbs = val.stripTrailingZeros().toPlainString();
759    String value = stringOfAbs.substring(stringOfAbs.indexOf('.') + 1);
760    int d;
761
762    // If the first float digit is 0, we will encode one digit more than MAX_PRECISION
763    // We encode at most MAX_PRECISION significant digits into centimals,
764    // because the input value, has been already normalized.
765    int maxPrecision = value.charAt(0) == '0' ? MAX_PRECISION + 1 : MAX_PRECISION;
766    maxPrecision = Math.min(maxPrecision, value.length());
767    for (int i = 0; i < maxPrecision; i += 2) {
768      d = (value.charAt(i) - '0') * 10;
769      if (i + 1 < maxPrecision) {
770        d += (value.charAt(i + 1) - '0');
771      }
772      dst.put((byte) (2 * d + 1));
773    }
774  }
775
776  /**
777   * Encode a numerical value using the variable-length encoding.
778   * @param dst The destination to which encoded digits are written.
779   * @param val The value to encode.
780   * @param ord The {@link Order} to respect while encoding {@code val}.
781   * @return the number of bytes written.
782   */
783  public static int encodeNumeric(PositionedByteRange dst, long val, Order ord) {
784    return encodeNumeric(dst, BigDecimal.valueOf(val), ord);
785  }
786
787  /**
788   * Encode a numerical value using the variable-length encoding.
789   * @param dst The destination to which encoded digits are written.
790   * @param val The value to encode.
791   * @param ord The {@link Order} to respect while encoding {@code val}.
792   * @return the number of bytes written.
793   */
794  public static int encodeNumeric(PositionedByteRange dst, double val, Order ord) {
795    if (val == 0.0) {
796      dst.put(ord.apply(ZERO));
797      return 1;
798    }
799    if (Double.isNaN(val)) {
800      dst.put(ord.apply(NAN));
801      return 1;
802    }
803    if (val == Double.NEGATIVE_INFINITY) {
804      dst.put(ord.apply(NEG_INF));
805      return 1;
806    }
807    if (val == Double.POSITIVE_INFINITY) {
808      dst.put(ord.apply(POS_INF));
809      return 1;
810    }
811    return encodeNumeric(dst, BigDecimal.valueOf(val), ord);
812  }
813
814  /**
815   * Encode a numerical value using the variable-length encoding. If the number of significant
816   * digits of the value exceeds the {@link OrderedBytes#MAX_PRECISION}, the exceeding part will be
817   * lost.
818   * @param dst The destination to which encoded digits are written.
819   * @param val The value to encode.
820   * @param ord The {@link Order} to respect while encoding {@code val}.
821   * @return the number of bytes written.
822   */
823  public static int encodeNumeric(PositionedByteRange dst, BigDecimal val, Order ord) {
824    final int len, offset = dst.getOffset(), start = dst.getPosition();
825    if (null == val) {
826      return encodeNull(dst, ord);
827    } else if (BigDecimal.ZERO.compareTo(val) == 0) {
828      dst.put(ord.apply(ZERO));
829      return 1;
830    }
831    BigDecimal abs = val.abs();
832    if (BigDecimal.ONE.compareTo(abs) <= 0) { // abs(v) >= 1.0
833      len = encodeNumericLarge(dst, normalize(val));
834    } else { // 1.0 > abs(v) >= 0.0
835      len = encodeNumericSmall(dst, normalize(val));
836    }
837    ord.apply(dst.getBytes(), offset + start, len);
838    return len;
839  }
840
841  /**
842   * Decode a {@link BigDecimal} from {@code src}. Assumes {@code src} encodes a value in Numeric
843   * encoding and is within the valid range of {@link BigDecimal} values. {@link BigDecimal} does
844   * not support {@code NaN} or {@code Infinte} values.
845   * @see #decodeNumericAsDouble(PositionedByteRange)
846   */
847  private static BigDecimal decodeNumericValue(PositionedByteRange src) {
848    final int e;
849    byte header = src.get();
850    boolean dsc = -1 == Integer.signum(header);
851    header = dsc ? DESCENDING.apply(header) : header;
852
853    if (header == NULL) return null;
854    if (header == NEG_LARGE) { /* Large negative number: 0x08, ~E, ~M */
855      e = (int) getVaruint64(src, !dsc);
856      return decodeSignificand(src, e, !dsc).negate();
857    }
858    if (header >= NEG_MED_MIN && header <= NEG_MED_MAX) {
859      /* Medium negative number: 0x13-E, ~M */
860      e = NEG_MED_MAX - header;
861      return decodeSignificand(src, e, !dsc).negate();
862    }
863    if (header == NEG_SMALL) { /* Small negative number: 0x14, -E, ~M */
864      e = (int) -getVaruint64(src, dsc);
865      return decodeSignificand(src, e, !dsc).negate();
866    }
867    if (header == ZERO) {
868      return BigDecimal.ZERO;
869    }
870    if (header == POS_SMALL) { /* Small positive number: 0x16, ~-E, M */
871      e = (int) -getVaruint64(src, !dsc);
872      return decodeSignificand(src, e, dsc);
873    }
874    if (header >= POS_MED_MIN && header <= POS_MED_MAX) {
875      /* Medium positive number: 0x17+E, M */
876      e = header - POS_MED_MIN;
877      return decodeSignificand(src, e, dsc);
878    }
879    if (header == POS_LARGE) { /* Large positive number: 0x22, E, M */
880      e = (int) getVaruint64(src, dsc);
881      return decodeSignificand(src, e, dsc);
882    }
883    throw unexpectedHeader(header);
884  }
885
886  /**
887   * Decode a primitive {@code double} value from the Numeric encoding. Numeric encoding is based on
888   * {@link BigDecimal}; in the event the encoded value is larger than can be represented in a
889   * {@code double}, this method performs an implicit narrowing conversion as described in
890   * {@link BigDecimal#doubleValue()}.
891   * @throws NullPointerException     when the encoded value is {@code NULL}.
892   * @throws IllegalArgumentException when the encoded value is not a Numeric.
893   * @see #encodeNumeric(PositionedByteRange, double, Order)
894   * @see BigDecimal#doubleValue()
895   */
896  public static double decodeNumericAsDouble(PositionedByteRange src) {
897    // TODO: should an encoded NULL value throw unexpectedHeader() instead?
898    if (isNull(src)) {
899      throw new NullPointerException("A null value cannot be decoded to a double.");
900    }
901    if (isNumericNaN(src)) {
902      src.get();
903      return Double.NaN;
904    }
905    if (isNumericZero(src)) {
906      src.get();
907      return Double.valueOf(0.0);
908    }
909
910    byte header = -1 == Integer.signum(src.peek()) ? DESCENDING.apply(src.peek()) : src.peek();
911
912    if (header == NEG_INF) {
913      src.get();
914      return Double.NEGATIVE_INFINITY;
915    } else if (header == POS_INF) {
916      src.get();
917      return Double.POSITIVE_INFINITY;
918    } else {
919      return decodeNumericValue(src).doubleValue();
920    }
921  }
922
923  /**
924   * Decode a primitive {@code long} value from the Numeric encoding. Numeric encoding is based on
925   * {@link BigDecimal}; in the event the encoded value is larger than can be represented in a
926   * {@code long}, this method performs an implicit narrowing conversion as described in
927   * {@link BigDecimal#doubleValue()}.
928   * @throws NullPointerException     when the encoded value is {@code NULL}.
929   * @throws IllegalArgumentException when the encoded value is not a Numeric.
930   * @see #encodeNumeric(PositionedByteRange, long, Order)
931   * @see BigDecimal#longValue()
932   */
933  public static long decodeNumericAsLong(PositionedByteRange src) {
934    // TODO: should an encoded NULL value throw unexpectedHeader() instead?
935    if (isNull(src)) throw new NullPointerException();
936    if (!isNumeric(src)) throw unexpectedHeader(src.peek());
937    if (isNumericNaN(src)) throw unexpectedHeader(src.peek());
938    if (isNumericInfinite(src)) throw unexpectedHeader(src.peek());
939
940    if (isNumericZero(src)) {
941      src.get();
942      return Long.valueOf(0);
943    }
944    return decodeNumericValue(src).longValue();
945  }
946
947  /**
948   * Decode a {@link BigDecimal} value from the variable-length encoding.
949   * @throws IllegalArgumentException when the encoded value is not a Numeric.
950   * @see #encodeNumeric(PositionedByteRange, BigDecimal, Order)
951   */
952  public static BigDecimal decodeNumericAsBigDecimal(PositionedByteRange src) {
953    if (isNull(src)) {
954      src.get();
955      return null;
956    }
957    if (!isNumeric(src)) throw unexpectedHeader(src.peek());
958    if (isNumericNaN(src)) throw unexpectedHeader(src.peek());
959    if (isNumericInfinite(src)) throw unexpectedHeader(src.peek());
960    return decodeNumericValue(src);
961  }
962
963  /**
964   * Encode a String value. String encoding is 0x00-terminated and so it does not support
965   * {@code \u0000} codepoints in the value.
966   * @param dst The destination to which the encoded value is written.
967   * @param val The value to encode.
968   * @param ord The {@link Order} to respect while encoding {@code val}.
969   * @return the number of bytes written.
970   * @throws IllegalArgumentException when {@code val} contains a {@code \u0000}.
971   */
972  public static int encodeString(PositionedByteRange dst, String val, Order ord) {
973    if (null == val) {
974      return encodeNull(dst, ord);
975    }
976    if (val.contains("\u0000"))
977      throw new IllegalArgumentException("Cannot encode String values containing '\\u0000'");
978    final int offset = dst.getOffset(), start = dst.getPosition();
979    dst.put(TEXT);
980    // TODO: is there no way to decode into dst directly?
981    dst.put(val.getBytes(UTF8));
982    dst.put(TERM);
983    ord.apply(dst.getBytes(), offset + start, dst.getPosition() - start);
984    return dst.getPosition() - start;
985  }
986
987  /**
988   * Decode a String value.
989   */
990  public static String decodeString(PositionedByteRange src) {
991    final byte header = src.get();
992    if (header == NULL || header == DESCENDING.apply(NULL)) return null;
993    assert header == TEXT || header == DESCENDING.apply(TEXT);
994    Order ord = header == TEXT ? ASCENDING : DESCENDING;
995    byte[] a = src.getBytes();
996    final int offset = src.getOffset(), start = src.getPosition();
997    final byte terminator = ord.apply(TERM);
998    int rawStartPos = offset + start, rawTermPos = rawStartPos;
999    for (; a[rawTermPos] != terminator; rawTermPos++)
1000      ;
1001    src.setPosition(rawTermPos - offset + 1); // advance position to TERM + 1
1002    if (DESCENDING == ord) {
1003      // make a copy so that we don't disturb encoded value with ord.
1004      byte[] copy = new byte[rawTermPos - rawStartPos];
1005      System.arraycopy(a, rawStartPos, copy, 0, copy.length);
1006      ord.apply(copy);
1007      return new String(copy, UTF8);
1008    } else {
1009      return new String(a, rawStartPos, rawTermPos - rawStartPos, UTF8);
1010    }
1011  }
1012
1013  /**
1014   * Calculate the expected BlobVar encoded length based on unencoded length.
1015   */
1016  public static int blobVarEncodedLength(int len) {
1017    if (0 == len) return 2; // 1-byte header + 1-byte terminator
1018    else return (int) Math.ceil((len * 8) // 8-bits per input byte
1019      / 7.0) // 7-bits of input data per encoded byte, rounded up
1020      + 1; // + 1-byte header
1021  }
1022
1023  /**
1024   * Calculate the expected BlobVar decoded length based on encoded length.
1025   */
1026  static int blobVarDecodedLength(int len) {
1027    return ((len - 1) // 1-byte header
1028      * 7) // 7-bits of payload per encoded byte
1029      / 8; // 8-bits per byte
1030  }
1031
1032  /**
1033   * Encode a Blob value using a modified varint encoding scheme.
1034   * <p>
1035   * This format encodes a byte[] value such that no limitations on the input value are imposed. The
1036   * first byte encodes the encoding scheme that follows, {@link #BLOB_VAR}. Each encoded byte
1037   * thereafter consists of a header bit followed by 7 bits of payload. A header bit of '1'
1038   * indicates continuation of the encoding. A header bit of '0' indicates this byte contains the
1039   * last of the payload. An empty input value is encoded as the header byte immediately followed by
1040   * a termination byte {@code 0x00}. This is not ambiguous with the encoded value of {@code []},
1041   * which results in {@code [0x80, 0x00]}.
1042   * </p>
1043   * @return the number of bytes written.
1044   */
1045  public static int encodeBlobVar(PositionedByteRange dst, byte[] val, int voff, int vlen,
1046    Order ord) {
1047    if (null == val) {
1048      return encodeNull(dst, ord);
1049    }
1050    // Empty value is null-terminated. All other values are encoded as 7-bits per byte.
1051    assert dst.getRemaining() >= blobVarEncodedLength(vlen) : "buffer overflow expected.";
1052    final int offset = dst.getOffset(), start = dst.getPosition();
1053    dst.put(BLOB_VAR);
1054    if (0 == vlen) {
1055      dst.put(TERM);
1056    } else {
1057      byte s = 1, t = 0;
1058      for (int i = voff; i < vlen; i++) {
1059        dst.put((byte) (0x80 | t | ((val[i] & 0xff) >>> s)));
1060        if (s < 7) {
1061          t = (byte) (val[i] << (7 - s));
1062          s++;
1063        } else {
1064          dst.put((byte) (0x80 | val[i]));
1065          s = 1;
1066          t = 0;
1067        }
1068      }
1069      if (s > 1) {
1070        dst.put((byte) (0x7f & t));
1071      } else {
1072        dst.getBytes()[offset + dst.getPosition() - 1] =
1073          (byte) (dst.getBytes()[offset + dst.getPosition() - 1] & 0x7f);
1074      }
1075    }
1076    ord.apply(dst.getBytes(), offset + start, dst.getPosition() - start);
1077    return dst.getPosition() - start;
1078  }
1079
1080  /**
1081   * Encode a blob value using a modified varint encoding scheme.
1082   * @return the number of bytes written.
1083   * @see #encodeBlobVar(PositionedByteRange, byte[], int, int, Order)
1084   */
1085  public static int encodeBlobVar(PositionedByteRange dst, byte[] val, Order ord) {
1086    return encodeBlobVar(dst, val, 0, null != val ? val.length : 0, ord);
1087  }
1088
1089  /**
1090   * Decode a blob value that was encoded using BlobVar encoding.
1091   */
1092  public static byte[] decodeBlobVar(PositionedByteRange src) {
1093    final byte header = src.get();
1094    if (header == NULL || header == DESCENDING.apply(NULL)) {
1095      return null;
1096    }
1097    assert header == BLOB_VAR || header == DESCENDING.apply(BLOB_VAR);
1098    Order ord = BLOB_VAR == header ? ASCENDING : DESCENDING;
1099    if (src.peek() == ord.apply(TERM)) {
1100      // skip empty input buffer.
1101      src.get();
1102      return new byte[0];
1103    }
1104    final int offset = src.getOffset(), start = src.getPosition();
1105    int end;
1106    byte[] a = src.getBytes();
1107    for (end = start; (byte) (ord.apply(a[offset + end]) & 0x80) != TERM; end++)
1108      ;
1109    end++; // increment end to 1-past last byte
1110    // create ret buffer using length of encoded data + 1 (header byte)
1111    PositionedByteRange ret =
1112      new SimplePositionedMutableByteRange(blobVarDecodedLength(end - start + 1));
1113    int s = 6;
1114    byte t = (byte) ((ord.apply(a[offset + start]) << 1) & 0xff);
1115    for (int i = start + 1; i < end; i++) {
1116      if (s == 7) {
1117        ret.put((byte) (t | (ord.apply(a[offset + i]) & 0x7f)));
1118        i++;
1119        // explicitly reset t -- clean up overflow buffer after decoding
1120        // a full cycle and retain assertion condition below. This happens
1121        t = 0; // when the LSB in the last encoded byte is 1. (HBASE-9893)
1122      } else {
1123        ret.put((byte) (t | ((ord.apply(a[offset + i]) & 0x7f) >>> s)));
1124      }
1125      if (i == end) break;
1126      t = (byte) ((ord.apply(a[offset + i]) << (8 - s)) & 0xff);
1127      s = s == 1 ? 7 : s - 1;
1128    }
1129    src.setPosition(end);
1130    assert t == 0 : "Unexpected bits remaining after decoding blob.";
1131    assert ret.getPosition() == ret.getLength() : "Allocated unnecessarily large return buffer.";
1132    return ret.getBytes();
1133  }
1134
1135  /**
1136   * Encode a Blob value as a byte-for-byte copy. BlobCopy encoding in DESCENDING order is NULL
1137   * terminated so as to preserve proper sorting of {@code []} and so it does not support
1138   * {@code 0x00} in the value.
1139   * @return the number of bytes written.
1140   * @throws IllegalArgumentException when {@code ord} is DESCENDING and {@code val} contains a
1141   *                                  {@code 0x00} byte.
1142   */
1143  public static int encodeBlobCopy(PositionedByteRange dst, byte[] val, int voff, int vlen,
1144    Order ord) {
1145    if (null == val) {
1146      encodeNull(dst, ord);
1147      if (ASCENDING == ord) return 1;
1148      else {
1149        // DESCENDING ordered BlobCopy requires a termination bit to preserve
1150        // sort-order semantics of null values.
1151        dst.put(ord.apply(TERM));
1152        return 2;
1153      }
1154    }
1155    // Blobs as final entry in a compound key are written unencoded.
1156    assert dst.getRemaining() >= vlen + (ASCENDING == ord ? 1 : 2);
1157    if (DESCENDING == ord) {
1158      for (int i = 0; i < vlen; i++) {
1159        if (TERM == val[voff + i]) {
1160          throw new IllegalArgumentException("0x00 bytes not permitted in value.");
1161        }
1162      }
1163    }
1164    final int offset = dst.getOffset(), start = dst.getPosition();
1165    dst.put(BLOB_COPY);
1166    dst.put(val, voff, vlen);
1167    // DESCENDING ordered BlobCopy requires a termination bit to preserve
1168    // sort-order semantics of null values.
1169    if (DESCENDING == ord) dst.put(TERM);
1170    ord.apply(dst.getBytes(), offset + start, dst.getPosition() - start);
1171    return dst.getPosition() - start;
1172  }
1173
1174  /**
1175   * Encode a Blob value as a byte-for-byte copy. BlobCopy encoding in DESCENDING order is NULL
1176   * terminated so as to preserve proper sorting of {@code []} and so it does not support
1177   * {@code 0x00} in the value.
1178   * @return the number of bytes written.
1179   * @throws IllegalArgumentException when {@code ord} is DESCENDING and {@code val} contains a
1180   *                                  {@code 0x00} byte.
1181   * @see #encodeBlobCopy(PositionedByteRange, byte[], int, int, Order)
1182   */
1183  public static int encodeBlobCopy(PositionedByteRange dst, byte[] val, Order ord) {
1184    return encodeBlobCopy(dst, val, 0, null != val ? val.length : 0, ord);
1185  }
1186
1187  /**
1188   * Decode a Blob value, byte-for-byte copy.
1189   * @see #encodeBlobCopy(PositionedByteRange, byte[], int, int, Order)
1190   */
1191  public static byte[] decodeBlobCopy(PositionedByteRange src) {
1192    byte header = src.get();
1193    if (header == NULL || header == DESCENDING.apply(NULL)) {
1194      return null;
1195    }
1196    assert header == BLOB_COPY || header == DESCENDING.apply(BLOB_COPY);
1197    Order ord = header == BLOB_COPY ? ASCENDING : DESCENDING;
1198    final int length = src.getRemaining() - (ASCENDING == ord ? 0 : 1);
1199    byte[] ret = new byte[length];
1200    src.get(ret);
1201    ord.apply(ret, 0, ret.length);
1202    // DESCENDING ordered BlobCopy requires a termination bit to preserve
1203    // sort-order semantics of null values.
1204    if (DESCENDING == ord) src.get();
1205    return ret;
1206  }
1207
1208  /**
1209   * Encode a null value.
1210   * @param dst The destination to which encoded digits are written.
1211   * @param ord The {@link Order} to respect while encoding {@code val}.
1212   * @return the number of bytes written.
1213   */
1214  public static int encodeNull(PositionedByteRange dst, Order ord) {
1215    dst.put(ord.apply(NULL));
1216    return 1;
1217  }
1218
1219  /**
1220   * Encode an {@code int8} value using the fixed-length encoding.
1221   * @return the number of bytes written.
1222   * @see #encodeInt64(PositionedByteRange, long, Order)
1223   * @see #decodeInt8(PositionedByteRange)
1224   */
1225  public static int encodeInt8(PositionedByteRange dst, byte val, Order ord) {
1226    final int offset = dst.getOffset(), start = dst.getPosition();
1227    dst.put(FIXED_INT8).put((byte) (val ^ 0x80));
1228    ord.apply(dst.getBytes(), offset + start, 2);
1229    return 2;
1230  }
1231
1232  /**
1233   * Decode an {@code int8} value.
1234   * @see #encodeInt8(PositionedByteRange, byte, Order)
1235   */
1236  public static byte decodeInt8(PositionedByteRange src) {
1237    final byte header = src.get();
1238    assert header == FIXED_INT8 || header == DESCENDING.apply(FIXED_INT8);
1239    Order ord = header == FIXED_INT8 ? ASCENDING : DESCENDING;
1240    return (byte) ((ord.apply(src.get()) ^ 0x80) & 0xff);
1241  }
1242
1243  /**
1244   * Encode an {@code int16} value using the fixed-length encoding.
1245   * @return the number of bytes written.
1246   * @see #encodeInt64(PositionedByteRange, long, Order)
1247   * @see #decodeInt16(PositionedByteRange)
1248   */
1249  public static int encodeInt16(PositionedByteRange dst, short val, Order ord) {
1250    final int offset = dst.getOffset(), start = dst.getPosition();
1251    dst.put(FIXED_INT16).put((byte) ((val >> 8) ^ 0x80)).put((byte) val);
1252    ord.apply(dst.getBytes(), offset + start, 3);
1253    return 3;
1254  }
1255
1256  /**
1257   * Decode an {@code int16} value.
1258   * @see #encodeInt16(PositionedByteRange, short, Order)
1259   */
1260  public static short decodeInt16(PositionedByteRange src) {
1261    final byte header = src.get();
1262    assert header == FIXED_INT16 || header == DESCENDING.apply(FIXED_INT16);
1263    Order ord = header == FIXED_INT16 ? ASCENDING : DESCENDING;
1264    short val = (short) ((ord.apply(src.get()) ^ 0x80) & 0xff);
1265    val = (short) ((val << 8) + (ord.apply(src.get()) & 0xff));
1266    return val;
1267  }
1268
1269  /**
1270   * Encode an {@code int32} value using the fixed-length encoding.
1271   * @return the number of bytes written.
1272   * @see #encodeInt64(PositionedByteRange, long, Order)
1273   * @see #decodeInt32(PositionedByteRange)
1274   */
1275  public static int encodeInt32(PositionedByteRange dst, int val, Order ord) {
1276    final int offset = dst.getOffset(), start = dst.getPosition();
1277    dst.put(FIXED_INT32).put((byte) ((val >> 24) ^ 0x80)).put((byte) (val >> 16))
1278      .put((byte) (val >> 8)).put((byte) val);
1279    ord.apply(dst.getBytes(), offset + start, 5);
1280    return 5;
1281  }
1282
1283  /**
1284   * Decode an {@code int32} value.
1285   * @see #encodeInt32(PositionedByteRange, int, Order)
1286   */
1287  public static int decodeInt32(PositionedByteRange src) {
1288    final byte header = src.get();
1289    assert header == FIXED_INT32 || header == DESCENDING.apply(FIXED_INT32);
1290    Order ord = header == FIXED_INT32 ? ASCENDING : DESCENDING;
1291    int val = (ord.apply(src.get()) ^ 0x80) & 0xff;
1292    for (int i = 1; i < 4; i++) {
1293      val = (val << 8) + (ord.apply(src.get()) & 0xff);
1294    }
1295    return val;
1296  }
1297
1298  /**
1299   * Encode an {@code int64} value using the fixed-length encoding.
1300   * <p>
1301   * This format ensures that all longs sort in their natural order, as they would sort when using
1302   * signed long comparison.
1303   * </p>
1304   * <p>
1305   * All Longs are serialized to an 8-byte, fixed-width sortable byte format. Serialization is
1306   * performed by inverting the integer sign bit and writing the resulting bytes to the byte array
1307   * in big endian order. The encoded value is prefixed by the {@link #FIXED_INT64} header byte.
1308   * This encoding is designed to handle java language primitives and so Null values are NOT
1309   * supported by this implementation.
1310   * </p>
1311   * <p>
1312   * For example:
1313   * </p>
1314   *
1315   * <pre>
1316   * Input:   0x0000000000000005 (5)
1317   * Result:  0x288000000000000005
1318   *
1319   * Input:   0xfffffffffffffffb (-4)
1320   * Result:  0x280000000000000004
1321   *
1322   * Input:   0x7fffffffffffffff (Long.MAX_VALUE)
1323   * Result:  0x28ffffffffffffffff
1324   *
1325   * Input:   0x8000000000000000 (Long.MIN_VALUE)
1326   * Result:  0x287fffffffffffffff
1327   * </pre>
1328   * <p>
1329   * This encoding format, and much of this documentation string, is based on Orderly's
1330   * {@code FixedIntWritableRowKey}.
1331   * </p>
1332   * @return the number of bytes written.
1333   * @see #decodeInt64(PositionedByteRange)
1334   */
1335  public static int encodeInt64(PositionedByteRange dst, long val, Order ord) {
1336    final int offset = dst.getOffset(), start = dst.getPosition();
1337    dst.put(FIXED_INT64).put((byte) ((val >> 56) ^ 0x80)).put((byte) (val >> 48))
1338      .put((byte) (val >> 40)).put((byte) (val >> 32)).put((byte) (val >> 24))
1339      .put((byte) (val >> 16)).put((byte) (val >> 8)).put((byte) val);
1340    ord.apply(dst.getBytes(), offset + start, 9);
1341    return 9;
1342  }
1343
1344  /**
1345   * Decode an {@code int64} value.
1346   * @see #encodeInt64(PositionedByteRange, long, Order)
1347   */
1348  public static long decodeInt64(PositionedByteRange src) {
1349    final byte header = src.get();
1350    assert header == FIXED_INT64 || header == DESCENDING.apply(FIXED_INT64);
1351    Order ord = header == FIXED_INT64 ? ASCENDING : DESCENDING;
1352    long val = (ord.apply(src.get()) ^ 0x80) & 0xff;
1353    for (int i = 1; i < 8; i++) {
1354      val = (val << 8) + (ord.apply(src.get()) & 0xff);
1355    }
1356    return val;
1357  }
1358
1359  /**
1360   * Encode a 32-bit floating point value using the fixed-length encoding. Encoding format is
1361   * described at length in {@link #encodeFloat64(PositionedByteRange, double, Order)}.
1362   * @return the number of bytes written.
1363   * @see #decodeFloat32(PositionedByteRange)
1364   * @see #encodeFloat64(PositionedByteRange, double, Order)
1365   */
1366  public static int encodeFloat32(PositionedByteRange dst, float val, Order ord) {
1367    final int offset = dst.getOffset(), start = dst.getPosition();
1368    int i = Float.floatToIntBits(val);
1369    i ^= ((i >> (Integer.SIZE - 1)) | Integer.MIN_VALUE);
1370    dst.put(FIXED_FLOAT32).put((byte) (i >> 24)).put((byte) (i >> 16)).put((byte) (i >> 8))
1371      .put((byte) i);
1372    ord.apply(dst.getBytes(), offset + start, 5);
1373    return 5;
1374  }
1375
1376  /**
1377   * Decode a 32-bit floating point value using the fixed-length encoding.
1378   * @see #encodeFloat32(PositionedByteRange, float, Order)
1379   */
1380  public static float decodeFloat32(PositionedByteRange src) {
1381    final byte header = src.get();
1382    assert header == FIXED_FLOAT32 || header == DESCENDING.apply(FIXED_FLOAT32);
1383    Order ord = header == FIXED_FLOAT32 ? ASCENDING : DESCENDING;
1384    int val = ord.apply(src.get()) & 0xff;
1385    for (int i = 1; i < 4; i++) {
1386      val = (val << 8) + (ord.apply(src.get()) & 0xff);
1387    }
1388    val ^= (~val >> (Integer.SIZE - 1)) | Integer.MIN_VALUE;
1389    return Float.intBitsToFloat(val);
1390  }
1391
1392  /**
1393   * Encode a 64-bit floating point value using the fixed-length encoding.
1394   * <p>
1395   * This format ensures the following total ordering of floating point values:
1396   * Double.NEGATIVE_INFINITY &lt; -Double.MAX_VALUE &lt; ... &lt; -Double.MIN_VALUE &lt; -0.0 &lt;
1397   * +0.0; &lt; Double.MIN_VALUE &lt; ... &lt; Double.MAX_VALUE &lt; Double.POSITIVE_INFINITY &lt;
1398   * Double.NaN
1399   * </p>
1400   * <p>
1401   * Floating point numbers are encoded as specified in IEEE 754. A 64-bit double precision float
1402   * consists of a sign bit, 11-bit unsigned exponent encoded in offset-1023 notation, and a 52-bit
1403   * significand. The format is described further in the
1404   * <a href="http://en.wikipedia.org/wiki/Double_precision"> Double Precision Floating Point
1405   * Wikipedia page</a>
1406   * </p>
1407   * <p>
1408   * The value of a normal float is -1 <sup>sign bit</sup> &times; 2<sup>exponent - 1023</sup>
1409   * &times; 1.significand
1410   * </p>
1411   * <p>
1412   * The IEE754 floating point format already preserves sort ordering for positive floating point
1413   * numbers when the raw bytes are compared in most significant byte order. This is discussed
1414   * further at
1415   * <a href= "http://www.cygnus-software.com/papers/comparingfloats/comparingfloats.htm" >
1416   * http://www.cygnus-software.com/papers/comparingfloats/comparingfloats. htm</a>
1417   * </p>
1418   * <p>
1419   * Thus, we need only ensure that negative numbers sort in the the exact opposite order as
1420   * positive numbers (so that say, negative infinity is less than negative 1), and that all
1421   * negative numbers compare less than any positive number. To accomplish this, we invert the sign
1422   * bit of all floating point numbers, and we also invert the exponent and significand bits if the
1423   * floating point number was negative.
1424   * </p>
1425   * <p>
1426   * More specifically, we first store the floating point bits into a 64-bit long {@code l} using
1427   * {@link Double#doubleToLongBits}. This method collapses all NaNs into a single, canonical NaN
1428   * value but otherwise leaves the bits unchanged. We then compute
1429   * </p>
1430   *
1431   * <pre>
1432   * l &circ;= (l &gt;&gt; (Long.SIZE - 1)) | Long.MIN_SIZE
1433   * </pre>
1434   * <p>
1435   * which inverts the sign bit and XOR's all other bits with the sign bit itself. Comparing the raw
1436   * bytes of {@code l} in most significant byte order is equivalent to performing a double
1437   * precision floating point comparison on the underlying bits (ignoring NaN comparisons, as NaNs
1438   * don't compare equal to anything when performing floating point comparisons).
1439   * </p>
1440   * <p>
1441   * The resulting long integer is then converted into a byte array by serializing the long one byte
1442   * at a time in most significant byte order. The serialized integer is prefixed by a single header
1443   * byte. All serialized values are 9 bytes in length.
1444   * </p>
1445   * <p>
1446   * This encoding format, and much of this highly detailed documentation string, is based on
1447   * Orderly's {@code DoubleWritableRowKey}.
1448   * </p>
1449   * @return the number of bytes written.
1450   * @see #decodeFloat64(PositionedByteRange)
1451   */
1452  public static int encodeFloat64(PositionedByteRange dst, double val, Order ord) {
1453    final int offset = dst.getOffset(), start = dst.getPosition();
1454    long lng = Double.doubleToLongBits(val);
1455    lng ^= ((lng >> (Long.SIZE - 1)) | Long.MIN_VALUE);
1456    dst.put(FIXED_FLOAT64).put((byte) (lng >> 56)).put((byte) (lng >> 48)).put((byte) (lng >> 40))
1457      .put((byte) (lng >> 32)).put((byte) (lng >> 24)).put((byte) (lng >> 16))
1458      .put((byte) (lng >> 8)).put((byte) lng);
1459    ord.apply(dst.getBytes(), offset + start, 9);
1460    return 9;
1461  }
1462
1463  /**
1464   * Decode a 64-bit floating point value using the fixed-length encoding.
1465   * @see #encodeFloat64(PositionedByteRange, double, Order)
1466   */
1467  public static double decodeFloat64(PositionedByteRange src) {
1468    final byte header = src.get();
1469    assert header == FIXED_FLOAT64 || header == DESCENDING.apply(FIXED_FLOAT64);
1470    Order ord = header == FIXED_FLOAT64 ? ASCENDING : DESCENDING;
1471    long val = ord.apply(src.get()) & 0xff;
1472    for (int i = 1; i < 8; i++) {
1473      val = (val << 8) + (ord.apply(src.get()) & 0xff);
1474    }
1475    val ^= (~val >> (Long.SIZE - 1)) | Long.MIN_VALUE;
1476    return Double.longBitsToDouble(val);
1477  }
1478
1479  /**
1480   * Returns true when {@code src} appears to be positioned an encoded value, false otherwise.
1481   */
1482  public static boolean isEncodedValue(PositionedByteRange src) {
1483    return isNull(src) || isNumeric(src) || isFixedInt8(src) || isFixedInt16(src)
1484      || isFixedInt32(src) || isFixedInt64(src) || isFixedFloat32(src) || isFixedFloat64(src)
1485      || isText(src) || isBlobCopy(src) || isBlobVar(src);
1486  }
1487
1488  /**
1489   * Return true when the next encoded value in {@code src} is null, false otherwise.
1490   */
1491  public static boolean isNull(PositionedByteRange src) {
1492    return NULL == (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1493  }
1494
1495  /**
1496   * Return true when the next encoded value in {@code src} uses Numeric encoding, false otherwise.
1497   * {@code NaN}, {@code +/-Inf} are valid Numeric values.
1498   */
1499  public static boolean isNumeric(PositionedByteRange src) {
1500    byte x = (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1501    return x >= NEG_INF && x <= NAN;
1502  }
1503
1504  /**
1505   * Return true when the next encoded value in {@code src} uses Numeric encoding and is
1506   * {@code Infinite}, false otherwise.
1507   */
1508  public static boolean isNumericInfinite(PositionedByteRange src) {
1509    byte x = (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1510    return NEG_INF == x || POS_INF == x;
1511  }
1512
1513  /**
1514   * Return true when the next encoded value in {@code src} uses Numeric encoding and is
1515   * {@code NaN}, false otherwise.
1516   */
1517  public static boolean isNumericNaN(PositionedByteRange src) {
1518    return NAN == (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1519  }
1520
1521  /**
1522   * Return true when the next encoded value in {@code src} uses Numeric encoding and is {@code 0},
1523   * false otherwise.
1524   */
1525  public static boolean isNumericZero(PositionedByteRange src) {
1526    return ZERO == (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1527  }
1528
1529  /**
1530   * Return true when the next encoded value in {@code src} uses fixed-width Int8 encoding, false
1531   * otherwise.
1532   */
1533  public static boolean isFixedInt8(PositionedByteRange src) {
1534    return FIXED_INT8
1535        == (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1536  }
1537
1538  /**
1539   * Return true when the next encoded value in {@code src} uses fixed-width Int16 encoding, false
1540   * otherwise.
1541   */
1542  public static boolean isFixedInt16(PositionedByteRange src) {
1543    return FIXED_INT16
1544        == (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1545  }
1546
1547  /**
1548   * Return true when the next encoded value in {@code src} uses fixed-width Int32 encoding, false
1549   * otherwise.
1550   */
1551  public static boolean isFixedInt32(PositionedByteRange src) {
1552    return FIXED_INT32
1553        == (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1554  }
1555
1556  /**
1557   * Return true when the next encoded value in {@code src} uses fixed-width Int64 encoding, false
1558   * otherwise.
1559   */
1560  public static boolean isFixedInt64(PositionedByteRange src) {
1561    return FIXED_INT64
1562        == (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1563  }
1564
1565  /**
1566   * Return true when the next encoded value in {@code src} uses fixed-width Float32 encoding, false
1567   * otherwise.
1568   */
1569  public static boolean isFixedFloat32(PositionedByteRange src) {
1570    return FIXED_FLOAT32
1571        == (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1572  }
1573
1574  /**
1575   * Return true when the next encoded value in {@code src} uses fixed-width Float64 encoding, false
1576   * otherwise.
1577   */
1578  public static boolean isFixedFloat64(PositionedByteRange src) {
1579    return FIXED_FLOAT64
1580        == (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1581  }
1582
1583  /**
1584   * Return true when the next encoded value in {@code src} uses Text encoding, false otherwise.
1585   */
1586  public static boolean isText(PositionedByteRange src) {
1587    return TEXT == (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1588  }
1589
1590  /**
1591   * Return true when the next encoded value in {@code src} uses BlobVar encoding, false otherwise.
1592   */
1593  public static boolean isBlobVar(PositionedByteRange src) {
1594    return BLOB_VAR
1595        == (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1596  }
1597
1598  /**
1599   * Return true when the next encoded value in {@code src} uses BlobCopy encoding, false otherwise.
1600   */
1601  public static boolean isBlobCopy(PositionedByteRange src) {
1602    return BLOB_COPY
1603        == (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1604  }
1605
1606  /**
1607   * Skip {@code buff}'s position forward over one encoded value.
1608   * @return number of bytes skipped.
1609   */
1610  public static int skip(PositionedByteRange src) {
1611    final int start = src.getPosition();
1612    byte header = src.get();
1613    Order ord = (-1 == Integer.signum(header)) ? DESCENDING : ASCENDING;
1614    header = ord.apply(header);
1615
1616    switch (header) {
1617      case NULL:
1618      case NEG_INF:
1619        return 1;
1620      case NEG_LARGE: /* Large negative number: 0x08, ~E, ~M */
1621        skipVaruint64(src, DESCENDING != ord);
1622        skipSignificand(src, DESCENDING != ord);
1623        return src.getPosition() - start;
1624      case NEG_MED_MIN: /* Medium negative number: 0x13-E, ~M */
1625      case NEG_MED_MIN + 0x01:
1626      case NEG_MED_MIN + 0x02:
1627      case NEG_MED_MIN + 0x03:
1628      case NEG_MED_MIN + 0x04:
1629      case NEG_MED_MIN + 0x05:
1630      case NEG_MED_MIN + 0x06:
1631      case NEG_MED_MIN + 0x07:
1632      case NEG_MED_MIN + 0x08:
1633      case NEG_MED_MIN + 0x09:
1634      case NEG_MED_MAX:
1635        skipSignificand(src, DESCENDING != ord);
1636        return src.getPosition() - start;
1637      case NEG_SMALL: /* Small negative number: 0x14, -E, ~M */
1638        skipVaruint64(src, DESCENDING == ord);
1639        skipSignificand(src, DESCENDING != ord);
1640        return src.getPosition() - start;
1641      case ZERO:
1642        return 1;
1643      case POS_SMALL: /* Small positive number: 0x16, ~-E, M */
1644        skipVaruint64(src, DESCENDING != ord);
1645        skipSignificand(src, DESCENDING == ord);
1646        return src.getPosition() - start;
1647      case POS_MED_MIN: /* Medium positive number: 0x17+E, M */
1648      case POS_MED_MIN + 0x01:
1649      case POS_MED_MIN + 0x02:
1650      case POS_MED_MIN + 0x03:
1651      case POS_MED_MIN + 0x04:
1652      case POS_MED_MIN + 0x05:
1653      case POS_MED_MIN + 0x06:
1654      case POS_MED_MIN + 0x07:
1655      case POS_MED_MIN + 0x08:
1656      case POS_MED_MIN + 0x09:
1657      case POS_MED_MAX:
1658        skipSignificand(src, DESCENDING == ord);
1659        return src.getPosition() - start;
1660      case POS_LARGE: /* Large positive number: 0x22, E, M */
1661        skipVaruint64(src, DESCENDING == ord);
1662        skipSignificand(src, DESCENDING == ord);
1663        return src.getPosition() - start;
1664      case POS_INF:
1665        return 1;
1666      case NAN:
1667        return 1;
1668      case FIXED_INT8:
1669        src.setPosition(src.getPosition() + 1);
1670        return src.getPosition() - start;
1671      case FIXED_INT16:
1672        src.setPosition(src.getPosition() + 2);
1673        return src.getPosition() - start;
1674      case FIXED_INT32:
1675        src.setPosition(src.getPosition() + 4);
1676        return src.getPosition() - start;
1677      case FIXED_INT64:
1678        src.setPosition(src.getPosition() + 8);
1679        return src.getPosition() - start;
1680      case FIXED_FLOAT32:
1681        src.setPosition(src.getPosition() + 4);
1682        return src.getPosition() - start;
1683      case FIXED_FLOAT64:
1684        src.setPosition(src.getPosition() + 8);
1685        return src.getPosition() - start;
1686      case TEXT:
1687        // for null-terminated values, skip to the end.
1688        do {
1689          header = ord.apply(src.get());
1690        } while (header != TERM);
1691        return src.getPosition() - start;
1692      case BLOB_VAR:
1693        // read until we find a 0 in the MSB
1694        do {
1695          header = ord.apply(src.get());
1696        } while ((byte) (header & 0x80) != TERM);
1697        return src.getPosition() - start;
1698      case BLOB_COPY:
1699        if (Order.DESCENDING == ord) {
1700          // if descending, read to termination byte.
1701          do {
1702            header = ord.apply(src.get());
1703          } while (header != TERM);
1704          return src.getPosition() - start;
1705        } else {
1706          // otherwise, just skip to the end.
1707          src.setPosition(src.getLength());
1708          return src.getPosition() - start;
1709        }
1710      default:
1711        throw unexpectedHeader(header);
1712    }
1713  }
1714
1715  /**
1716   * Return the number of encoded entries remaining in {@code buff}. The state of {@code buff} is
1717   * not modified through use of this method.
1718   */
1719  public static int length(PositionedByteRange buff) {
1720    PositionedByteRange b =
1721      new SimplePositionedMutableByteRange(buff.getBytes(), buff.getOffset(), buff.getLength());
1722    b.setPosition(buff.getPosition());
1723    int cnt = 0;
1724    for (; isEncodedValue(b); skip(b), cnt++)
1725      ;
1726    return cnt;
1727  }
1728}