001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.List;
024import org.apache.hadoop.hbase.util.Bytes;
025import org.apache.yetus.audience.InterfaceAudience;
026
027/**
028 * A scan result cache that only returns complete result.
029 */
030@InterfaceAudience.Private
031class CompleteScanResultCache implements ScanResultCache {
032
033  private int numberOfCompleteRows;
034
035  private final List<Result> partialResults = new ArrayList<>();
036
037  private Result combine() throws IOException {
038    Result result = Result.createCompleteResult(partialResults);
039    partialResults.clear();
040    return result;
041  }
042
043  private Result[] prependCombined(Result[] results, int length) throws IOException {
044    if (length == 0) {
045      return new Result[] { combine() };
046    }
047    // the last part of a partial result may not be marked as partial so here we need to check if
048    // there is a row change.
049    int start;
050    if (Bytes.equals(partialResults.get(0).getRow(), results[0].getRow())) {
051      partialResults.add(results[0]);
052      start = 1;
053      length--;
054    } else {
055      start = 0;
056    }
057    Result[] prependResults = new Result[length + 1];
058    prependResults[0] = combine();
059    System.arraycopy(results, start, prependResults, 1, length);
060    return prependResults;
061  }
062
063  private Result[] updateNumberOfCompleteResultsAndReturn(Result... results) {
064    numberOfCompleteRows += results.length;
065    return results;
066  }
067
068  @Override
069  public Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException {
070    // If no results were returned it indicates that either we have the all the partial results
071    // necessary to construct the complete result or the server had to send a heartbeat message
072    // to the client to keep the client-server connection alive
073    if (results.length == 0) {
074      // If this response was an empty heartbeat message, then we have not exhausted the region
075      // and thus there may be more partials server side that still need to be added to the partial
076      // list before we form the complete Result
077      if (!partialResults.isEmpty() && !isHeartbeatMessage) {
078        return updateNumberOfCompleteResultsAndReturn(combine());
079      }
080      return EMPTY_RESULT_ARRAY;
081    }
082    // In every RPC response there should be at most a single partial result. Furthermore, if
083    // there is a partial result, it is guaranteed to be in the last position of the array.
084    Result last = results[results.length - 1];
085    if (last.mayHaveMoreCellsInRow()) {
086      if (partialResults.isEmpty()) {
087        partialResults.add(last);
088        return updateNumberOfCompleteResultsAndReturn(Arrays.copyOf(results, results.length - 1));
089      }
090      // We have only one result and it is partial
091      if (results.length == 1) {
092        // check if there is a row change
093        if (Bytes.equals(partialResults.get(0).getRow(), last.getRow())) {
094          partialResults.add(last);
095          return EMPTY_RESULT_ARRAY;
096        }
097        Result completeResult = combine();
098        partialResults.add(last);
099        return updateNumberOfCompleteResultsAndReturn(completeResult);
100      }
101      // We have some complete results
102      Result[] resultsToReturn = prependCombined(results, results.length - 1);
103      partialResults.add(last);
104      return updateNumberOfCompleteResultsAndReturn(resultsToReturn);
105    }
106    if (!partialResults.isEmpty()) {
107      return updateNumberOfCompleteResultsAndReturn(prependCombined(results, results.length));
108    }
109    return updateNumberOfCompleteResultsAndReturn(results);
110  }
111
112  @Override
113  public void clear() {
114    partialResults.clear();
115  }
116
117  @Override
118  public int numberOfCompleteRows() {
119    return numberOfCompleteRows;
120  }
121}