001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Arrays;
023import java.util.List;
024
025import org.apache.yetus.audience.InterfaceAudience;
026import org.apache.hadoop.hbase.util.Bytes;
027
028/**
029 * A scan result cache that only returns complete result.
030 */
031@InterfaceAudience.Private
032class CompleteScanResultCache implements ScanResultCache {
033
034  private int numberOfCompleteRows;
035
036  private final List<Result> partialResults = new ArrayList<>();
037
038  private Result combine() throws IOException {
039    Result result = Result.createCompleteResult(partialResults);
040    partialResults.clear();
041    return result;
042  }
043
044  private Result[] prependCombined(Result[] results, int length) throws IOException {
045    if (length == 0) {
046      return new Result[] { combine() };
047    }
048    // the last part of a partial result may not be marked as partial so here we need to check if
049    // there is a row change.
050    int start;
051    if (Bytes.equals(partialResults.get(0).getRow(), results[0].getRow())) {
052      partialResults.add(results[0]);
053      start = 1;
054      length--;
055    } else {
056      start = 0;
057    }
058    Result[] prependResults = new Result[length + 1];
059    prependResults[0] = combine();
060    System.arraycopy(results, start, prependResults, 1, length);
061    return prependResults;
062  }
063
064  private Result[] updateNumberOfCompleteResultsAndReturn(Result... results) {
065    numberOfCompleteRows += results.length;
066    return results;
067  }
068
069  @Override
070  public Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException {
071    // If no results were returned it indicates that either we have the all the partial results
072    // necessary to construct the complete result or the server had to send a heartbeat message
073    // to the client to keep the client-server connection alive
074    if (results.length == 0) {
075      // If this response was an empty heartbeat message, then we have not exhausted the region
076      // and thus there may be more partials server side that still need to be added to the partial
077      // list before we form the complete Result
078      if (!partialResults.isEmpty() && !isHeartbeatMessage) {
079        return updateNumberOfCompleteResultsAndReturn(combine());
080      }
081      return EMPTY_RESULT_ARRAY;
082    }
083    // In every RPC response there should be at most a single partial result. Furthermore, if
084    // there is a partial result, it is guaranteed to be in the last position of the array.
085    Result last = results[results.length - 1];
086    if (last.mayHaveMoreCellsInRow()) {
087      if (partialResults.isEmpty()) {
088        partialResults.add(last);
089        return updateNumberOfCompleteResultsAndReturn(Arrays.copyOf(results, results.length - 1));
090      }
091      // We have only one result and it is partial
092      if (results.length == 1) {
093        // check if there is a row change
094        if (Bytes.equals(partialResults.get(0).getRow(), last.getRow())) {
095          partialResults.add(last);
096          return EMPTY_RESULT_ARRAY;
097        }
098        Result completeResult = combine();
099        partialResults.add(last);
100        return updateNumberOfCompleteResultsAndReturn(completeResult);
101      }
102      // We have some complete results
103      Result[] resultsToReturn = prependCombined(results, results.length - 1);
104      partialResults.add(last);
105      return updateNumberOfCompleteResultsAndReturn(resultsToReturn);
106    }
107    if (!partialResults.isEmpty()) {
108      return updateNumberOfCompleteResultsAndReturn(prependCombined(results, results.length));
109    }
110    return updateNumberOfCompleteResultsAndReturn(results);
111  }
112
113  @Override
114  public void clear() {
115    partialResults.clear();
116  }
117
118  @Override
119  public int numberOfCompleteRows() {
120    return numberOfCompleteRows;
121  }
122}