View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.codec;
19  
20  import java.io.EOFException;
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.PushbackInputStream;
24  
25  import edu.umd.cs.findbugs.annotations.NonNull;
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.hbase.Cell;
29  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
30  import org.apache.hadoop.hbase.classification.InterfaceAudience;
31  
32  /**
33   * TODO javadoc
34   */
35  @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
36  public abstract class BaseDecoder implements Codec.Decoder {
37    protected static final Log LOG = LogFactory.getLog(BaseDecoder.class);
38  
39    protected final InputStream in;
40    private Cell current = null;
41  
42    protected static class PBIS extends PushbackInputStream {
43      public PBIS(InputStream in, int size) {
44        super(in, size);
45      }
46  
47      public void resetBuf(int size) {
48        this.buf = new byte[size];
49        this.pos = size;
50      }
51    }
52  
53    public BaseDecoder(final InputStream in) {
54      this.in = new PBIS(in, 1);
55    }
56  
57    @Override
58    public boolean advance() throws IOException {
59      int firstByte = in.read();
60      if (firstByte == -1) {
61        return false;
62      } else {
63        ((PBIS)in).unread(firstByte);
64      }
65  
66      try {
67        this.current = parseCell();
68      } catch (IOException ioEx) {
69        ((PBIS)in).resetBuf(1); // reset the buffer in case the underlying stream is read from upper layers
70        rethrowEofException(ioEx);
71      }
72      return true;
73    }
74  
75    private void rethrowEofException(IOException ioEx) throws IOException {
76      boolean isEof = false;
77      try {
78        isEof = this.in.available() == 0;
79      } catch (Throwable t) {
80        LOG.trace("Error getting available for error message - ignoring", t);
81      }
82      if (!isEof) throw ioEx;
83      if (LOG.isTraceEnabled()) {
84        LOG.trace("Partial cell read caused by EOF", ioEx);
85      }
86      EOFException eofEx = new EOFException("Partial cell read");
87      eofEx.initCause(ioEx);
88      throw eofEx;
89    }
90  
91    protected InputStream getInputStream() {
92      return in;
93    }
94  
95    /**
96     * Extract a Cell.
97     * @return a parsed Cell or throws an Exception. EOFException or a generic IOException maybe
98     * thrown if EOF is reached prematurely. Does not return null.
99     * @throws IOException
100    */
101   @NonNull
102   protected abstract Cell parseCell() throws IOException;
103 
104   @Override
105   public Cell current() {
106     return this.current;
107   }
108 }