001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.thrift;
019
020import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
021import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
022import static org.apache.hadoop.hbase.thrift.Constants.COALESCE_INC_KEY;
023import static org.apache.hadoop.hbase.util.Bytes.getBytes;
024
025import java.io.IOException;
026import java.nio.ByteBuffer;
027import java.util.ArrayList;
028import java.util.Collections;
029import java.util.HashMap;
030import java.util.List;
031import java.util.Map;
032import java.util.TreeMap;
033import java.util.concurrent.TimeUnit;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.CatalogFamilyFormat;
036import org.apache.hadoop.hbase.Cell;
037import org.apache.hadoop.hbase.CellBuilder;
038import org.apache.hadoop.hbase.CellBuilderFactory;
039import org.apache.hadoop.hbase.CellBuilderType;
040import org.apache.hadoop.hbase.CellUtil;
041import org.apache.hadoop.hbase.DoNotRetryIOException;
042import org.apache.hadoop.hbase.HConstants;
043import org.apache.hadoop.hbase.HRegionLocation;
044import org.apache.hadoop.hbase.KeyValue;
045import org.apache.hadoop.hbase.ServerName;
046import org.apache.hadoop.hbase.TableName;
047import org.apache.hadoop.hbase.TableNotFoundException;
048import org.apache.hadoop.hbase.client.Append;
049import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
050import org.apache.hadoop.hbase.client.Delete;
051import org.apache.hadoop.hbase.client.Durability;
052import org.apache.hadoop.hbase.client.Get;
053import org.apache.hadoop.hbase.client.Increment;
054import org.apache.hadoop.hbase.client.OperationWithAttributes;
055import org.apache.hadoop.hbase.client.Put;
056import org.apache.hadoop.hbase.client.RegionInfo;
057import org.apache.hadoop.hbase.client.RegionLocator;
058import org.apache.hadoop.hbase.client.Result;
059import org.apache.hadoop.hbase.client.ResultScanner;
060import org.apache.hadoop.hbase.client.Scan;
061import org.apache.hadoop.hbase.client.Table;
062import org.apache.hadoop.hbase.client.TableDescriptor;
063import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
064import org.apache.hadoop.hbase.filter.Filter;
065import org.apache.hadoop.hbase.filter.ParseFilter;
066import org.apache.hadoop.hbase.filter.PrefixFilter;
067import org.apache.hadoop.hbase.filter.WhileMatchFilter;
068import org.apache.hadoop.hbase.security.UserProvider;
069import org.apache.hadoop.hbase.security.access.AccessControlClient;
070import org.apache.hadoop.hbase.security.access.Permission;
071import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
072import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
073import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
074import org.apache.hadoop.hbase.thrift.generated.Hbase;
075import org.apache.hadoop.hbase.thrift.generated.IOError;
076import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
077import org.apache.hadoop.hbase.thrift.generated.Mutation;
078import org.apache.hadoop.hbase.thrift.generated.TAccessControlEntity;
079import org.apache.hadoop.hbase.thrift.generated.TAppend;
080import org.apache.hadoop.hbase.thrift.generated.TCell;
081import org.apache.hadoop.hbase.thrift.generated.TIncrement;
082import org.apache.hadoop.hbase.thrift.generated.TPermissionScope;
083import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
084import org.apache.hadoop.hbase.thrift.generated.TRowResult;
085import org.apache.hadoop.hbase.thrift.generated.TScan;
086import org.apache.hadoop.hbase.thrift.generated.TThriftServerType;
087import org.apache.hadoop.hbase.util.Bytes;
088import org.apache.thrift.TException;
089import org.apache.yetus.audience.InterfaceAudience;
090import org.slf4j.Logger;
091import org.slf4j.LoggerFactory;
092
093import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
094import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
095import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
096
097/**
098 * The HBaseServiceHandler is a glue object that connects Thrift RPC calls to the HBase client API
099 * primarily defined in the Admin and Table objects.
100 */
101@InterfaceAudience.Private
102@SuppressWarnings("deprecation")
103public class ThriftHBaseServiceHandler extends HBaseServiceHandler implements Hbase.Iface {
104  private static final Logger LOG = LoggerFactory.getLogger(ThriftHBaseServiceHandler.class);
105
106  public static final int HREGION_VERSION = 1;
107
108  // nextScannerId and scannerMap are used to manage scanner state
109  private int nextScannerId = 0;
110  private Cache<Integer, ResultScannerWrapper> scannerMap;
111  IncrementCoalescer coalescer;
112
113  /**
114   * Returns a list of all the column families for a given Table.
115   */
116  byte[][] getAllColumns(Table table) throws IOException {
117    ColumnFamilyDescriptor[] cds = table.getDescriptor().getColumnFamilies();
118    byte[][] columns = new byte[cds.length][];
119    for (int i = 0; i < cds.length; i++) {
120      columns[i] = Bytes.add(cds[i].getName(), KeyValue.COLUMN_FAMILY_DELIM_ARRAY);
121    }
122    return columns;
123  }
124
125  /**
126   * Assigns a unique ID to the scanner and adds the mapping to an internal hash-map.
127   * @param scanner the {@link ResultScanner} to add
128   * @return integer scanner id
129   */
130  protected synchronized int addScanner(ResultScanner scanner, boolean sortColumns) {
131    int id = nextScannerId++;
132    ResultScannerWrapper resultScannerWrapper = new ResultScannerWrapper(scanner, sortColumns);
133    scannerMap.put(id, resultScannerWrapper);
134    return id;
135  }
136
137  /**
138   * Returns the scanner associated with the specified ID.
139   * @param id the ID of the scanner to get
140   * @return a Scanner, or null if ID was invalid.
141   */
142  private synchronized ResultScannerWrapper getScanner(int id) {
143    return scannerMap.getIfPresent(id);
144  }
145
146  /**
147   * Removes the scanner associated with the specified ID from the internal id-&gt;scanner hash-map.
148   * @param id the ID of the scanner to remove
149   */
150  private synchronized void removeScanner(int id) {
151    scannerMap.invalidate(id);
152  }
153
154  protected ThriftHBaseServiceHandler(final Configuration c, final UserProvider userProvider)
155    throws IOException {
156    super(c, userProvider);
157    long cacheTimeout =
158      c.getLong(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD)
159        * 2;
160
161    scannerMap =
162      CacheBuilder.newBuilder().expireAfterAccess(cacheTimeout, TimeUnit.MILLISECONDS).build();
163
164    this.coalescer = new IncrementCoalescer(this);
165  }
166
167  @Override
168  public void enableTable(ByteBuffer tableName) throws IOError {
169    try {
170      getAdmin().enableTable(getTableName(tableName));
171    } catch (IOException e) {
172      LOG.warn(e.getMessage(), e);
173      throw getIOError(e);
174    }
175  }
176
177  @Override
178  public void disableTable(ByteBuffer tableName) throws IOError {
179    try {
180      getAdmin().disableTable(getTableName(tableName));
181    } catch (IOException e) {
182      LOG.warn(e.getMessage(), e);
183      throw getIOError(e);
184    }
185  }
186
187  @Override
188  public boolean isTableEnabled(ByteBuffer tableName) throws IOError {
189    try {
190      return this.connectionCache.getAdmin().isTableEnabled(getTableName(tableName));
191    } catch (IOException e) {
192      LOG.warn(e.getMessage(), e);
193      throw getIOError(e);
194    }
195  }
196
197  @Override
198  public Map<ByteBuffer, Boolean> getTableNamesWithIsTableEnabled() throws IOError {
199    try {
200      HashMap<ByteBuffer, Boolean> tables = new HashMap<>();
201      for (ByteBuffer tableName : this.getTableNames()) {
202        tables.put(tableName, this.isTableEnabled(tableName));
203      }
204      return tables;
205    } catch (IOError e) {
206      LOG.warn(e.getMessage(), e);
207      throw getIOError(e);
208    }
209  }
210
211  // ThriftServerRunner.compact should be deprecated and replaced with methods specific to
212  // table and region.
213  @Override
214  public void compact(ByteBuffer tableNameOrRegionName) throws IOError {
215    try {
216      try {
217        getAdmin().compactRegion(getBytes(tableNameOrRegionName));
218      } catch (IllegalArgumentException e) {
219        // Invalid region, try table
220        getAdmin().compact(TableName.valueOf(getBytes(tableNameOrRegionName)));
221      }
222    } catch (IOException e) {
223      LOG.warn(e.getMessage(), e);
224      throw getIOError(e);
225    }
226  }
227
228  // ThriftServerRunner.majorCompact should be deprecated and replaced with methods specific
229  // to table and region.
230  @Override
231  public void majorCompact(ByteBuffer tableNameOrRegionName) throws IOError {
232    try {
233      try {
234        getAdmin().compactRegion(getBytes(tableNameOrRegionName));
235      } catch (IllegalArgumentException e) {
236        // Invalid region, try table
237        getAdmin().compact(TableName.valueOf(getBytes(tableNameOrRegionName)));
238      }
239    } catch (IOException e) {
240      LOG.warn(e.getMessage(), e);
241      throw getIOError(e);
242    }
243  }
244
245  @Override
246  public List<ByteBuffer> getTableNames() throws IOError {
247    try {
248      TableName[] tableNames = this.getAdmin().listTableNames();
249      ArrayList<ByteBuffer> list = new ArrayList<>(tableNames.length);
250      for (TableName tableName : tableNames) {
251        list.add(ByteBuffer.wrap(tableName.getName()));
252      }
253      return list;
254    } catch (IOException e) {
255      LOG.warn(e.getMessage(), e);
256      throw getIOError(e);
257    }
258  }
259
260  /**
261   * Returns the list of regions in the given table, or an empty list if the table does not exist
262   */
263  @Override
264  public List<TRegionInfo> getTableRegions(ByteBuffer tableName) throws IOError {
265    try (RegionLocator locator = connectionCache.getRegionLocator(getBytes(tableName))) {
266      List<HRegionLocation> regionLocations = locator.getAllRegionLocations();
267      List<TRegionInfo> results = new ArrayList<>(regionLocations.size());
268      for (HRegionLocation regionLocation : regionLocations) {
269        RegionInfo info = regionLocation.getRegion();
270        ServerName serverName = regionLocation.getServerName();
271        TRegionInfo region = new TRegionInfo();
272        region.serverName = ByteBuffer.wrap(Bytes.toBytes(serverName.getHostname()));
273        region.port = serverName.getPort();
274        region.startKey = ByteBuffer.wrap(info.getStartKey());
275        region.endKey = ByteBuffer.wrap(info.getEndKey());
276        region.id = info.getRegionId();
277        region.name = ByteBuffer.wrap(info.getRegionName());
278        region.version = HREGION_VERSION; // HRegion now not versioned, PB encoding used
279        results.add(region);
280      }
281      return results;
282    } catch (TableNotFoundException e) {
283      // Return empty list for non-existing table
284      return Collections.emptyList();
285    } catch (IOException e) {
286      LOG.warn(e.getMessage(), e);
287      throw getIOError(e);
288    }
289  }
290
291  @Override
292  public List<TCell> get(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
293    Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
294    byte[][] famAndQf = CellUtil.parseColumn(getBytes(column));
295    if (famAndQf.length == 1) {
296      return get(tableName, row, famAndQf[0], null, attributes);
297    }
298    if (famAndQf.length == 2) {
299      return get(tableName, row, famAndQf[0], famAndQf[1], attributes);
300    }
301    throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
302  }
303
304  /**
305   * Note: this internal interface is slightly different from public APIs in regard to handling of
306   * the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather, we
307   * respect qual == null as a request for the entire column family. The caller (
308   * {@link #get(ByteBuffer, ByteBuffer, ByteBuffer, Map)}) interface IS consistent in that the
309   * column is parse like normal.
310   */
311  protected List<TCell> get(ByteBuffer tableName, ByteBuffer row, byte[] family, byte[] qualifier,
312    Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
313    Table table = null;
314    try {
315      table = getTable(tableName);
316      Get get = new Get(getBytes(row));
317      addAttributes(get, attributes);
318      if (qualifier == null) {
319        get.addFamily(family);
320      } else {
321        get.addColumn(family, qualifier);
322      }
323      Result result = table.get(get);
324      return ThriftUtilities.cellFromHBase(result.rawCells());
325    } catch (IOException e) {
326      LOG.warn(e.getMessage(), e);
327      throw getIOError(e);
328    } finally {
329      closeTable(table);
330    }
331  }
332
333  @Override
334  public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
335    int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
336    byte[][] famAndQf = CellUtil.parseColumn(getBytes(column));
337    if (famAndQf.length == 1) {
338      return getVer(tableName, row, famAndQf[0], null, numVersions, attributes);
339    }
340    if (famAndQf.length == 2) {
341      return getVer(tableName, row, famAndQf[0], famAndQf[1], numVersions, attributes);
342    }
343    throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
344
345  }
346
347  /**
348   * Note: this public interface is slightly different from public Java APIs in regard to handling
349   * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather, we
350   * respect qual == null as a request for the entire column family. If you want to access the
351   * entire column family, use {@link #getVer(ByteBuffer, ByteBuffer, ByteBuffer, int, Map)} with a
352   * {@code column} value that lacks a {@code ':'}.
353   */
354  public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, byte[] family, byte[] qualifier,
355    int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
356
357    Table table = null;
358    try {
359      table = getTable(tableName);
360      Get get = new Get(getBytes(row));
361      addAttributes(get, attributes);
362      if (null == qualifier) {
363        get.addFamily(family);
364      } else {
365        get.addColumn(family, qualifier);
366      }
367      get.readVersions(numVersions);
368      Result result = table.get(get);
369      return ThriftUtilities.cellFromHBase(result.rawCells());
370    } catch (IOException e) {
371      LOG.warn(e.getMessage(), e);
372      throw getIOError(e);
373    } finally {
374      closeTable(table);
375    }
376  }
377
378  @Override
379  public List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
380    long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
381    byte[][] famAndQf = CellUtil.parseColumn(getBytes(column));
382    if (famAndQf.length == 1) {
383      return getVerTs(tableName, row, famAndQf[0], null, timestamp, numVersions, attributes);
384    }
385    if (famAndQf.length == 2) {
386      return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp, numVersions, attributes);
387    }
388    throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
389  }
390
391  /**
392   * Note: this internal interface is slightly different from public APIs in regard to handling of
393   * the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather, we
394   * respect qual == null as a request for the entire column family. The caller (
395   * {@link #getVerTs(ByteBuffer, ByteBuffer, ByteBuffer, long, int, Map)}) interface IS consistent
396   * in that the column is parse like normal.
397   */
398  protected List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, byte[] family,
399    byte[] qualifier, long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes)
400    throws IOError {
401
402    Table table = null;
403    try {
404      table = getTable(tableName);
405      Get get = new Get(getBytes(row));
406      addAttributes(get, attributes);
407      if (null == qualifier) {
408        get.addFamily(family);
409      } else {
410        get.addColumn(family, qualifier);
411      }
412      get.setTimeRange(0, timestamp);
413      get.readVersions(numVersions);
414      Result result = table.get(get);
415      return ThriftUtilities.cellFromHBase(result.rawCells());
416    } catch (IOException e) {
417      LOG.warn(e.getMessage(), e);
418      throw getIOError(e);
419    } finally {
420      closeTable(table);
421    }
422  }
423
424  @Override
425  public List<TRowResult> getRow(ByteBuffer tableName, ByteBuffer row,
426    Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
427    return getRowWithColumnsTs(tableName, row, null, HConstants.LATEST_TIMESTAMP, attributes);
428  }
429
430  @Override
431  public List<TRowResult> getRowWithColumns(ByteBuffer tableName, ByteBuffer row,
432    List<ByteBuffer> columns, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
433    return getRowWithColumnsTs(tableName, row, columns, HConstants.LATEST_TIMESTAMP, attributes);
434  }
435
436  @Override
437  public List<TRowResult> getRowTs(ByteBuffer tableName, ByteBuffer row, long timestamp,
438    Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
439    return getRowWithColumnsTs(tableName, row, null, timestamp, attributes);
440  }
441
442  @Override
443  public List<TRowResult> getRowWithColumnsTs(ByteBuffer tableName, ByteBuffer row,
444    List<ByteBuffer> columns, long timestamp, Map<ByteBuffer, ByteBuffer> attributes)
445    throws IOError {
446
447    Table table = null;
448    try {
449      table = getTable(tableName);
450      if (columns == null) {
451        Get get = new Get(getBytes(row));
452        addAttributes(get, attributes);
453        get.setTimeRange(0, timestamp);
454        Result result = table.get(get);
455        return ThriftUtilities.rowResultFromHBase(result);
456      }
457      Get get = new Get(getBytes(row));
458      addAttributes(get, attributes);
459      for (ByteBuffer column : columns) {
460        byte[][] famAndQf = CellUtil.parseColumn(getBytes(column));
461        if (famAndQf.length == 1) {
462          get.addFamily(famAndQf[0]);
463        } else {
464          get.addColumn(famAndQf[0], famAndQf[1]);
465        }
466      }
467      get.setTimeRange(0, timestamp);
468      Result result = table.get(get);
469      return ThriftUtilities.rowResultFromHBase(result);
470    } catch (IOException e) {
471      LOG.warn(e.getMessage(), e);
472      throw getIOError(e);
473    } finally {
474      closeTable(table);
475    }
476  }
477
478  @Override
479  public List<TRowResult> getRows(ByteBuffer tableName, List<ByteBuffer> rows,
480    Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
481    return getRowsWithColumnsTs(tableName, rows, null, HConstants.LATEST_TIMESTAMP, attributes);
482  }
483
484  @Override
485  public List<TRowResult> getRowsWithColumns(ByteBuffer tableName, List<ByteBuffer> rows,
486    List<ByteBuffer> columns, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
487    return getRowsWithColumnsTs(tableName, rows, columns, HConstants.LATEST_TIMESTAMP, attributes);
488  }
489
490  @Override
491  public List<TRowResult> getRowsTs(ByteBuffer tableName, List<ByteBuffer> rows, long timestamp,
492    Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
493    return getRowsWithColumnsTs(tableName, rows, null, timestamp, attributes);
494  }
495
496  @Override
497  public List<TRowResult> getRowsWithColumnsTs(ByteBuffer tableName, List<ByteBuffer> rows,
498    List<ByteBuffer> columns, long timestamp, Map<ByteBuffer, ByteBuffer> attributes)
499    throws IOError {
500
501    Table table = null;
502    try {
503      List<Get> gets = new ArrayList<>(rows.size());
504      table = getTable(tableName);
505      if (metrics != null) {
506        metrics.incNumRowKeysInBatchGet(rows.size());
507      }
508      for (ByteBuffer row : rows) {
509        Get get = new Get(getBytes(row));
510        addAttributes(get, attributes);
511        if (columns != null) {
512
513          for (ByteBuffer column : columns) {
514            byte[][] famAndQf = CellUtil.parseColumn(getBytes(column));
515            if (famAndQf.length == 1) {
516              get.addFamily(famAndQf[0]);
517            } else {
518              get.addColumn(famAndQf[0], famAndQf[1]);
519            }
520          }
521        }
522        get.setTimeRange(0, timestamp);
523        gets.add(get);
524      }
525      Result[] result = table.get(gets);
526      return ThriftUtilities.rowResultFromHBase(result);
527    } catch (IOException e) {
528      LOG.warn(e.getMessage(), e);
529      throw getIOError(e);
530    } finally {
531      closeTable(table);
532    }
533  }
534
535  @Override
536  public void deleteAll(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
537    Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
538    deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP, attributes);
539  }
540
541  @Override
542  public void deleteAllTs(ByteBuffer tableName, ByteBuffer row, ByteBuffer column, long timestamp,
543    Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
544    Table table = null;
545    try {
546      table = getTable(tableName);
547      Delete delete = new Delete(getBytes(row));
548      addAttributes(delete, attributes);
549      byte[][] famAndQf = CellUtil.parseColumn(getBytes(column));
550      if (famAndQf.length == 1) {
551        delete.addFamily(famAndQf[0], timestamp);
552      } else {
553        delete.addColumns(famAndQf[0], famAndQf[1], timestamp);
554      }
555      table.delete(delete);
556
557    } catch (IOException e) {
558      LOG.warn(e.getMessage(), e);
559      throw getIOError(e);
560    } finally {
561      closeTable(table);
562    }
563  }
564
565  @Override
566  public void deleteAllRow(ByteBuffer tableName, ByteBuffer row,
567    Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
568    deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP, attributes);
569  }
570
571  @Override
572  public void deleteAllRowTs(ByteBuffer tableName, ByteBuffer row, long timestamp,
573    Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
574    Table table = null;
575    try {
576      table = getTable(tableName);
577      Delete delete = new Delete(getBytes(row), timestamp);
578      addAttributes(delete, attributes);
579      table.delete(delete);
580    } catch (IOException e) {
581      LOG.warn(e.getMessage(), e);
582      throw getIOError(e);
583    } finally {
584      closeTable(table);
585    }
586  }
587
588  @Override
589  public void createTable(ByteBuffer in_tableName, List<ColumnDescriptor> columnFamilies)
590    throws IOError, IllegalArgument, AlreadyExists {
591    TableName tableName = getTableName(in_tableName);
592    try {
593      if (getAdmin().tableExists(tableName)) {
594        throw new AlreadyExists("table name already in use");
595      }
596      TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
597      for (ColumnDescriptor col : columnFamilies) {
598        builder.setColumnFamily(ThriftUtilities.colDescFromThrift(col));
599      }
600      getAdmin().createTable(builder.build());
601    } catch (IOException e) {
602      LOG.warn(e.getMessage(), e);
603      throw getIOError(e);
604    } catch (IllegalArgumentException e) {
605      LOG.warn(e.getMessage(), e);
606      throw new IllegalArgument(Throwables.getStackTraceAsString(e));
607    }
608  }
609
610  private static TableName getTableName(ByteBuffer buffer) {
611    return TableName.valueOf(getBytes(buffer));
612  }
613
614  @Override
615  public void deleteTable(ByteBuffer in_tableName) throws IOError {
616    TableName tableName = getTableName(in_tableName);
617    if (LOG.isDebugEnabled()) {
618      LOG.debug("deleteTable: table={}", tableName);
619    }
620    try {
621      if (!getAdmin().tableExists(tableName)) {
622        throw new IOException("table does not exist");
623      }
624      getAdmin().deleteTable(tableName);
625    } catch (IOException e) {
626      LOG.warn(e.getMessage(), e);
627      throw getIOError(e);
628    }
629  }
630
631  @Override
632  public void mutateRow(ByteBuffer tableName, ByteBuffer row, List<Mutation> mutations,
633    Map<ByteBuffer, ByteBuffer> attributes) throws IOError, IllegalArgument {
634    mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP, attributes);
635  }
636
637  @Override
638  public void mutateRowTs(ByteBuffer tableName, ByteBuffer row, List<Mutation> mutations,
639    long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError, IllegalArgument {
640    Table table = null;
641    try {
642      table = getTable(tableName);
643      Put put = new Put(getBytes(row), timestamp);
644      addAttributes(put, attributes);
645
646      Delete delete = new Delete(getBytes(row));
647      addAttributes(delete, attributes);
648      if (metrics != null) {
649        metrics.incNumRowKeysInBatchMutate(mutations.size());
650      }
651
652      // I apologize for all this mess :)
653      CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
654      for (Mutation m : mutations) {
655        byte[][] famAndQf = CellUtil.parseColumn(getBytes(m.column));
656        if (m.isDelete) {
657          if (famAndQf.length == 1) {
658            delete.addFamily(famAndQf[0], timestamp);
659          } else {
660            delete.addColumns(famAndQf[0], famAndQf[1], timestamp);
661          }
662          delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
663        } else {
664          if (famAndQf.length == 1) {
665            LOG.warn("No column qualifier specified. Delete is the only mutation supported "
666              + "over the whole column family.");
667          } else {
668            put.add(builder.clear().setRow(put.getRow()).setFamily(famAndQf[0])
669              .setQualifier(famAndQf[1]).setTimestamp(put.getTimestamp()).setType(Cell.Type.Put)
670              .setValue(m.value != null ? getBytes(m.value) : HConstants.EMPTY_BYTE_ARRAY).build());
671          }
672          put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
673        }
674      }
675      if (!delete.isEmpty()) {
676        table.delete(delete);
677      }
678      if (!put.isEmpty()) {
679        table.put(put);
680      }
681    } catch (IOException e) {
682      LOG.warn(e.getMessage(), e);
683      throw getIOError(e);
684    } catch (IllegalArgumentException e) {
685      LOG.warn(e.getMessage(), e);
686      throw new IllegalArgument(Throwables.getStackTraceAsString(e));
687    } finally {
688      closeTable(table);
689    }
690  }
691
692  @Override
693  public void mutateRows(ByteBuffer tableName, List<BatchMutation> rowBatches,
694    Map<ByteBuffer, ByteBuffer> attributes) throws IOError, IllegalArgument, TException {
695    mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP, attributes);
696  }
697
698  @Override
699  public void mutateRowsTs(ByteBuffer tableName, List<BatchMutation> rowBatches, long timestamp,
700    Map<ByteBuffer, ByteBuffer> attributes) throws IOError, IllegalArgument, TException {
701    List<Put> puts = new ArrayList<>();
702    List<Delete> deletes = new ArrayList<>();
703    CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
704    for (BatchMutation batch : rowBatches) {
705      byte[] row = getBytes(batch.row);
706      List<Mutation> mutations = batch.mutations;
707      Delete delete = new Delete(row);
708      addAttributes(delete, attributes);
709      Put put = new Put(row, timestamp);
710      addAttributes(put, attributes);
711      for (Mutation m : mutations) {
712        byte[][] famAndQf = CellUtil.parseColumn(getBytes(m.column));
713        if (m.isDelete) {
714          // no qualifier, family only.
715          if (famAndQf.length == 1) {
716            delete.addFamily(famAndQf[0], timestamp);
717          } else {
718            delete.addColumns(famAndQf[0], famAndQf[1], timestamp);
719          }
720          delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
721        } else {
722          if (famAndQf.length == 1) {
723            LOG.warn("No column qualifier specified. Delete is the only mutation supported "
724              + "over the whole column family.");
725          }
726          if (famAndQf.length == 2) {
727            try {
728              put.add(builder.clear().setRow(put.getRow()).setFamily(famAndQf[0])
729                .setQualifier(famAndQf[1]).setTimestamp(put.getTimestamp()).setType(Cell.Type.Put)
730                .setValue(m.value != null ? getBytes(m.value) : HConstants.EMPTY_BYTE_ARRAY)
731                .build());
732            } catch (IOException e) {
733              throw new IllegalArgumentException(e);
734            }
735          } else {
736            throw new IllegalArgumentException("Invalid famAndQf provided.");
737          }
738          put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
739        }
740      }
741      if (!delete.isEmpty()) {
742        deletes.add(delete);
743      }
744      if (!put.isEmpty()) {
745        puts.add(put);
746      }
747    }
748
749    Table table = null;
750    try {
751      table = getTable(tableName);
752      if (!puts.isEmpty()) {
753        table.put(puts);
754      }
755      if (!deletes.isEmpty()) {
756        table.delete(deletes);
757      }
758    } catch (IOException e) {
759      LOG.warn(e.getMessage(), e);
760      throw getIOError(e);
761    } catch (IllegalArgumentException e) {
762      LOG.warn(e.getMessage(), e);
763      throw new IllegalArgument(Throwables.getStackTraceAsString(e));
764    } finally {
765      closeTable(table);
766    }
767  }
768
769  @Override
770  public long atomicIncrement(ByteBuffer tableName, ByteBuffer row, ByteBuffer column, long amount)
771    throws IOError, IllegalArgument, TException {
772    byte[][] famAndQf = CellUtil.parseColumn(getBytes(column));
773    if (famAndQf.length == 1) {
774      return atomicIncrement(tableName, row, famAndQf[0], HConstants.EMPTY_BYTE_ARRAY, amount);
775    }
776    return atomicIncrement(tableName, row, famAndQf[0], famAndQf[1], amount);
777  }
778
779  protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row, byte[] family,
780    byte[] qualifier, long amount) throws IOError, IllegalArgument, TException {
781    Table table = null;
782    try {
783      table = getTable(tableName);
784      return table.incrementColumnValue(getBytes(row), family, qualifier, amount);
785    } catch (IOException e) {
786      LOG.warn(e.getMessage(), e);
787      throw getIOError(e);
788    } finally {
789      closeTable(table);
790    }
791  }
792
793  @Override
794  public void scannerClose(int id) throws IOError, IllegalArgument {
795    LOG.debug("scannerClose: id={}", id);
796    ResultScannerWrapper resultScannerWrapper = getScanner(id);
797    if (resultScannerWrapper == null) {
798      LOG.warn("scanner ID is invalid");
799      throw new IllegalArgument("scanner ID is invalid");
800    }
801    resultScannerWrapper.getScanner().close();
802    removeScanner(id);
803  }
804
805  @Override
806  public List<TRowResult> scannerGetList(int id, int nbRows) throws IllegalArgument, IOError {
807    LOG.debug("scannerGetList: id={}", id);
808    ResultScannerWrapper resultScannerWrapper = getScanner(id);
809    if (null == resultScannerWrapper) {
810      String message = "scanner ID is invalid";
811      LOG.warn(message);
812      throw new IllegalArgument("scanner ID is invalid");
813    }
814
815    Result[] results;
816    try {
817      results = resultScannerWrapper.getScanner().next(nbRows);
818      if (null == results) {
819        return new ArrayList<>();
820      }
821    } catch (IOException e) {
822      LOG.warn(e.getMessage(), e);
823      throw getIOError(e);
824    } finally {
825      // Add scanner back to scannerMap; protects against case
826      // where scanner expired during processing of request.
827      scannerMap.put(id, resultScannerWrapper);
828    }
829    return ThriftUtilities.rowResultFromHBase(results, resultScannerWrapper.isColumnSorted());
830  }
831
832  @Override
833  public List<TRowResult> scannerGet(int id) throws IllegalArgument, IOError {
834    return scannerGetList(id, 1);
835  }
836
837  @Override
838  public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan,
839    Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
840
841    Table table = null;
842    try {
843      table = getTable(tableName);
844      Scan scan = new Scan();
845      addAttributes(scan, attributes);
846      if (tScan.isSetStartRow()) {
847        scan.withStartRow(tScan.getStartRow());
848      }
849      if (tScan.isSetStopRow()) {
850        scan.withStopRow(tScan.getStopRow());
851      }
852      if (tScan.isSetTimestamp()) {
853        scan.setTimeRange(0, tScan.getTimestamp());
854      }
855      if (tScan.isSetCaching()) {
856        scan.setCaching(tScan.getCaching());
857      }
858      if (tScan.isSetBatchSize()) {
859        scan.setBatch(tScan.getBatchSize());
860      }
861      if (tScan.isSetColumns() && !tScan.getColumns().isEmpty()) {
862        for (ByteBuffer column : tScan.getColumns()) {
863          byte[][] famQf = CellUtil.parseColumn(getBytes(column));
864          if (famQf.length == 1) {
865            scan.addFamily(famQf[0]);
866          } else {
867            scan.addColumn(famQf[0], famQf[1]);
868          }
869        }
870      }
871      if (tScan.isSetFilterString()) {
872        ParseFilter parseFilter = new ParseFilter();
873        scan.setFilter(parseFilter.parseFilterString(tScan.getFilterString()));
874      }
875      if (tScan.isSetReversed()) {
876        scan.setReversed(tScan.isReversed());
877      }
878      if (tScan.isSetCacheBlocks()) {
879        scan.setCacheBlocks(tScan.isCacheBlocks());
880      }
881      return addScanner(table.getScanner(scan), tScan.sortColumns);
882    } catch (IOException e) {
883      LOG.warn(e.getMessage(), e);
884      throw getIOError(e);
885    } finally {
886      closeTable(table);
887    }
888  }
889
890  @Override
891  public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow, List<ByteBuffer> columns,
892    Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
893
894    Table table = null;
895    try {
896      table = getTable(tableName);
897      Scan scan = new Scan().withStartRow(getBytes(startRow));
898      addAttributes(scan, attributes);
899      if (columns != null && !columns.isEmpty()) {
900        for (ByteBuffer column : columns) {
901          byte[][] famQf = CellUtil.parseColumn(getBytes(column));
902          if (famQf.length == 1) {
903            scan.addFamily(famQf[0]);
904          } else {
905            scan.addColumn(famQf[0], famQf[1]);
906          }
907        }
908      }
909      return addScanner(table.getScanner(scan), false);
910    } catch (IOException e) {
911      LOG.warn(e.getMessage(), e);
912      throw getIOError(e);
913    } finally {
914      closeTable(table);
915    }
916  }
917
918  @Override
919  public int scannerOpenWithStop(ByteBuffer tableName, ByteBuffer startRow, ByteBuffer stopRow,
920    List<ByteBuffer> columns, Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
921
922    Table table = null;
923    try {
924      table = getTable(tableName);
925      Scan scan = new Scan().withStartRow(getBytes(startRow)).withStopRow(getBytes(stopRow));
926      addAttributes(scan, attributes);
927      if (columns != null && !columns.isEmpty()) {
928        for (ByteBuffer column : columns) {
929          byte[][] famQf = CellUtil.parseColumn(getBytes(column));
930          if (famQf.length == 1) {
931            scan.addFamily(famQf[0]);
932          } else {
933            scan.addColumn(famQf[0], famQf[1]);
934          }
935        }
936      }
937      return addScanner(table.getScanner(scan), false);
938    } catch (IOException e) {
939      LOG.warn(e.getMessage(), e);
940      throw getIOError(e);
941    } finally {
942      closeTable(table);
943    }
944  }
945
946  @Override
947  public int scannerOpenWithPrefix(ByteBuffer tableName, ByteBuffer startAndPrefix,
948    List<ByteBuffer> columns, Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
949
950    Table table = null;
951    try {
952      table = getTable(tableName);
953      Scan scan = new Scan().withStartRow(getBytes(startAndPrefix));
954      addAttributes(scan, attributes);
955      Filter f = new WhileMatchFilter(new PrefixFilter(getBytes(startAndPrefix)));
956      scan.setFilter(f);
957      if (columns != null && !columns.isEmpty()) {
958        for (ByteBuffer column : columns) {
959          byte[][] famQf = CellUtil.parseColumn(getBytes(column));
960          if (famQf.length == 1) {
961            scan.addFamily(famQf[0]);
962          } else {
963            scan.addColumn(famQf[0], famQf[1]);
964          }
965        }
966      }
967      return addScanner(table.getScanner(scan), false);
968    } catch (IOException e) {
969      LOG.warn(e.getMessage(), e);
970      throw getIOError(e);
971    } finally {
972      closeTable(table);
973    }
974  }
975
976  @Override
977  public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow, List<ByteBuffer> columns,
978    long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
979
980    Table table = null;
981    try {
982      table = getTable(tableName);
983      Scan scan = new Scan().withStartRow(getBytes(startRow));
984      addAttributes(scan, attributes);
985      scan.setTimeRange(0, timestamp);
986      if (columns != null && !columns.isEmpty()) {
987        for (ByteBuffer column : columns) {
988          byte[][] famQf = CellUtil.parseColumn(getBytes(column));
989          if (famQf.length == 1) {
990            scan.addFamily(famQf[0]);
991          } else {
992            scan.addColumn(famQf[0], famQf[1]);
993          }
994        }
995      }
996      return addScanner(table.getScanner(scan), false);
997    } catch (IOException e) {
998      LOG.warn(e.getMessage(), e);
999      throw getIOError(e);
1000    } finally {
1001      closeTable(table);
1002    }
1003  }
1004
1005  @Override
1006  public int scannerOpenWithStopTs(ByteBuffer tableName, ByteBuffer startRow, ByteBuffer stopRow,
1007    List<ByteBuffer> columns, long timestamp, Map<ByteBuffer, ByteBuffer> attributes)
1008    throws IOError, TException {
1009
1010    Table table = null;
1011    try {
1012      table = getTable(tableName);
1013      Scan scan = new Scan().withStartRow(getBytes(startRow)).withStopRow(getBytes(stopRow));
1014      addAttributes(scan, attributes);
1015      scan.setTimeRange(0, timestamp);
1016      if (columns != null && !columns.isEmpty()) {
1017        for (ByteBuffer column : columns) {
1018          byte[][] famQf = CellUtil.parseColumn(getBytes(column));
1019          if (famQf.length == 1) {
1020            scan.addFamily(famQf[0]);
1021          } else {
1022            scan.addColumn(famQf[0], famQf[1]);
1023          }
1024        }
1025      }
1026      scan.setTimeRange(0, timestamp);
1027      return addScanner(table.getScanner(scan), false);
1028    } catch (IOException e) {
1029      LOG.warn(e.getMessage(), e);
1030      throw getIOError(e);
1031    } finally {
1032      closeTable(table);
1033    }
1034  }
1035
1036  @Override
1037  public Map<ByteBuffer, ColumnDescriptor> getColumnDescriptors(ByteBuffer tableName)
1038    throws IOError, TException {
1039
1040    Table table = null;
1041    try {
1042      TreeMap<ByteBuffer, ColumnDescriptor> columns = new TreeMap<>();
1043
1044      table = getTable(tableName);
1045      TableDescriptor desc = table.getDescriptor();
1046
1047      for (ColumnFamilyDescriptor e : desc.getColumnFamilies()) {
1048        ColumnDescriptor col = ThriftUtilities.colDescFromHbase(e);
1049        columns.put(col.name, col);
1050      }
1051      return columns;
1052    } catch (IOException e) {
1053      LOG.warn(e.getMessage(), e);
1054      throw getIOError(e);
1055    } finally {
1056      closeTable(table);
1057    }
1058  }
1059
1060  private void closeTable(Table table) throws IOError {
1061    try {
1062      if (table != null) {
1063        table.close();
1064      }
1065    } catch (IOException e) {
1066      LOG.error(e.getMessage(), e);
1067      throw getIOError(e);
1068    }
1069  }
1070
1071  @Override
1072  public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError {
1073    try {
1074      byte[] row = getBytes(searchRow);
1075      Result startRowResult =
1076        getReverseScanResult(TableName.META_TABLE_NAME.getName(), row, HConstants.CATALOG_FAMILY);
1077
1078      if (startRowResult == null) {
1079        throw new IOException(
1080          "Cannot find row in " + TableName.META_TABLE_NAME + ", row=" + Bytes.toStringBinary(row));
1081      }
1082
1083      // find region start and end keys
1084      RegionInfo regionInfo = CatalogFamilyFormat.getRegionInfo(startRowResult);
1085      if (regionInfo == null) {
1086        throw new IOException("RegionInfo REGIONINFO was null or " + " empty in Meta for row="
1087          + Bytes.toStringBinary(row));
1088      }
1089      TRegionInfo region = new TRegionInfo();
1090      region.setStartKey(regionInfo.getStartKey());
1091      region.setEndKey(regionInfo.getEndKey());
1092      region.id = regionInfo.getRegionId();
1093      region.setName(regionInfo.getRegionName());
1094      region.version = HREGION_VERSION; // version not used anymore, PB encoding used.
1095
1096      // find region assignment to server
1097      ServerName serverName = CatalogFamilyFormat.getServerName(startRowResult, 0);
1098      if (serverName != null) {
1099        region.setServerName(Bytes.toBytes(serverName.getHostname()));
1100        region.port = serverName.getPort();
1101      }
1102      return region;
1103    } catch (IOException e) {
1104      LOG.warn(e.getMessage(), e);
1105      throw getIOError(e);
1106    }
1107  }
1108
1109  private Result getReverseScanResult(byte[] tableName, byte[] row, byte[] family)
1110    throws IOException {
1111    Scan scan = new Scan().withStartRow(row);
1112    scan.setReversed(true);
1113    scan.addFamily(family);
1114    scan.withStartRow(row);
1115    try (Table table = getTable(tableName); ResultScanner scanner = table.getScanner(scan)) {
1116      return scanner.next();
1117    }
1118  }
1119
1120  @Override
1121  public void increment(TIncrement tincrement) throws IOError, TException {
1122
1123    if (tincrement.getRow().length == 0 || tincrement.getTable().length == 0) {
1124      throw new TException("Must supply a table and a row key; can't increment");
1125    }
1126
1127    if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1128      this.coalescer.queueIncrement(tincrement);
1129      return;
1130    }
1131
1132    Table table = null;
1133    try {
1134      table = getTable(tincrement.getTable());
1135      Increment inc = ThriftUtilities.incrementFromThrift(tincrement);
1136      table.increment(inc);
1137    } catch (IOException e) {
1138      LOG.warn(e.getMessage(), e);
1139      throw getIOError(e);
1140    } finally {
1141      closeTable(table);
1142    }
1143  }
1144
1145  @Override
1146  public void incrementRows(List<TIncrement> tincrements) throws IOError, TException {
1147    if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1148      this.coalescer.queueIncrements(tincrements);
1149      return;
1150    }
1151    for (TIncrement tinc : tincrements) {
1152      increment(tinc);
1153    }
1154  }
1155
1156  @Override
1157  public List<TCell> append(TAppend tappend) throws IOError, TException {
1158    if (tappend.getRow().length == 0 || tappend.getTable().length == 0) {
1159      throw new TException("Must supply a table and a row key; can't append");
1160    }
1161
1162    Table table = null;
1163    try {
1164      table = getTable(tappend.getTable());
1165      Append append = ThriftUtilities.appendFromThrift(tappend);
1166      Result result = table.append(append);
1167      return ThriftUtilities.cellFromHBase(result.rawCells());
1168    } catch (IOException e) {
1169      LOG.warn(e.getMessage(), e);
1170      throw getIOError(e);
1171    } finally {
1172      closeTable(table);
1173    }
1174  }
1175
1176  @Override
1177  public boolean checkAndPut(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
1178    ByteBuffer value, Mutation mput, Map<ByteBuffer, ByteBuffer> attributes)
1179    throws IOError, IllegalArgument, TException {
1180    Put put;
1181    try {
1182      put = new Put(getBytes(row), HConstants.LATEST_TIMESTAMP);
1183      addAttributes(put, attributes);
1184
1185      byte[][] famAndQf = CellUtil.parseColumn(getBytes(mput.column));
1186      put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(put.getRow())
1187        .setFamily(famAndQf[0]).setQualifier(famAndQf[1]).setTimestamp(put.getTimestamp())
1188        .setType(Cell.Type.Put)
1189        .setValue(mput.value != null ? getBytes(mput.value) : HConstants.EMPTY_BYTE_ARRAY).build());
1190      put.setDurability(mput.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1191    } catch (IOException | IllegalArgumentException e) {
1192      LOG.warn(e.getMessage(), e);
1193      throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1194    }
1195
1196    Table table = null;
1197    try {
1198      table = getTable(tableName);
1199      byte[][] famAndQf = CellUtil.parseColumn(getBytes(column));
1200      Table.CheckAndMutateBuilder mutateBuilder =
1201        table.checkAndMutate(getBytes(row), famAndQf[0]).qualifier(famAndQf[1]);
1202      if (value != null) {
1203        return mutateBuilder.ifEquals(getBytes(value)).thenPut(put);
1204      } else {
1205        return mutateBuilder.ifNotExists().thenPut(put);
1206      }
1207    } catch (IOException e) {
1208      LOG.warn(e.getMessage(), e);
1209      throw getIOError(e);
1210    } catch (IllegalArgumentException e) {
1211      LOG.warn(e.getMessage(), e);
1212      throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1213    } finally {
1214      closeTable(table);
1215    }
1216  }
1217
1218  @Override
1219  public TThriftServerType getThriftServerType() {
1220    return TThriftServerType.ONE;
1221  }
1222
1223  @Override
1224  public String getClusterId() throws TException {
1225    return connectionCache.getClusterId();
1226  }
1227
1228  @Override
1229  public boolean grant(TAccessControlEntity info) throws IOError, TException {
1230    Permission.Action[] actions = ThriftUtilities.permissionActionsFromString(info.actions);
1231    try {
1232      if (info.scope == TPermissionScope.NAMESPACE) {
1233        AccessControlClient.grant(connectionCache.getAdmin().getConnection(), info.getNsName(),
1234          info.getUsername(), actions);
1235      } else if (info.scope == TPermissionScope.TABLE) {
1236        TableName tableName = TableName.valueOf(info.getTableName());
1237        AccessControlClient.grant(connectionCache.getAdmin().getConnection(), tableName,
1238          info.getUsername(), null, null, actions);
1239      }
1240    } catch (Throwable t) {
1241      if (t instanceof IOException) {
1242        throw getIOError(t);
1243      } else {
1244        throw getIOError(new DoNotRetryIOException(t.getMessage()));
1245      }
1246    }
1247    return true;
1248  }
1249
1250  @Override
1251  public boolean revoke(TAccessControlEntity info) throws IOError, TException {
1252    Permission.Action[] actions = ThriftUtilities.permissionActionsFromString(info.actions);
1253    try {
1254      if (info.scope == TPermissionScope.NAMESPACE) {
1255        AccessControlClient.revoke(connectionCache.getAdmin().getConnection(), info.getNsName(),
1256          info.getUsername(), actions);
1257      } else if (info.scope == TPermissionScope.TABLE) {
1258        TableName tableName = TableName.valueOf(info.getTableName());
1259        AccessControlClient.revoke(connectionCache.getAdmin().getConnection(), tableName,
1260          info.getUsername(), null, null, actions);
1261      }
1262    } catch (Throwable t) {
1263      if (t instanceof IOException) {
1264        throw getIOError(t);
1265      } else {
1266        throw getIOError(new DoNotRetryIOException(t.getMessage()));
1267      }
1268    }
1269    return true;
1270  }
1271
1272  private static IOError getIOError(Throwable throwable) {
1273    IOError error = new IOErrorWithCause(throwable);
1274    error.setCanRetry(!(throwable instanceof DoNotRetryIOException));
1275    error.setMessage(Throwables.getStackTraceAsString(throwable));
1276    return error;
1277  }
1278
1279  /**
1280   * Adds all the attributes into the Operation object
1281   */
1282  private static void addAttributes(OperationWithAttributes op,
1283    Map<ByteBuffer, ByteBuffer> attributes) {
1284    if (attributes == null || attributes.isEmpty()) {
1285      return;
1286    }
1287    for (Map.Entry<ByteBuffer, ByteBuffer> entry : attributes.entrySet()) {
1288      String name = Bytes.toStringBinary(getBytes(entry.getKey()));
1289      byte[] value = getBytes(entry.getValue());
1290      op.setAttribute(name, value);
1291    }
1292  }
1293
1294  protected static class ResultScannerWrapper {
1295
1296    private final ResultScanner scanner;
1297    private final boolean sortColumns;
1298
1299    public ResultScannerWrapper(ResultScanner resultScanner, boolean sortResultColumns) {
1300      scanner = resultScanner;
1301      sortColumns = sortResultColumns;
1302    }
1303
1304    public ResultScanner getScanner() {
1305      return scanner;
1306    }
1307
1308    public boolean isColumnSorted() {
1309      return sortColumns;
1310    }
1311  }
1312
1313  public static class IOErrorWithCause extends IOError {
1314    private final Throwable cause;
1315
1316    public IOErrorWithCause(Throwable cause) {
1317      this.cause = cause;
1318    }
1319
1320    @Override
1321    public synchronized Throwable getCause() {
1322      return cause;
1323    }
1324
1325    @Override
1326    public boolean equals(Object other) {
1327      if (super.equals(other) && other instanceof IOErrorWithCause) {
1328        Throwable otherCause = ((IOErrorWithCause) other).getCause();
1329        if (this.getCause() != null) {
1330          return otherCause != null && this.getCause().equals(otherCause);
1331        } else {
1332          return otherCause == null;
1333        }
1334      }
1335      return false;
1336    }
1337
1338    @Override
1339    public int hashCode() {
1340      int result = super.hashCode();
1341      result = 31 * result + (cause != null ? cause.hashCode() : 0);
1342      return result;
1343    }
1344  }
1345
1346}