-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsharepointFilesToAzureDataLake
159 lines (142 loc) · 9.04 KB
/
sharepointFilesToAzureDataLake
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import sys
import os
import logging
import azure.functions as func
from office365.runtime.auth.authentication_context import AuthenticationContext
from office365.sharepoint.client_context import ClientContext
from office365.sharepoint.files.file import File
from azure.storage.filedatalake import DataLakeFileClient, DataLakeServiceClient
from azure.core.exceptions import ResourceNotFoundError
import datetime
from notebookutils import mssparkutils
from datetime import date, datetime
# Disse bliver overskrevet af parametrene sat i Pipelinen. Disse parametre er sat her således at man kan teste her i notebook
Environment = 'dev'
LinkedService_name = 'YourLinkedServiceNameinSynapse'
storage_account = 'StorageAccount-{}'.format(Environment)
account_url = 'https://{}.dfs.core.windows.net'.format(storage_account)
container_name = '{YourContainerNameFromDataLake}'
directory_name = '/Folder/Subfolder/SubSubFolder/WhereYouWantYourFiles'
file_name = 'LastModified.csv'
directory_name_LastModified_csv = 'Folder/Subfolder/SubSubfolder/WhereYourFileisToControlTheIncrementalLoad'
site_url = 'https://{Yourcompany}.sharepoint.com/sites/Something/Something'
list_name = '/sites/Something/SomethingMore/TheSiteWhereYourFilesAre'
# Create a Linked service in Synapse and put the name of it in the parameters above.
The names on the right side is the name of your different secrets inside Keyvault. #Im using a spark feature/command. but you can refer to you keyvault in another way if you want. this is just my choice. you cannot use this method if you dont have a spark engine
The name on the left side is the name of your linked Service you have created to you keyvault instance/ressource
# Vi bruger Managed identity i den oprettede Linked Service i Synapse til at authenticate til Key Vault.
client_id = mssparkutils.credentials.getSecretWithLS(LinkedService_name,'ClientId')
client_secret = mssparkutils.credentials.getSecretWithLS(LinkedService_name,'ClientSecret')
tenant_id = mssparkutils.credentials.getSecretWithLS(LinkedService_name,'TenantId')
access_key = mssparkutils.credentials.getSecretWithLS(LinkedService_name,'DatalakeAccessKey')
connection_string = mssparkutils.credentials.getSecretWithLS(LinkedService_name,'ConnectionString')
# Authenticater til Data Lake og skaber forbindelse til stien sat i parameter sektionen
#Authenticating to Data Lake
try:
logging.info('Connecting to Data Lake: {}'.format(account_url))
print('Connecting to Data Lake: {}'.format(account_url))
service_client = DataLakeServiceClient(account_url=account_url, credential=access_key)
file_system_client = service_client.get_file_system_client(file_system=container_name)
directory_client = file_system_client.get_directory_client(directory_name)
logging.info('Connected to Data Lake: {}'.format(account_url) + container_name + directory_name)
print('Connected to Data Lake: {}'.format(account_url))
except Exception as e:
print("Error occurred: ", e)
sys.stdout.flush()
logging.error("Error occurred: {}".format(str(e)))
try:
# Authenticater til SharePoint
logging.info('Connecting to SharePoint: {}'.format(site_url) + list_name)
print('Connecting to SharePoint: {}'.format(site_url) + list_name)
sys.stdout.flush()
ctx_auth = AuthenticationContext(url=site_url)
ctx_auth.acquire_token_for_app(client_id, client_secret)
ctx = ClientContext(site_url, ctx_auth)
web = ctx.web
ctx.load(web)
ctx.execute_query()
logging.info('Connected to SharePoint: {}'.format(web.properties['Title']))
print('Connected to SharePoint: ',web.properties['Title'], flush=True)
sys.stdout.flush()
except Exception as e:
logging.error("Error occurred: {}".format(str(e)))
print("Error occurred: ", e)
sys.stdout.flush()
####This is for the incremental load so we only get the newest files### It is just a csv that it reads a datetime string from. and in the end of the entire script and when it is successful, it writes a new timestamp.
#It reads last modified datetime from the folder where the csv file is so that it only gets newest files. If there is no file already or it is the first time you are running this script it will use the hardcoded date in the except part. But the second time you run the script it will not go to the exception part. Remember: The new date stamp is set in the end of this script below
# Læser csv med LastModified datetime.
try:
directory_client_lastModfied = file_system_client.get_directory_client(directory_name_LastModified_csv)
last_modified_date_file = directory_client_lastModfied.get_file_client(file_name)
read_csv = last_modified_date_file.download_file().readall().decode('utf-8')
except:
read_csv = '2000-01-01T11:45:15Z'
###Whatever happened above, it will take the date from the csv part (it does not care whether it came from the try part or except part.
This variable "last_modified_datetime" is used to filter the sharepoint list in the next part
last_modified_datetime = datetime.fromisoformat(read_csv[:-1])
print(last_modified_datetime)
# Filtrerer Sharepoint listen på baggrund af vores læste LastModified datetime
logging.info('Getting Sharepoint files...')
folder = ctx.web.get_folder_by_server_relative_url(list_name)
# Filtrerer Sharepoint listen på baggrund af vores læste LastModified datetime fra forrige step
files = folder.files.filter("Name ne null and TimeLastModified ge datetime'{}'".format(last_modified_datetime.strftime('%Y-%m-%dT%H:%M:%SZ'))).top(5000).order_by('TimeLastModified desc')
ctx.load(files)
ctx.execute_query()
logging.info("Number of files: {}".format(len(files)))
print("Number of files to get in Sharepoint: {}".format(len(files)), flush=True)
sys.stdout.flush()
# for file in files: #this is just to see the file name. it might help you to check if it is doing the right thing and taking the right files
# print(file.properties["TimeLastModified"]) #this is just to see the file name. it might help you to check if it is doing the right thing and taking the right files
##exiting if there is no new files in the sharepoint list
if len(files) == 0:
print('No new files in Sharepoint. The job is stopped')
sys.stdout.flush()
mssparkutils.session.stop() #Im using notebooks and therefore some Spark Features. if you are using just regular python. you might want to use sys.exit or something similar
else:
pass
today = date.today()
folder_name = today.strftime("%Y-%m-%d") # Format as "YYYY-MM-DD"
print(folder_name)
current_date = datetime.now()
year = str(current_date.year)
month = current_date.strftime("%B")
day = today.strftime("%Y-%m-%d")
print(year,month,day)
folder_path = f"{directory_name}/{year}/{month}/{day}"
directory_client = file_system_client.get_directory_client(folder_path)
if not file_system_client.get_directory_client(folder_path).exists():
directory_client.create_directory()
try:
file_counter = 0
# For loop gennem hentede filer i seneste step og derefter upload til data lake
logging.info('Starting writing to data lake: ' + storage_account + container_name + directory_name)
for file in files:
# Henter filerne én for én dynamisk fra en folder
sharepoint_file = list_name + '/' + file.properties["Name"]
print(sharepoint_file)
sys.stdout.flush()
file_response = File.open_binary(ctx, sharepoint_file)
# Uploader i Datalake
response_content = file_response.content
file_path = f"{folder_path}/{file.properties['Name']}"
file_client = DataLakeFileClient.from_connection_string(connection_string, file_system_name=container_name, file_path=file_path)
file_client.upload_data(response_content, overwrite=True)
file_counter += 1
logging.info("Processed {} out of {} files.".format(file_counter, len(files)))
print("Processed {} out of {} files.".format(file_counter, len(files)), flush=True)
sys.stdout.flush()
except Exception as e:
mssparkutils.session.stop()
# Ellers hvis antallet af filer i indsat i data lake er lig med(=) totale filer hentet fra sharepoint
if file_counter == len(files):
newest_file = files.top(1)
time_modified = newest_file[0].properties['TimeLastModified']
data = time_modified.encode('utf-8')
# Overskriver csv med minimum datoen hentet fra kildesystem: Sharepoint
last_modified_date_file.upload_data(data, overwrite=True)
print("Processed {} out of {} files.".format(file_counter, len(files)), flush=True)
print('New Last modfied date in LastModfiedLog.csv : ', time_modified)
sys.stdout.flush()
else:
print('ERROR: Only processed {} out of {} files. Start the job again'.format(file_counter, len(files)))
sys.stdout.flush()