001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.thrift;
021
022import static org.apache.hadoop.hbase.thrift.Constants.COALESCE_INC_KEY;
023import static org.apache.hadoop.hbase.util.Bytes.getBytes;
024
025import java.io.IOException;
026import java.nio.ByteBuffer;
027import java.util.ArrayList;
028import java.util.Collections;
029import java.util.HashMap;
030import java.util.List;
031import java.util.Map;
032import java.util.TreeMap;
033
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.hbase.Cell;
036import org.apache.hadoop.hbase.CellBuilder;
037import org.apache.hadoop.hbase.CellBuilderFactory;
038import org.apache.hadoop.hbase.CellBuilderType;
039import org.apache.hadoop.hbase.CellUtil;
040import org.apache.hadoop.hbase.HColumnDescriptor;
041import org.apache.hadoop.hbase.HConstants;
042import org.apache.hadoop.hbase.HRegionLocation;
043import org.apache.hadoop.hbase.HTableDescriptor;
044import org.apache.hadoop.hbase.KeyValue;
045import org.apache.hadoop.hbase.MetaTableAccessor;
046import org.apache.hadoop.hbase.ServerName;
047import org.apache.hadoop.hbase.TableName;
048import org.apache.hadoop.hbase.TableNotFoundException;
049import org.apache.hadoop.hbase.client.Append;
050import org.apache.hadoop.hbase.client.Delete;
051import org.apache.hadoop.hbase.client.Durability;
052import org.apache.hadoop.hbase.client.Get;
053import org.apache.hadoop.hbase.client.Increment;
054import org.apache.hadoop.hbase.client.OperationWithAttributes;
055import org.apache.hadoop.hbase.client.Put;
056import org.apache.hadoop.hbase.client.RegionInfo;
057import org.apache.hadoop.hbase.client.RegionLocator;
058import org.apache.hadoop.hbase.client.Result;
059import org.apache.hadoop.hbase.client.ResultScanner;
060import org.apache.hadoop.hbase.client.Scan;
061import org.apache.hadoop.hbase.client.Table;
062import org.apache.hadoop.hbase.filter.Filter;
063import org.apache.hadoop.hbase.filter.ParseFilter;
064import org.apache.hadoop.hbase.filter.PrefixFilter;
065import org.apache.hadoop.hbase.filter.WhileMatchFilter;
066import org.apache.hadoop.hbase.security.UserProvider;
067import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
068import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
069import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
070import org.apache.hadoop.hbase.thrift.generated.Hbase;
071import org.apache.hadoop.hbase.thrift.generated.IOError;
072import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
073import org.apache.hadoop.hbase.thrift.generated.Mutation;
074import org.apache.hadoop.hbase.thrift.generated.TAppend;
075import org.apache.hadoop.hbase.thrift.generated.TCell;
076import org.apache.hadoop.hbase.thrift.generated.TIncrement;
077import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
078import org.apache.hadoop.hbase.thrift.generated.TRowResult;
079import org.apache.hadoop.hbase.thrift.generated.TScan;
080import org.apache.hadoop.hbase.thrift.generated.TThriftServerType;
081import org.apache.hadoop.hbase.util.Bytes;
082import org.apache.thrift.TException;
083import org.apache.yetus.audience.InterfaceAudience;
084import org.slf4j.Logger;
085import org.slf4j.LoggerFactory;
086
087import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
088
089/**
090 * The HBaseServiceHandler is a glue object that connects Thrift RPC calls to the
091 * HBase client API primarily defined in the Admin and Table objects.
092 */
093@InterfaceAudience.Private
094@SuppressWarnings("deprecation")
095public class ThriftHBaseServiceHandler extends HBaseServiceHandler implements Hbase.Iface {
096  private static final Logger LOG = LoggerFactory.getLogger(ThriftHBaseServiceHandler.class);
097
098  public static final int HREGION_VERSION = 1;
099
100  // nextScannerId and scannerMap are used to manage scanner state
101  private int nextScannerId = 0;
102  private HashMap<Integer, ResultScannerWrapper> scannerMap;
103  IncrementCoalescer coalescer;
104
105  /**
106   * Returns a list of all the column families for a given Table.
107   *
108   * @param table table
109   * @throws IOException
110   */
111  byte[][] getAllColumns(Table table) throws IOException {
112    HColumnDescriptor[] cds = table.getTableDescriptor().getColumnFamilies();
113    byte[][] columns = new byte[cds.length][];
114    for (int i = 0; i < cds.length; i++) {
115      columns[i] = Bytes.add(cds[i].getName(),
116          KeyValue.COLUMN_FAMILY_DELIM_ARRAY);
117    }
118    return columns;
119  }
120
121
122  /**
123   * Assigns a unique ID to the scanner and adds the mapping to an internal
124   * hash-map.
125   *
126   * @param scanner the {@link ResultScanner} to add
127   * @return integer scanner id
128   */
129  protected synchronized int addScanner(ResultScanner scanner, boolean sortColumns) {
130    int id = nextScannerId++;
131    ResultScannerWrapper resultScannerWrapper =
132        new ResultScannerWrapper(scanner, sortColumns);
133    scannerMap.put(id, resultScannerWrapper);
134    return id;
135  }
136
137  /**
138   * Returns the scanner associated with the specified ID.
139   *
140   * @param id the ID of the scanner to get
141   * @return a Scanner, or null if ID was invalid.
142   */
143  private synchronized ResultScannerWrapper getScanner(int id) {
144    return scannerMap.get(id);
145  }
146
147  /**
148   * Removes the scanner associated with the specified ID from the internal
149   * id-&gt;scanner hash-map.
150   *
151   * @param id the ID of the scanner to remove
152   * @return a Scanner, or null if ID was invalid.
153   */
154  private synchronized ResultScannerWrapper removeScanner(int id) {
155    return scannerMap.remove(id);
156  }
157
158  protected ThriftHBaseServiceHandler(final Configuration c,
159      final UserProvider userProvider) throws IOException {
160    super(c, userProvider);
161    scannerMap = new HashMap<>();
162    this.coalescer = new IncrementCoalescer(this);
163  }
164
165
166  @Override
167  public void enableTable(ByteBuffer tableName) throws IOError {
168    try{
169      getAdmin().enableTable(getTableName(tableName));
170    } catch (IOException e) {
171      LOG.warn(e.getMessage(), e);
172      throw getIOError(e);
173    }
174  }
175
176  @Override
177  public void disableTable(ByteBuffer tableName) throws IOError{
178    try{
179      getAdmin().disableTable(getTableName(tableName));
180    } catch (IOException e) {
181      LOG.warn(e.getMessage(), e);
182      throw getIOError(e);
183    }
184  }
185
186  @Override
187  public boolean isTableEnabled(ByteBuffer tableName) throws IOError {
188    try {
189      return this.connectionCache.getAdmin().isTableEnabled(getTableName(tableName));
190    } catch (IOException e) {
191      LOG.warn(e.getMessage(), e);
192      throw getIOError(e);
193    }
194  }
195
196  // ThriftServerRunner.compact should be deprecated and replaced with methods specific to
197  // table and region.
198  @Override
199  public void compact(ByteBuffer tableNameOrRegionName) throws IOError {
200    try {
201      try {
202        getAdmin().compactRegion(getBytes(tableNameOrRegionName));
203      } catch (IllegalArgumentException e) {
204        // Invalid region, try table
205        getAdmin().compact(TableName.valueOf(getBytes(tableNameOrRegionName)));
206      }
207    } catch (IOException e) {
208      LOG.warn(e.getMessage(), e);
209      throw getIOError(e);
210    }
211  }
212
213  // ThriftServerRunner.majorCompact should be deprecated and replaced with methods specific
214  // to table and region.
215  @Override
216  public void majorCompact(ByteBuffer tableNameOrRegionName) throws IOError {
217    try {
218      try {
219        getAdmin().compactRegion(getBytes(tableNameOrRegionName));
220      } catch (IllegalArgumentException e) {
221        // Invalid region, try table
222        getAdmin().compact(TableName.valueOf(getBytes(tableNameOrRegionName)));
223      }
224    } catch (IOException e) {
225      LOG.warn(e.getMessage(), e);
226      throw getIOError(e);
227    }
228  }
229
230  @Override
231  public List<ByteBuffer> getTableNames() throws IOError {
232    try {
233      TableName[] tableNames = this.getAdmin().listTableNames();
234      ArrayList<ByteBuffer> list = new ArrayList<>(tableNames.length);
235      for (TableName tableName : tableNames) {
236        list.add(ByteBuffer.wrap(tableName.getName()));
237      }
238      return list;
239    } catch (IOException e) {
240      LOG.warn(e.getMessage(), e);
241      throw getIOError(e);
242    }
243  }
244
245  /**
246   * @return the list of regions in the given table, or an empty list if the table does not exist
247   */
248  @Override
249  public List<TRegionInfo> getTableRegions(ByteBuffer tableName) throws IOError {
250    try (RegionLocator locator = connectionCache.getRegionLocator(getBytes(tableName))) {
251      List<HRegionLocation> regionLocations = locator.getAllRegionLocations();
252      List<TRegionInfo> results = new ArrayList<>(regionLocations.size());
253      for (HRegionLocation regionLocation : regionLocations) {
254        RegionInfo info = regionLocation.getRegionInfo();
255        ServerName serverName = regionLocation.getServerName();
256        TRegionInfo region = new TRegionInfo();
257        region.serverName = ByteBuffer.wrap(
258            Bytes.toBytes(serverName.getHostname()));
259        region.port = serverName.getPort();
260        region.startKey = ByteBuffer.wrap(info.getStartKey());
261        region.endKey = ByteBuffer.wrap(info.getEndKey());
262        region.id = info.getRegionId();
263        region.name = ByteBuffer.wrap(info.getRegionName());
264        region.version = HREGION_VERSION; // HRegion now not versioned, PB encoding used
265        results.add(region);
266      }
267      return results;
268    } catch (TableNotFoundException e) {
269      // Return empty list for non-existing table
270      return Collections.emptyList();
271    } catch (IOException e){
272      LOG.warn(e.getMessage(), e);
273      throw getIOError(e);
274    }
275  }
276
277  @Override
278  public List<TCell> get(
279      ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
280      Map<ByteBuffer, ByteBuffer> attributes)
281      throws IOError {
282    byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
283    if (famAndQf.length == 1) {
284      return get(tableName, row, famAndQf[0], null, attributes);
285    }
286    if (famAndQf.length == 2) {
287      return get(tableName, row, famAndQf[0], famAndQf[1], attributes);
288    }
289    throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
290  }
291
292  /**
293   * Note: this internal interface is slightly different from public APIs in regard to handling
294   * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather,
295   * we respect qual == null as a request for the entire column family. The caller (
296   * {@link #get(ByteBuffer, ByteBuffer, ByteBuffer, Map)}) interface IS consistent in that the
297   * column is parse like normal.
298   */
299  protected List<TCell> get(ByteBuffer tableName,
300      ByteBuffer row,
301      byte[] family,
302      byte[] qualifier,
303      Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
304    Table table = null;
305    try {
306      table = getTable(tableName);
307      Get get = new Get(getBytes(row));
308      addAttributes(get, attributes);
309      if (qualifier == null) {
310        get.addFamily(family);
311      } else {
312        get.addColumn(family, qualifier);
313      }
314      Result result = table.get(get);
315      return ThriftUtilities.cellFromHBase(result.rawCells());
316    } catch (IOException e) {
317      LOG.warn(e.getMessage(), e);
318      throw getIOError(e);
319    } finally {
320      closeTable(table);
321    }
322  }
323
324  @Override
325  public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
326      int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
327    byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
328    if(famAndQf.length == 1) {
329      return getVer(tableName, row, famAndQf[0], null, numVersions, attributes);
330    }
331    if (famAndQf.length == 2) {
332      return getVer(tableName, row, famAndQf[0], famAndQf[1], numVersions, attributes);
333    }
334    throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
335
336  }
337
338  /**
339   * Note: this public interface is slightly different from public Java APIs in regard to
340   * handling of the qualifier. Here we differ from the public Java API in that null != byte[0].
341   * Rather, we respect qual == null as a request for the entire column family. If you want to
342   * access the entire column family, use
343   * {@link #getVer(ByteBuffer, ByteBuffer, ByteBuffer, int, Map)} with a {@code column} value
344   * that lacks a {@code ':'}.
345   */
346  public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, byte[] family,
347      byte[] qualifier, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
348
349    Table table = null;
350    try {
351      table = getTable(tableName);
352      Get get = new Get(getBytes(row));
353      addAttributes(get, attributes);
354      if (null == qualifier) {
355        get.addFamily(family);
356      } else {
357        get.addColumn(family, qualifier);
358      }
359      get.setMaxVersions(numVersions);
360      Result result = table.get(get);
361      return ThriftUtilities.cellFromHBase(result.rawCells());
362    } catch (IOException e) {
363      LOG.warn(e.getMessage(), e);
364      throw getIOError(e);
365    } finally{
366      closeTable(table);
367    }
368  }
369
370  @Override
371  public List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
372      long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
373    byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
374    if (famAndQf.length == 1) {
375      return getVerTs(tableName, row, famAndQf[0], null, timestamp, numVersions, attributes);
376    }
377    if (famAndQf.length == 2) {
378      return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp, numVersions,
379          attributes);
380    }
381    throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
382  }
383
384  /**
385   * Note: this internal interface is slightly different from public APIs in regard to handling
386   * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather,
387   * we respect qual == null as a request for the entire column family. The caller (
388   * {@link #getVerTs(ByteBuffer, ByteBuffer, ByteBuffer, long, int, Map)}) interface IS
389   * consistent in that the column is parse like normal.
390   */
391  protected List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, byte[] family,
392      byte[] qualifier, long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes)
393      throws IOError {
394
395    Table table = null;
396    try {
397      table = getTable(tableName);
398      Get get = new Get(getBytes(row));
399      addAttributes(get, attributes);
400      if (null == qualifier) {
401        get.addFamily(family);
402      } else {
403        get.addColumn(family, qualifier);
404      }
405      get.setTimeRange(0, timestamp);
406      get.setMaxVersions(numVersions);
407      Result result = table.get(get);
408      return ThriftUtilities.cellFromHBase(result.rawCells());
409    } catch (IOException e) {
410      LOG.warn(e.getMessage(), e);
411      throw getIOError(e);
412    } finally{
413      closeTable(table);
414    }
415  }
416
417  @Override
418  public List<TRowResult> getRow(ByteBuffer tableName, ByteBuffer row,
419      Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
420    return getRowWithColumnsTs(tableName, row, null,
421        HConstants.LATEST_TIMESTAMP,
422        attributes);
423  }
424
425  @Override
426  public List<TRowResult> getRowWithColumns(ByteBuffer tableName,
427      ByteBuffer row,
428      List<ByteBuffer> columns,
429      Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
430    return getRowWithColumnsTs(tableName, row, columns,
431        HConstants.LATEST_TIMESTAMP,
432        attributes);
433  }
434
435  @Override
436  public List<TRowResult> getRowTs(ByteBuffer tableName, ByteBuffer row,
437      long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
438    return getRowWithColumnsTs(tableName, row, null,
439        timestamp, attributes);
440  }
441
442  @Override
443  public List<TRowResult> getRowWithColumnsTs(
444      ByteBuffer tableName, ByteBuffer row, List<ByteBuffer> columns,
445      long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
446
447    Table table = null;
448    try {
449      table = getTable(tableName);
450      if (columns == null) {
451        Get get = new Get(getBytes(row));
452        addAttributes(get, attributes);
453        get.setTimeRange(0, timestamp);
454        Result result = table.get(get);
455        return ThriftUtilities.rowResultFromHBase(result);
456      }
457      Get get = new Get(getBytes(row));
458      addAttributes(get, attributes);
459      for(ByteBuffer column : columns) {
460        byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
461        if (famAndQf.length == 1) {
462          get.addFamily(famAndQf[0]);
463        } else {
464          get.addColumn(famAndQf[0], famAndQf[1]);
465        }
466      }
467      get.setTimeRange(0, timestamp);
468      Result result = table.get(get);
469      return ThriftUtilities.rowResultFromHBase(result);
470    } catch (IOException e) {
471      LOG.warn(e.getMessage(), e);
472      throw getIOError(e);
473    } finally{
474      closeTable(table);
475    }
476  }
477
478  @Override
479  public List<TRowResult> getRows(ByteBuffer tableName,
480      List<ByteBuffer> rows,
481      Map<ByteBuffer, ByteBuffer> attributes)
482      throws IOError {
483    return getRowsWithColumnsTs(tableName, rows, null,
484        HConstants.LATEST_TIMESTAMP,
485        attributes);
486  }
487
488  @Override
489  public List<TRowResult> getRowsWithColumns(ByteBuffer tableName,
490      List<ByteBuffer> rows,
491      List<ByteBuffer> columns,
492      Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
493    return getRowsWithColumnsTs(tableName, rows, columns,
494        HConstants.LATEST_TIMESTAMP,
495        attributes);
496  }
497
498  @Override
499  public List<TRowResult> getRowsTs(ByteBuffer tableName,
500      List<ByteBuffer> rows,
501      long timestamp,
502      Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
503    return getRowsWithColumnsTs(tableName, rows, null,
504        timestamp, attributes);
505  }
506
507  @Override
508  public List<TRowResult> getRowsWithColumnsTs(ByteBuffer tableName,
509      List<ByteBuffer> rows,
510      List<ByteBuffer> columns, long timestamp,
511      Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
512
513    Table table= null;
514    try {
515      List<Get> gets = new ArrayList<>(rows.size());
516      table = getTable(tableName);
517      if (metrics != null) {
518        metrics.incNumRowKeysInBatchGet(rows.size());
519      }
520      for (ByteBuffer row : rows) {
521        Get get = new Get(getBytes(row));
522        addAttributes(get, attributes);
523        if (columns != null) {
524
525          for(ByteBuffer column : columns) {
526            byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
527            if (famAndQf.length == 1) {
528              get.addFamily(famAndQf[0]);
529            } else {
530              get.addColumn(famAndQf[0], famAndQf[1]);
531            }
532          }
533        }
534        get.setTimeRange(0, timestamp);
535        gets.add(get);
536      }
537      Result[] result = table.get(gets);
538      return ThriftUtilities.rowResultFromHBase(result);
539    } catch (IOException e) {
540      LOG.warn(e.getMessage(), e);
541      throw getIOError(e);
542    } finally{
543      closeTable(table);
544    }
545  }
546
547  @Override
548  public void deleteAll(
549      ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
550      Map<ByteBuffer, ByteBuffer> attributes)
551      throws IOError {
552    deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP,
553        attributes);
554  }
555
556  @Override
557  public void deleteAllTs(ByteBuffer tableName,
558      ByteBuffer row,
559      ByteBuffer column,
560      long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
561    Table table = null;
562    try {
563      table = getTable(tableName);
564      Delete delete  = new Delete(getBytes(row));
565      addAttributes(delete, attributes);
566      byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
567      if (famAndQf.length == 1) {
568        delete.addFamily(famAndQf[0], timestamp);
569      } else {
570        delete.addColumns(famAndQf[0], famAndQf[1], timestamp);
571      }
572      table.delete(delete);
573
574    } catch (IOException e) {
575      LOG.warn(e.getMessage(), e);
576      throw getIOError(e);
577    } finally {
578      closeTable(table);
579    }
580  }
581
582  @Override
583  public void deleteAllRow(
584      ByteBuffer tableName, ByteBuffer row,
585      Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
586    deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP, attributes);
587  }
588
589  @Override
590  public void deleteAllRowTs(
591      ByteBuffer tableName, ByteBuffer row, long timestamp,
592      Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
593    Table table = null;
594    try {
595      table = getTable(tableName);
596      Delete delete  = new Delete(getBytes(row), timestamp);
597      addAttributes(delete, attributes);
598      table.delete(delete);
599    } catch (IOException e) {
600      LOG.warn(e.getMessage(), e);
601      throw getIOError(e);
602    } finally {
603      closeTable(table);
604    }
605  }
606
607  @Override
608  public void createTable(ByteBuffer in_tableName,
609      List<ColumnDescriptor> columnFamilies) throws IOError, IllegalArgument, AlreadyExists {
610    TableName tableName = getTableName(in_tableName);
611    try {
612      if (getAdmin().tableExists(tableName)) {
613        throw new AlreadyExists("table name already in use");
614      }
615      HTableDescriptor desc = new HTableDescriptor(tableName);
616      for (ColumnDescriptor col : columnFamilies) {
617        HColumnDescriptor colDesc = ThriftUtilities.colDescFromThrift(col);
618        desc.addFamily(colDesc);
619      }
620      getAdmin().createTable(desc);
621    } catch (IOException e) {
622      LOG.warn(e.getMessage(), e);
623      throw getIOError(e);
624    } catch (IllegalArgumentException e) {
625      LOG.warn(e.getMessage(), e);
626      throw new IllegalArgument(Throwables.getStackTraceAsString(e));
627    }
628  }
629
630  private static TableName getTableName(ByteBuffer buffer) {
631    return TableName.valueOf(getBytes(buffer));
632  }
633
634  @Override
635  public void deleteTable(ByteBuffer in_tableName) throws IOError {
636    TableName tableName = getTableName(in_tableName);
637    if (LOG.isDebugEnabled()) {
638      LOG.debug("deleteTable: table={}", tableName);
639    }
640    try {
641      if (!getAdmin().tableExists(tableName)) {
642        throw new IOException("table does not exist");
643      }
644      getAdmin().deleteTable(tableName);
645    } catch (IOException e) {
646      LOG.warn(e.getMessage(), e);
647      throw getIOError(e);
648    }
649  }
650
651  @Override
652  public void mutateRow(ByteBuffer tableName, ByteBuffer row,
653      List<Mutation> mutations, Map<ByteBuffer, ByteBuffer> attributes)
654      throws IOError, IllegalArgument {
655    mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP, attributes);
656  }
657
658  @Override
659  public void mutateRowTs(ByteBuffer tableName, ByteBuffer row,
660      List<Mutation> mutations, long timestamp,
661      Map<ByteBuffer, ByteBuffer> attributes)
662      throws IOError, IllegalArgument {
663    Table table = null;
664    try {
665      table = getTable(tableName);
666      Put put = new Put(getBytes(row), timestamp);
667      addAttributes(put, attributes);
668
669      Delete delete = new Delete(getBytes(row));
670      addAttributes(delete, attributes);
671      if (metrics != null) {
672        metrics.incNumRowKeysInBatchMutate(mutations.size());
673      }
674
675      // I apologize for all this mess :)
676      CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
677      for (Mutation m : mutations) {
678        byte[][] famAndQf = CellUtil.parseColumn(getBytes(m.column));
679        if (m.isDelete) {
680          if (famAndQf.length == 1) {
681            delete.addFamily(famAndQf[0], timestamp);
682          } else {
683            delete.addColumns(famAndQf[0], famAndQf[1], timestamp);
684          }
685          delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
686        } else {
687          if(famAndQf.length == 1) {
688            LOG.warn("No column qualifier specified. Delete is the only mutation supported "
689                + "over the whole column family.");
690          } else {
691            put.add(builder.clear()
692                .setRow(put.getRow())
693                .setFamily(famAndQf[0])
694                .setQualifier(famAndQf[1])
695                .setTimestamp(put.getTimestamp())
696                .setType(Cell.Type.Put)
697                .setValue(m.value != null ? getBytes(m.value)
698                    : HConstants.EMPTY_BYTE_ARRAY)
699                .build());
700          }
701          put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
702        }
703      }
704      if (!delete.isEmpty()) {
705        table.delete(delete);
706      }
707      if (!put.isEmpty()) {
708        table.put(put);
709      }
710    } catch (IOException e) {
711      LOG.warn(e.getMessage(), e);
712      throw getIOError(e);
713    } catch (IllegalArgumentException e) {
714      LOG.warn(e.getMessage(), e);
715      throw new IllegalArgument(Throwables.getStackTraceAsString(e));
716    } finally{
717      closeTable(table);
718    }
719  }
720
721  @Override
722  public void mutateRows(ByteBuffer tableName, List<BatchMutation> rowBatches,
723      Map<ByteBuffer, ByteBuffer> attributes)
724      throws IOError, IllegalArgument, TException {
725    mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP, attributes);
726  }
727
728  @Override
729  public void mutateRowsTs(
730      ByteBuffer tableName, List<BatchMutation> rowBatches, long timestamp,
731      Map<ByteBuffer, ByteBuffer> attributes)
732      throws IOError, IllegalArgument, TException {
733    List<Put> puts = new ArrayList<>();
734    List<Delete> deletes = new ArrayList<>();
735    CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
736    for (BatchMutation batch : rowBatches) {
737      byte[] row = getBytes(batch.row);
738      List<Mutation> mutations = batch.mutations;
739      Delete delete = new Delete(row);
740      addAttributes(delete, attributes);
741      Put put = new Put(row, timestamp);
742      addAttributes(put, attributes);
743      for (Mutation m : mutations) {
744        byte[][] famAndQf = CellUtil.parseColumn(getBytes(m.column));
745        if (m.isDelete) {
746          // no qualifier, family only.
747          if (famAndQf.length == 1) {
748            delete.addFamily(famAndQf[0], timestamp);
749          } else {
750            delete.addColumns(famAndQf[0], famAndQf[1], timestamp);
751          }
752          delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
753              : Durability.SKIP_WAL);
754        } else {
755          if (famAndQf.length == 1) {
756            LOG.warn("No column qualifier specified. Delete is the only mutation supported "
757                + "over the whole column family.");
758          }
759          if (famAndQf.length == 2) {
760            try {
761              put.add(builder.clear()
762                  .setRow(put.getRow())
763                  .setFamily(famAndQf[0])
764                  .setQualifier(famAndQf[1])
765                  .setTimestamp(put.getTimestamp())
766                  .setType(Cell.Type.Put)
767                  .setValue(m.value != null ? getBytes(m.value)
768                      : HConstants.EMPTY_BYTE_ARRAY)
769                  .build());
770            } catch (IOException e) {
771              throw new IllegalArgumentException(e);
772            }
773          } else {
774            throw new IllegalArgumentException("Invalid famAndQf provided.");
775          }
776          put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
777        }
778      }
779      if (!delete.isEmpty()) {
780        deletes.add(delete);
781      }
782      if (!put.isEmpty()) {
783        puts.add(put);
784      }
785    }
786
787    Table table = null;
788    try {
789      table = getTable(tableName);
790      if (!puts.isEmpty()) {
791        table.put(puts);
792      }
793      if (!deletes.isEmpty()) {
794        table.delete(deletes);
795      }
796    } catch (IOException e) {
797      LOG.warn(e.getMessage(), e);
798      throw getIOError(e);
799    } catch (IllegalArgumentException e) {
800      LOG.warn(e.getMessage(), e);
801      throw new IllegalArgument(Throwables.getStackTraceAsString(e));
802    } finally{
803      closeTable(table);
804    }
805  }
806
807  @Override
808  public long atomicIncrement(
809      ByteBuffer tableName, ByteBuffer row, ByteBuffer column, long amount)
810      throws IOError, IllegalArgument, TException {
811    byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
812    if(famAndQf.length == 1) {
813      return atomicIncrement(tableName, row, famAndQf[0], HConstants.EMPTY_BYTE_ARRAY, amount);
814    }
815    return atomicIncrement(tableName, row, famAndQf[0], famAndQf[1], amount);
816  }
817
818  protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row,
819      byte [] family, byte [] qualifier, long amount)
820      throws IOError, IllegalArgument, TException {
821    Table table = null;
822    try {
823      table = getTable(tableName);
824      return table.incrementColumnValue(
825          getBytes(row), family, qualifier, amount);
826    } catch (IOException e) {
827      LOG.warn(e.getMessage(), e);
828      throw getIOError(e);
829    } finally {
830      closeTable(table);
831    }
832  }
833
834  @Override
835  public void scannerClose(int id) throws IOError, IllegalArgument {
836    LOG.debug("scannerClose: id={}", id);
837    ResultScannerWrapper resultScannerWrapper = getScanner(id);
838    if (resultScannerWrapper == null) {
839      LOG.warn("scanner ID is invalid");
840      throw new IllegalArgument("scanner ID is invalid");
841    }
842    resultScannerWrapper.getScanner().close();
843    removeScanner(id);
844  }
845
846  @Override
847  public List<TRowResult> scannerGetList(int id,int nbRows)
848      throws IllegalArgument, IOError {
849    LOG.debug("scannerGetList: id={}", id);
850    ResultScannerWrapper resultScannerWrapper = getScanner(id);
851    if (null == resultScannerWrapper) {
852      String message = "scanner ID is invalid";
853      LOG.warn(message);
854      throw new IllegalArgument("scanner ID is invalid");
855    }
856
857    Result [] results;
858    try {
859      results = resultScannerWrapper.getScanner().next(nbRows);
860      if (null == results) {
861        return new ArrayList<>();
862      }
863    } catch (IOException e) {
864      LOG.warn(e.getMessage(), e);
865      throw getIOError(e);
866    }
867    return ThriftUtilities.rowResultFromHBase(results, resultScannerWrapper.isColumnSorted());
868  }
869
870  @Override
871  public List<TRowResult> scannerGet(int id) throws IllegalArgument, IOError {
872    return scannerGetList(id,1);
873  }
874
875  @Override
876  public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan,
877      Map<ByteBuffer, ByteBuffer> attributes)
878      throws IOError {
879
880    Table table = null;
881    try {
882      table = getTable(tableName);
883      Scan scan = new Scan();
884      addAttributes(scan, attributes);
885      if (tScan.isSetStartRow()) {
886        scan.setStartRow(tScan.getStartRow());
887      }
888      if (tScan.isSetStopRow()) {
889        scan.setStopRow(tScan.getStopRow());
890      }
891      if (tScan.isSetTimestamp()) {
892        scan.setTimeRange(0, tScan.getTimestamp());
893      }
894      if (tScan.isSetCaching()) {
895        scan.setCaching(tScan.getCaching());
896      }
897      if (tScan.isSetBatchSize()) {
898        scan.setBatch(tScan.getBatchSize());
899      }
900      if (tScan.isSetColumns() && !tScan.getColumns().isEmpty()) {
901        for(ByteBuffer column : tScan.getColumns()) {
902          byte [][] famQf = CellUtil.parseColumn(getBytes(column));
903          if(famQf.length == 1) {
904            scan.addFamily(famQf[0]);
905          } else {
906            scan.addColumn(famQf[0], famQf[1]);
907          }
908        }
909      }
910      if (tScan.isSetFilterString()) {
911        ParseFilter parseFilter = new ParseFilter();
912        scan.setFilter(
913            parseFilter.parseFilterString(tScan.getFilterString()));
914      }
915      if (tScan.isSetReversed()) {
916        scan.setReversed(tScan.isReversed());
917      }
918      if (tScan.isSetCacheBlocks()) {
919        scan.setCacheBlocks(tScan.isCacheBlocks());
920      }
921      return addScanner(table.getScanner(scan), tScan.sortColumns);
922    } catch (IOException e) {
923      LOG.warn(e.getMessage(), e);
924      throw getIOError(e);
925    } finally{
926      closeTable(table);
927    }
928  }
929
930  @Override
931  public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow,
932      List<ByteBuffer> columns,
933      Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
934
935    Table table = null;
936    try {
937      table = getTable(tableName);
938      Scan scan = new Scan(getBytes(startRow));
939      addAttributes(scan, attributes);
940      if(columns != null && !columns.isEmpty()) {
941        for(ByteBuffer column : columns) {
942          byte [][] famQf = CellUtil.parseColumn(getBytes(column));
943          if(famQf.length == 1) {
944            scan.addFamily(famQf[0]);
945          } else {
946            scan.addColumn(famQf[0], famQf[1]);
947          }
948        }
949      }
950      return addScanner(table.getScanner(scan), false);
951    } catch (IOException e) {
952      LOG.warn(e.getMessage(), e);
953      throw getIOError(e);
954    } finally{
955      closeTable(table);
956    }
957  }
958
959  @Override
960  public int scannerOpenWithStop(ByteBuffer tableName, ByteBuffer startRow,
961      ByteBuffer stopRow, List<ByteBuffer> columns,
962      Map<ByteBuffer, ByteBuffer> attributes)
963      throws IOError, TException {
964
965    Table table = null;
966    try {
967      table = getTable(tableName);
968      Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
969      addAttributes(scan, attributes);
970      if(columns != null && !columns.isEmpty()) {
971        for(ByteBuffer column : columns) {
972          byte [][] famQf = CellUtil.parseColumn(getBytes(column));
973          if(famQf.length == 1) {
974            scan.addFamily(famQf[0]);
975          } else {
976            scan.addColumn(famQf[0], famQf[1]);
977          }
978        }
979      }
980      return addScanner(table.getScanner(scan), false);
981    } catch (IOException e) {
982      LOG.warn(e.getMessage(), e);
983      throw getIOError(e);
984    } finally{
985      closeTable(table);
986    }
987  }
988
989  @Override
990  public int scannerOpenWithPrefix(ByteBuffer tableName,
991      ByteBuffer startAndPrefix,
992      List<ByteBuffer> columns,
993      Map<ByteBuffer, ByteBuffer> attributes)
994      throws IOError, TException {
995
996    Table table = null;
997    try {
998      table = getTable(tableName);
999      Scan scan = new Scan(getBytes(startAndPrefix));
1000      addAttributes(scan, attributes);
1001      Filter f = new WhileMatchFilter(
1002          new PrefixFilter(getBytes(startAndPrefix)));
1003      scan.setFilter(f);
1004      if (columns != null && !columns.isEmpty()) {
1005        for(ByteBuffer column : columns) {
1006          byte [][] famQf = CellUtil.parseColumn(getBytes(column));
1007          if(famQf.length == 1) {
1008            scan.addFamily(famQf[0]);
1009          } else {
1010            scan.addColumn(famQf[0], famQf[1]);
1011          }
1012        }
1013      }
1014      return addScanner(table.getScanner(scan), false);
1015    } catch (IOException e) {
1016      LOG.warn(e.getMessage(), e);
1017      throw getIOError(e);
1018    } finally{
1019      closeTable(table);
1020    }
1021  }
1022
1023  @Override
1024  public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow,
1025      List<ByteBuffer> columns, long timestamp,
1026      Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
1027
1028    Table table = null;
1029    try {
1030      table = getTable(tableName);
1031      Scan scan = new Scan(getBytes(startRow));
1032      addAttributes(scan, attributes);
1033      scan.setTimeRange(0, timestamp);
1034      if (columns != null && !columns.isEmpty()) {
1035        for (ByteBuffer column : columns) {
1036          byte [][] famQf = CellUtil.parseColumn(getBytes(column));
1037          if(famQf.length == 1) {
1038            scan.addFamily(famQf[0]);
1039          } else {
1040            scan.addColumn(famQf[0], famQf[1]);
1041          }
1042        }
1043      }
1044      return addScanner(table.getScanner(scan), false);
1045    } catch (IOException e) {
1046      LOG.warn(e.getMessage(), e);
1047      throw getIOError(e);
1048    } finally{
1049      closeTable(table);
1050    }
1051  }
1052
1053  @Override
1054  public int scannerOpenWithStopTs(ByteBuffer tableName, ByteBuffer startRow,
1055      ByteBuffer stopRow, List<ByteBuffer> columns, long timestamp,
1056      Map<ByteBuffer, ByteBuffer> attributes)
1057      throws IOError, TException {
1058
1059    Table table = null;
1060    try {
1061      table = getTable(tableName);
1062      Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
1063      addAttributes(scan, attributes);
1064      scan.setTimeRange(0, timestamp);
1065      if (columns != null && !columns.isEmpty()) {
1066        for (ByteBuffer column : columns) {
1067          byte [][] famQf = CellUtil.parseColumn(getBytes(column));
1068          if(famQf.length == 1) {
1069            scan.addFamily(famQf[0]);
1070          } else {
1071            scan.addColumn(famQf[0], famQf[1]);
1072          }
1073        }
1074      }
1075      scan.setTimeRange(0, timestamp);
1076      return addScanner(table.getScanner(scan), false);
1077    } catch (IOException e) {
1078      LOG.warn(e.getMessage(), e);
1079      throw getIOError(e);
1080    } finally{
1081      closeTable(table);
1082    }
1083  }
1084
1085  @Override
1086  public Map<ByteBuffer, ColumnDescriptor> getColumnDescriptors(
1087      ByteBuffer tableName) throws IOError, TException {
1088
1089    Table table = null;
1090    try {
1091      TreeMap<ByteBuffer, ColumnDescriptor> columns = new TreeMap<>();
1092
1093      table = getTable(tableName);
1094      HTableDescriptor desc = table.getTableDescriptor();
1095
1096      for (HColumnDescriptor e : desc.getFamilies()) {
1097        ColumnDescriptor col = ThriftUtilities.colDescFromHbase(e);
1098        columns.put(col.name, col);
1099      }
1100      return columns;
1101    } catch (IOException e) {
1102      LOG.warn(e.getMessage(), e);
1103      throw getIOError(e);
1104    } finally {
1105      closeTable(table);
1106    }
1107  }
1108
1109  private void closeTable(Table table) throws IOError {
1110    try{
1111      if(table != null){
1112        table.close();
1113      }
1114    } catch (IOException e){
1115      LOG.error(e.getMessage(), e);
1116      throw getIOError(e);
1117    }
1118  }
1119
1120  @Override
1121  public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError {
1122    try {
1123      byte[] row = getBytes(searchRow);
1124      Result startRowResult = getReverseScanResult(TableName.META_TABLE_NAME.getName(), row,
1125          HConstants.CATALOG_FAMILY);
1126
1127      if (startRowResult == null) {
1128        throw new IOException("Cannot find row in "+ TableName.META_TABLE_NAME+", row="
1129            + Bytes.toStringBinary(row));
1130      }
1131
1132      // find region start and end keys
1133      RegionInfo regionInfo = MetaTableAccessor.getRegionInfo(startRowResult);
1134      if (regionInfo == null) {
1135        throw new IOException("RegionInfo REGIONINFO was null or " +
1136            " empty in Meta for row="
1137            + Bytes.toStringBinary(row));
1138      }
1139      TRegionInfo region = new TRegionInfo();
1140      region.setStartKey(regionInfo.getStartKey());
1141      region.setEndKey(regionInfo.getEndKey());
1142      region.id = regionInfo.getRegionId();
1143      region.setName(regionInfo.getRegionName());
1144      region.version = HREGION_VERSION; // version not used anymore, PB encoding used.
1145
1146      // find region assignment to server
1147      ServerName serverName = MetaTableAccessor.getServerName(startRowResult, 0);
1148      if (serverName != null) {
1149        region.setServerName(Bytes.toBytes(serverName.getHostname()));
1150        region.port = serverName.getPort();
1151      }
1152      return region;
1153    } catch (IOException e) {
1154      LOG.warn(e.getMessage(), e);
1155      throw getIOError(e);
1156    }
1157  }
1158
1159  private Result getReverseScanResult(byte[] tableName, byte[] row, byte[] family)
1160      throws IOException {
1161    Scan scan = new Scan(row);
1162    scan.setReversed(true);
1163    scan.addFamily(family);
1164    scan.setStartRow(row);
1165    try (Table table = getTable(tableName);
1166         ResultScanner scanner = table.getScanner(scan)) {
1167      return scanner.next();
1168    }
1169  }
1170
1171  @Override
1172  public void increment(TIncrement tincrement) throws IOError, TException {
1173
1174    if (tincrement.getRow().length == 0 || tincrement.getTable().length == 0) {
1175      throw new TException("Must supply a table and a row key; can't increment");
1176    }
1177
1178    if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1179      this.coalescer.queueIncrement(tincrement);
1180      return;
1181    }
1182
1183    Table table = null;
1184    try {
1185      table = getTable(tincrement.getTable());
1186      Increment inc = ThriftUtilities.incrementFromThrift(tincrement);
1187      table.increment(inc);
1188    } catch (IOException e) {
1189      LOG.warn(e.getMessage(), e);
1190      throw getIOError(e);
1191    } finally{
1192      closeTable(table);
1193    }
1194  }
1195
1196  @Override
1197  public void incrementRows(List<TIncrement> tincrements) throws IOError, TException {
1198    if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1199      this.coalescer.queueIncrements(tincrements);
1200      return;
1201    }
1202    for (TIncrement tinc : tincrements) {
1203      increment(tinc);
1204    }
1205  }
1206
1207  @Override
1208  public List<TCell> append(TAppend tappend) throws IOError, TException {
1209    if (tappend.getRow().length == 0 || tappend.getTable().length == 0) {
1210      throw new TException("Must supply a table and a row key; can't append");
1211    }
1212
1213    Table table = null;
1214    try {
1215      table = getTable(tappend.getTable());
1216      Append append = ThriftUtilities.appendFromThrift(tappend);
1217      Result result = table.append(append);
1218      return ThriftUtilities.cellFromHBase(result.rawCells());
1219    } catch (IOException e) {
1220      LOG.warn(e.getMessage(), e);
1221      throw getIOError(e);
1222    } finally{
1223      closeTable(table);
1224    }
1225  }
1226
1227  @Override
1228  public boolean checkAndPut(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
1229      ByteBuffer value, Mutation mput, Map<ByteBuffer, ByteBuffer> attributes) throws IOError,
1230      IllegalArgument, TException {
1231    Put put;
1232    try {
1233      put = new Put(getBytes(row), HConstants.LATEST_TIMESTAMP);
1234      addAttributes(put, attributes);
1235
1236      byte[][] famAndQf = CellUtil.parseColumn(getBytes(mput.column));
1237      put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
1238          .setRow(put.getRow())
1239          .setFamily(famAndQf[0])
1240          .setQualifier(famAndQf[1])
1241          .setTimestamp(put.getTimestamp())
1242          .setType(Cell.Type.Put)
1243          .setValue(mput.value != null ? getBytes(mput.value)
1244              : HConstants.EMPTY_BYTE_ARRAY)
1245          .build());
1246      put.setDurability(mput.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1247    } catch (IOException | IllegalArgumentException e) {
1248      LOG.warn(e.getMessage(), e);
1249      throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1250    }
1251
1252    Table table = null;
1253    try {
1254      table = getTable(tableName);
1255      byte[][] famAndQf = CellUtil.parseColumn(getBytes(column));
1256      Table.CheckAndMutateBuilder mutateBuilder =
1257          table.checkAndMutate(getBytes(row), famAndQf[0]).qualifier(famAndQf[1]);
1258      if (value != null) {
1259        return mutateBuilder.ifEquals(getBytes(value)).thenPut(put);
1260      } else {
1261        return mutateBuilder.ifNotExists().thenPut(put);
1262      }
1263    } catch (IOException e) {
1264      LOG.warn(e.getMessage(), e);
1265      throw getIOError(e);
1266    } catch (IllegalArgumentException e) {
1267      LOG.warn(e.getMessage(), e);
1268      throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1269    } finally {
1270      closeTable(table);
1271    }
1272  }
1273
1274  @Override
1275  public TThriftServerType getThriftServerType() {
1276    return TThriftServerType.ONE;
1277  }
1278
1279  private static IOError getIOError(Throwable throwable) {
1280    IOError error = new IOErrorWithCause(throwable);
1281    error.setMessage(Throwables.getStackTraceAsString(throwable));
1282    return error;
1283  }
1284
1285  /**
1286   * Adds all the attributes into the Operation object
1287   */
1288  private static void addAttributes(OperationWithAttributes op,
1289      Map<ByteBuffer, ByteBuffer> attributes) {
1290    if (attributes == null || attributes.isEmpty()) {
1291      return;
1292    }
1293    for (Map.Entry<ByteBuffer, ByteBuffer> entry : attributes.entrySet()) {
1294      String name = Bytes.toStringBinary(getBytes(entry.getKey()));
1295      byte[] value =  getBytes(entry.getValue());
1296      op.setAttribute(name, value);
1297    }
1298  }
1299
1300  protected static class ResultScannerWrapper {
1301
1302    private final ResultScanner scanner;
1303    private final boolean sortColumns;
1304    public ResultScannerWrapper(ResultScanner resultScanner,
1305        boolean sortResultColumns) {
1306      scanner = resultScanner;
1307      sortColumns = sortResultColumns;
1308    }
1309
1310    public ResultScanner getScanner() {
1311      return scanner;
1312    }
1313
1314    public boolean isColumnSorted() {
1315      return sortColumns;
1316    }
1317  }
1318
1319  public static class IOErrorWithCause extends IOError {
1320    private final Throwable cause;
1321    public IOErrorWithCause(Throwable cause) {
1322      this.cause = cause;
1323    }
1324
1325    @Override
1326    public synchronized Throwable getCause() {
1327      return cause;
1328    }
1329
1330    @Override
1331    public boolean equals(Object other) {
1332      if (super.equals(other) &&
1333          other instanceof IOErrorWithCause) {
1334        Throwable otherCause = ((IOErrorWithCause) other).getCause();
1335        if (this.getCause() != null) {
1336          return otherCause != null && this.getCause().equals(otherCause);
1337        } else {
1338          return otherCause == null;
1339        }
1340      }
1341      return false;
1342    }
1343
1344    @Override
1345    public int hashCode() {
1346      int result = super.hashCode();
1347      result = 31 * result + (cause != null ? cause.hashCode() : 0);
1348      return result;
1349    }
1350  }
1351
1352
1353}