HCatalog Reader Writer

HCatalog offers a data transfer API that allows for parallel data transfer for both input and output. This process can be carried out without the need for MapReduce. The API provides a method to read data from or write data to a Hadoop cluster.

To facilitate this process, the HCatalog data transfer API employs a fundamental storage structure consisting of tables and rows. It is specifically designed to enable the integration of external systems with Hadoop. The data transfer API includes three important classes, which are essential for its functionality.

  • HCatReader – reads data from a Hadoop cluster
  • HCatWriter – writes data into a Hadoop cluster
  • DataTransferFactory – generates reader and writer instances

HCatReader -

HCatReader is an abstract class internal to HCatalog. It simplifies the process of retrieving records from an external system. Reading records happens in two steps. The first step takes place on the main node of the external system. The second step occurs in parallel on several slave nodes.

To read records, we need a "ReadEntity." Before starting to read, we must define this ReadEntity. You can create it using ReadEntity.Builder, where you specify details like the database name, table name, partition, and filter string.

Example:

ReadEntity.Builder builder = new ReadEntity.Builder();
ReadEntity entity = builder.withDatabase("stddb")
.withTable("stdtbl").build();

The code snippet defines a ReadEntity object called "entity," which represents a table named "stdtbl" in a database called "stddb." This object can be used to read all the rows from this table.

Note! The above table must exist in HCatalog prior to the start of this operation.

After defining a ReadEntity, user get an instance of HCatReader using the ReadEntity and cluster configuration like below -

HCatReader reader = DataTransferFactory
.getHCatReader(entity, config);

The next step is to get a ReaderContext from reader as follows -

ReaderContext cntxt = reader.prepareRead();

All of the above steps happen on the master node. The master node then serializes the ReaderContext object and sends it to all the slave nodes. Slave nodes use the reader context to read data.

HCatWriter -

HCatWriter is part of HCatalog and is used to write data from external systems to HCatalog. Do not try to create an instance of HCatWriter directly. Instead, use the DataTransferFactory.

Writing data happens in two steps. The first step takes place on the master node, and the second step runs in parallel on slave nodes. You write data using a "WriteEntity," which you can build in a way that is similar to how you build reads.

Example:

WriteEntity.Builder builder = new WriteEntity.Builder();
WriteEntity entity = builder.withDatabase("stddb")
.withTable("stdtbl").build();

The above code creates a WriteEntity object ("entity") which can be used to write into a table named "stdtbl" in the database "stddb". After creating a WriteEntity, the next step is to get a WriterContext -

HCatWriter writer = DataTransferFactory
.getHCatWriter(entity, config);
WriterContext info = writer.prepareWrite();

All of the above steps happen on the master node. The master node then serializes the WriterContext object and makes it available to all the slaves. On slave nodes, user need to obtain an HCatWriter using WriterContext like below -

HCatWriter writer = DataTransferFactory
.getHCatWriter(context);

Then, writer takes an iterator as the argument for the write method -

writer.write(hCatRecordItr);

Full Example:

Below is a complete, ready-to-run walk-through that shows how to use HCatalog’s HCatWriter and HCatReader with a student_details Hive table.

Step-1: Set up the table

CREATE TABLE IF NOT EXISTS student_details (
    student_id   INT,
    student_name STRING,
    age          INT,
    grade        STRING
)
STORED AS RCFILE;

Step-2: Add the Maven (or Gradle) dependencies

<!-- pom.xml excerpt -->
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>3.1.3</version>   <!-- match your Hive version -->
</dependency>
<dependency>
    <groupId>org.apache.hive.hcatalog</groupId>
    <artifactId>hcatalog-core</artifactId>
    <version>3.1.3</version>
</dependency>
<dependency>
    <groupId>org.apache.hive.hcatalog</groupId>
    <artifactId>hcatalog-streaming</artifactId>
    <version>3.1.3</version>
</dependency>

Make sure the Hive JAR versions match whatever is installed on your cluster.

Step-3: Write rows with HCatWriter

Step-4: Read rows with HCatReader

Step-5: Full executable class: StudentDetailsReadWrite.java

import java.io.*;
import java.util.*;
import java.util.Map.Entry;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hive.hcatalog.common.HCatException;
import org.apache.hive.hcatalog.data.DefaultHCatRecord;
import org.apache.hive.hcatalog.data.HCatRecord;
import org.apache.hive.hcatalog.data.transfer.*;
import org.apache.hive.hcatalog.mapreduce.HCatBaseTest;
import org.junit.Assert;
import org.junit.Test;

public class TestReaderWriter extends HCatBaseTest {

  /** Convenience method that fabricates one demo record. */
  private static HCatRecord rec(int i){
      return new DefaultHCatRecord(Arrays.asList("Row #"+i, i));
  }

  /** Master-side: set up a WriteEntity and get the initial WriterContext. */
  private WriterContext masterPrepareWrite(Map<String,String> conf) 
		throws HCatException{
      WriteEntity we = new WriteEntity.Builder()
              .withTable("mytbl")      // use your DB + table here!
              .build();
      HCatWriter w = DataTransferFactory.getHCatWriter(we, conf);
      return w.prepareWrite();    // happens exactly once on master
  }

  /** Slave-side: actually write the records. */
  private void slaveWrite(WriterContext ctx) throws HCatException{
      HCatWriter w = DataTransferFactory.getHCatWriter(ctx);
      // create a tiny iterator of demo rows
      Iterator<HCatRecord> it = new Iterator<>() {
          private int i=1, max=100;
          public boolean hasNext(){ return i<=max; }
          public HCatRecord next(){ return rec(i++); }
      };
      w.write(it);    // pushes all rows into HCatalog
  }

  /** Master-side: commit or abort metadata. */
  private void masterFinishWrite(Map<String,String> conf,
                   WriterContext ctx, boolean ok) throws IOException{
      WriteEntity we = new WriteEntity.Builder().withTable("mytbl").build();
      HCatWriter w = DataTransferFactory.getHCatWriter(we, conf);
      if(ok)  w.commit(ctx);  else  w.abort(ctx);
  }

  /** Master-side: prepare to read splits. */
  private ReaderContext masterPrepareRead(Map<String,String> conf) 
  throws HCatException{
      ReadEntity re = new ReadEntity.Builder().withTable("mytbl").build();
      HCatReader r  = DataTransferFactory.getHCatReader(re, conf);
      return r.prepareRead();          // split metadata sent to slaves
  }

  /** Slave-side: consume one split and assert its contents. */
  private void slaveRead(ReaderContext ctx, int split) throws HCatException{
      HCatReader r = DataTransferFactory.getHCatReader(ctx, split);
      Iterator<HCatRecord> it = r.read();
      int expected=1;
      while(it.hasNext()){
          HCatRecord row = it.next();
          Assert.assertEquals("Row #"+expected, row.get(0));
          Assert.assertEquals(expected,      row.get(1));
          expected++;
      }
  }

  @Test
  public void integrationTest() throws Exception {

      // ── DEMO ONLY: create table in the Hive metastore ──
      driver.run("DROP TABLE IF EXISTS mytbl");
      driver.run("CREATE TABLE mytbl (a STRING, b INT)");

      // gather hiveConf into a plain Map because 
	  DataTransferFactory requires it
      Map<String,String> cfg = new HashMap<>();
      hiveConf.iterator().forEachRemaining(e 
		-> cfg.put(e.getKey(), e.getValue()));

      // ── WRITE PHASE ──
      WriterContext wCtx = masterPrepareWrite(cfg);
      // pretend to serialize/ship the context
      slaveWrite(wCtx);
      masterFinishWrite(cfg, wCtx,true);

      // ── READ PHASE ──
      ReaderContext rCtx = masterPrepareRead(cfg);
      for(int i=0;i<rCtx.numSplits();i++){
          slaveRead(rCtx,i);
      }
  }
}

Step-6: How to Run

# Compile (example classpath; tweak for your environment)
javac -classpath "<hadoop_jars>:<hive_jars>:<hcatalog_jars>" StudentDetailsReadWrite.java

# Execute
java  -classpath ".:<hadoop_jars>:<hive_jars>:<hcatalog_jars>" StudentDetailsReadWrite