001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.ConnectionUtils.filterCells;
021
022import java.io.IOException;
023import java.util.ArrayDeque;
024import java.util.ArrayList;
025import java.util.Deque;
026import java.util.List;
027
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.CellUtil;
030import org.apache.yetus.audience.InterfaceAudience;
031import org.apache.hadoop.hbase.util.Bytes;
032
033/**
034 * A scan result cache for batched scan, i.e,
035 * {@code scan.getBatch() > 0 && !scan.getAllowPartialResults()}.
036 * <p>
037 * If user setBatch(5) and rpc returns 3+5+5+5+3 cells, we should return 5+5+5+5+1 to user. setBatch
038 * doesn't mean setAllowPartialResult(true).
039 * @since 2.0.0
040 */
041@InterfaceAudience.Private
042public class BatchScanResultCache implements ScanResultCache {
043
044  private final int batch;
045
046  // used to filter out the cells that already returned to user as we always start from the
047  // beginning of a row when retry.
048  private Cell lastCell;
049
050  private boolean lastResultPartial;
051
052  private final Deque<Result> partialResults = new ArrayDeque<>();
053
054  private int numCellsOfPartialResults;
055
056  private int numberOfCompleteRows;
057
058  public BatchScanResultCache(int batch) {
059    this.batch = batch;
060  }
061
062  private void recordLastResult(Result result) {
063    lastCell = result.rawCells()[result.rawCells().length - 1];
064    lastResultPartial = result.mayHaveMoreCellsInRow();
065  }
066
067  private Result createCompletedResult() throws IOException {
068    numberOfCompleteRows++;
069    Result result = Result.createCompleteResult(partialResults);
070    partialResults.clear();
071    numCellsOfPartialResults = 0;
072    return result;
073  }
074
075  // Add new result to the partial list and return a batched Result if caching size exceed batching
076  // limit. As the RS will also respect the scan.getBatch, we can make sure that we will get only
077  // one Result back at most(or null, which means we do not have enough cells).
078  private Result regroupResults(Result result) {
079    partialResults.addLast(result);
080    numCellsOfPartialResults += result.size();
081    if (numCellsOfPartialResults < batch) {
082      return null;
083    }
084    Cell[] cells = new Cell[batch];
085    int cellCount = 0;
086    boolean stale = false;
087    for (;;) {
088      Result r = partialResults.pollFirst();
089      stale = stale || r.isStale();
090      int newCellCount = cellCount + r.size();
091      if (newCellCount > batch) {
092        // We have more cells than expected, so split the current result
093        int len = batch - cellCount;
094        System.arraycopy(r.rawCells(), 0, cells, cellCount, len);
095        Cell[] remainingCells = new Cell[r.size() - len];
096        System.arraycopy(r.rawCells(), len, remainingCells, 0, r.size() - len);
097        partialResults.addFirst(
098          Result.create(remainingCells, r.getExists(), r.isStale(), r.mayHaveMoreCellsInRow()));
099        break;
100      }
101      System.arraycopy(r.rawCells(), 0, cells, cellCount, r.size());
102      if (newCellCount == batch) {
103        break;
104      }
105      cellCount = newCellCount;
106    }
107    numCellsOfPartialResults -= batch;
108    return Result.create(cells, null, stale,
109      result.mayHaveMoreCellsInRow() || !partialResults.isEmpty());
110  }
111
112  @Override
113  public Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException {
114    if (results.length == 0) {
115      if (!isHeartbeatMessage) {
116        if (!partialResults.isEmpty()) {
117          return new Result[] { createCompletedResult() };
118        }
119        if (lastResultPartial) {
120          // An empty non heartbeat result indicate that there must be a row change. So if the
121          // lastResultPartial is true then we need to increase numberOfCompleteRows.
122          numberOfCompleteRows++;
123        }
124      }
125      return EMPTY_RESULT_ARRAY;
126    }
127    List<Result> regroupedResults = new ArrayList<>();
128    for (Result result : results) {
129      result = filterCells(result, lastCell);
130      if (result == null) {
131        continue;
132      }
133      if (!partialResults.isEmpty()) {
134        if (!Bytes.equals(partialResults.peek().getRow(), result.getRow())) {
135          // there is a row change
136          regroupedResults.add(createCompletedResult());
137        }
138      } else if (lastResultPartial && !CellUtil.matchingRows(lastCell, result.getRow())) {
139        // As for batched scan we may return partial results to user if we reach the batch limit, so
140        // here we need to use lastCell to determine if there is row change and increase
141        // numberOfCompleteRows.
142        numberOfCompleteRows++;
143      }
144      // check if we have a row change
145      if (!partialResults.isEmpty() &&
146          !Bytes.equals(partialResults.peek().getRow(), result.getRow())) {
147        regroupedResults.add(createCompletedResult());
148      }
149      Result regroupedResult = regroupResults(result);
150      if (regroupedResult != null) {
151        if (!regroupedResult.mayHaveMoreCellsInRow()) {
152          numberOfCompleteRows++;
153        }
154        regroupedResults.add(regroupedResult);
155        // only update last cell when we actually return it to user.
156        recordLastResult(regroupedResult);
157      }
158      if (!result.mayHaveMoreCellsInRow() && !partialResults.isEmpty()) {
159        // We are done for this row
160        regroupedResults.add(createCompletedResult());
161      }
162    }
163    return regroupedResults.toArray(new Result[0]);
164  }
165
166  @Override
167  public void clear() {
168    partialResults.clear();
169    numCellsOfPartialResults = 0;
170  }
171
172  @Override
173  public int numberOfCompleteRows() {
174    return numberOfCompleteRows;
175  }
176}