Sunday, July 3, 2011

Hive Query execution - behind the scenes (alpha version)

Today, I am going to talk on Hive Query execution.

Without any delay, let's kick start the execution process. First, user enters some query on Hive CLI prompt. For the rest of the blog, we will use this query : "select count(*) from hivetable_rcfile".

hive> select count(*) from hivetable_rcfile;
Query execution happens in a series of steps. First, CliDriver object will be created and the entered command will be passed to the processCmd method.

CliDriver.processCmd(cmd);

processCmd method:

    if cmd starts with  "exit" then System.exit(0)
    else if cmd starts with  {file_name} then processFile(file_name)
    else if cmd starts with  "!"{command} then
        shell command will be executed
        // for example !ls; will give us list of folders
        // in current folder (from hive prompt)
    end
    else if cmd starts with "list" then
        added files will be listed
    end
    else
        begin:
            get Driver object
            Driver.run(cmd)
        end
    end if

Driver.run("cmd"){
    Driver.compile("cmd");

    Once compilation got completed, then try to acquire read and write locks
    on the target table.
    Note: Driver.acquireReadWriteLocks() method will deal with table lock
    mechanism

    Once locking is done, then execute the query

    setting hive.exec.parallel to true makes map-reduce tasks to get executed
    in parallel else tey will run in sequential manner

}

Driver.compile("cmd"){
    create ParseDriver object
    // retrieve context info from coniguration object
    ParseDriver.parse(cmd, context);

    After getting AST tree and creating BaseSemanticAnalyzer object, then check
    for custom semantic analyzer hooks from configuration object. Invoke all
    custom hooks.

    // analyze tree using BaseSemanticAnalyzer object
    BaseSemanticAnalyzer.analyze(tree, context)

    SemanticAnalyzer class extends BaseSemanticAnalyzer
    // this class supports SELECT queries analysis
    calling....
    SemanticAnalyzer.analyzeInternal(tree)

    validate the plan. Here we validate whether inputs and outputs have right
    protect mode to execute the query

    create QueryPlan object using queryString and SemanticAnalyzer object
    /*
    Example for a Query Plan:

Map Reduce
    Alias -> Map Operator Tree:
        hivetable_rcfile
        TableScan
            alias: hivetable_rcfile
            GatherStats: false
        Select Operator
        Group By Operator
            aggregations:
            expr: count()
            bucketGroup: false
            mode: hash
            outputColumnNames: _col0
        Reduce Output Operator
            sort order:
            tag: -1
            value expressions:
            expr: _col0
            type: bigint
            Needs Tagging: false
        Path -> Alias:
            hdfs://localhost:9000/user/krishna/warehouse/hivetable.db/hivetable_rcfile/temp=1/custid=1001063160 [hivetable_rcfile]
            .
            .
        Path -> Partition:
            hdfs://localhost:9000/user/krishna/warehouse/hivetable.db/hivetable_rcfile/temp=1/custid=1001063160
            Partition
                base file name: custid=1001063160
                input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat
                output format: org.apache.hadoop.hive.ql.io.RCFileOutputFormat
                partition values:
                custid 1001063160
                temp 1
                properties:
                    bucket_count -1
                    columns id1,id2,id3,id4,id5,id6
                    columns.types string:string:string:string:string:string
                    file.inputformat org.apache.hadoop.hive.ql.io.RCFileInputFormat
                    file.outputformat org.apache.hadoop.hive.ql.io.RCFileOutputFormat
                    location hdfs://localhost:9000/user/krishna/warehouse/hivetable.db/hivetable_rcfile/temp=1/custid=1001063160
                    name hivetable.hivetable_rcfile
                    numFiles 60
                    numPartitions 60
                    numRows 0
                    partition_columns temp/custid
                    serialization.ddl struct hivetable_rcfile { string id1, string id2, string id3, string id4, string id5, string id6}
                    serialization.format 1
                    serialization.lib org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe
                    totalSize 1359361866
                    transient_lastDdlTime 1309305474
                serde: org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe

                input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat
                output format: org.apache.hadoop.hive.ql.io.RCFileOutputFormat
                properties:
                    bucket_count -1
                    columns id1,id2,id3,id4,id5,id6
                    columns.types string:string:string:string:string:string
                    file.inputformat org.apache.hadoop.hive.ql.io.RCFileInputFormat
                    location hdfs://localhost:9000/user/krishna/warehouse/hivetable.db/hivetable_rcfile
                    file.outputformat org.apache.hadoop.hive.ql.io.RCFileOutputFormat
                    name hivetable.hivetable_rcfile
                    numFiles 60
                    numPartitions 60
                    numRows 0
                    partition_columns temp/custid
                    serialization.ddl struct hivetable_rcfile { string id1, string id2, string id3, string id4, string id5, string id6}
                    serialization.format 1
                    serialization.lib org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe
                    totalSize 1359361866
                    transient_lastDdlTime 1309305474
                serde: org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe
                name: hivetable.hivetable_rcfile
                name: hivetable.hivetable_rcfile
    Reduce Operator Tree:
        Group By Operator
            aggregations:
            expr: count(VALUE._col0)
            bucketGroup: false
            mode: mergepartial
            outputColumnNames: _col0
        Select Operator
            expressions:
            expr: _col0
            type: bigint
            outputColumnNames: _col0
        File Output Operator
            compressed: false
            GlobalTableId: 0
            directory: hdfs://localhost:9000/tmp/hive-krishna/hive_2011-06-29_14-14-23_824_1404543156695888065/-ext-10001
            NumFilesPerFileSink: 1
            Stats Publishing Key Prefix: hdfs://localhost:9000/tmp/hive-krishna/hive_2011-06-29_14-14-23_824_1404543156695888065/-ext-10001/
            table:
            input format: org.apache.hadoop.mapred.TextInputFormat
            output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
            properties:
                columns _col0
                columns.types bigint
                serialization.format 1
            TotalFiles: 1
            GatherStats: false
            MultiFileSpray: false
    */

    If the plan already exists, that is query is serialized, then de-serialize
    and then load the query information to QueryPlan object

    // Fetch the output schema represented in Hive native types
    Driver.getSchema(semanticAnalyzer, conf)

    Serialize and De-serialize the plan. Query Plan will be saved in a xml
    format. Initialize the de-serialized plan.
}   

ParseDriver.parse(cmd, context){
    // AST tree will be created using HiveParser object
    /*
    Example for AST tree
    Query : select count(*) from hivetable_rcfile;

    ABSTRACT SYNTAX TREE:
      (TOK_QUERY
          (TOK_FROM
              (TOK_TABREF
                  (TOK_TABNAME hivetable_rcfile)
              )
          )
          (TOK_INSERT
              (TOK_DESTINATION
                  (TOK_DIR TOK_TMP_FILE)
              )
              (TOK_SELECT
                  (TOK_SELEXPR
                      (TOK_FUNCTIONSTAR count)
                  )
              )
          )
      )
      */
    return AST tree
}

SemanticAnalyzer.analyzeInternal(tree){
    //analyze from child node of AST tree
    SemanticAnalyzer.doPhase1(childNode, QB, context) // QB - Query Block
   
    Extracts all information from AST tree regarding the clauses in the QB
}