This repository has been archived by the owner on May 2, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathmain.go
98 lines (83 loc) · 3.09 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package main
import (
"fmt"
"github.com/uswitch/bqshift/bigquery"
"github.com/uswitch/bqshift/redshift"
"gopkg.in/alecthomas/kingpin.v2"
"log"
"time"
)
var (
config = kingpin.Flag("config", "Configuration file with S3 and Redshift credentials").Required().File()
accessKey = kingpin.Flag("access-key", "AWS access key. Defaults to $AWS_ACCESS_KEY_ID").OverrideDefaultFromEnvar("AWS_ACCESS_KEY_ID").Required().String()
secretKey = kingpin.Flag("secret-access-key", "AWS secret access key. Defaults to $AWS_SECRET_").OverrideDefaultFromEnvar("AWS_SECRET_ACCESS_KEY").Required().String()
project = kingpin.Flag("project", "Google Project ID").OverrideDefaultFromEnvar("GCLOUD_PROJECT").Required().String()
overwrite = kingpin.Flag("overwrite", "Overwrite BigQuery table").Bool()
usePartitionedTables = kingpin.Flag("partition", "Create time partitioned BigQuery tables.").Bool()
dateExpression = kingpin.Flag("date-expression", "Redshift SQL expression to return row date. e.g. CAST(inserted as DATE)").String()
dateFilter = kingpin.Flag("date", "Date (YYYY-MM-DD) of partition to filter and load. e.g. 2016-09-30.").String()
where = kingpin.Flag("where", "Redshift WHERE clause. Cannot be used with --date/--date-expression. e.g.: CAST(inserted as DATE) < 2016-09-30").String()
dataset = kingpin.Arg("dataset", "Destination BigQuery dataset").Required().String()
table = kingpin.Arg("table", "Redshift table name").Required().String()
destinationTable = kingpin.Arg("destination-table", "BigQuery table name. Defaults to Redshift table name").String()
)
var versionNumber string
var sha string
func version() string {
if versionNumber == "" {
return "DEVELOPMENT"
}
return fmt.Sprintf("%s (%s)", versionNumber, sha)
}
func partitionFromArgs() (*redshift.DatePartition, error) {
if *dateExpression == "" {
return nil, nil
}
if *dateFilter == "" {
return nil, nil
}
t, err := time.Parse("2006-01-02", *dateFilter)
if err != nil {
return nil, err
}
return redshift.NewDatePartition(*dateExpression, t), nil
}
func destinationTableID() string {
if *destinationTable == "" {
return *table
}
return *destinationTable
}
func main() {
kingpin.Version(version())
kingpin.Parse()
awsConfig, err := redshift.ParseAWSConfiguration(*config)
if err != nil {
log.Fatalln("error parsing redshift configuration:", err.Error())
}
awsConfig.S3.AccessKey = *accessKey
awsConfig.S3.SecretKey = *secretKey
config := &Configuration{
AWS: awsConfig,
OverwriteBigQuery: *overwrite,
DayPartition: *usePartitionedTables,
WhereClause: *where,
}
shifter, err := NewShifter(config)
if err != nil {
log.Fatalln(err.Error())
}
bq := bigquery.NewTableReference(*project, *dataset, destinationTableID())
partition, err := partitionFromArgs()
if err != nil {
log.Fatalln(err.Error())
}
if partition != nil {
bq.DayPartition = &partition.DateFilter
}
err = shifter.Run(*table, partition, bq)
if err != nil {
log.Fatalln(err.Error())
}
log.Println("finished")
}