Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Google Drive - Presentation to Markdown / Audio|Video Transcribed (10MB Limit) #3075

Closed
wants to merge 34 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
7995e86
[8.15] Replace task.name with task.get_name() (#2690) (#2692)
github-actions[bot] Jul 9, 2024
c989ca9
[8.15] [PostgreSQL] Add new field id_columns in sync rules to generat…
github-actions[bot] Jul 10, 2024
1b1a3ff
[8.15] Bump certifi from 2023.7.22 to 2024.7.4 in /requirements (#269…
github-actions[bot] Jul 11, 2024
716dbe0
[8.15] [DOCS] Update connectors overview table for 8.15 (#2712) (#2713)
github-actions[bot] Jul 17, 2024
0100063
[8.15] Cancel and fail downloads if exception happened (#2710) (#2715)
github-actions[bot] Jul 17, 2024
02fed56
[8.15] Add _id to failed bulk logs (#2735) (#2736)
navarone-feekery Aug 6, 2024
5c38529
[8.15] Pin docker.elastic.co/wolfi/python Docker tag to 609e3fd (#272…
github-actions[bot] Aug 7, 2024
4e04344
Bump version to 8.15.0.1 (#2741)
efegurkan Aug 8, 2024
65c121f
[8.15] Bump msal to 1.30.0 (#2740) (#2742)
github-actions[bot] Aug 8, 2024
0817982
[8.15] [MSSQL] Fix serialization to parse time object (#2717) (#2737)
github-actions[bot] Aug 9, 2024
4af9638
[8.15][Github] Fix Rate limit error #2711 (#2746)
moxarth-rathod Aug 13, 2024
488883b
[8.15] bump aiohttp for dependabot (#2755) (#2756)
github-actions[bot] Aug 19, 2024
b7aa9dd
Update VERSION to 8.15.1.0 for 8.15.1 FF (#2788)
seanstory Aug 29, 2024
e35de56
bump version after 8.15.1 release (#2803)
seanstory Sep 5, 2024
712f21b
[8.15] Forcefully downgrade pyasn1 to 0.6.0 (#2817) (#2820)
github-actions[bot] Sep 13, 2024
ebd5a43
Bump 8.15 Branch Version to 8.15.2.0 (#2842)
markjhoy Sep 19, 2024
3ee23f1
[8.15][Network Drive] fix duplicate ids in access control sync #2832 …
moxarth-rathod Sep 25, 2024
f5a8831
Bump 8.15 Branch to 8.15.3.0 (#2854)
markjhoy Sep 26, 2024
16a683e
[8.15] Unify smbprotocol deps and bump to latest (#2852) (#2856)
github-actions[bot] Sep 30, 2024
666c2e6
[8.15] include full stack trace in concurrent exception (#2869) (#2871)
github-actions[bot] Oct 8, 2024
4119c26
Bump 8.15 branch to 8.15.4.0 (#2874)
TattdCodeMonkey Oct 11, 2024
6fccd85
bump 8.15 to 8.15.3.1 for release (#2897)
TattdCodeMonkey Oct 17, 2024
652eb69
[8.15] Properly display stacktrace for errors in extractor/sink tasks…
github-actions[bot] Nov 4, 2024
825c987
Bump VERSION to 8.15.4.0 (#2949)
artem-shelkovnikov Nov 12, 2024
01a7225
Bump to 8.15.4.1 (#2952)
jedrazb Nov 12, 2024
8926670
Remove MSSQL nightly functional tests from 8.15 for aarch64 (#2948)
artem-shelkovnikov Nov 13, 2024
67a575c
[8.15] [outlook] Filter users to only include active ones (#2967) (#2…
github-actions[bot] Nov 21, 2024
b235087
bump version to 8.15.5.0 (#2999)
joemcelroy Nov 21, 2024
fd1ddd2
[8.15] Change dto for page blog documents to access fields in safe ma…
artem-shelkovnikov Nov 22, 2024
66549ff
bump to 8.15.5.1 (#3003)
joemcelroy Nov 26, 2024
cc7513e
Google Drive Custom
that-dom Dec 23, 2024
c729ff8
Removed try/exception to support @retryable
that-dom Dec 23, 2024
ba700c7
Updated dockerfile for security
that-dom Dec 23, 2024
a889357
OpenAI lib and black formatting
that-dom Dec 23, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .buildkite/nightly_steps.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ steps:
- label: "🔨 Microsoft SQL"
command:
- ".buildkite/run_nigthly.sh mssql"
env:
SKIP_AARCH64: "true"
artifact_paths:
- "perf8-report-*/**/*"
- label: "🔨 Jira"
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.ftest.wolfi
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM docker.elastic.co/wolfi/python:3.10-dev
FROM docker.elastic.co/wolfi/python:3.10-dev@sha256:609e3fdb2c2a219941a6ea1e81f935899d05a787af634e4d47a3a3ec9a6d3379
USER root
COPY . /connectors
WORKDIR /connectors
Expand Down
2 changes: 1 addition & 1 deletion connectors/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
8.15.0.0
8.15.5.1
26 changes: 19 additions & 7 deletions connectors/es/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,9 @@ async def _bulk_api_call():
for item in res["items"]:
for op, data in item.items():
if "error" in data:
self._logger.error(f"operation {op} failed, {data['error']}")
self._logger.error(
f"operation {op} failed for doc {data['_id']}, {data['error']}"
)

self._populate_stats(stats, res)

Expand Down Expand Up @@ -598,8 +600,18 @@ async def get_docs(self, generator, skip_unchanged_documents=False):
"doc": doc,
}
)
# We try raising every loop to not miss a moment when
# too many errors happened when downloading
lazy_downloads.raise_any_exception()

await asyncio.sleep(0)

# Sit and wait until an error happens
await lazy_downloads.join(raise_on_error=True)
except Exception as ex:
self._logger.error(f"Extractor failed with an error: {ex}")
lazy_downloads.cancel()
raise
finally:
# wait for all downloads to be finished
await lazy_downloads.join()
Expand Down Expand Up @@ -883,9 +895,7 @@ def _extractor_task_running(self):

async def cancel(self):
if self._sink_task_running():
self._logger.info(
f"Cancling the Sink task: {self._sink_task.name}" # pyright: ignore
)
self._logger.info(f"Canceling the Sink task: {self._sink_task.get_name()}")
self._sink_task.cancel()
else:
self._logger.debug(
Expand All @@ -894,7 +904,7 @@ async def cancel(self):

if self._extractor_task_running():
self._logger.info(
f"Canceling the Extractor task: {self._extractor_task.name}" # pyright: ignore
f"Canceling the Extractor task: {self._extractor_task.get_name()}"
)
self._extractor_task.cancel()
else:
Expand Down Expand Up @@ -1042,13 +1052,15 @@ async def async_bulk(
def sink_task_callback(self, task):
if task.exception():
self._logger.error(
f"Encountered an error in the sync's {type(self._sink).__name__}: {task.get_name()}: {task.exception()}",
f"Encountered an error in the sync's {type(self._sink).__name__}: {task.get_name()}",
exc_info=task.exception(),
)
self.error = task.exception()

def extractor_task_callback(self, task):
if task.exception():
self._logger.error(
f"Encountered an error in the sync's {type(self._extractor).__name__}: {task.get_name()}: {task.exception()}",
f"Encountered an error in the sync's {type(self._extractor).__name__}: {task.get_name()}",
exc_info=task.exception(),
)
self.error = task.exception()
4 changes: 2 additions & 2 deletions connectors/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import importlib
import re
from contextlib import asynccontextmanager
from datetime import date, datetime
from datetime import date, datetime, time
from decimal import Decimal
from enum import Enum
from functools import cache
Expand Down Expand Up @@ -670,7 +670,7 @@ def _serialize(value):
elif isinstance(value, dict):
for key, svalue in value.items():
value[key] = _serialize(svalue)
elif isinstance(value, (datetime, date)):
elif isinstance(value, (datetime, date, time)):
value = value.isoformat()
elif isinstance(value, Decimal128):
value = value.to_decimal()
Expand Down
16 changes: 11 additions & 5 deletions connectors/sources/confluence.py
Original file line number Diff line number Diff line change
Expand Up @@ -895,14 +895,20 @@ async def fetch_documents(self, api_query):
doc = {
"_id": str(document["id"]),
"type": document["type"],
"_timestamp": document["history"]["lastUpdated"]["when"],
"_timestamp": nested_get_from_dict(
document, ["history", "lastUpdated", "when"]
),
"title": document.get("title"),
"ancestors": ancestor_title,
"space": document["space"]["name"],
"body": document["body"]["storage"]["value"],
"space": nested_get_from_dict(document, ["space", "name"]),
"body": nested_get_from_dict(document, ["body", "storage", "value"]),
"url": document_url,
"author": document["history"]["createdBy"][self.authorkey],
"createdDate": document["history"]["createdDate"],
"author": nested_get_from_dict(
document, ["history", "createdBy", self.authorkey]
),
"createdDate": nested_get_from_dict(
document, ["history", "createdDate"]
),
}
if self.confluence_client.index_labels:
doc["labels"] = document["labels"]
Expand Down
87 changes: 62 additions & 25 deletions connectors/sources/github.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import aiohttp
import fastjsonschema
from aiohttp.client_exceptions import ClientResponseError
from gidgethub import RateLimitExceeded, sansio
from gidgethub import QueryError, RateLimitExceeded, sansio
from gidgethub.abc import (
BadGraphQLRequest,
GraphQLAuthorizationFailure,
Expand Down Expand Up @@ -52,6 +52,7 @@
RETRIES = 3
RETRY_INTERVAL = 2
FORBIDDEN = 403
UNAUTHORIZED = 401
NODE_SIZE = 100
REVIEWS_COUNT = 45

Expand Down Expand Up @@ -673,14 +674,12 @@ def __init__(
def set_logger(self, logger_):
self._logger = logger_

def get_rate_limit_encountered(self, status_code, message):
return status_code == FORBIDDEN and "rate limit" in str(message).lower()
def get_rate_limit_encountered(self, status_code, rate_limit_remaining):
return status_code == FORBIDDEN and not int(rate_limit_remaining)

async def _get_retry_after(self, resource_type):
current_time = time.time()
response = await self._get_client.getitem(
"/rate_limit", oauth_token=self._access_token()
)
response = await self.get_github_item("/rate_limit")
reset = nested_get_from_dict(
response, ["resources", resource_type, "reset"], default=current_time
)
Expand Down Expand Up @@ -725,6 +724,8 @@ async def _update_installation_access_token(self):
private_key=self.private_key,
)
self._installation_access_token = access_token_response["token"]
except RateLimitExceeded:
await self._put_to_sleep("core")
except Exception:
self._logger.exception(
f"Failed to get access token for installation {self._installation_id}.",
Expand Down Expand Up @@ -789,11 +790,20 @@ async def graphql(self, query, variables=None):
msg = "Your Github token is either expired or revoked. Please check again."
raise UnauthorizedException(msg) from exception
except BadGraphQLRequest as exception:
if self.get_rate_limit_encountered(exception.status_code, exception):
await self._put_to_sleep(resource_type="graphql")
elif exception.status == FORBIDDEN:
if exception.status_code == FORBIDDEN:
msg = f"Provided GitHub token does not have the necessary permissions to perform the request for the URL: {url} and query: {query}."
raise ForbiddenException(msg) from exception
else:
raise
except QueryError as exception:
for error in exception.response.get("errors"):
if (
error.get("type").lower() == "rate_limited"
and "api rate limit exceeded" in error.get("message").lower()
):
await self._put_to_sleep(resource_type="graphql")
msg = f"Error while executing query. Exception: {exception.response.get('errors')}"
raise Exception(msg) from exception
except Exception:
raise

Expand All @@ -819,7 +829,7 @@ async def get_github_item(self, resource):
url=resource, oauth_token=self._access_token()
)
except ClientResponseError as exception:
if exception.status == 401:
if exception.status == UNAUTHORIZED:
if self.auth_method == GITHUB_APP:
self._logger.debug(
f"The access token for installation #{self._installation_id} expired, Regenerating a new token."
Expand All @@ -828,6 +838,10 @@ async def get_github_item(self, resource):
raise
msg = "Your Github token is either expired or revoked. Please check again."
raise UnauthorizedException(msg) from exception
elif self.get_rate_limit_encountered(
exception.status, exception.headers.get("X-RateLimit-Remaining")
):
await self._put_to_sleep(resource_type="core")
elif exception.status == FORBIDDEN:
msg = f"Provided GitHub token does not have the necessary permissions to perform the request for the URL: {resource}."
raise ForbiddenException(msg) from exception
Expand Down Expand Up @@ -861,20 +875,44 @@ async def paginated_api_call(self, query, variables, keys):
def get_repo_details(self, repo_name):
return repo_name.split("/")

@retryable(
retries=RETRIES,
interval=RETRY_INTERVAL,
strategy=RetryStrategy.EXPONENTIAL_BACKOFF,
skipped_exceptions=UnauthorizedException,
)
async def get_personal_access_token_scopes(self):
request_headers = sansio.create_headers(
self._get_client.requester,
accept=sansio.accept_format(),
oauth_token=self._access_token(),
)
_, headers, _ = await self._get_client._request(
"HEAD", self.base_url, request_headers
)
scopes = headers.get("X-OAuth-Scopes")
if not scopes or not scopes.strip():
self._logger.warning(f"Couldn't find 'X-OAuth-Scopes' in headers {headers}")
return set()
return {scope.strip() for scope in scopes.split(",")}
try:
request_headers = sansio.create_headers(
self._get_client.requester,
accept=sansio.accept_format(),
oauth_token=self._access_token(),
)
url = f"{self.base_url}/graphql"
_, headers, _ = await self._get_client._request(
"HEAD", url, request_headers
)
scopes = headers.get("X-OAuth-Scopes")
if not scopes or not scopes.strip():
self._logger.warning(
f"Couldn't find 'X-OAuth-Scopes' in headers {headers}"
)
return set()
return {scope.strip() for scope in scopes.split(",")}
except ClientResponseError as exception:
if exception.status == FORBIDDEN:
if self.get_rate_limit_encountered(
exception.status, exception.headers.get("X-RateLimit-Remaining")
):
await self._put_to_sleep("graphql")
else:
msg = f"Provided GitHub token does not have the necessary permissions to perform the request for the URL: {self.base_url}."
raise ForbiddenException(msg) from exception
elif exception.status == UNAUTHORIZED:
msg = "Your Github token is either expired or revoked. Please check again."
raise UnauthorizedException(msg) from exception
else:
raise

@retryable(
retries=RETRIES,
Expand Down Expand Up @@ -1279,7 +1317,6 @@ async def _get_invalid_repos_for_github_app(self):
self.configured_repos,
)
)

for full_repo_name in self.configured_repos:
if full_repo_name in invalid_repos:
continue
Expand Down Expand Up @@ -1432,7 +1469,7 @@ async def _validate_personal_access_token_scopes(self):
if self.configuration["auth_method"] != PERSONAL_ACCESS_TOKEN:
return

scopes = await self.github_client.get_personal_access_token_scopes()
scopes = await self.github_client.get_personal_access_token_scopes() or set()
required_scopes = {"repo", "user", "read:org"}

for scope in ["write:org", "admin:org"]:
Expand Down
37 changes: 37 additions & 0 deletions connectors/sources/google.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
#
import json
import os
import urllib.parse
from enum import Enum

from aiogoogle import Aiogoogle, AuthError, HTTPError
from aiogoogle.auth.creds import ServiceAccountCreds
from aiogoogle.models import Request
from aiogoogle.sessions.aiohttp_session import AiohttpSession

from connectors.logger import logger
Expand Down Expand Up @@ -192,6 +194,41 @@ async def _call_api(google_client, method_object, kwargs):

return await anext(self._execute_api_call(resource, method, _call_api, kwargs))

async def api_call_custom(self, api_service, url_path, params=None, method="GET"):
"""
Make a custom API call to any Google API endpoint by specifying only the service name,
along with the path and parameters.

Args:
api_service (str): The Google service to use (e.g., "slides" or "drive").
url_path (str): The specific path for the API call (e.g., "/presentations/.../thumbnail").
params (dict): Optional dictionary of query parameters.
method (str): HTTP method for the request. Default is "GET".

Returns:
dict: The response from the Google API.

Raises:
HTTPError: If the API request fails.
"""
# Construct the full URL with the specified Google service
base_url = f"https://{api_service}.googleapis.com/v1"
full_url = f"{base_url}{url_path}"

if params:
full_url += f"?{urllib.parse.urlencode(params)}"

# Define the request
request = Request(method=method, url=full_url)

# Perform the request
async with Aiogoogle(
service_account_creds=self.service_account_credentials
) as google_client:
response = await google_client.as_service_account(request)

return response

async def _execute_api_call(self, resource, method, call_api_func, kwargs):
"""Execute the API call with common try/except logic.
Args:
Expand Down
Loading
Loading