+# Gopkg.toml example
+# Refer to https://golang.github.io/dep/docs/Gopkg.toml.html
+# for detailed Gopkg.toml documentation.
+# required = ["github.com/user/thing/cmd/thing"]
+# ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"]
+# [[constraint]]
+# name = "github.com/user/project"
+# version = "1.0.0"
+# [[constraint]]
+# name = "github.com/user/project2"
+# branch = "dev"
+# source = "github.com/myfork/project2"
+# [[override]]
+# name = "github.com/x/y"
+# version = "2.4.0"
+# [prune]
+# non-go = false
+# go-tests = true
+# unused-packages = true
+ go-tests = true
+ unused-packages = true
+# pg2bq
+Load Terabytes of Data From Postgres Into BigQuery
+Work in progress.
+Inspired by: [ETL bash script](https://medium.com/radio-africa-techblog/loading-terabytes-of-data-from-postgres-into-bigquery-ec4ba8978cc9)
+package adapters
+import (
+ "context"
+ "github.com/masterxavierfox/goetl-postgres-bigquery/config"
+ log "github.com/sirupsen/logrus"
+ "google.golang.org/api/option"
+ "os"
+ "cloud.google.com/go/bigquery"
+func bQUpload(datasetID, tableID, filename string) error{
+ config.LoadEnv()
+ //creds, err := ioutil.ReadFile(credsFile)
+ //if err != nil {
+ // log.Fatalf("Could note parse config file.")
+ //}
+ log.Info(":::: ========== BIG QUERY UPLOAD STARTED ========= ::::")
+ log.Info("\n -::Dataset ID | ",datasetID,"\n -::Table ID | ",tableID,"\n -::CSV File | ",filename,"\n -::PROJECT ID | ",config.BQ_PROJECT_ID,"\n")
+ ctx := context.Background()
+ client, err := bigquery.NewClient(ctx, config.BQ_PROJECT_ID,option.WithCredentialsFile(credsFile))
+ if err != nil {
+ return err
+ }
+ // [START big-query_load_from_file]
+ f, err := os.Open(filename)
+ if err != nil {
+ return err
+ }
+ source := bigquery.NewReaderSource(f)
+ source.AutoDetect = true // Allow BigQuery to determine schema.
+ source.SkipLeadingRows = 1 // CSV has a single header line.
+ loader := client.Dataset(datasetID).Table(tableID).LoaderFrom(source)
+ job, err := loader.Run(ctx)
+ if err != nil {
+ return err
+ }
+ status, err := job.Wait(ctx)
+ if err != nil {
+ return err
+ }
+ if err := status.Err(); err != nil {
+ return err
+ }
+ // [END bigquery_load_from_file]
+ DeleteArchive(filename)
+ return nil
+//package adapters
+//import (
+// "fmt"
+// "bytes"
+// "strings"
+// "github.com/go-pg/pg"
+// "github.com/go-pg/pg/orm"
+//func getTable(){
+// db := pg.Connect(&pg.Options{
+// User: "postgres",
+// Password: "",
+// Database: "",
+// })
+// defer db.Close()
+// _, err := pgdb.Exec(`CREATE TEMP TABLE words(word text, len int)`)
+// panicIf(err)
+// r := strings.NewReader("hello,5\nfoo,3\n")
+// _, err = pgdb.CopyFrom(r, `COPY words FROM STDIN WITH CSV`)
+// panicIf(err)
+// var buf bytes.Buffer
+// _, err = pgdb.CopyTo(&buf, `COPY words TO STDOUT WITH CSV`)
+// panicIf(err)
+// fmt.Println(buf.String())
\ No newline at end of file
+package adapters
+import (
+ "github.com/masterxavierfox/goetl-postgres-bigquery/config"
+ log "github.com/sirupsen/logrus"
+ "os"
+ "os/exec"
+ "time"
+//Runs ETL job decider for daemon or non-daemon mode
+func EtlJobDecider(daemon bool,table,columnSelect, startDate,endDate string) {
+ switch daemon{
+ case true:
+ pgEtlDaemonJob()
+ case false:
+ pgEtlManualJob(table,columnSelect,startDate,endDate)
+ default:
+ log.Fatalf("ERROR: Please check your configs.")
+ }
+//Runs ETL job for non-daemon mode
+func pgEtlManualJob (table,columnSelect, startDate,endDate string){
+ config.CheckCmdExists("psql")
+ config.LoadEnv()
+ //Pg options
+ tblExport := "'"+table+"-"+startDate+".csv'"
+ dayStart := "'"+startDate+"'"
+ dayEnd := "'"+endDate+"'"
+ //Cleanup workspace
+ DeleteArchive(""+table+"-"+startDate+".csv.gz")
+ log.Info("MANUAL WORKFLOW STARTED :::::: Downloading table: "+ table +":" + startDate + "...")
+ cmd := exec.Command("psql","-c", "\\copy (select "+columnSelect+" from "+table+" where created_at >= "+dayStart+" and created_at < "+dayEnd+") TO "+tblExport+" WITH CSV HEADER")
+ out, err := cmd.CombinedOutput()
+ if err != nil {
+ log.Error("PSQL COMMAND: failed with %s\n", err)
+ }
+ log.Info("PSQL STATUS: ", string(out))
+ tblExportArc := ""+table+"-"+startDate+".csv"
+ log.Info(":::: ========== CREATING ARCHIVE ========= ::::")
+ config.CmdEngine("gzip",tblExportArc)
+ err = bQUpload(config.BQ_DATASET_ID, config.BQ_TABLE_ID, tblExportArc+".gz")
+ if err != nil {
+ log.Fatal(err)
+ }
+//Runs ETL job for daemon mode
+func pgEtlDaemonJob (){
+ config.LoadEnv()
+ var tables = []string{os.Getenv("PGTABLES")}
+ //for i := 0; i < len(tables); i++ {
+ for i := range tables {
+ //Get start sync date
+ syncStartDate := pgdaemonDatesearch("MIN",tables[i],"created_at","START")
+ //Get start sync date
+ syncEndDate := pgdaemonDatesearch("MAX",tables[i],"created_at","END")
+ //Get Last job time
+ //syncTime := config.RedKeep(tables[i],syncEndDate)
+ startTime := time.Now().String()
+ //Parse date to check if its correct
+ log.Info("| PG :: START-SYNC-DATE | record: ",syncStartDate)
+ log.Info("| PG :: END-SYNC-DATE | record: ",syncEndDate)
+ //time, err := time.Parse(time.RFC3339,syncTime);
+ //if err != nil {
+ // log.Fatal("| DATE-FORMAT-ERROR | we have: ",time)
+ //}else {
+ log.Info("| PG ::: JOB START TIME: "+ startTime[0:19] +"| SYNC-TABLE |: "+tables[i] )//+" | REDIS: "+config.RedKeepPing())
+ //Dump and pump
+ pgUploadday(syncStartDate,syncEndDate,tables[i],startTime)
+ // }
+ ////Check if database has been synced or not
+ //switch state{
+ //
+ //case "create":
+ // log.Info("| NEW-SYNC-DATE | record: ",syncTime)
+ // time, err := time.Parse(time.RFC3339,syncTime);
+ // if err != nil {
+ // log.Fatal("| DATE-FORMAT-ERROR | we have: ",time)
+ // }else{
+ // log.Info("| SYNC-DATE |: "+ syncTime +"REDIS |:"+config.RedKeepPing())
+ // //Dump and pump
+ // pgUploadday(syncStartDate,syncEndDate,tables[i])
+ // return syncStartDate
+ // }
+ //case "update":
+ // //Get start sync date
+ // upSyncStartDate := pgdaemonDatesearch("MIN",tables[i],"updated_at")
+ //
+ // //Get start sync date
+ // upSyncEndDate := pgdaemonDatesearch("MAX",tables[i],"updated_at")
+ //
+ //
+ // upSyncTime ,state:= config.RedkeepUpdate(tables[i],upSyncEndDate)
+ //
+ // log.Info("| SYNC-DATE |: Bypassing Redis. | REDIS |:"+config.RedKeepPing())
+ //
+ // if syncTime != "" {
+ // log.Info("| SYNC-DATE | record: ",syncTime)
+ // //Dump and pump
+ // pgUploadday(syncStartDate,syncEndDate,tables[i])
+ // return syncTime
+ // }else{
+ // log.Info("| SYNC-DATE |: Bypassing Redis. | REDIS |:"+config.RedKeepPing())
+ // //Dump and pump
+ // pgUploadday(syncEndDate,upSyncEndDate,tables[i])
+ // return syncStartDate
+ // }
+ //
+ //
+ //default:
+ // log.Info("All Synced up baby")
+ //
+ //
+ //}
+ }
+//initiate big query upload and postgres db dump.
+func pgUploadday (syncStart,syncEnd,table,jobtime string){
+ config.LoadEnv()
+ tblExport := table+"-"+syncStart[0:10]+".csv"
+ firstDate :=syncStart[0:10]
+ secondDate := syncEnd[0:10]
+ dbjobTime := table+"-"+jobtime[0:10]+".csv"
+ //Cleanup workspace
+ //DeleteArchive(tblExport+".gz")
+ //Dump database to CSV
+ log.Info("DAEMON WORKFLOW STARTED ::::::\n Downloading table: "+ table +".\n From - " + syncStart[0:10] + ".\n To - "+syncEnd[0:10]+".\n Job Time: "+jobtime)
+ //cmd := exec.Command("psql", "-c", "\\copy (select "+config.COLUMN_SELECT+" from "+table+" where "+column+" >= "+syncStart[0:10]+" and "+column+" <= "+syncEnd[0:10]+") TO "+table+"-"+syncStart[0:10]+".csv WITH CSV HEADER")
+ cmd := exec.Command("psql","-c", "\\copy (select "+config.COLUMN_SELECT+" from "+table+" where created_at >= '"+firstDate+"' and created_at <= '"+secondDate+"') TO "+dbjobTime+" WITH CSV HEADER")
+ out, err := cmd.CombinedOutput()
+ if err != nil {
+ log.Errorf("ERROR:\n%s\n", string(out))
+ log.Fatal("PSQL: failed with %s\n", err,cmd)
+ }
+ config.CmdEngine("gzip",tblExport)
+ //err = bQUpload(config.BQ_DATASET_ID, config.DB_NAME+"_"+table, ""+table+"-"+syncStart+".csv.gz")
+ //if err != nil {
+ // log.Fatal(err)
+ //}
+ //Delete archive
+ config.CmdEngine("rm",tblExport+".gz")
+//Search database for the max or min date
+func pgdaemonDatesearch (c,table string,column string,stage string) string{
+ //Get date record from database
+ cmd := exec.Command("psql", "-qAt","-c", "select "+c+"("+column+") from public."+table)
+ out, err := cmd.CombinedOutput()
+ if err != nil {
+ log.Errorf("PSQL: failed with %s\n", err)
+ }
+ log.Info(stage+" : RECORD DATE: ", string(out[0:19]))
+ date := string(out[0:19])
+ return date
+//Delete database archive
+func DeleteArchive (tblExportArchive string){
+ //Delete archive
+ log.Info(":::: ========== DELETING ARCHIVE ========= ::::")
+ config.CmdEngine("rm",tblExportArchive)
+ //config.NewCommander("rm",tblExportArchive)
+function upload_day {
+ table=$1
+ sel=$2
+ day=$3
+ next_day=$(date -d "$day+1 days" +%Y-%m-%d)
+ bq_suffix=$(date -d "$day" +%Y%m%d)
+ echo "Uploading $table: $day..."
+ psql -c "\\copy (select $sel from $table where created_at >= '$day' and created_at < '$next_day') TO '$table-$day.csv' WITH CSV HEADER"
+ gzip $table-$day.csv
+ bq load --allow_quoted_newlines --project_id --replace --source_format=CSV --autodetect --max_bad_records 100 .$table$bq_suffix $table-$day.csv.gz
+ rm $table-$day.csv.gz
+function upload_table {
+ t=$1
+ s=$2
+ start_date=$3
+ end_date=$4
+ while [ "$start_date" != "$end_date" ]; do
+ upload_day "$t" "$s" "$start_date"
+ start_date=$(date -d "$start_date+1 days" +%Y-%m-%d)
+ done
+upload_table "$1" '*' "$2" "$3"
+#//Runs ETL job for non-daemon mode
+#func pgEtlManualJob (table,columnSelect, startDate,endDate string){
+# config.CheckCmdExists("psql")
+# config.LoadEnv()
+# //Pg options
+# tblExport := "'"+table+"-"+startDate+".csv'"
+# dayStart := "'"+startDate+"'"
+# dayEnd := "'"+endDate+"'"
+# log.Info("MANUAL WORKFLOW STARTED :::::: Downloading table: "+ table +":" + startDate + "...")
+# cmd := exec.Command("psql","-c", "\\copy (select "+columnSelect+" from "+table+" where created_at >= "+dayStart+" and created_at < "+dayEnd+") TO "+tblExport+" WITH CSV HEADER")
+# out, err := cmd.CombinedOutput()
+# if err != nil {
+# log.Error("PSQL COMMAND: failed with %s\n", err)
+# }
+# log.Fatalf("ERROR:\n%s\n", string(out))
+package config
+import (
+ "fmt"
+ "github.com/joho/godotenv"
+ log "github.com/sirupsen/logrus"
+ "os"
+ "os/exec"
+ "time"
+ "github.com/go-redis/redis"
+//ASCII Character generator
+// http://patorjk.com/software/taag/#p=display&f=ANSI%20Shadow&t=PG2BQ
+const PG2BQ_ASCII_LOGO = `
+█▀▀█ █▀▀▀ ░ ░ █▀█ ░ ░ █▀▀▄ █▀▀█
+█░░█ █░▀█ ▀ ▀ ░▄▀ ▀ ▀ █▀▀▄ █░░█
+█▀▀▀ ▀▀▀▀ ░ ░ █▄▄ ░ ░ ▀▀▀░ ▀▀▀█
+var (
+ BQ_PROJECT_ID string = os.Getenv("BQ_PROJECT_ID")
+ BQ_DATASET_ID string = os.Getenv("BQ_DATASET_ID")
+ BQ_TABLE_ID string = os.Getenv("BQ_TABLE_ID")
+ DB_HOST string = os.Getenv("PGHOST")
+ DB_PORT string = os.Getenv("PGPORT")
+ DB_USER string = os.Getenv("PGUSER")
+ DB_PASS string = os.Getenv("PGPASSWORD")
+ DB_NAME string = os.Getenv("PGDATABASE")
+ COLUMN_SELECT string = os.Getenv("PGCOLUMNS")
+ START_DAY string = os.Getenv("START_DAY")
+ END_DAY string = os.Getenv("END_DAY")
+ REDIS_ADDR string = os.Getenv("REDIS_ADDR")
+ REDIS_PASS string = os.Getenv("REDIS_PASS")
+ REDIS_PREFIX string = os.Getenv("REDIS_PREFIX")
+//Load up environment variables
+func LoadEnv(){
+ err := godotenv.Load()
+ if err != nil {
+ log.Fatal("Error loading .env file")
+ os.Exit(1)
+ }
+ BQ_TABLE_ID = os.Getenv("BQ_TABLE_ID")
+ DB_HOST = os.Getenv("PGHOST")
+ DB_PORT = os.Getenv("PGPORT")
+ DB_USER = os.Getenv("PGUSER")
+ DB_PASS = os.Getenv("PGPASSWORD")
+ DB_NAME = os.Getenv("PGDATABASE")
+ START_DAY = os.Getenv("START_DAY")
+ END_DAY = os.Getenv("END_DAY")
+ REDIS_ADDR = os.Getenv("REDIS_ADDR")
+ REDIS_PASS = os.Getenv("REDIS_PASS")
+//Verify executable command exists
+func CheckCmdExists(cmd string) {
+ path, err := exec.LookPath(cmd)
+ if err != nil {
+ log.Printf("::: OOPS ::: we didn't find "+cmd+" executable, kindly check.\n")
+ } else {
+ log.Printf("::: INIT OK ::: '%s'\n", path)
+ }
+//Generic command engine with output.
+func CmdEngine (c string,args string) {
+ LoadEnv()
+ log.Info("Executing...: "+ c +".")
+ cmd := exec.Command(c, args)
+ out, err := cmd.CombinedOutput()
+ if err != nil {
+ log.Errorf(c+" status :\n%s\n", string(out))
+ log.Fatalf("ERROR: "+c+" "+args+" failed with \n", err)
+ }
+//func NewCommander (c string,args string,args2 string){
+// // Disable output buffering, enable streaming
+// cmdOptions := cmd.Options{
+// Buffered: false,
+// Streaming: true,
+// }
+// // Create Cmd with options
+// envCmd := cmd.NewCmdOptions(cmdOptions, c,args,args2)
+// // Print STDOUT and STDERR lines streaming from Cmd
+// go func() {
+// for {
+// select {
+// case line := <-envCmd.Stdout:
+// log.Info(line)
+// case line := <-envCmd.Stderr:
+// log.Info(os.Stderr, line)
+// }
+// }
+// }()
+// // Run and wait for Cmd to return, discard Status
+// <-envCmd.Start()
+// // Cmd has finished but wait for goroutine to print all lines
+// for len(envCmd.Stdout) > 0 || len(envCmd.Stderr) > 0 {
+// time.Sleep(10 * time.Millisecond)
+// }
+func RedKeep(tableName string, lastSync string) (string){
+ Rclient := redis.NewClient(&redis.Options{
+ Password: REDIS_PASS,
+ DB: 0, // use default DB
+ })
+ redkey := REDIS_PREFIX+tableName
+ createState := "create"
+ syncBookmark := REDIS_PREFIX+tableName+"sync"
+ time := time.Now().String()
+ syncJobTime := time[0:19]
+ bookTime, err := Rclient.Get(syncBookmark).Result()
+ switch{
+ case err == redis.Nil:
+ log.Info("| REDKEEP BOOKMARK |: 404 - Bookmark "+syncBookmark+" not done..."+bookTime)
+ //Search and save
+ syncTime, err := Rclient.Get(redkey+"-"+createState).Result()
+ if err == redis.Nil {
+ log.Info("| REDKEEP BOOKMARK |: Table "+redkey+"-"+createState+" time sync does not exist..saving")
+ //Save Table record with state
+ Rerr := Rclient.Set(redkey+"-"+createState, lastSync, 0).Err()
+ if Rerr != nil {
+ PanicAndRecover("Chale We Hot! we could not save the track to redis.")
+ log.Fatal(Rerr)
+ return "nil"
+ }
+ savedTabletime, _:= Rclient.Get(redkey+"-"+createState).Result()
+ _= Rclient.Set(syncBookmark, syncJobTime, 0).Err()
+ return savedTabletime
+ } else if err != nil {
+ log.Fatal("| REDKEEP |:Something Went wrong - Check Redis.|\n "+REDIS_ADDR+" |\n "+REDIS_PASS+" |\n ",err)
+ return "nil"
+ } else {
+ jobTime,_ := Rclient.Get(syncBookmark).Result()
+ //redkeepUpdate(syncBookmark,syncJobTime[0:10])
+ log.Info("| REDKEEP BOOKMARK |: Time sync bookmark Found | ID: "+ syncBookmark +"| JOB TIME: "+syncJobTime +"::::|Last Record TIME: "+ syncTime)
+ return jobTime
+ }
+ case err != nil:
+ log.Fatal("| REDKEEP |:Something Went wrong - Check Redis.|\n "+REDIS_ADDR+" |\n "+REDIS_PASS+" |\n ",err)
+ return "nil"
+ default:
+ jobTime,err:= Rclient.Get(syncBookmark).Result()
+ if err != nil {
+ PanicAndRecover("Chale We Hot! we could not save the track to redis.")
+ log.Fatal(err)
+ return "nil"
+ }
+ log.Info("| REDKEEP UPDATE|: Time sync bookmark Found | ID: "+ syncBookmark +"| JOB TIME: "+fmt.Sprint(syncJobTime) )
+ redkeepUpdate(syncBookmark,syncJobTime)
+ return jobTime
+ }
+func redkeepUpdate (key string,value string){
+ Rclient := redis.NewClient(&redis.Options{
+ Password: REDIS_PASS,
+ DB: 0, // use default DB
+ })
+ log.Info("| REDKEEP UPDATE| KEY: "+ key +" | JOB TIME - "+value)
+ //Save Table record with state
+ Rerr := Rclient.Set(key, value, 0).Err()
+ if Rerr != nil {
+ PanicAndRecover("Chale We Hot! we could not save the track to redis.")
+ log.Fatal(Rerr)
+ //return "nil"
+ }
+ log.Info("| REDKEEP UPDATE | JOB TIME UPDATED TO :"+ value)
+ //savedTable, _:= Rclient.Get(redkey+"-"+updateState).Result()
+ //return savedTable,updateState
+func RedKeepPing() string{
+ Rclient := redis.NewClient(&redis.Options{
+ Password: REDIS_PASS,
+ DB: 0, // use default DB
+ })
+ pong, err := Rclient.Ping().Result()
+ if err != nil{
+ log.Warn("| REDKEEP-PING |: We cannot reach the RedKeep, check your redis connection.")
+ } else {
+ return pong
+ }
+ return ""
+func PanicAndRecover(message string) string{
+ defer func() {
+ if err := recover(); err != nil {
+ fmt.Println(err)
+ }
+ }()
+ panic(message)
+ dialect: postgres
+ database: goetl-postgres-bigquery_development
+ user: postgres
+ password: postgres
+ host:
+ pool: 5
+ url: {{envOr "TEST_DATABASE_URL" "postgres://postgres:postgres@"}}
+ url: {{envOr "DATABASE_URL" "postgres://postgres:postgres@"}}
\ No newline at end of file
+package main
+import (
+ "flag"
+ "fmt"
+ "github.com/masterxavierfox/goetl-postgres-bigquery/adapters"
+ "github.com/masterxavierfox/goetl-postgres-bigquery/config"
+ "os"
+ log "github.com/sirupsen/logrus"
+func main() {
+ //Load up environment variables
+ config.LoadEnv()
+ //Get parameters from cmd input
+ var (
+ tableName string
+ columnSelection string
+ startDate string
+ endDate string
+ daemon bool
+ )
+ //Runs flags for non-daemon mode
+ flag.StringVar(&tableName, "table", os.Getenv("TABLE_NAME"), "Usage: enter database table name to use e.g: -table billing.")
+ flag.StringVar(&columnSelection, "column", "*", "Usage: enter columns to select. e.g: -column * or -column created_at")
+ flag.BoolVar(&daemon,"daemon",false,"Usage: run as daemon. e.g: -daemon true ")
+ //Runs flags for daemon mode
+ flag.StringVar(&startDate, "start", os.Getenv("START_EXPORT_DATE"), "Usage: enter export start date. e.g: -start 2019-01-01 ")
+ flag.StringVar(&endDate, "end", os.Getenv("END_EXPORT_DATE"), "Usage: enter export end date. e.g: -end 2019-01-01 ")
+ //Parse the flags
+ flag.Parse()
+ //Check the flags
+ switch{
+ case daemon == true && startDate != "" && endDate != "" && tableName != "":
+ log.Println("::: DAEMON JOB ::: ==================================== STARTING RAD PG-TO-BIGQUERY ETL COMMANDER ========================================")
+ adapters.EtlJobDecider(daemon,tableName,columnSelection,startDate,endDate)
+ case daemon != true && tableName != "" && columnSelection != "" && startDate != "":
+ log.Info("::: MANUAL JOB ::: ==================================== STARTING RAD PG-TO-BIGQUERY ETL COMMANDER ========================================: \n -::Daemon | ",daemon,"\n -::Table Name | ",tableName,"\n -::Columns Selected | ",columnSelection,"\n -::Date Start | ",startDate,"\n -::Date End | ",endDate,"\n -::Trailing | ",flag.Args())
+ adapters.EtlJobDecider(daemon,tableName,columnSelection,startDate,endDate)
+ default:
+ fmt.Println(config.PG2BQ_ASCII_LOGO)
+ fmt.Println("=============== Eish! mbaba!, Please give us some flags see below: ===============")
+ flag.PrintDefaults()
+ os.Exit(1)
+ }