001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertTrue;
022
023import com.google.protobuf.Message;
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Arrays;
027import java.util.Collection;
028import java.util.Collections;
029import java.util.HashSet;
030import java.util.List;
031import java.util.Set;
032import java.util.concurrent.CountDownLatch;
033import java.util.concurrent.atomic.AtomicInteger;
034import java.util.concurrent.atomic.AtomicLong;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.Cell;
037import org.apache.hadoop.hbase.CellUtil;
038import org.apache.hadoop.hbase.HBaseClassTestRule;
039import org.apache.hadoop.hbase.HBaseTestingUtility;
040import org.apache.hadoop.hbase.HConstants;
041import org.apache.hadoop.hbase.KeyValue;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.client.Delete;
044import org.apache.hadoop.hbase.client.Get;
045import org.apache.hadoop.hbase.client.IsolationLevel;
046import org.apache.hadoop.hbase.client.Mutation;
047import org.apache.hadoop.hbase.client.Put;
048import org.apache.hadoop.hbase.client.Scan;
049import org.apache.hadoop.hbase.client.Table;
050import org.apache.hadoop.hbase.client.coprocessor.RowProcessorClient;
051import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos;
052import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.FriendsOfFriendsProcessorRequest;
053import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.FriendsOfFriendsProcessorResponse;
054import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.IncCounterProcessorRequest;
055import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.IncCounterProcessorResponse;
056import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.RowSwapProcessorRequest;
057import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.RowSwapProcessorResponse;
058import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.TimeoutProcessorRequest;
059import org.apache.hadoop.hbase.coprocessor.protobuf.generated.IncrementCounterProcessorTestProtos.TimeoutProcessorResponse;
060import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
061import org.apache.hadoop.hbase.ipc.RpcScheduler;
062import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.ProcessRequest;
063import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.ProcessResponse;
064import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorService;
065import org.apache.hadoop.hbase.regionserver.BaseRowProcessor;
066import org.apache.hadoop.hbase.regionserver.HRegion;
067import org.apache.hadoop.hbase.regionserver.InternalScanner;
068import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
069import org.apache.hadoop.hbase.testclassification.MediumTests;
070import org.apache.hadoop.hbase.util.ByteStringer;
071import org.apache.hadoop.hbase.util.Bytes;
072import org.apache.hadoop.hbase.wal.WALEdit;
073import org.junit.AfterClass;
074import org.junit.BeforeClass;
075import org.junit.ClassRule;
076import org.junit.Test;
077import org.junit.experimental.categories.Category;
078import org.slf4j.Logger;
079import org.slf4j.LoggerFactory;
080
081/**
082 * Verifies ProcessEndpoint works.
083 * The tested RowProcessor performs two scans and a read-modify-write.
084 */
085@Category({CoprocessorTests.class, MediumTests.class})
086public class TestRowProcessorEndpoint {
087  @ClassRule
088  public static final HBaseClassTestRule CLASS_RULE =
089      HBaseClassTestRule.forClass(TestRowProcessorEndpoint.class);
090
091  private static final Logger LOG = LoggerFactory.getLogger(TestRowProcessorEndpoint.class);
092
093  private static final TableName TABLE = TableName.valueOf("testtable");
094  private final static byte[] ROW = Bytes.toBytes("testrow");
095  private final static byte[] ROW2 = Bytes.toBytes("testrow2");
096  private final static byte[] FAM = Bytes.toBytes("friendlist");
097
098  // Column names
099  private final static byte[] A = Bytes.toBytes("a");
100  private final static byte[] B = Bytes.toBytes("b");
101  private final static byte[] C = Bytes.toBytes("c");
102  private final static byte[] D = Bytes.toBytes("d");
103  private final static byte[] E = Bytes.toBytes("e");
104  private final static byte[] F = Bytes.toBytes("f");
105  private final static byte[] G = Bytes.toBytes("g");
106  private final static byte[] COUNTER = Bytes.toBytes("counter");
107  private final static AtomicLong myTimer = new AtomicLong(0);
108  private final AtomicInteger failures = new AtomicInteger(0);
109
110  private static HBaseTestingUtility util = new HBaseTestingUtility();
111  private static volatile int expectedCounter = 0;
112  private static int rowSize, row2Size;
113
114  private volatile static Table table = null;
115  private volatile static boolean swapped = false;
116  private volatile CountDownLatch startSignal;
117  private volatile CountDownLatch doneSignal;
118
119  @BeforeClass
120  public static void setupBeforeClass() throws Exception {
121    Configuration conf = util.getConfiguration();
122    conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
123        RowProcessorEndpoint.class.getName());
124    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
125    conf.setLong("hbase.hregion.row.processor.timeout", 1000L);
126    conf.setLong(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 2048);
127    util.startMiniCluster();
128  }
129
130  @AfterClass
131  public static void tearDownAfterClass() throws Exception {
132    util.shutdownMiniCluster();
133  }
134
135  public void prepareTestData() throws Exception {
136    try {
137      util.getAdmin().disableTable(TABLE);
138      util.getAdmin().deleteTable(TABLE);
139    } catch (Exception e) {
140      // ignore table not found
141    }
142    table = util.createTable(TABLE, FAM);
143    {
144      Put put = new Put(ROW);
145      put.addColumn(FAM, A, Bytes.add(B, C));    // B, C are friends of A
146      put.addColumn(FAM, B, Bytes.add(D, E, F)); // D, E, F are friends of B
147      put.addColumn(FAM, C, G);                  // G is a friend of C
148      table.put(put);
149      rowSize = put.size();
150    }
151    Put put = new Put(ROW2);
152    put.addColumn(FAM, D, E);
153    put.addColumn(FAM, F, G);
154    table.put(put);
155    row2Size = put.size();
156  }
157
158  @Test
159  public void testDoubleScan() throws Throwable {
160    prepareTestData();
161
162    CoprocessorRpcChannel channel = table.coprocessorService(ROW);
163    RowProcessorEndpoint.FriendsOfFriendsProcessor processor =
164        new RowProcessorEndpoint.FriendsOfFriendsProcessor(ROW, A);
165    RowProcessorService.BlockingInterface service =
166        RowProcessorService.newBlockingStub(channel);
167    ProcessRequest request = RowProcessorClient.getRowProcessorPB(processor);
168    ProcessResponse protoResult = service.process(null, request);
169    FriendsOfFriendsProcessorResponse response =
170        FriendsOfFriendsProcessorResponse.parseFrom(protoResult.getRowProcessorResult());
171    Set<String> result = new HashSet<>();
172    result.addAll(response.getResultList());
173    Set<String> expected = new HashSet<>(Arrays.asList(new String[]{"d", "e", "f", "g"}));
174    Get get = new Get(ROW);
175    LOG.debug("row keyvalues:" + stringifyKvs(table.get(get).listCells()));
176    assertEquals(expected, result);
177  }
178
179  @Test
180  public void testReadModifyWrite() throws Throwable {
181    prepareTestData();
182    failures.set(0);
183    int numThreads = 100;
184    concurrentExec(new IncrementRunner(), numThreads);
185    Get get = new Get(ROW);
186    LOG.debug("row keyvalues:" + stringifyKvs(table.get(get).listCells()));
187    int finalCounter = incrementCounter(table);
188    int failureNumber = failures.get();
189    if (failureNumber > 0) {
190      LOG.debug("We failed " + failureNumber + " times during test");
191    }
192    assertEquals(numThreads + 1 - failureNumber, finalCounter);
193  }
194
195  class IncrementRunner implements Runnable {
196    @Override
197    public void run() {
198      try {
199        incrementCounter(table);
200      } catch (Throwable e) {
201        failures.incrementAndGet();
202        e.printStackTrace();
203      }
204    }
205  }
206
207  private int incrementCounter(Table table) throws Throwable {
208    CoprocessorRpcChannel channel = table.coprocessorService(ROW);
209    RowProcessorEndpoint.IncrementCounterProcessor processor =
210        new RowProcessorEndpoint.IncrementCounterProcessor(ROW);
211    RowProcessorService.BlockingInterface service =
212        RowProcessorService.newBlockingStub(channel);
213    ProcessRequest request = RowProcessorClient.getRowProcessorPB(processor);
214    ProcessResponse protoResult = service.process(null, request);
215    IncCounterProcessorResponse response = IncCounterProcessorResponse
216        .parseFrom(protoResult.getRowProcessorResult());
217    Integer result = response.getResponse();
218    return result;
219  }
220
221  private void concurrentExec(final Runnable task, final int numThreads) throws Throwable {
222    startSignal = new CountDownLatch(numThreads);
223    doneSignal = new CountDownLatch(numThreads);
224    for (int i = 0; i < numThreads; ++i) {
225      new Thread(new Runnable() {
226        @Override
227        public void run() {
228          try {
229            startSignal.countDown();
230            startSignal.await();
231            task.run();
232          } catch (Throwable e) {
233            failures.incrementAndGet();
234            e.printStackTrace();
235          }
236          doneSignal.countDown();
237        }
238      }).start();
239    }
240    doneSignal.await();
241  }
242
243  @Test
244  public void testMultipleRows() throws Throwable {
245    prepareTestData();
246    failures.set(0);
247    int numThreads = 100;
248    concurrentExec(new SwapRowsRunner(), numThreads);
249    LOG.debug("row keyvalues:" +
250              stringifyKvs(table.get(new Get(ROW)).listCells()));
251    LOG.debug("row2 keyvalues:" +
252              stringifyKvs(table.get(new Get(ROW2)).listCells()));
253    int failureNumber = failures.get();
254    if (failureNumber > 0) {
255      LOG.debug("We failed " + failureNumber + " times during test");
256    }
257    if (!swapped) {
258      assertEquals(rowSize, table.get(new Get(ROW)).listCells().size());
259      assertEquals(row2Size, table.get(new Get(ROW2)).listCells().size());
260    } else {
261      assertEquals(rowSize, table.get(new Get(ROW2)).listCells().size());
262      assertEquals(row2Size, table.get(new Get(ROW)).listCells().size());
263    }
264  }
265
266  class SwapRowsRunner implements Runnable {
267    @Override
268    public void run() {
269      try {
270        swapRows(table);
271      } catch (Throwable e) {
272        failures.incrementAndGet();
273        e.printStackTrace();
274      }
275    }
276  }
277
278  private void swapRows(Table table) throws Throwable {
279    CoprocessorRpcChannel channel = table.coprocessorService(ROW);
280    RowProcessorEndpoint.RowSwapProcessor processor =
281        new RowProcessorEndpoint.RowSwapProcessor(ROW, ROW2);
282    RowProcessorService.BlockingInterface service =
283        RowProcessorService.newBlockingStub(channel);
284    ProcessRequest request = RowProcessorClient.getRowProcessorPB(processor);
285    service.process(null, request);
286  }
287
288  @Test
289  public void testTimeout() throws Throwable {
290    prepareTestData();
291    CoprocessorRpcChannel channel = table.coprocessorService(ROW);
292    RowProcessorEndpoint.TimeoutProcessor processor =
293        new RowProcessorEndpoint.TimeoutProcessor(ROW);
294    RowProcessorService.BlockingInterface service =
295        RowProcessorService.newBlockingStub(channel);
296    ProcessRequest request = RowProcessorClient.getRowProcessorPB(processor);
297    boolean exceptionCaught = false;
298    try {
299      service.process(null, request);
300    } catch (Exception e) {
301      exceptionCaught = true;
302    }
303    assertTrue(exceptionCaught);
304  }
305
306  /**
307   * This class defines two RowProcessors:
308   * IncrementCounterProcessor and FriendsOfFriendsProcessor.
309   *
310   * We define the RowProcessors as the inner class of the endpoint.
311   * So they can be loaded with the endpoint on the coprocessor.
312   */
313  public static class RowProcessorEndpoint<S extends Message,T extends Message>
314          extends BaseRowProcessorEndpoint<S,T> {
315    public static class IncrementCounterProcessor extends
316            BaseRowProcessor<IncrementCounterProcessorTestProtos.IncCounterProcessorRequest,
317                    IncrementCounterProcessorTestProtos.IncCounterProcessorResponse> {
318      int counter = 0;
319      byte[] row = new byte[0];
320
321      /**
322       * Empty constructor for Writable
323       */
324      IncrementCounterProcessor() {
325      }
326
327      IncrementCounterProcessor(byte[] row) {
328        this.row = row;
329      }
330
331      @Override
332      public Collection<byte[]> getRowsToLock() {
333        return Collections.singleton(row);
334      }
335
336      @Override
337      public IncCounterProcessorResponse getResult() {
338        IncCounterProcessorResponse.Builder i = IncCounterProcessorResponse.newBuilder();
339        i.setResponse(counter);
340        return i.build();
341      }
342
343      @Override
344      public boolean readOnly() {
345        return false;
346      }
347
348      @Override
349      public void process(long now, HRegion region,
350          List<Mutation> mutations, WALEdit walEdit) throws IOException {
351        // Scan current counter
352        List<Cell> kvs = new ArrayList<>();
353        Scan scan = new Scan(row, row);
354        scan.addColumn(FAM, COUNTER);
355        doScan(region, scan, kvs);
356        counter = kvs.isEmpty() ? 0 :
357          Bytes.toInt(CellUtil.cloneValue(kvs.iterator().next()));
358
359        // Assert counter value
360        assertEquals(expectedCounter, counter);
361
362        // Increment counter and send it to both memstore and wal edit
363        counter += 1;
364        expectedCounter += 1;
365
366
367        Put p = new Put(row);
368        KeyValue kv =
369            new KeyValue(row, FAM, COUNTER, now, Bytes.toBytes(counter));
370        p.add(kv);
371        mutations.add(p);
372        walEdit.add(kv);
373
374        // We can also inject some meta data to the walEdit
375        KeyValue metaKv = new KeyValue(
376            row, WALEdit.METAFAMILY,
377            Bytes.toBytes("I just increment counter"),
378            Bytes.toBytes(counter));
379        walEdit.add(metaKv);
380      }
381
382      @Override
383      public IncCounterProcessorRequest getRequestData() throws IOException {
384        IncCounterProcessorRequest.Builder builder = IncCounterProcessorRequest.newBuilder();
385        builder.setCounter(counter);
386        builder.setRow(ByteStringer.wrap(row));
387        return builder.build();
388      }
389
390      @Override
391      public void initialize(IncCounterProcessorRequest msg) {
392        this.row = msg.getRow().toByteArray();
393        this.counter = msg.getCounter();
394      }
395    }
396
397    public static class FriendsOfFriendsProcessor extends
398            BaseRowProcessor<FriendsOfFriendsProcessorRequest, FriendsOfFriendsProcessorResponse> {
399      byte[] row = null;
400      byte[] person = null;
401      final Set<String> result = new HashSet<>();
402
403      /**
404       * Empty constructor for Writable
405       */
406      FriendsOfFriendsProcessor() {
407      }
408
409      FriendsOfFriendsProcessor(byte[] row, byte[] person) {
410        this.row = row;
411        this.person = person;
412      }
413
414      @Override
415      public Collection<byte[]> getRowsToLock() {
416        return Collections.singleton(row);
417      }
418
419      @Override
420      public FriendsOfFriendsProcessorResponse getResult() {
421        FriendsOfFriendsProcessorResponse.Builder builder =
422            FriendsOfFriendsProcessorResponse.newBuilder();
423        builder.addAllResult(result);
424        return builder.build();
425      }
426
427      @Override
428      public boolean readOnly() {
429        return true;
430      }
431
432      @Override
433      public void process(long now, HRegion region,
434          List<Mutation> mutations, WALEdit walEdit) throws IOException {
435        List<Cell> kvs = new ArrayList<>();
436        { // First scan to get friends of the person
437          Scan scan = new Scan(row, row);
438          scan.addColumn(FAM, person);
439          doScan(region, scan, kvs);
440        }
441
442        // Second scan to get friends of friends
443        Scan scan = new Scan(row, row);
444        for (Cell kv : kvs) {
445          byte[] friends = CellUtil.cloneValue(kv);
446          for (byte f : friends) {
447            scan.addColumn(FAM, new byte[]{f});
448          }
449        }
450        doScan(region, scan, kvs);
451
452        // Collect result
453        result.clear();
454        for (Cell kv : kvs) {
455          for (byte b : CellUtil.cloneValue(kv)) {
456            result.add((char)b + "");
457          }
458        }
459      }
460
461      @Override
462      public FriendsOfFriendsProcessorRequest getRequestData() throws IOException {
463        FriendsOfFriendsProcessorRequest.Builder builder =
464            FriendsOfFriendsProcessorRequest.newBuilder();
465        builder.setPerson(ByteStringer.wrap(person));
466        builder.setRow(ByteStringer.wrap(row));
467        builder.addAllResult(result);
468        FriendsOfFriendsProcessorRequest f = builder.build();
469        return f;
470      }
471
472      @Override
473      public void initialize(FriendsOfFriendsProcessorRequest request)
474          throws IOException {
475        this.person = request.getPerson().toByteArray();
476        this.row = request.getRow().toByteArray();
477        result.clear();
478        result.addAll(request.getResultList());
479      }
480    }
481
482    public static class RowSwapProcessor extends
483            BaseRowProcessor<RowSwapProcessorRequest, RowSwapProcessorResponse> {
484      byte[] row1 = new byte[0];
485      byte[] row2 = new byte[0];
486
487      /**
488       * Empty constructor for Writable
489       */
490      RowSwapProcessor() {
491      }
492
493      RowSwapProcessor(byte[] row1, byte[] row2) {
494        this.row1 = row1;
495        this.row2 = row2;
496      }
497
498      @Override
499      public Collection<byte[]> getRowsToLock() {
500        List<byte[]> rows = new ArrayList<>(2);
501        rows.add(row1);
502        rows.add(row2);
503        return rows;
504      }
505
506      @Override
507      public boolean readOnly() {
508        return false;
509      }
510
511      @Override
512      public RowSwapProcessorResponse getResult() {
513        return RowSwapProcessorResponse.getDefaultInstance();
514      }
515
516      @Override
517      public void process(long now, HRegion region,
518          List<Mutation> mutations, WALEdit walEdit) throws IOException {
519
520        // Override the time to avoid race-condition in the unit test caused by
521        // inacurate timer on some machines
522        now = myTimer.getAndIncrement();
523
524        // Scan both rows
525        List<Cell> kvs1 = new ArrayList<>();
526        List<Cell> kvs2 = new ArrayList<>();
527        doScan(region, new Scan(row1, row1), kvs1);
528        doScan(region, new Scan(row2, row2), kvs2);
529
530        // Assert swapped
531        if (swapped) {
532          assertEquals(rowSize, kvs2.size());
533          assertEquals(row2Size, kvs1.size());
534        } else {
535          assertEquals(rowSize, kvs1.size());
536          assertEquals(row2Size, kvs2.size());
537        }
538        swapped = !swapped;
539
540        // Add and delete keyvalues
541        List<List<Cell>> kvs = new ArrayList<>(2);
542        kvs.add(kvs1);
543        kvs.add(kvs2);
544        byte[][] rows = new byte[][]{row1, row2};
545        for (int i = 0; i < kvs.size(); ++i) {
546          for (Cell kv : kvs.get(i)) {
547            // Delete from the current row and add to the other row
548            Delete d = new Delete(rows[i]);
549            KeyValue kvDelete =
550                new KeyValue(rows[i], CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv),
551                    kv.getTimestamp(), KeyValue.Type.Delete);
552            d.add(kvDelete);
553            Put p = new Put(rows[1 - i]);
554            KeyValue kvAdd =
555                new KeyValue(rows[1 - i], CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv),
556                    now, CellUtil.cloneValue(kv));
557            p.add(kvAdd);
558            mutations.add(d);
559            walEdit.add(kvDelete);
560            mutations.add(p);
561            walEdit.add(kvAdd);
562          }
563        }
564      }
565
566      @Override
567      public String getName() {
568        return "swap";
569      }
570
571      @Override
572      public RowSwapProcessorRequest getRequestData() throws IOException {
573        RowSwapProcessorRequest.Builder builder = RowSwapProcessorRequest.newBuilder();
574        builder.setRow1(ByteStringer.wrap(row1));
575        builder.setRow2(ByteStringer.wrap(row2));
576        return builder.build();
577      }
578
579      @Override
580      public void initialize(RowSwapProcessorRequest msg) {
581        this.row1 = msg.getRow1().toByteArray();
582        this.row2 = msg.getRow2().toByteArray();
583      }
584    }
585
586    public static class TimeoutProcessor extends
587            BaseRowProcessor<TimeoutProcessorRequest, TimeoutProcessorResponse> {
588      byte[] row = new byte[0];
589
590      /**
591       * Empty constructor for Writable
592       */
593      public TimeoutProcessor() {
594      }
595
596      public TimeoutProcessor(byte[] row) {
597        this.row = row;
598      }
599
600      public Collection<byte[]> getRowsToLock() {
601        return Collections.singleton(row);
602      }
603
604      @Override
605      public TimeoutProcessorResponse getResult() {
606        return TimeoutProcessorResponse.getDefaultInstance();
607      }
608
609      @Override
610      public void process(long now, HRegion region,
611          List<Mutation> mutations, WALEdit walEdit) throws IOException {
612        try {
613          // Sleep for a long time so it timeout
614          Thread.sleep(100 * 1000L);
615        } catch (Exception e) {
616          throw new IOException(e);
617        }
618      }
619
620      @Override
621      public boolean readOnly() {
622        return true;
623      }
624
625      @Override
626      public String getName() {
627        return "timeout";
628      }
629
630      @Override
631      public TimeoutProcessorRequest getRequestData() throws IOException {
632        TimeoutProcessorRequest.Builder builder = TimeoutProcessorRequest.newBuilder();
633        builder.setRow(ByteStringer.wrap(row));
634        return builder.build();
635      }
636
637      @Override
638      public void initialize(TimeoutProcessorRequest msg) throws IOException {
639        this.row = msg.getRow().toByteArray();
640      }
641    }
642
643    public static void doScan(HRegion region, Scan scan, List<Cell> result) throws IOException {
644      InternalScanner scanner = null;
645      try {
646        scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
647        scanner = region.getScanner(scan);
648        result.clear();
649        scanner.next(result);
650      } finally {
651        if (scanner != null) {
652          scanner.close();
653        }
654      }
655    }
656  }
657
658  static String stringifyKvs(Collection<Cell> kvs) {
659    StringBuilder out = new StringBuilder();
660    out.append("[");
661    if (kvs != null) {
662      for (Cell kv : kvs) {
663        byte[] col = CellUtil.cloneQualifier(kv);
664        byte[] val = CellUtil.cloneValue(kv);
665        if (Bytes.equals(col, COUNTER)) {
666          out.append(Bytes.toStringBinary(col) + ":" +
667                     Bytes.toInt(val) + " ");
668        } else {
669          out.append(Bytes.toStringBinary(col) + ":" +
670                     Bytes.toStringBinary(val) + " ");
671        }
672      }
673    }
674    out.append("]");
675    return out.toString();
676  }
677}