001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.thrift;
021
022import static org.apache.hadoop.hbase.thrift.Constants.COALESCE_INC_KEY;
023import static org.apache.hadoop.hbase.util.Bytes.getBytes;
024
025import java.io.IOException;
026import java.nio.ByteBuffer;
027import java.util.ArrayList;
028import java.util.Collections;
029import java.util.HashMap;
030import java.util.List;
031import java.util.Map;
032import java.util.TreeMap;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.hbase.CatalogFamilyFormat;
035import org.apache.hadoop.hbase.Cell;
036import org.apache.hadoop.hbase.CellBuilder;
037import org.apache.hadoop.hbase.CellBuilderFactory;
038import org.apache.hadoop.hbase.CellBuilderType;
039import org.apache.hadoop.hbase.CellUtil;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.HRegionLocation;
042import org.apache.hadoop.hbase.KeyValue;
043import org.apache.hadoop.hbase.ServerName;
044import org.apache.hadoop.hbase.TableName;
045import org.apache.hadoop.hbase.TableNotFoundException;
046import org.apache.hadoop.hbase.client.Append;
047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
048import org.apache.hadoop.hbase.client.Delete;
049import org.apache.hadoop.hbase.client.Durability;
050import org.apache.hadoop.hbase.client.Get;
051import org.apache.hadoop.hbase.client.Increment;
052import org.apache.hadoop.hbase.client.OperationWithAttributes;
053import org.apache.hadoop.hbase.client.Put;
054import org.apache.hadoop.hbase.client.RegionInfo;
055import org.apache.hadoop.hbase.client.RegionLocator;
056import org.apache.hadoop.hbase.client.Result;
057import org.apache.hadoop.hbase.client.ResultScanner;
058import org.apache.hadoop.hbase.client.Scan;
059import org.apache.hadoop.hbase.client.Table;
060import org.apache.hadoop.hbase.client.TableDescriptor;
061import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
062import org.apache.hadoop.hbase.filter.Filter;
063import org.apache.hadoop.hbase.filter.ParseFilter;
064import org.apache.hadoop.hbase.filter.PrefixFilter;
065import org.apache.hadoop.hbase.filter.WhileMatchFilter;
066import org.apache.hadoop.hbase.security.UserProvider;
067import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
068import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
069import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
070import org.apache.hadoop.hbase.thrift.generated.Hbase;
071import org.apache.hadoop.hbase.thrift.generated.IOError;
072import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
073import org.apache.hadoop.hbase.thrift.generated.Mutation;
074import org.apache.hadoop.hbase.thrift.generated.TAppend;
075import org.apache.hadoop.hbase.thrift.generated.TCell;
076import org.apache.hadoop.hbase.thrift.generated.TIncrement;
077import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
078import org.apache.hadoop.hbase.thrift.generated.TRowResult;
079import org.apache.hadoop.hbase.thrift.generated.TScan;
080import org.apache.hadoop.hbase.thrift.generated.TThriftServerType;
081import org.apache.hadoop.hbase.util.Bytes;
082import org.apache.thrift.TException;
083import org.apache.yetus.audience.InterfaceAudience;
084import org.slf4j.Logger;
085import org.slf4j.LoggerFactory;
086
087import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
088
089/**
090 * The HBaseServiceHandler is a glue object that connects Thrift RPC calls to the
091 * HBase client API primarily defined in the Admin and Table objects.
092 */
093@InterfaceAudience.Private
094@SuppressWarnings("deprecation")
095public class ThriftHBaseServiceHandler extends HBaseServiceHandler implements Hbase.Iface {
096  private static final Logger LOG = LoggerFactory.getLogger(ThriftHBaseServiceHandler.class);
097
098  public static final int HREGION_VERSION = 1;
099
100  // nextScannerId and scannerMap are used to manage scanner state
101  private int nextScannerId = 0;
102  private HashMap<Integer, ResultScannerWrapper> scannerMap;
103  IncrementCoalescer coalescer;
104
105  /**
106   * Returns a list of all the column families for a given Table.
107   */
108  byte[][] getAllColumns(Table table) throws IOException {
109    ColumnFamilyDescriptor[] cds = table.getDescriptor().getColumnFamilies();
110    byte[][] columns = new byte[cds.length][];
111    for (int i = 0; i < cds.length; i++) {
112      columns[i] = Bytes.add(cds[i].getName(), KeyValue.COLUMN_FAMILY_DELIM_ARRAY);
113    }
114    return columns;
115  }
116
117
118  /**
119   * Assigns a unique ID to the scanner and adds the mapping to an internal
120   * hash-map.
121   *
122   * @param scanner the {@link ResultScanner} to add
123   * @return integer scanner id
124   */
125  protected synchronized int addScanner(ResultScanner scanner, boolean sortColumns) {
126    int id = nextScannerId++;
127    ResultScannerWrapper resultScannerWrapper =
128        new ResultScannerWrapper(scanner, sortColumns);
129    scannerMap.put(id, resultScannerWrapper);
130    return id;
131  }
132
133  /**
134   * Returns the scanner associated with the specified ID.
135   *
136   * @param id the ID of the scanner to get
137   * @return a Scanner, or null if ID was invalid.
138   */
139  private synchronized ResultScannerWrapper getScanner(int id) {
140    return scannerMap.get(id);
141  }
142
143  /**
144   * Removes the scanner associated with the specified ID from the internal
145   * id-&gt;scanner hash-map.
146   *
147   * @param id the ID of the scanner to remove
148   * @return a Scanner, or null if ID was invalid.
149   */
150  private synchronized ResultScannerWrapper removeScanner(int id) {
151    return scannerMap.remove(id);
152  }
153
154  protected ThriftHBaseServiceHandler(final Configuration c,
155      final UserProvider userProvider) throws IOException {
156    super(c, userProvider);
157    scannerMap = new HashMap<>();
158    this.coalescer = new IncrementCoalescer(this);
159  }
160
161
162  @Override
163  public void enableTable(ByteBuffer tableName) throws IOError {
164    try{
165      getAdmin().enableTable(getTableName(tableName));
166    } catch (IOException e) {
167      LOG.warn(e.getMessage(), e);
168      throw getIOError(e);
169    }
170  }
171
172  @Override
173  public void disableTable(ByteBuffer tableName) throws IOError{
174    try{
175      getAdmin().disableTable(getTableName(tableName));
176    } catch (IOException e) {
177      LOG.warn(e.getMessage(), e);
178      throw getIOError(e);
179    }
180  }
181
182  @Override
183  public boolean isTableEnabled(ByteBuffer tableName) throws IOError {
184    try {
185      return this.connectionCache.getAdmin().isTableEnabled(getTableName(tableName));
186    } catch (IOException e) {
187      LOG.warn(e.getMessage(), e);
188      throw getIOError(e);
189    }
190  }
191
192  // ThriftServerRunner.compact should be deprecated and replaced with methods specific to
193  // table and region.
194  @Override
195  public void compact(ByteBuffer tableNameOrRegionName) throws IOError {
196    try {
197      try {
198        getAdmin().compactRegion(getBytes(tableNameOrRegionName));
199      } catch (IllegalArgumentException e) {
200        // Invalid region, try table
201        getAdmin().compact(TableName.valueOf(getBytes(tableNameOrRegionName)));
202      }
203    } catch (IOException e) {
204      LOG.warn(e.getMessage(), e);
205      throw getIOError(e);
206    }
207  }
208
209  // ThriftServerRunner.majorCompact should be deprecated and replaced with methods specific
210  // to table and region.
211  @Override
212  public void majorCompact(ByteBuffer tableNameOrRegionName) throws IOError {
213    try {
214      try {
215        getAdmin().compactRegion(getBytes(tableNameOrRegionName));
216      } catch (IllegalArgumentException e) {
217        // Invalid region, try table
218        getAdmin().compact(TableName.valueOf(getBytes(tableNameOrRegionName)));
219      }
220    } catch (IOException e) {
221      LOG.warn(e.getMessage(), e);
222      throw getIOError(e);
223    }
224  }
225
226  @Override
227  public List<ByteBuffer> getTableNames() throws IOError {
228    try {
229      TableName[] tableNames = this.getAdmin().listTableNames();
230      ArrayList<ByteBuffer> list = new ArrayList<>(tableNames.length);
231      for (TableName tableName : tableNames) {
232        list.add(ByteBuffer.wrap(tableName.getName()));
233      }
234      return list;
235    } catch (IOException e) {
236      LOG.warn(e.getMessage(), e);
237      throw getIOError(e);
238    }
239  }
240
241  /**
242   * @return the list of regions in the given table, or an empty list if the table does not exist
243   */
244  @Override
245  public List<TRegionInfo> getTableRegions(ByteBuffer tableName) throws IOError {
246    try (RegionLocator locator = connectionCache.getRegionLocator(getBytes(tableName))) {
247      List<HRegionLocation> regionLocations = locator.getAllRegionLocations();
248      List<TRegionInfo> results = new ArrayList<>(regionLocations.size());
249      for (HRegionLocation regionLocation : regionLocations) {
250        RegionInfo info = regionLocation.getRegion();
251        ServerName serverName = regionLocation.getServerName();
252        TRegionInfo region = new TRegionInfo();
253        region.serverName = ByteBuffer.wrap(
254            Bytes.toBytes(serverName.getHostname()));
255        region.port = serverName.getPort();
256        region.startKey = ByteBuffer.wrap(info.getStartKey());
257        region.endKey = ByteBuffer.wrap(info.getEndKey());
258        region.id = info.getRegionId();
259        region.name = ByteBuffer.wrap(info.getRegionName());
260        region.version = HREGION_VERSION; // HRegion now not versioned, PB encoding used
261        results.add(region);
262      }
263      return results;
264    } catch (TableNotFoundException e) {
265      // Return empty list for non-existing table
266      return Collections.emptyList();
267    } catch (IOException e){
268      LOG.warn(e.getMessage(), e);
269      throw getIOError(e);
270    }
271  }
272
273  @Override
274  public List<TCell> get(
275      ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
276      Map<ByteBuffer, ByteBuffer> attributes)
277      throws IOError {
278    byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
279    if (famAndQf.length == 1) {
280      return get(tableName, row, famAndQf[0], null, attributes);
281    }
282    if (famAndQf.length == 2) {
283      return get(tableName, row, famAndQf[0], famAndQf[1], attributes);
284    }
285    throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
286  }
287
288  /**
289   * Note: this internal interface is slightly different from public APIs in regard to handling
290   * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather,
291   * we respect qual == null as a request for the entire column family. The caller (
292   * {@link #get(ByteBuffer, ByteBuffer, ByteBuffer, Map)}) interface IS consistent in that the
293   * column is parse like normal.
294   */
295  protected List<TCell> get(ByteBuffer tableName,
296      ByteBuffer row,
297      byte[] family,
298      byte[] qualifier,
299      Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
300    Table table = null;
301    try {
302      table = getTable(tableName);
303      Get get = new Get(getBytes(row));
304      addAttributes(get, attributes);
305      if (qualifier == null) {
306        get.addFamily(family);
307      } else {
308        get.addColumn(family, qualifier);
309      }
310      Result result = table.get(get);
311      return ThriftUtilities.cellFromHBase(result.rawCells());
312    } catch (IOException e) {
313      LOG.warn(e.getMessage(), e);
314      throw getIOError(e);
315    } finally {
316      closeTable(table);
317    }
318  }
319
320  @Override
321  public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
322      int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
323    byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
324    if(famAndQf.length == 1) {
325      return getVer(tableName, row, famAndQf[0], null, numVersions, attributes);
326    }
327    if (famAndQf.length == 2) {
328      return getVer(tableName, row, famAndQf[0], famAndQf[1], numVersions, attributes);
329    }
330    throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
331
332  }
333
334  /**
335   * Note: this public interface is slightly different from public Java APIs in regard to
336   * handling of the qualifier. Here we differ from the public Java API in that null != byte[0].
337   * Rather, we respect qual == null as a request for the entire column family. If you want to
338   * access the entire column family, use
339   * {@link #getVer(ByteBuffer, ByteBuffer, ByteBuffer, int, Map)} with a {@code column} value
340   * that lacks a {@code ':'}.
341   */
342  public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, byte[] family,
343      byte[] qualifier, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
344
345    Table table = null;
346    try {
347      table = getTable(tableName);
348      Get get = new Get(getBytes(row));
349      addAttributes(get, attributes);
350      if (null == qualifier) {
351        get.addFamily(family);
352      } else {
353        get.addColumn(family, qualifier);
354      }
355      get.readVersions(numVersions);
356      Result result = table.get(get);
357      return ThriftUtilities.cellFromHBase(result.rawCells());
358    } catch (IOException e) {
359      LOG.warn(e.getMessage(), e);
360      throw getIOError(e);
361    } finally{
362      closeTable(table);
363    }
364  }
365
366  @Override
367  public List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
368      long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
369    byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
370    if (famAndQf.length == 1) {
371      return getVerTs(tableName, row, famAndQf[0], null, timestamp, numVersions, attributes);
372    }
373    if (famAndQf.length == 2) {
374      return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp, numVersions,
375          attributes);
376    }
377    throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
378  }
379
380  /**
381   * Note: this internal interface is slightly different from public APIs in regard to handling
382   * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather,
383   * we respect qual == null as a request for the entire column family. The caller (
384   * {@link #getVerTs(ByteBuffer, ByteBuffer, ByteBuffer, long, int, Map)}) interface IS
385   * consistent in that the column is parse like normal.
386   */
387  protected List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, byte[] family,
388      byte[] qualifier, long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes)
389      throws IOError {
390
391    Table table = null;
392    try {
393      table = getTable(tableName);
394      Get get = new Get(getBytes(row));
395      addAttributes(get, attributes);
396      if (null == qualifier) {
397        get.addFamily(family);
398      } else {
399        get.addColumn(family, qualifier);
400      }
401      get.setTimeRange(0, timestamp);
402      get.readVersions(numVersions);
403      Result result = table.get(get);
404      return ThriftUtilities.cellFromHBase(result.rawCells());
405    } catch (IOException e) {
406      LOG.warn(e.getMessage(), e);
407      throw getIOError(e);
408    } finally{
409      closeTable(table);
410    }
411  }
412
413  @Override
414  public List<TRowResult> getRow(ByteBuffer tableName, ByteBuffer row,
415      Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
416    return getRowWithColumnsTs(tableName, row, null,
417        HConstants.LATEST_TIMESTAMP,
418        attributes);
419  }
420
421  @Override
422  public List<TRowResult> getRowWithColumns(ByteBuffer tableName,
423      ByteBuffer row,
424      List<ByteBuffer> columns,
425      Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
426    return getRowWithColumnsTs(tableName, row, columns,
427        HConstants.LATEST_TIMESTAMP,
428        attributes);
429  }
430
431  @Override
432  public List<TRowResult> getRowTs(ByteBuffer tableName, ByteBuffer row,
433      long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
434    return getRowWithColumnsTs(tableName, row, null,
435        timestamp, attributes);
436  }
437
438  @Override
439  public List<TRowResult> getRowWithColumnsTs(
440      ByteBuffer tableName, ByteBuffer row, List<ByteBuffer> columns,
441      long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
442
443    Table table = null;
444    try {
445      table = getTable(tableName);
446      if (columns == null) {
447        Get get = new Get(getBytes(row));
448        addAttributes(get, attributes);
449        get.setTimeRange(0, timestamp);
450        Result result = table.get(get);
451        return ThriftUtilities.rowResultFromHBase(result);
452      }
453      Get get = new Get(getBytes(row));
454      addAttributes(get, attributes);
455      for(ByteBuffer column : columns) {
456        byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
457        if (famAndQf.length == 1) {
458          get.addFamily(famAndQf[0]);
459        } else {
460          get.addColumn(famAndQf[0], famAndQf[1]);
461        }
462      }
463      get.setTimeRange(0, timestamp);
464      Result result = table.get(get);
465      return ThriftUtilities.rowResultFromHBase(result);
466    } catch (IOException e) {
467      LOG.warn(e.getMessage(), e);
468      throw getIOError(e);
469    } finally{
470      closeTable(table);
471    }
472  }
473
474  @Override
475  public List<TRowResult> getRows(ByteBuffer tableName,
476      List<ByteBuffer> rows,
477      Map<ByteBuffer, ByteBuffer> attributes)
478      throws IOError {
479    return getRowsWithColumnsTs(tableName, rows, null,
480        HConstants.LATEST_TIMESTAMP,
481        attributes);
482  }
483
484  @Override
485  public List<TRowResult> getRowsWithColumns(ByteBuffer tableName,
486      List<ByteBuffer> rows,
487      List<ByteBuffer> columns,
488      Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
489    return getRowsWithColumnsTs(tableName, rows, columns,
490        HConstants.LATEST_TIMESTAMP,
491        attributes);
492  }
493
494  @Override
495  public List<TRowResult> getRowsTs(ByteBuffer tableName,
496      List<ByteBuffer> rows,
497      long timestamp,
498      Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
499    return getRowsWithColumnsTs(tableName, rows, null,
500        timestamp, attributes);
501  }
502
503  @Override
504  public List<TRowResult> getRowsWithColumnsTs(ByteBuffer tableName,
505      List<ByteBuffer> rows,
506      List<ByteBuffer> columns, long timestamp,
507      Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
508
509    Table table= null;
510    try {
511      List<Get> gets = new ArrayList<>(rows.size());
512      table = getTable(tableName);
513      if (metrics != null) {
514        metrics.incNumRowKeysInBatchGet(rows.size());
515      }
516      for (ByteBuffer row : rows) {
517        Get get = new Get(getBytes(row));
518        addAttributes(get, attributes);
519        if (columns != null) {
520
521          for(ByteBuffer column : columns) {
522            byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
523            if (famAndQf.length == 1) {
524              get.addFamily(famAndQf[0]);
525            } else {
526              get.addColumn(famAndQf[0], famAndQf[1]);
527            }
528          }
529        }
530        get.setTimeRange(0, timestamp);
531        gets.add(get);
532      }
533      Result[] result = table.get(gets);
534      return ThriftUtilities.rowResultFromHBase(result);
535    } catch (IOException e) {
536      LOG.warn(e.getMessage(), e);
537      throw getIOError(e);
538    } finally{
539      closeTable(table);
540    }
541  }
542
543  @Override
544  public void deleteAll(
545      ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
546      Map<ByteBuffer, ByteBuffer> attributes)
547      throws IOError {
548    deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP,
549        attributes);
550  }
551
552  @Override
553  public void deleteAllTs(ByteBuffer tableName,
554      ByteBuffer row,
555      ByteBuffer column,
556      long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
557    Table table = null;
558    try {
559      table = getTable(tableName);
560      Delete delete  = new Delete(getBytes(row));
561      addAttributes(delete, attributes);
562      byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
563      if (famAndQf.length == 1) {
564        delete.addFamily(famAndQf[0], timestamp);
565      } else {
566        delete.addColumns(famAndQf[0], famAndQf[1], timestamp);
567      }
568      table.delete(delete);
569
570    } catch (IOException e) {
571      LOG.warn(e.getMessage(), e);
572      throw getIOError(e);
573    } finally {
574      closeTable(table);
575    }
576  }
577
578  @Override
579  public void deleteAllRow(
580      ByteBuffer tableName, ByteBuffer row,
581      Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
582    deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP, attributes);
583  }
584
585  @Override
586  public void deleteAllRowTs(
587      ByteBuffer tableName, ByteBuffer row, long timestamp,
588      Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
589    Table table = null;
590    try {
591      table = getTable(tableName);
592      Delete delete  = new Delete(getBytes(row), timestamp);
593      addAttributes(delete, attributes);
594      table.delete(delete);
595    } catch (IOException e) {
596      LOG.warn(e.getMessage(), e);
597      throw getIOError(e);
598    } finally {
599      closeTable(table);
600    }
601  }
602
603  @Override
604  public void createTable(ByteBuffer in_tableName, List<ColumnDescriptor> columnFamilies)
605    throws IOError, IllegalArgument, AlreadyExists {
606    TableName tableName = getTableName(in_tableName);
607    try {
608      if (getAdmin().tableExists(tableName)) {
609        throw new AlreadyExists("table name already in use");
610      }
611      TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
612      for (ColumnDescriptor col : columnFamilies) {
613        builder.setColumnFamily(ThriftUtilities.colDescFromThrift(col));
614      }
615      getAdmin().createTable(builder.build());
616    } catch (IOException e) {
617      LOG.warn(e.getMessage(), e);
618      throw getIOError(e);
619    } catch (IllegalArgumentException e) {
620      LOG.warn(e.getMessage(), e);
621      throw new IllegalArgument(Throwables.getStackTraceAsString(e));
622    }
623  }
624
625  private static TableName getTableName(ByteBuffer buffer) {
626    return TableName.valueOf(getBytes(buffer));
627  }
628
629  @Override
630  public void deleteTable(ByteBuffer in_tableName) throws IOError {
631    TableName tableName = getTableName(in_tableName);
632    if (LOG.isDebugEnabled()) {
633      LOG.debug("deleteTable: table={}", tableName);
634    }
635    try {
636      if (!getAdmin().tableExists(tableName)) {
637        throw new IOException("table does not exist");
638      }
639      getAdmin().deleteTable(tableName);
640    } catch (IOException e) {
641      LOG.warn(e.getMessage(), e);
642      throw getIOError(e);
643    }
644  }
645
646  @Override
647  public void mutateRow(ByteBuffer tableName, ByteBuffer row,
648      List<Mutation> mutations, Map<ByteBuffer, ByteBuffer> attributes)
649      throws IOError, IllegalArgument {
650    mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP, attributes);
651  }
652
653  @Override
654  public void mutateRowTs(ByteBuffer tableName, ByteBuffer row,
655      List<Mutation> mutations, long timestamp,
656      Map<ByteBuffer, ByteBuffer> attributes)
657      throws IOError, IllegalArgument {
658    Table table = null;
659    try {
660      table = getTable(tableName);
661      Put put = new Put(getBytes(row), timestamp);
662      addAttributes(put, attributes);
663
664      Delete delete = new Delete(getBytes(row));
665      addAttributes(delete, attributes);
666      if (metrics != null) {
667        metrics.incNumRowKeysInBatchMutate(mutations.size());
668      }
669
670      // I apologize for all this mess :)
671      CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
672      for (Mutation m : mutations) {
673        byte[][] famAndQf = CellUtil.parseColumn(getBytes(m.column));
674        if (m.isDelete) {
675          if (famAndQf.length == 1) {
676            delete.addFamily(famAndQf[0], timestamp);
677          } else {
678            delete.addColumns(famAndQf[0], famAndQf[1], timestamp);
679          }
680          delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
681        } else {
682          if(famAndQf.length == 1) {
683            LOG.warn("No column qualifier specified. Delete is the only mutation supported "
684                + "over the whole column family.");
685          } else {
686            put.add(builder.clear()
687                .setRow(put.getRow())
688                .setFamily(famAndQf[0])
689                .setQualifier(famAndQf[1])
690                .setTimestamp(put.getTimestamp())
691                .setType(Cell.Type.Put)
692                .setValue(m.value != null ? getBytes(m.value)
693                    : HConstants.EMPTY_BYTE_ARRAY)
694                .build());
695          }
696          put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
697        }
698      }
699      if (!delete.isEmpty()) {
700        table.delete(delete);
701      }
702      if (!put.isEmpty()) {
703        table.put(put);
704      }
705    } catch (IOException e) {
706      LOG.warn(e.getMessage(), e);
707      throw getIOError(e);
708    } catch (IllegalArgumentException e) {
709      LOG.warn(e.getMessage(), e);
710      throw new IllegalArgument(Throwables.getStackTraceAsString(e));
711    } finally{
712      closeTable(table);
713    }
714  }
715
716  @Override
717  public void mutateRows(ByteBuffer tableName, List<BatchMutation> rowBatches,
718      Map<ByteBuffer, ByteBuffer> attributes)
719      throws IOError, IllegalArgument, TException {
720    mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP, attributes);
721  }
722
723  @Override
724  public void mutateRowsTs(
725      ByteBuffer tableName, List<BatchMutation> rowBatches, long timestamp,
726      Map<ByteBuffer, ByteBuffer> attributes)
727      throws IOError, IllegalArgument, TException {
728    List<Put> puts = new ArrayList<>();
729    List<Delete> deletes = new ArrayList<>();
730    CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
731    for (BatchMutation batch : rowBatches) {
732      byte[] row = getBytes(batch.row);
733      List<Mutation> mutations = batch.mutations;
734      Delete delete = new Delete(row);
735      addAttributes(delete, attributes);
736      Put put = new Put(row, timestamp);
737      addAttributes(put, attributes);
738      for (Mutation m : mutations) {
739        byte[][] famAndQf = CellUtil.parseColumn(getBytes(m.column));
740        if (m.isDelete) {
741          // no qualifier, family only.
742          if (famAndQf.length == 1) {
743            delete.addFamily(famAndQf[0], timestamp);
744          } else {
745            delete.addColumns(famAndQf[0], famAndQf[1], timestamp);
746          }
747          delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
748              : Durability.SKIP_WAL);
749        } else {
750          if (famAndQf.length == 1) {
751            LOG.warn("No column qualifier specified. Delete is the only mutation supported "
752                + "over the whole column family.");
753          }
754          if (famAndQf.length == 2) {
755            try {
756              put.add(builder.clear()
757                  .setRow(put.getRow())
758                  .setFamily(famAndQf[0])
759                  .setQualifier(famAndQf[1])
760                  .setTimestamp(put.getTimestamp())
761                  .setType(Cell.Type.Put)
762                  .setValue(m.value != null ? getBytes(m.value)
763                      : HConstants.EMPTY_BYTE_ARRAY)
764                  .build());
765            } catch (IOException e) {
766              throw new IllegalArgumentException(e);
767            }
768          } else {
769            throw new IllegalArgumentException("Invalid famAndQf provided.");
770          }
771          put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
772        }
773      }
774      if (!delete.isEmpty()) {
775        deletes.add(delete);
776      }
777      if (!put.isEmpty()) {
778        puts.add(put);
779      }
780    }
781
782    Table table = null;
783    try {
784      table = getTable(tableName);
785      if (!puts.isEmpty()) {
786        table.put(puts);
787      }
788      if (!deletes.isEmpty()) {
789        table.delete(deletes);
790      }
791    } catch (IOException e) {
792      LOG.warn(e.getMessage(), e);
793      throw getIOError(e);
794    } catch (IllegalArgumentException e) {
795      LOG.warn(e.getMessage(), e);
796      throw new IllegalArgument(Throwables.getStackTraceAsString(e));
797    } finally{
798      closeTable(table);
799    }
800  }
801
802  @Override
803  public long atomicIncrement(
804      ByteBuffer tableName, ByteBuffer row, ByteBuffer column, long amount)
805      throws IOError, IllegalArgument, TException {
806    byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
807    if(famAndQf.length == 1) {
808      return atomicIncrement(tableName, row, famAndQf[0], HConstants.EMPTY_BYTE_ARRAY, amount);
809    }
810    return atomicIncrement(tableName, row, famAndQf[0], famAndQf[1], amount);
811  }
812
813  protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row,
814      byte [] family, byte [] qualifier, long amount)
815      throws IOError, IllegalArgument, TException {
816    Table table = null;
817    try {
818      table = getTable(tableName);
819      return table.incrementColumnValue(
820          getBytes(row), family, qualifier, amount);
821    } catch (IOException e) {
822      LOG.warn(e.getMessage(), e);
823      throw getIOError(e);
824    } finally {
825      closeTable(table);
826    }
827  }
828
829  @Override
830  public void scannerClose(int id) throws IOError, IllegalArgument {
831    LOG.debug("scannerClose: id={}", id);
832    ResultScannerWrapper resultScannerWrapper = getScanner(id);
833    if (resultScannerWrapper == null) {
834      LOG.warn("scanner ID is invalid");
835      throw new IllegalArgument("scanner ID is invalid");
836    }
837    resultScannerWrapper.getScanner().close();
838    removeScanner(id);
839  }
840
841  @Override
842  public List<TRowResult> scannerGetList(int id,int nbRows)
843      throws IllegalArgument, IOError {
844    LOG.debug("scannerGetList: id={}", id);
845    ResultScannerWrapper resultScannerWrapper = getScanner(id);
846    if (null == resultScannerWrapper) {
847      String message = "scanner ID is invalid";
848      LOG.warn(message);
849      throw new IllegalArgument("scanner ID is invalid");
850    }
851
852    Result [] results;
853    try {
854      results = resultScannerWrapper.getScanner().next(nbRows);
855      if (null == results) {
856        return new ArrayList<>();
857      }
858    } catch (IOException e) {
859      LOG.warn(e.getMessage(), e);
860      throw getIOError(e);
861    }
862    return ThriftUtilities.rowResultFromHBase(results, resultScannerWrapper.isColumnSorted());
863  }
864
865  @Override
866  public List<TRowResult> scannerGet(int id) throws IllegalArgument, IOError {
867    return scannerGetList(id,1);
868  }
869
870  @Override
871  public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan,
872      Map<ByteBuffer, ByteBuffer> attributes)
873      throws IOError {
874
875    Table table = null;
876    try {
877      table = getTable(tableName);
878      Scan scan = new Scan();
879      addAttributes(scan, attributes);
880      if (tScan.isSetStartRow()) {
881        scan.withStartRow(tScan.getStartRow());
882      }
883      if (tScan.isSetStopRow()) {
884        scan.withStopRow(tScan.getStopRow());
885      }
886      if (tScan.isSetTimestamp()) {
887        scan.setTimeRange(0, tScan.getTimestamp());
888      }
889      if (tScan.isSetCaching()) {
890        scan.setCaching(tScan.getCaching());
891      }
892      if (tScan.isSetBatchSize()) {
893        scan.setBatch(tScan.getBatchSize());
894      }
895      if (tScan.isSetColumns() && !tScan.getColumns().isEmpty()) {
896        for(ByteBuffer column : tScan.getColumns()) {
897          byte [][] famQf = CellUtil.parseColumn(getBytes(column));
898          if(famQf.length == 1) {
899            scan.addFamily(famQf[0]);
900          } else {
901            scan.addColumn(famQf[0], famQf[1]);
902          }
903        }
904      }
905      if (tScan.isSetFilterString()) {
906        ParseFilter parseFilter = new ParseFilter();
907        scan.setFilter(
908            parseFilter.parseFilterString(tScan.getFilterString()));
909      }
910      if (tScan.isSetReversed()) {
911        scan.setReversed(tScan.isReversed());
912      }
913      if (tScan.isSetCacheBlocks()) {
914        scan.setCacheBlocks(tScan.isCacheBlocks());
915      }
916      return addScanner(table.getScanner(scan), tScan.sortColumns);
917    } catch (IOException e) {
918      LOG.warn(e.getMessage(), e);
919      throw getIOError(e);
920    } finally{
921      closeTable(table);
922    }
923  }
924
925  @Override
926  public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow,
927      List<ByteBuffer> columns,
928      Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
929
930    Table table = null;
931    try {
932      table = getTable(tableName);
933      Scan scan = new Scan().withStartRow(getBytes(startRow));
934      addAttributes(scan, attributes);
935      if(columns != null && !columns.isEmpty()) {
936        for(ByteBuffer column : columns) {
937          byte [][] famQf = CellUtil.parseColumn(getBytes(column));
938          if(famQf.length == 1) {
939            scan.addFamily(famQf[0]);
940          } else {
941            scan.addColumn(famQf[0], famQf[1]);
942          }
943        }
944      }
945      return addScanner(table.getScanner(scan), false);
946    } catch (IOException e) {
947      LOG.warn(e.getMessage(), e);
948      throw getIOError(e);
949    } finally{
950      closeTable(table);
951    }
952  }
953
954  @Override
955  public int scannerOpenWithStop(ByteBuffer tableName, ByteBuffer startRow,
956      ByteBuffer stopRow, List<ByteBuffer> columns,
957      Map<ByteBuffer, ByteBuffer> attributes)
958      throws IOError, TException {
959
960    Table table = null;
961    try {
962      table = getTable(tableName);
963      Scan scan = new Scan().withStartRow(getBytes(startRow)).withStopRow(getBytes(stopRow));
964      addAttributes(scan, attributes);
965      if(columns != null && !columns.isEmpty()) {
966        for(ByteBuffer column : columns) {
967          byte [][] famQf = CellUtil.parseColumn(getBytes(column));
968          if(famQf.length == 1) {
969            scan.addFamily(famQf[0]);
970          } else {
971            scan.addColumn(famQf[0], famQf[1]);
972          }
973        }
974      }
975      return addScanner(table.getScanner(scan), false);
976    } catch (IOException e) {
977      LOG.warn(e.getMessage(), e);
978      throw getIOError(e);
979    } finally{
980      closeTable(table);
981    }
982  }
983
984  @Override
985  public int scannerOpenWithPrefix(ByteBuffer tableName,
986      ByteBuffer startAndPrefix,
987      List<ByteBuffer> columns,
988      Map<ByteBuffer, ByteBuffer> attributes)
989      throws IOError, TException {
990
991    Table table = null;
992    try {
993      table = getTable(tableName);
994      Scan scan = new Scan().withStartRow(getBytes(startAndPrefix));
995      addAttributes(scan, attributes);
996      Filter f = new WhileMatchFilter(
997          new PrefixFilter(getBytes(startAndPrefix)));
998      scan.setFilter(f);
999      if (columns != null && !columns.isEmpty()) {
1000        for(ByteBuffer column : columns) {
1001          byte [][] famQf = CellUtil.parseColumn(getBytes(column));
1002          if(famQf.length == 1) {
1003            scan.addFamily(famQf[0]);
1004          } else {
1005            scan.addColumn(famQf[0], famQf[1]);
1006          }
1007        }
1008      }
1009      return addScanner(table.getScanner(scan), false);
1010    } catch (IOException e) {
1011      LOG.warn(e.getMessage(), e);
1012      throw getIOError(e);
1013    } finally{
1014      closeTable(table);
1015    }
1016  }
1017
1018  @Override
1019  public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow,
1020      List<ByteBuffer> columns, long timestamp,
1021      Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
1022
1023    Table table = null;
1024    try {
1025      table = getTable(tableName);
1026      Scan scan = new Scan().withStartRow(getBytes(startRow));
1027      addAttributes(scan, attributes);
1028      scan.setTimeRange(0, timestamp);
1029      if (columns != null && !columns.isEmpty()) {
1030        for (ByteBuffer column : columns) {
1031          byte [][] famQf = CellUtil.parseColumn(getBytes(column));
1032          if(famQf.length == 1) {
1033            scan.addFamily(famQf[0]);
1034          } else {
1035            scan.addColumn(famQf[0], famQf[1]);
1036          }
1037        }
1038      }
1039      return addScanner(table.getScanner(scan), false);
1040    } catch (IOException e) {
1041      LOG.warn(e.getMessage(), e);
1042      throw getIOError(e);
1043    } finally{
1044      closeTable(table);
1045    }
1046  }
1047
1048  @Override
1049  public int scannerOpenWithStopTs(ByteBuffer tableName, ByteBuffer startRow,
1050      ByteBuffer stopRow, List<ByteBuffer> columns, long timestamp,
1051      Map<ByteBuffer, ByteBuffer> attributes)
1052      throws IOError, TException {
1053
1054    Table table = null;
1055    try {
1056      table = getTable(tableName);
1057      Scan scan = new Scan().withStartRow(getBytes(startRow)).withStopRow(getBytes(stopRow));
1058      addAttributes(scan, attributes);
1059      scan.setTimeRange(0, timestamp);
1060      if (columns != null && !columns.isEmpty()) {
1061        for (ByteBuffer column : columns) {
1062          byte [][] famQf = CellUtil.parseColumn(getBytes(column));
1063          if(famQf.length == 1) {
1064            scan.addFamily(famQf[0]);
1065          } else {
1066            scan.addColumn(famQf[0], famQf[1]);
1067          }
1068        }
1069      }
1070      scan.setTimeRange(0, timestamp);
1071      return addScanner(table.getScanner(scan), false);
1072    } catch (IOException e) {
1073      LOG.warn(e.getMessage(), e);
1074      throw getIOError(e);
1075    } finally{
1076      closeTable(table);
1077    }
1078  }
1079
1080  @Override
1081  public Map<ByteBuffer, ColumnDescriptor> getColumnDescriptors(
1082      ByteBuffer tableName) throws IOError, TException {
1083
1084    Table table = null;
1085    try {
1086      TreeMap<ByteBuffer, ColumnDescriptor> columns = new TreeMap<>();
1087
1088      table = getTable(tableName);
1089      TableDescriptor desc = table.getDescriptor();
1090
1091      for (ColumnFamilyDescriptor e : desc.getColumnFamilies()) {
1092        ColumnDescriptor col = ThriftUtilities.colDescFromHbase(e);
1093        columns.put(col.name, col);
1094      }
1095      return columns;
1096    } catch (IOException e) {
1097      LOG.warn(e.getMessage(), e);
1098      throw getIOError(e);
1099    } finally {
1100      closeTable(table);
1101    }
1102  }
1103
1104  private void closeTable(Table table) throws IOError {
1105    try{
1106      if(table != null){
1107        table.close();
1108      }
1109    } catch (IOException e){
1110      LOG.error(e.getMessage(), e);
1111      throw getIOError(e);
1112    }
1113  }
1114
1115  @Override
1116  public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError {
1117    try {
1118      byte[] row = getBytes(searchRow);
1119      Result startRowResult = getReverseScanResult(TableName.META_TABLE_NAME.getName(), row,
1120          HConstants.CATALOG_FAMILY);
1121
1122      if (startRowResult == null) {
1123        throw new IOException("Cannot find row in "+ TableName.META_TABLE_NAME+", row="
1124            + Bytes.toStringBinary(row));
1125      }
1126
1127      // find region start and end keys
1128      RegionInfo regionInfo = CatalogFamilyFormat.getRegionInfo(startRowResult);
1129      if (regionInfo == null) {
1130        throw new IOException("RegionInfo REGIONINFO was null or " +
1131            " empty in Meta for row="
1132            + Bytes.toStringBinary(row));
1133      }
1134      TRegionInfo region = new TRegionInfo();
1135      region.setStartKey(regionInfo.getStartKey());
1136      region.setEndKey(regionInfo.getEndKey());
1137      region.id = regionInfo.getRegionId();
1138      region.setName(regionInfo.getRegionName());
1139      region.version = HREGION_VERSION; // version not used anymore, PB encoding used.
1140
1141      // find region assignment to server
1142      ServerName serverName = CatalogFamilyFormat.getServerName(startRowResult, 0);
1143      if (serverName != null) {
1144        region.setServerName(Bytes.toBytes(serverName.getHostname()));
1145        region.port = serverName.getPort();
1146      }
1147      return region;
1148    } catch (IOException e) {
1149      LOG.warn(e.getMessage(), e);
1150      throw getIOError(e);
1151    }
1152  }
1153
1154  private Result getReverseScanResult(byte[] tableName, byte[] row, byte[] family)
1155      throws IOException {
1156    Scan scan = new Scan().withStartRow(row);
1157    scan.setReversed(true);
1158    scan.addFamily(family);
1159    scan.withStartRow(row);
1160    try (Table table = getTable(tableName);
1161         ResultScanner scanner = table.getScanner(scan)) {
1162      return scanner.next();
1163    }
1164  }
1165
1166  @Override
1167  public void increment(TIncrement tincrement) throws IOError, TException {
1168
1169    if (tincrement.getRow().length == 0 || tincrement.getTable().length == 0) {
1170      throw new TException("Must supply a table and a row key; can't increment");
1171    }
1172
1173    if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1174      this.coalescer.queueIncrement(tincrement);
1175      return;
1176    }
1177
1178    Table table = null;
1179    try {
1180      table = getTable(tincrement.getTable());
1181      Increment inc = ThriftUtilities.incrementFromThrift(tincrement);
1182      table.increment(inc);
1183    } catch (IOException e) {
1184      LOG.warn(e.getMessage(), e);
1185      throw getIOError(e);
1186    } finally{
1187      closeTable(table);
1188    }
1189  }
1190
1191  @Override
1192  public void incrementRows(List<TIncrement> tincrements) throws IOError, TException {
1193    if (conf.getBoolean(COALESCE_INC_KEY, false)) {
1194      this.coalescer.queueIncrements(tincrements);
1195      return;
1196    }
1197    for (TIncrement tinc : tincrements) {
1198      increment(tinc);
1199    }
1200  }
1201
1202  @Override
1203  public List<TCell> append(TAppend tappend) throws IOError, TException {
1204    if (tappend.getRow().length == 0 || tappend.getTable().length == 0) {
1205      throw new TException("Must supply a table and a row key; can't append");
1206    }
1207
1208    Table table = null;
1209    try {
1210      table = getTable(tappend.getTable());
1211      Append append = ThriftUtilities.appendFromThrift(tappend);
1212      Result result = table.append(append);
1213      return ThriftUtilities.cellFromHBase(result.rawCells());
1214    } catch (IOException e) {
1215      LOG.warn(e.getMessage(), e);
1216      throw getIOError(e);
1217    } finally{
1218      closeTable(table);
1219    }
1220  }
1221
1222  @Override
1223  public boolean checkAndPut(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
1224      ByteBuffer value, Mutation mput, Map<ByteBuffer, ByteBuffer> attributes) throws IOError,
1225      IllegalArgument, TException {
1226    Put put;
1227    try {
1228      put = new Put(getBytes(row), HConstants.LATEST_TIMESTAMP);
1229      addAttributes(put, attributes);
1230
1231      byte[][] famAndQf = CellUtil.parseColumn(getBytes(mput.column));
1232      put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
1233          .setRow(put.getRow())
1234          .setFamily(famAndQf[0])
1235          .setQualifier(famAndQf[1])
1236          .setTimestamp(put.getTimestamp())
1237          .setType(Cell.Type.Put)
1238          .setValue(mput.value != null ? getBytes(mput.value)
1239              : HConstants.EMPTY_BYTE_ARRAY)
1240          .build());
1241      put.setDurability(mput.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
1242    } catch (IOException | IllegalArgumentException e) {
1243      LOG.warn(e.getMessage(), e);
1244      throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1245    }
1246
1247    Table table = null;
1248    try {
1249      table = getTable(tableName);
1250      byte[][] famAndQf = CellUtil.parseColumn(getBytes(column));
1251      Table.CheckAndMutateBuilder mutateBuilder =
1252          table.checkAndMutate(getBytes(row), famAndQf[0]).qualifier(famAndQf[1]);
1253      if (value != null) {
1254        return mutateBuilder.ifEquals(getBytes(value)).thenPut(put);
1255      } else {
1256        return mutateBuilder.ifNotExists().thenPut(put);
1257      }
1258    } catch (IOException e) {
1259      LOG.warn(e.getMessage(), e);
1260      throw getIOError(e);
1261    } catch (IllegalArgumentException e) {
1262      LOG.warn(e.getMessage(), e);
1263      throw new IllegalArgument(Throwables.getStackTraceAsString(e));
1264    } finally {
1265      closeTable(table);
1266    }
1267  }
1268
1269  @Override
1270  public TThriftServerType getThriftServerType() {
1271    return TThriftServerType.ONE;
1272  }
1273
1274  @Override
1275  public String getClusterId() throws TException {
1276    return connectionCache.getClusterId();
1277  }
1278
1279  private static IOError getIOError(Throwable throwable) {
1280    IOError error = new IOErrorWithCause(throwable);
1281    error.setMessage(Throwables.getStackTraceAsString(throwable));
1282    return error;
1283  }
1284
1285  /**
1286   * Adds all the attributes into the Operation object
1287   */
1288  private static void addAttributes(OperationWithAttributes op,
1289      Map<ByteBuffer, ByteBuffer> attributes) {
1290    if (attributes == null || attributes.isEmpty()) {
1291      return;
1292    }
1293    for (Map.Entry<ByteBuffer, ByteBuffer> entry : attributes.entrySet()) {
1294      String name = Bytes.toStringBinary(getBytes(entry.getKey()));
1295      byte[] value =  getBytes(entry.getValue());
1296      op.setAttribute(name, value);
1297    }
1298  }
1299
1300  protected static class ResultScannerWrapper {
1301
1302    private final ResultScanner scanner;
1303    private final boolean sortColumns;
1304    public ResultScannerWrapper(ResultScanner resultScanner,
1305        boolean sortResultColumns) {
1306      scanner = resultScanner;
1307      sortColumns = sortResultColumns;
1308    }
1309
1310    public ResultScanner getScanner() {
1311      return scanner;
1312    }
1313
1314    public boolean isColumnSorted() {
1315      return sortColumns;
1316    }
1317  }
1318
1319  public static class IOErrorWithCause extends IOError {
1320    private final Throwable cause;
1321    public IOErrorWithCause(Throwable cause) {
1322      this.cause = cause;
1323    }
1324
1325    @Override
1326    public synchronized Throwable getCause() {
1327      return cause;
1328    }
1329
1330    @Override
1331    public boolean equals(Object other) {
1332      if (super.equals(other) &&
1333          other instanceof IOErrorWithCause) {
1334        Throwable otherCause = ((IOErrorWithCause) other).getCause();
1335        if (this.getCause() != null) {
1336          return otherCause != null && this.getCause().equals(otherCause);
1337        } else {
1338          return otherCause == null;
1339        }
1340      }
1341      return false;
1342    }
1343
1344    @Override
1345    public int hashCode() {
1346      int result = super.hashCode();
1347      result = 31 * result + (cause != null ? cause.hashCode() : 0);
1348      return result;
1349    }
1350  }
1351
1352
1353}