-
Notifications
You must be signed in to change notification settings - Fork 16
/
Copy pathDMSCDC_LoadInitial.py
38 lines (31 loc) · 1.17 KB
/
DMSCDC_LoadInitial.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import sys
from awsglue.job import Job
from pyspark.context import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
args = getResolvedOptions(sys.argv, [
'JOB_NAME',
'bucket',
'prefix',
'folder',
'out_path',
'partitionKey'])
sparkContext = SparkContext.getOrCreate()
glueContext = GlueContext(sparkContext)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
s3_inputpath = 's3://' + args['bucket'] + '/' + args['prefix']
s3_outputpath = 's3://' + args['out_path'] + args['folder']
input = spark.read.parquet(s3_inputpath+"/LOAD*.parquet").withColumn("Op", lit("I"))
partition_keys = args['partitionKey']
if partition_keys != "null" :
partitionKeys = partition_keys.split(",")
partitionCount = input.select(partitionKeys).distinct().count()
input.repartition(partitionCount,partitionKeys).write.mode('overwrite').partitionBy(partitionKeys).parquet(s3_outputpath)
else:
input.write.mode('overwrite').parquet(s3_outputpath)
job.commit()