001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Arrays; 023import java.util.List; 024import org.apache.hadoop.hbase.util.Bytes; 025import org.apache.yetus.audience.InterfaceAudience; 026 027/** 028 * A scan result cache that only returns complete result. 029 */ 030@InterfaceAudience.Private 031class CompleteScanResultCache implements ScanResultCache { 032 033 private int numberOfCompleteRows; 034 035 private final List<Result> partialResults = new ArrayList<>(); 036 037 private Result combine() throws IOException { 038 Result result = Result.createCompleteResult(partialResults); 039 partialResults.clear(); 040 return result; 041 } 042 043 private Result[] prependCombined(Result[] results, int length) throws IOException { 044 if (length == 0) { 045 return new Result[] { combine() }; 046 } 047 // the last part of a partial result may not be marked as partial so here we need to check if 048 // there is a row change. 049 int start; 050 if (Bytes.equals(partialResults.get(0).getRow(), results[0].getRow())) { 051 partialResults.add(results[0]); 052 start = 1; 053 length--; 054 } else { 055 start = 0; 056 } 057 Result[] prependResults = new Result[length + 1]; 058 prependResults[0] = combine(); 059 System.arraycopy(results, start, prependResults, 1, length); 060 return prependResults; 061 } 062 063 private Result[] updateNumberOfCompleteResultsAndReturn(Result... results) { 064 numberOfCompleteRows += results.length; 065 return results; 066 } 067 068 @Override 069 public Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException { 070 // If no results were returned it indicates that either we have the all the partial results 071 // necessary to construct the complete result or the server had to send a heartbeat message 072 // to the client to keep the client-server connection alive 073 if (results.length == 0) { 074 // If this response was an empty heartbeat message, then we have not exhausted the region 075 // and thus there may be more partials server side that still need to be added to the partial 076 // list before we form the complete Result 077 if (!partialResults.isEmpty() && !isHeartbeatMessage) { 078 return updateNumberOfCompleteResultsAndReturn(combine()); 079 } 080 return EMPTY_RESULT_ARRAY; 081 } 082 // In every RPC response there should be at most a single partial result. Furthermore, if 083 // there is a partial result, it is guaranteed to be in the last position of the array. 084 Result last = results[results.length - 1]; 085 if (last.mayHaveMoreCellsInRow()) { 086 if (partialResults.isEmpty()) { 087 partialResults.add(last); 088 return updateNumberOfCompleteResultsAndReturn(Arrays.copyOf(results, results.length - 1)); 089 } 090 // We have only one result and it is partial 091 if (results.length == 1) { 092 // check if there is a row change 093 if (Bytes.equals(partialResults.get(0).getRow(), last.getRow())) { 094 partialResults.add(last); 095 return EMPTY_RESULT_ARRAY; 096 } 097 Result completeResult = combine(); 098 partialResults.add(last); 099 return updateNumberOfCompleteResultsAndReturn(completeResult); 100 } 101 // We have some complete results 102 Result[] resultsToReturn = prependCombined(results, results.length - 1); 103 partialResults.add(last); 104 return updateNumberOfCompleteResultsAndReturn(resultsToReturn); 105 } 106 if (!partialResults.isEmpty()) { 107 return updateNumberOfCompleteResultsAndReturn(prependCombined(results, results.length)); 108 } 109 return updateNumberOfCompleteResultsAndReturn(results); 110 } 111 112 @Override 113 public void clear() { 114 partialResults.clear(); 115 } 116 117 @Override 118 public int numberOfCompleteRows() { 119 return numberOfCompleteRows; 120 } 121}