001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.codec;
019
020import edu.umd.cs.findbugs.annotations.NonNull;
021import java.io.EOFException;
022import java.io.IOException;
023import java.io.InputStream;
024import java.io.PushbackInputStream;
025import org.apache.hadoop.hbase.ExtendedCell;
026import org.apache.hadoop.hbase.HBaseInterfaceAudience;
027import org.apache.yetus.audience.InterfaceAudience;
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030
031/**
032 * Base implementation for {@link Codec.Decoder}.
033 */
034@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX })
035public abstract class BaseDecoder implements Codec.Decoder {
036  protected static final Logger LOG = LoggerFactory.getLogger(BaseDecoder.class);
037
038  protected final InputStream in;
039  private ExtendedCell current = null;
040
041  protected static class PBIS extends PushbackInputStream {
042    public PBIS(InputStream in, int size) {
043      super(in, size);
044    }
045
046    public void resetBuf(int size) {
047      this.buf = new byte[size];
048      this.pos = size;
049    }
050  }
051
052  public BaseDecoder(final InputStream in) {
053    this.in = new PBIS(in, 1);
054  }
055
056  @Override
057  public boolean advance() throws IOException {
058    int firstByte = in.read();
059    if (firstByte == -1) {
060      return false;
061    } else {
062      ((PBIS) in).unread(firstByte);
063    }
064
065    try {
066      this.current = parseCell();
067    } catch (IOException ioEx) {
068      ((PBIS) in).resetBuf(1); // reset the buffer in case the underlying stream is read from upper
069                               // layers
070      rethrowEofException(ioEx);
071    }
072    return true;
073  }
074
075  private void rethrowEofException(IOException ioEx) throws IOException {
076    boolean isEof = false;
077    try {
078      isEof = this.in.available() == 0;
079    } catch (Throwable t) {
080      LOG.trace("Error getting available for error message - ignoring", t);
081    }
082    if (!isEof) throw ioEx;
083    if (LOG.isTraceEnabled()) {
084      LOG.trace("Partial cell read caused by EOF", ioEx);
085    }
086    EOFException eofEx = new EOFException("Partial cell read");
087    eofEx.initCause(ioEx);
088    throw eofEx;
089  }
090
091  protected InputStream getInputStream() {
092    return in;
093  }
094
095  /**
096   * Extract a Cell.
097   * @return a parsed Cell or throws an Exception. EOFException or a generic IOException maybe
098   *         thrown if EOF is reached prematurely. Does not return null.
099   */
100  @NonNull
101  protected abstract ExtendedCell parseCell() throws IOException;
102
103  @Override
104  public ExtendedCell current() {
105    return this.current;
106  }
107}