001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.util.Iterator;
021import java.util.List;
022import java.util.concurrent.ExecutorService;
023import org.apache.hadoop.hbase.TableName;
024import org.apache.hadoop.hbase.client.coprocessor.Batch;
025import org.apache.yetus.audience.InterfaceAudience;
026import org.apache.yetus.audience.InterfaceStability;
027
028/**
029 * Contains the attributes of a task which will be executed by
030 * {@link org.apache.hadoop.hbase.client.AsyncProcess}. The attributes will be validated by
031 * AsyncProcess. It's intended for advanced client applications.
032 * @param <T> The type of response from server-side
033 */
034@InterfaceAudience.Private
035@InterfaceStability.Evolving
036public class AsyncProcessTask<T> {
037  /**
038   * The number of processed rows. The AsyncProcess has traffic control which may reject some rows.
039   */
040  public enum SubmittedRows {
041    ALL,
042    AT_LEAST_ONE,
043    NORMAL
044  }
045
046  public static <T> Builder<T> newBuilder(final Batch.Callback<T> callback) {
047    return new Builder<>(callback);
048  }
049
050  public static Builder newBuilder() {
051    return new Builder();
052  }
053
054  public static class Builder<T> {
055
056    private ExecutorService pool;
057    private TableName tableName;
058    private RowAccess<? extends Row> rows;
059    private SubmittedRows submittedRows = SubmittedRows.ALL;
060    private Batch.Callback<T> callback;
061    private boolean needResults;
062    private int rpcTimeout;
063    private int operationTimeout;
064    private CancellableRegionServerCallable callable;
065    private Object[] results;
066
067    private Builder() {
068    }
069
070    private Builder(Batch.Callback<T> callback) {
071      this.callback = callback;
072    }
073
074    Builder<T> setResults(Object[] results) {
075      this.results = results;
076      if (results != null && results.length != 0) {
077        setNeedResults(true);
078      }
079      return this;
080    }
081
082    public Builder<T> setPool(ExecutorService pool) {
083      this.pool = pool;
084      return this;
085    }
086
087    public Builder<T> setRpcTimeout(int rpcTimeout) {
088      this.rpcTimeout = rpcTimeout;
089      return this;
090    }
091
092    public Builder<T> setOperationTimeout(int operationTimeout) {
093      this.operationTimeout = operationTimeout;
094      return this;
095    }
096
097    public Builder<T> setTableName(TableName tableName) {
098      this.tableName = tableName;
099      return this;
100    }
101
102    public Builder<T> setRowAccess(List<? extends Row> rows) {
103      this.rows = new ListRowAccess<>(rows);
104      return this;
105    }
106
107    public Builder<T> setRowAccess(RowAccess<? extends Row> rows) {
108      this.rows = rows;
109      return this;
110    }
111
112    public Builder<T> setSubmittedRows(SubmittedRows submittedRows) {
113      this.submittedRows = submittedRows;
114      return this;
115    }
116
117    public Builder<T> setNeedResults(boolean needResults) {
118      this.needResults = needResults;
119      return this;
120    }
121
122    Builder<T> setCallable(CancellableRegionServerCallable callable) {
123      this.callable = callable;
124      return this;
125    }
126
127    public AsyncProcessTask<T> build() {
128      return new AsyncProcessTask<>(pool, tableName, rows, submittedRows, callback, callable,
129        needResults, rpcTimeout, operationTimeout, results);
130    }
131  }
132
133  private final ExecutorService pool;
134  private final TableName tableName;
135  private final RowAccess<? extends Row> rows;
136  private final SubmittedRows submittedRows;
137  private final Batch.Callback<T> callback;
138  private final CancellableRegionServerCallable callable;
139  private final boolean needResults;
140  private final int rpcTimeout;
141  private final int operationTimeout;
142  private final Object[] results;
143
144  AsyncProcessTask(AsyncProcessTask<T> task) {
145    this(task.getPool(), task.getTableName(), task.getRowAccess(), task.getSubmittedRows(),
146      task.getCallback(), task.getCallable(), task.getNeedResults(), task.getRpcTimeout(),
147      task.getOperationTimeout(), task.getResults());
148  }
149
150  AsyncProcessTask(ExecutorService pool, TableName tableName, RowAccess<? extends Row> rows,
151    SubmittedRows size, Batch.Callback<T> callback, CancellableRegionServerCallable callable,
152    boolean needResults, int rpcTimeout, int operationTimeout, Object[] results) {
153    this.pool = pool;
154    this.tableName = tableName;
155    this.rows = rows;
156    this.submittedRows = size;
157    this.callback = callback;
158    this.callable = callable;
159    this.needResults = needResults;
160    this.rpcTimeout = rpcTimeout;
161    this.operationTimeout = operationTimeout;
162    this.results = results;
163  }
164
165  public int getOperationTimeout() {
166    return operationTimeout;
167  }
168
169  public ExecutorService getPool() {
170    return pool;
171  }
172
173  public TableName getTableName() {
174    return tableName;
175  }
176
177  public RowAccess<? extends Row> getRowAccess() {
178    return rows;
179  }
180
181  public SubmittedRows getSubmittedRows() {
182    return submittedRows;
183  }
184
185  public Batch.Callback<T> getCallback() {
186    return callback;
187  }
188
189  CancellableRegionServerCallable getCallable() {
190    return callable;
191  }
192
193  Object[] getResults() {
194    return results;
195  }
196
197  public boolean getNeedResults() {
198    return needResults;
199  }
200
201  public int getRpcTimeout() {
202    return rpcTimeout;
203  }
204
205  static class ListRowAccess<T> implements RowAccess<T> {
206
207    private final List<T> data;
208
209    ListRowAccess(final List<T> data) {
210      this.data = data;
211    }
212
213    @Override
214    public int size() {
215      return data.size();
216    }
217
218    @Override
219    public boolean isEmpty() {
220      return data.isEmpty();
221    }
222
223    @Override
224    public Iterator<T> iterator() {
225      return data.iterator();
226    }
227  }
228}