View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.client;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.concurrent.ExecutorService;
26  
27  import org.apache.hadoop.conf.Configuration;
28  import org.apache.hadoop.hbase.HTableDescriptor;
29  import org.apache.hadoop.hbase.TableName;
30  import org.apache.hadoop.hbase.client.Append;
31  import org.apache.hadoop.hbase.client.ClusterConnection;
32  import org.apache.hadoop.hbase.client.Delete;
33  import org.apache.hadoop.hbase.client.Durability;
34  import org.apache.hadoop.hbase.client.Get;
35  import org.apache.hadoop.hbase.client.HTable;
36  import org.apache.hadoop.hbase.client.HTableInterface;
37  import org.apache.hadoop.hbase.client.Increment;
38  import org.apache.hadoop.hbase.client.Put;
39  import org.apache.hadoop.hbase.client.Result;
40  import org.apache.hadoop.hbase.client.ResultScanner;
41  import org.apache.hadoop.hbase.client.Row;
42  import org.apache.hadoop.hbase.client.RowMutations;
43  import org.apache.hadoop.hbase.client.Scan;
44  import org.apache.hadoop.hbase.client.coprocessor.Batch;
45  import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
46  import org.apache.hadoop.hbase.coprocessor.CoprocessorHost.Environment;
47  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
48  import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
49  import org.apache.hadoop.io.MultipleIOException;
50  
51  import com.google.protobuf.Descriptors.MethodDescriptor;
52  import com.google.protobuf.Message;
53  import com.google.protobuf.Service;
54  import com.google.protobuf.ServiceException;
55  
56  /**
57   * A wrapper for HTable. Can be used to restrict privilege.
58   *
59   * Currently it just helps to track tables opened by a Coprocessor and
60   * facilitate close of them if it is aborted.
61   *
62   * We also disallow row locking.
63   *
64   * There is nothing now that will stop a coprocessor from using HTable
65   * objects directly instead of this API, but in the future we intend to
66   * analyze coprocessor implementations as they are loaded and reject those
67   * which attempt to use objects and methods outside the Environment
68   * sandbox.
69   */
70  public class HTableWrapper implements HTableInterface {
71  
72    private TableName tableName;
73    private HTable table;
74    private ClusterConnection connection;
75    private final List<HTableInterface> openTables;
76  
77    /**
78     * @param openTables External list of tables used for tracking wrappers.
79     * @throws IOException 
80     */
81    public static HTableInterface createWrapper(List<HTableInterface> openTables,
82        TableName tableName, Environment env, ExecutorService pool) throws IOException {
83      return new HTableWrapper(openTables, tableName,
84          CoprocessorHConnection.getConnectionForEnvironment(env), pool);
85    }
86  
87    private HTableWrapper(List<HTableInterface> openTables, TableName tableName,
88        ClusterConnection connection, ExecutorService pool)
89        throws IOException {
90      this.tableName = tableName;
91      this.table = new HTable(tableName, connection, pool);
92      this.connection = connection;
93      this.openTables = openTables;
94      this.openTables.add(this);
95    }
96  
97    public void internalClose() throws IOException {
98      List<IOException> exceptions = new ArrayList<IOException>(2);
99      try {
100     table.close();
101     } catch (IOException e) {
102       exceptions.add(e);
103     }
104     try {
105       // have to self-manage our connection, as per the HTable contract
106       if (this.connection != null) {
107         this.connection.close();
108       }
109     } catch (IOException e) {
110       exceptions.add(e);
111     }
112     if (!exceptions.isEmpty()) {
113       throw MultipleIOException.createIOException(exceptions);
114     }
115   }
116 
117   public Configuration getConfiguration() {
118     return table.getConfiguration();
119   }
120 
121   public void close() throws IOException {
122     try {
123       internalClose();
124     } finally {
125       openTables.remove(this);
126     }
127   }
128 
129   public Result getRowOrBefore(byte[] row, byte[] family)
130       throws IOException {
131     return table.getRowOrBefore(row, family);
132   }
133 
134   public Result get(Get get) throws IOException {
135     return table.get(get);
136   }
137 
138   public boolean exists(Get get) throws IOException {
139     return table.exists(get);
140   }
141 
142   public Boolean[] exists(List<Get> gets) throws IOException{
143     return table.exists(gets);
144   }
145 
146   public void put(Put put) throws IOException {
147     table.put(put);
148   }
149 
150   public void put(List<Put> puts) throws IOException {
151     table.put(puts);
152   }
153 
154   public void delete(Delete delete) throws IOException {
155     table.delete(delete);
156   }
157 
158   public void delete(List<Delete> deletes) throws IOException {
159     table.delete(deletes);
160   }
161 
162   public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
163       byte[] value, Put put) throws IOException {
164     return table.checkAndPut(row, family, qualifier, value, put);
165   }
166 
167   public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
168       CompareOp compareOp, byte[] value, Put put) throws IOException {
169     return table.checkAndPut(row, family, qualifier, compareOp, value, put);
170   }
171 
172   public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
173       byte[] value, Delete delete) throws IOException {
174     return table.checkAndDelete(row, family, qualifier, value, delete);
175   }
176 
177   public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
178       CompareOp compareOp, byte[] value, Delete delete) throws IOException {
179     return table.checkAndDelete(row, family, qualifier, compareOp, value, delete);
180   }
181 
182   public long incrementColumnValue(byte[] row, byte[] family,
183       byte[] qualifier, long amount) throws IOException {
184     return table.incrementColumnValue(row, family, qualifier, amount);
185   }
186 
187   public long incrementColumnValue(byte[] row, byte[] family,
188       byte[] qualifier, long amount, Durability durability)
189       throws IOException {
190     return table.incrementColumnValue(row, family, qualifier, amount,
191         durability);
192   }
193 
194   @Override
195   public Result append(Append append) throws IOException {
196     return table.append(append);
197   }
198 
199   @Override
200   public Result increment(Increment increment) throws IOException {
201     return table.increment(increment);
202   }
203 
204   public void flushCommits() throws IOException {
205     table.flushCommits();
206   }
207 
208   public boolean isAutoFlush() {
209     return table.isAutoFlush();
210   }
211 
212   public ResultScanner getScanner(Scan scan) throws IOException {
213     return table.getScanner(scan);
214   }
215 
216   public ResultScanner getScanner(byte[] family) throws IOException {
217     return table.getScanner(family);
218   }
219 
220   public ResultScanner getScanner(byte[] family, byte[] qualifier)
221       throws IOException {
222     return table.getScanner(family, qualifier);
223   }
224 
225   public HTableDescriptor getTableDescriptor() throws IOException {
226     return table.getTableDescriptor();
227   }
228 
229   @Override
230   public byte[] getTableName() {
231     return tableName.getName();
232   }
233 
234   @Override
235   public TableName getName() {
236     return table.getName();
237   }
238 
239   @Override
240   public void batch(List<? extends Row> actions, Object[] results)
241       throws IOException, InterruptedException {
242     table.batch(actions, results);
243   }
244 
245   /**
246    * {@inheritDoc}
247    * @deprecated If any exception is thrown by one of the actions, there is no way to
248    * retrieve the partially executed results. Use {@link #batch(List, Object[])} instead.
249    */
250   @Override
251   public Object[] batch(List<? extends Row> actions)
252       throws IOException, InterruptedException {
253     return table.batch(actions);
254   }
255 
256   @Override
257   public <R> void batchCallback(List<? extends Row> actions, Object[] results,
258       Batch.Callback<R> callback) throws IOException, InterruptedException {
259     table.batchCallback(actions, results, callback);
260   }
261 
262   /**
263    * {@inheritDoc}
264    * @deprecated If any exception is thrown by one of the actions, there is no way to
265    * retrieve the partially executed results. Use 
266    * {@link #batchCallback(List, Object[], org.apache.hadoop.hbase.client.coprocessor.Batch.Callback)}
267    * instead.
268    */
269   @Override
270   public <R> Object[] batchCallback(List<? extends Row> actions,
271       Batch.Callback<R> callback) throws IOException, InterruptedException {
272     return table.batchCallback(actions, callback);
273   }
274 
275   @Override
276   public Result[] get(List<Get> gets) throws IOException {
277     return table.get(gets);
278   }
279 
280   @Override
281   public CoprocessorRpcChannel coprocessorService(byte[] row) {
282     return table.coprocessorService(row);
283   }
284 
285   @Override
286   public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service,
287       byte[] startKey, byte[] endKey, Batch.Call<T, R> callable)
288       throws ServiceException, Throwable {
289     return table.coprocessorService(service, startKey, endKey, callable);
290   }
291 
292   @Override
293   public <T extends Service, R> void coprocessorService(Class<T> service,
294       byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Batch.Callback<R> callback)
295       throws ServiceException, Throwable {
296     table.coprocessorService(service, startKey, endKey, callable, callback);
297   }
298 
299   @Override
300   public void mutateRow(RowMutations rm) throws IOException {
301     table.mutateRow(rm);
302   }
303 
304   @Override
305   public void setAutoFlush(boolean autoFlush) {
306     table.setAutoFlush(autoFlush, autoFlush);
307   }
308 
309   @Override
310   public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
311     table.setAutoFlush(autoFlush, clearBufferOnFail);
312   }
313 
314   @Override
315   public void setAutoFlushTo(boolean autoFlush) {
316     table.setAutoFlushTo(autoFlush);
317   }
318 
319   @Override
320   public long getWriteBufferSize() {
321      return table.getWriteBufferSize();
322   }
323 
324   @Override
325   public void setWriteBufferSize(long writeBufferSize) throws IOException {
326     table.setWriteBufferSize(writeBufferSize);
327   }
328 
329   @Override
330   public long incrementColumnValue(byte[] row, byte[] family,
331       byte[] qualifier, long amount, boolean writeToWAL) throws IOException {
332     return table.incrementColumnValue(row, family, qualifier, amount, writeToWAL);
333   }
334 
335   @Override
336   public <R extends Message> Map<byte[], R> batchCoprocessorService(
337       MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey,
338       R responsePrototype) throws ServiceException, Throwable {
339     return table.batchCoprocessorService(methodDescriptor, request, startKey, endKey,
340       responsePrototype);
341   }
342 
343   @Override
344   public <R extends Message> void batchCoprocessorService(MethodDescriptor methodDescriptor,
345       Message request, byte[] startKey, byte[] endKey, R responsePrototype, Callback<R> callback)
346       throws ServiceException, Throwable {
347     table.batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
348       callback);
349   }
350 }