Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new video_service_usage advanced example #436

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 112 additions & 0 deletions examples/advanced_usage/video_service_usage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import random
import string
from pathlib import Path
from time import sleep, time

from atproto import Client, models
from atproto.exceptions import RequestErrorBase
from atproto_client.models.dot_dict import DotDict
from atproto_client.models.utils import get_or_create

# Comment from the author (@MarshalX):
# I did not expect that using the video service would be so complicated.
# It is out of atproto scope sometimes. For example, GET params are not described in the lexicon at all...
# The "Video processed successfully" responses with 4xx status codes are a bit strange too.
# Random filename generation on the client side, etc.


_FINISHED_STATES = ['JOB_STATE_COMPLETED', 'JOB_STATE_FAILED']


def _generate_random_filename(file_path: str) -> str:
# Get the file extension
file_extension = Path(file_path).suffix

# Generate a random filename
random_name = ''.join(random.choices(string.ascii_letters + string.digits, k=10)) # noqa: S311

# Combine the random name with the same extension
return f'{random_name}{file_extension}'


def main() -> None:
client = Client() # create a default client to get service tokens
client.login('my-handle', 'my-password')

# replace it with your own data
post_text = 'Post with video from Python'
video_path = 'video.mp4'
video_alt = 'Text version of the video (ALT)'
video_upload_timeout_sec = 60

with open(video_path, 'rb') as f:
video_data = f.read()

video_client = Client('https://video.bsky.app') # create a client for video service

print('Getting upload limits...')
upload_limits_service_token = client.com.atproto.server.get_service_auth(
models.ComAtprotoServerGetServiceAuth.Params(
aud='did:web:video.bsky.app',
lxm='app.bsky.video.getUploadLimits',
)
).token
video_client._set_auth_headers(upload_limits_service_token) # set token to get upload limits

upload_limits = video_client.app.bsky.video.get_upload_limits()
print('Video upload limits:', upload_limits)

print('Uploading video...')
upload_video_service_token = client.com.atproto.server.get_service_auth(
models.ComAtprotoServerGetServiceAuth.Params(
aud=client._access_jwt_payload.aud,
lxm='com.atproto.repo.uploadBlob',
)
).token
video_client._set_auth_headers(upload_video_service_token) # set token to upload videos

try:
video_upload_params = DotDict({'did': client.me.did, 'name': _generate_random_filename(video_path)})
job_status = video_client.app.bsky.video.upload_video(
video_data, params=video_upload_params, timeout=video_upload_timeout_sec
).job_status
print('Video processing job has been started:', job_status)

# wait until the video processing job is finished
while job_status.state not in _FINISHED_STATES:
job_status = video_client.app.bsky.video.get_job_status(
models.AppBskyVideoGetJobStatus.Params(job_id=job_status.job_id)
).job_status
print(f'[{time()}] Job state: {job_status.state}; progress: {job_status.progress}%')
sleep(1)
except RequestErrorBase as err:
if err.response is None or not isinstance(err.response.content, DotDict):
# if the response is not a job status, then raise the error
raise err

# for example, it returns 409 status code if the video already processed (cached)
job_status = get_or_create(err.response.content.to_dict(), models.AppBskyVideoDefs.JobStatus)
print('Video processing caught:', job_status)

# but somehow it does not return enriched job status... so we need to refetch proper job status again...
if job_status.state in _FINISHED_STATES:
job_status = video_client.app.bsky.video.get_job_status(
models.AppBskyVideoGetJobStatus.Params(job_id=job_status.job_id)
).job_status

print('Full job status:', job_status)

blob = job_status.blob
if not blob:
print('Video processing failed (blob was not created):', job_status.error)
return

print(job_status.message)

embed_video = models.AppBskyEmbedVideo.Main(video=blob, alt=video_alt)
created_post = client.send_post(post_text, embed=embed_video)
print('Post with video has been created:', created_post.uri)


if __name__ == '__main__':
main()
Loading