Apache Hive with MongoDB Integration

Apache Hive is a tool from Apache Hadoop eco-system to convert SQL like queries into Hadoop jobs for data summarization, querying and analysis. In this blog post we will see how data stored in MongoDB can be imported into Hive table. The data from Hive table is then processed and the result in stored in another Hive table.
We will use a 1 minute interval time-series stock prices for MSFT with the opening (first) price, high (max), low (min), and closing (last) price of each time interval and aggregate them into 5 minute intervals (called OHLC bars). The initial 1 minute interval data is stored in MongoDB in “minibars” table and is then processed in Hive via the MongoDB Hadoop Connector. The processed data is finally saved in “five_minute_bars” Hive table.
Firstly we need to setup the source data in MongoDB. So open File Browser in Hue and drag and drop the data file (data.csv) into it. Then load the file into HDFS using the following command:
$> hdfs dfs -get data.csv .
Open Linux terminal and start MongoDB. Create a database “vs_marketdata” and within it a collection “minibars” for holding the input data of 1 minute interval stock prices.
$>mongo
>use vs_marketdata
>db.createCollection("minibars")
{ "ok" : 1 }
>show collections
minibars
Open another Linux terminal and load data from data.csv file into the MongoDB collection
$>mongoimport data.csv --type csv --headerline --db vs_marketdata --collection minibars
On MongoDB terminal execute the following command to check that the data is successfully loaded in the “minibars” table.
> db.minibars.findOne()
{
"_id" : ObjectId("5b65152dbd565df3e9da0197"),
"Symbol" : "MSFT",
"Timestamp" : "2009-08-24 09:30",
"Day" : 24,
"Open" : 24.41,
"High" : 24.42,
"Low" : 24.31,
"Close" : 24.31,
"Volume" : 683713
}
Download Jars in Linux terminal from these Maven repository links:
https://mvnrepository.com/artifact/org.mongodb.mongo-hadoop/mongo-hadoop-core/1.5.2
https://mvnrepository.com/artifact/org.mongodb.mongo-hadoop/mongo-hadoop-hive/1.5.2
https://mvnrepository.com/artifact/org.mongodb/mongo-java-driver/2.13.2
Copy the link to jar files and use the wget commands in Linux terminal to download them:
$>wget http://central.maven.org/maven2/org/mongodb/mongo-hadoop/mongo-hadoop-core/1.5.2/mongo-hadoop-core-1.5.2.jar
$>wget http://central.maven.org/maven2/org/mongodb/mongo-hadoop/mongo-hadoop-hive/1.5.2/mongo-hadoop-hive-1.5.2.jar
$>wget http://central.maven.org/maven2/org/mongodb/mongo-java-driver/2.13.2/mongo-java-driver-2.13.2.jar
Start Hive from Linux terminal.
$>hive
hive (default)>
Create a Hive DB “vs_bar” and connect to it.
hive (default)> create database vs_bar;
OK
Time taken: 1.133 seconds
hive (default)> use vs_bar;
OK
Time taken: 5.045 seconds
hive (vs_bar)>
Add the downloaded jars in the Hive shell.
hive (vs_bar)> add jar /home/vaibhavsharma806060/mongo-hadoop-core-1.5.2.jar;
Added [/home/vaibhavsharma806060/mongo-hadoop-core-1.5.2.jar] to class path
Added resources: [/home/vaibhavsharma806060/mongo-hadoop-core-1.5.2.jar]
hive (vs_bar)> add jar /home/vaibhavsharma806060/mongo-hadoop-hive-1.5.2.jar;
Added [/home/vaibhavsharma806060/mongo-hadoop-hive-1.5.2.jar] to class path
Added resources: [/home/vaibhavsharma806060/mongo-hadoop-hive-1.5.2.jar]
hive (vs_bar)> add jar /home/vaibhavsharma806060/mongo-java-driver-2.13.2.jar;
Added [/home/vaibhavsharma806060/mongo-java-driver-2.13.2.jar] to class path
Added resources: [/home/vaibhavsharma806060/mongo-java-driver-2.13.2.jar]
Since we are using Timestamp (a reserved keyword) as column name we will have to disable the reserved keyword property
hive (vs_bar)> set hive.support.sql11.reserved.keywords=false;
Set execution engine as map-reduce
hive (vs_bar)> set hive.execution.engine=mr;
Create a new external Hive table and import data from MongoDB database “vs_marketdata” into Hive table “minute_bars”.
hive (vs_bar)> CREATE EXTERNAL TABLE minute_bars (id STRING, Symbol STRING, Timestamp STRING, Day INT, Open DOUBLE, High DOUBLE, Low DOUBLE, Close DOUBLE, Volume INT ) STORED BY 'com.mongodb.hadoop.hive.MongoStorageHandler' WITH SERDEPROPERTIES('mongo.columns.mapping'='{"id":"_id", "Symbol":"Symbol", "Timestamp":"Timestamp", "Day":"Day", "Open":"Open", "High":"High", "Low":"Low", "Close":"Close", "Volume":"Volume"}') TBLPROPERTIES('mongo.uri'='mongodb://127.0.0.1:27017/vs_marketdata.minibars');
Check if some data is present in the table:
hive (vs_bar)> select * from minute_bars limit 5;
OK
5b65152dbd565df3e9da0197 MSFT 2009-08-24 09:30 24 24.41 24.42 24.31 24.31 683713
5b65152dbd565df3e9da0198 MSFT 2009-08-24 09:34 24 24.39 24.42 24.38 24.42 238711
5b65152dbd565df3e9da0199 MSFT 2009-08-24 09:35 24 24.41 24.42 24.38 24.38 203860
5b65152dbd565df3e9da019a MSFT 2009-08-24 09:36 24 24.39 24.44 24.38 24.44 300527
5b65152dbd565df3e9da019b MSFT 2009-08-24 09:37 24 24.44 24.45 24.43 24.45 190160
Time taken: 0.507 seconds, Fetched: 5 row(s)
Create a new table in Hive to group five 1-minute periods and determine the OHLC(Open-high-low-close-chart) for the 5 minutes
hive (vs_bar)> CREATE TABLE five_minute_bars (id STRING, Symbol STRING, Timestamp STRING, Open DOUBLE, High DOUBLE, Low DOUBLE, Close DOUBLE);
OK
Time taken: 0.068 seconds
Create and execute an insert query to select the data from minute_bars table and load the aggregated data over five minutes into five_minute_bars table.
hive (vs_bar)> INSERT INTO TABLE vs_bar.five_minute_bars
SELECT m.id, m.Symbol, m.OpenTime as Timestamp, m.Open, m.High, m.Low, m.Close
FROM
(SELECT id, Symbol, FIRST_VALUE(Timestamp) OVER (
PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
ORDER BY Timestamp) as OpenTime,
LAST_VALUE(Timestamp) OVER (
PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
ORDER BY Timestamp) as CloseTime,
FIRST_VALUE(Open) OVER (
PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
ORDER BY Timestamp) as Open, MAX(High) OVER (
PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
ORDER BY Timestamp) as High,
MIN(Low) OVER (
PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
ORDER BY Timestamp) as Low, LAST_VALUE(Close) OVER (
PARTITION BY floor(unix_timestamp(Timestamp, 'yyyy-MM-dd HH:mm')/(5*60))
ORDER BY Timestamp) as Close
FROM vs_bar.minute_bars) as m
WHERE unix_timestamp(m.CloseTime, 'yyyy-MM-dd HH:mm') - unix_timestamp(m.OpenTime, 'yyyy-MM-dd HH:mm') = 60*4;
You should get somewhat similar output. Sometimes you can get IOException for MongoDB in lab environments. In such cases re-run the Insert command:
Query ID = vaibhavsharma806060_20180804033337_1b07e933-c482-436a-8426-c35245fcbb0d
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1527045214830_13302, Tracking URL = http://a.cloudxlab.com:8088/proxy/application_1527045214830_13302/
Kill Command = /usr/hdp/2.3.4.0-3485/hadoop/bin/hadoop job -kill job_1527045214830_13302
Hadoop job information for Stage-1: number of mappers: 4; number of reducers: 1
2018-08-04 03:33:44,630 Stage-1 map = 0%, reduce = 0%
2018-08-04 03:33:54,039 Stage-1 map = 25%, reduce = 0%, Cumulative CPU 10.25 sec
2018-08-04 03:34:04,459 Stage-1 map = 25%, reduce = 8%, Cumulative CPU 10.81 sec
2018-08-04 03:34:12,953 Stage-1 map = 50%, reduce = 8%, Cumulative CPU 18.95 sec
2018-08-04 03:34:13,978 Stage-1 map = 50%, reduce = 17%, Cumulative CPU 19.02 sec
2018-08-04 03:34:29,493 Stage-1 map = 75%, reduce = 17%, Cumulative CPU 29.11 sec
2018-08-04 03:34:32,563 Stage-1 map = 75%, reduce = 25%, Cumulative CPU 29.16 sec
2018-08-04 03:34:42,057 Stage-1 map = 100%, reduce = 25%, Cumulative CPU 38.78 sec
2018-08-04 03:34:44,108 Stage-1 map = 100%, reduce = 67%, Cumulative CPU 41.97 sec
2018-08-04 03:34:47,180 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 51.3 sec
MapReduce Total cumulative CPU time: 51 seconds 300 msec
Ended Job = job_1527045214830_13302
Loading data to table vs_bar.five_minute_bars
Table vs_bar.five_minute_bars stats: [numFiles=1, numRows=19464, totalSize=1372872, rawDataSize=1353408]
MapReduce Jobs Launched:
Stage-Stage-1: Map: 4 Reduce: 1 Cumulative CPU: 51.3 sec HDFS Read: 39978 HDFS Write: 1372959 SUCCESS
Total MapReduce CPU Time Spent: 51 seconds 300 msec
OK
Time taken: 70.54 seconds
Query the data in HiveQL to confirm that the 5 minute internal data has been inserted successfully.
hive (vs_bar)> select * from five_minute_bars limit 5;
OK
5b65152dbd565df3e9da0198 MSFT 2009-08-24 09:30 24.41 24.42 24.28 24.42
5b65152dbd565df3e9da01a6 MSFT 2009-08-24 09:35 24.41 24.49 24.38 24.48
5b65152dbd565df3e9da01a2 MSFT 2009-08-24 09:40 24.48 24.56 24.47 24.56
5b65152dbd565df3e9da01a7 MSFT 2009-08-24 09:45 24.56 24.6 24.49 24.51
5b65152dbd565df3e9da01ac MSFT 2009-08-24 09:50 24.5 24.52 24.47 24.48
Time taken: 0.106 seconds, Fetched: 5 row(s)
We have seen how using the MongoDB connectors we can load the data from MongoDB into Hive tables for data processing.