001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import static org.apache.hadoop.hbase.client.ConnectionUtils.filterCells;
021
022import java.io.IOException;
023import java.util.ArrayDeque;
024import java.util.ArrayList;
025import java.util.Deque;
026import java.util.List;
027import org.apache.hadoop.hbase.CellUtil;
028import org.apache.hadoop.hbase.ExtendedCell;
029import org.apache.hadoop.hbase.util.Bytes;
030import org.apache.yetus.audience.InterfaceAudience;
031
032/**
033 * A scan result cache for batched scan, i.e,
034 * {@code scan.getBatch() > 0 && !scan.getAllowPartialResults()}.
035 * <p>
036 * If user setBatch(5) and rpc returns 3+5+5+5+3 cells, we should return 5+5+5+5+1 to user. setBatch
037 * doesn't mean setAllowPartialResult(true).
038 * @since 2.0.0
039 */
040@InterfaceAudience.Private
041public class BatchScanResultCache implements ScanResultCache {
042
043  private final int batch;
044
045  // used to filter out the cells that already returned to user as we always start from the
046  // beginning of a row when retry.
047  private ExtendedCell lastCell;
048
049  private boolean lastResultPartial;
050
051  private final Deque<Result> partialResults = new ArrayDeque<>();
052
053  private int numCellsOfPartialResults;
054
055  private int numberOfCompleteRows;
056
057  public BatchScanResultCache(int batch) {
058    this.batch = batch;
059  }
060
061  private void recordLastResult(Result result) {
062    lastCell = result.rawExtendedCells()[result.rawExtendedCells().length - 1];
063    lastResultPartial = result.mayHaveMoreCellsInRow();
064  }
065
066  private Result createCompletedResult() throws IOException {
067    numberOfCompleteRows++;
068    Result result = Result.createCompleteResult(partialResults);
069    partialResults.clear();
070    numCellsOfPartialResults = 0;
071    return result;
072  }
073
074  // Add new result to the partial list and return a batched Result if caching size exceed batching
075  // limit. As the RS will also respect the scan.getBatch, we can make sure that we will get only
076  // one Result back at most(or null, which means we do not have enough cells).
077  private Result regroupResults(Result result) {
078    partialResults.addLast(result);
079    numCellsOfPartialResults += result.size();
080    if (numCellsOfPartialResults < batch) {
081      return null;
082    }
083    ExtendedCell[] cells = new ExtendedCell[batch];
084    int cellCount = 0;
085    boolean stale = false;
086    for (;;) {
087      Result r = partialResults.pollFirst();
088      stale = stale || r.isStale();
089      int newCellCount = cellCount + r.size();
090      if (newCellCount > batch) {
091        // We have more cells than expected, so split the current result
092        int len = batch - cellCount;
093        System.arraycopy(r.rawCells(), 0, cells, cellCount, len);
094        ExtendedCell[] remainingCells = new ExtendedCell[r.size() - len];
095        System.arraycopy(r.rawExtendedCells(), len, remainingCells, 0, r.size() - len);
096        partialResults.addFirst(
097          Result.create(remainingCells, r.getExists(), r.isStale(), r.mayHaveMoreCellsInRow()));
098        break;
099      }
100      System.arraycopy(r.rawCells(), 0, cells, cellCount, r.size());
101      if (newCellCount == batch) {
102        break;
103      }
104      cellCount = newCellCount;
105    }
106    numCellsOfPartialResults -= batch;
107    return Result.create(cells, null, stale,
108      result.mayHaveMoreCellsInRow() || !partialResults.isEmpty());
109  }
110
111  @Override
112  public Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException {
113    if (results.length == 0) {
114      if (!isHeartbeatMessage) {
115        if (!partialResults.isEmpty()) {
116          return new Result[] { createCompletedResult() };
117        }
118        if (lastResultPartial) {
119          // An empty non heartbeat result indicate that there must be a row change. So if the
120          // lastResultPartial is true then we need to increase numberOfCompleteRows.
121          numberOfCompleteRows++;
122        }
123      }
124      return EMPTY_RESULT_ARRAY;
125    }
126    List<Result> regroupedResults = new ArrayList<>();
127    for (Result result : results) {
128      result = filterCells(result, lastCell);
129      if (result == null) {
130        continue;
131      }
132      if (!partialResults.isEmpty()) {
133        if (!Bytes.equals(partialResults.peek().getRow(), result.getRow())) {
134          // there is a row change
135          regroupedResults.add(createCompletedResult());
136        }
137      } else if (lastResultPartial && !CellUtil.matchingRows(lastCell, result.getRow())) {
138        // As for batched scan we may return partial results to user if we reach the batch limit, so
139        // here we need to use lastCell to determine if there is row change and increase
140        // numberOfCompleteRows.
141        numberOfCompleteRows++;
142      }
143      // check if we have a row change
144      if (
145        !partialResults.isEmpty() && !Bytes.equals(partialResults.peek().getRow(), result.getRow())
146      ) {
147        regroupedResults.add(createCompletedResult());
148      }
149      Result regroupedResult = regroupResults(result);
150      if (regroupedResult != null) {
151        if (!regroupedResult.mayHaveMoreCellsInRow()) {
152          numberOfCompleteRows++;
153        }
154        regroupedResults.add(regroupedResult);
155        // only update last cell when we actually return it to user.
156        recordLastResult(regroupedResult);
157      }
158      if (!result.mayHaveMoreCellsInRow() && !partialResults.isEmpty()) {
159        // We are done for this row
160        regroupedResults.add(createCompletedResult());
161      }
162    }
163    return regroupedResults.toArray(new Result[0]);
164  }
165
166  @Override
167  public void clear() {
168    partialResults.clear();
169    numCellsOfPartialResults = 0;
170  }
171
172  @Override
173  public int numberOfCompleteRows() {
174    return numberOfCompleteRows;
175  }
176}