Importing Postgres Data Into Hadoop HDFS
Recently I’ve been getting a crash course in big data. I ran into an issue where the analytics that used a certain table on out Postgres database was so large it would not run. Like to the point where it would grind a 8gb of memory server to a near halt on a single query. While I had gone through several optimizations, it still was not performant enough at the scale needed. In fact it caused havoc with our host due to IO issues. We’re talking 50 million rows or more that need to be analyzed in a scalable way.
I’m sure Hadoop needs no introduction for most, but the great benefits is a scalable way to take massive amounts of data and answer the questions you have.
How do you get existing data into Hadoop?
I had a ton of data that I needed to get into HDFS that was on a Postgres server. I needed a way to dump the data into a useable format so that I could use it properly with Hadoop. Luckily there is an Apache project called Sqoop (yes Sqoop Hadoop is just an amazing combination of words).
Running Sqoop Jobs
Sqoop requires that you have Java and Hadoop installed on the machine that is going to run the job. The great news is that it doesn’t have to be on the same machine as your server. You can run it from any machine, but you typically will run it from a Hadoop cluster. That means it doesn’t matter where your server lives or who if your host doesn’t allow you to install your own packages. You also don’t have to dump the data to the machine that the Sqoop job will be running on. In my case I wanted all my data in S3 as a super portable replacement for HDFS.
Unfotunately, it took a lot of trial and error to get it working. Make sure that you install the proper JDBC driver by putting it on your java classpath or put it in the sqoop directory (usually in /usr/bin/sqoop).
Here’s a local example using Postgres:
sqoop import --connect jdbc:postgresql://localhost:5432/mydb \ --table my_table \ --username myusername \ --password mypassword \ --direct
This will dump the table my_table on a local postgres server. The —direct part is a speedup that’s unique to Postgres and MySql connectors for sqoop.
To dump data directly to S3:
sqoop import --username myusername --password mypassword --table my_table --target-dir s3n://MYS3APIKEY:MYS3SECRETKEY@bucketname/folder/ --hive-drop-import-delims --escaped-by "\\"
Some notes to help your blood pressure:
- Make sure your bucket doesn’t have an underscore in it. For some reason I wan’t able to get it to work if there was an underscore in the bucket.
- Make sure you use trailing slashes. This should help your blood pressure.
- —direct doesn’t work with —escaped-by
- If you have freeform text fields in your data and there is \n it will fuck up your Hadoop job if you try to read new lines of data by splitting on \n. Use the —hive-drop-import-delims to parse those out. I don’t know why it works, this has nothing to do with Hive, it just cleans up your output.
- —escaped-by should always be used if you have freeform text in your data. This will help you parse the data when you run a job because you can ignore important characters (such as a comma) if they are escaped, letting you split data into fields more easily.
What happens if we need to get updated data off of our database? HDFS doesn’t really do an “update” to existing data, but we can append all the new rows added past a certain point. To do that we can use the —incremental argument.
sqoop import --connect jdbc:postgresql://localhost:5432/mydb --username myusername --password mypassword --table my_table --incremental append --check-column id --hive-drop-import-delims --escaped-by "\\"
Append Data To S3 Using Sqoop
I wanted to keep my data independent of the cluster that would run the job. This gives you flexibility to run your jobs wherever, on a cluster or on something like Elastic MapReduce. You have the freedom to build and destroy clusters on a whim because there is no persistant data to lose.
There’s no point in appending if you don’t know the last thing you appended (which row was last to go in the database). Sqoop has a “job” command that will store the last id (or date) of the last items dumped. It then queries only the rows after that point. Very cool!
Unfortunately (again), I couldn’t get this to work by dumping directly to S3 using the —incremental argument so I had to use a workaround. First run the command then use distcp to copy the data to S3. For some reason, incremental appends using S3 as the —warehouse-dir does not work even when specifying a -fs argument.
Hopefully this saves you as much time as I spent figuring it out.
First save the job:
sqoop job --create myjob -- import --connect jdbc:postgresql://myaddress:12345/dbname --table mytable --hive-drop-import-delims --escaped-by "\\" --username myusername --password mypassword --incremental append --check-column id --split-by id
Then run it:
sqoop job --exec myjob
Then copy it in a parallelized way to S3:
hadoop distcp mytable s3n://MY_S3_KEY:MY_S3_SECRET@mybucket/folder/
Boom, data on S3 with no dependency on your cluster living another day! I should note that you will want to use a centralized metastore for saving the last rows run by append. See the sqoop docs for more info.