001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.codec;
019
020import java.io.EOFException;
021import java.io.IOException;
022import java.io.InputStream;
023import java.io.PushbackInputStream;
024
025import edu.umd.cs.findbugs.annotations.NonNull;
026
027import org.apache.hadoop.hbase.Cell;
028import org.apache.hadoop.hbase.HBaseInterfaceAudience;
029import org.apache.yetus.audience.InterfaceAudience;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033/**
034 * TODO javadoc
035 */
036@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
037public abstract class BaseDecoder implements Codec.Decoder {
038  protected static final Logger LOG = LoggerFactory.getLogger(BaseDecoder.class);
039
040  protected final InputStream in;
041  private Cell current = null;
042
043  protected static class PBIS extends PushbackInputStream {
044    public PBIS(InputStream in, int size) {
045      super(in, size);
046    }
047
048    public void resetBuf(int size) {
049      this.buf = new byte[size];
050      this.pos = size;
051    }
052  }
053
054  public BaseDecoder(final InputStream in) {
055    this.in = new PBIS(in, 1);
056  }
057
058  @Override
059  public boolean advance() throws IOException {
060    int firstByte = in.read();
061    if (firstByte == -1) {
062      return false;
063    } else {
064      ((PBIS)in).unread(firstByte);
065    }
066
067    try {
068      this.current = parseCell();
069    } catch (IOException ioEx) {
070      ((PBIS)in).resetBuf(1); // reset the buffer in case the underlying stream is read from upper layers
071      rethrowEofException(ioEx);
072    }
073    return true;
074  }
075
076  private void rethrowEofException(IOException ioEx) throws IOException {
077    boolean isEof = false;
078    try {
079      isEof = this.in.available() == 0;
080    } catch (Throwable t) {
081      LOG.trace("Error getting available for error message - ignoring", t);
082    }
083    if (!isEof) throw ioEx;
084    if (LOG.isTraceEnabled()) {
085      LOG.trace("Partial cell read caused by EOF", ioEx);
086    }
087    EOFException eofEx = new EOFException("Partial cell read");
088    eofEx.initCause(ioEx);
089    throw eofEx;
090  }
091
092  protected InputStream getInputStream() {
093    return in;
094  }
095
096  /**
097   * Extract a Cell.
098   * @return a parsed Cell or throws an Exception. EOFException or a generic IOException maybe
099   * thrown if EOF is reached prematurely. Does not return null.
100   * @throws IOException
101   */
102  @NonNull
103  protected abstract Cell parseCell() throws IOException;
104
105  @Override
106  public Cell current() {
107    return this.current;
108  }
109}