You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I've been playing around with simple cli-based clients, this is what I've come up with to monitor the timeline for new posts and it works but seems pretty inefficient as it is polling based. I've also tried wading through the bsky api docs but nothing is jumping out as an obvious and better alternative.
If this really is the right pattern, it might be nice to eventually have some of this logic bundled into the atproto client :)
importasyncioimporttextwrapfromhtmlimportescapeimportosfromdotenvimportload_dotenvfromdatetimeimportdatetimeimporthumanizefromatprotoimportAsyncClientload_dotenv()
FETCH_NOTIFICATIONS_DELAY_SEC=5defprocess_post(post, seen_posts):
ifpost.cidnotinseen_posts:
author=post.author.display_nameorpost.author.handlecreated_at=post.indexed_attimestamp=datetime.fromisoformat(created_at.replace('Z', '+00:00'))
age=humanize.naturaltime(datetime.now().astimezone() -timestamp)
text=textwrap.fill(post.record.text, width=70)
text=escape(text)
print(f"\n\033[94m[{created_at}] ({age}) \033[92m{author}\033[0m ({post.author.did}):\n")
print(text)
seen_posts.add(post.cid)
returnTruereturnFalseasyncdefmain() ->None:
client=AsyncClient()
awaitclient.login(os.getenv('BSKY_HANDLE'), os.getenv('BSKY_APP_PASSWORD'))
print("Monitoring timeline for new posts...")
seen_posts=set()
cursor=Noneoldest=NonewhileTrue:
try:
timeline=awaitclient.get_timeline(limit=1, cursor=cursor)
print(f"Got {len(timeline.feed)} new posts")
ifnottimeline.feedandnotseen_posts:
cursor=timeline.cursor# startup go till we get a post since muted posts count in the limit but aren't returnedcontinueforfvintimeline.feed:
ifprocess_post(fv.post, seen_posts):
cursor=timeline.cursorelse:
cursor=Noneat=datetime.fromisoformat(fv.post.indexed_at.replace('Z', '+00:00'))
ifoldestisNoneorat<oldest:
oldest=atcursor=None# never page olderexceptExceptionase:
print(f"Error: {e}")
cursor=Noneawaitasyncio.sleep(10)
continueawaitasyncio.sleep(FETCH_NOTIFICATIONS_DELAY_SEC)
if__name__=='__main__':
asyncio.run(main())
The text was updated successfully, but these errors were encountered:
Hi Jer! If the timeline is just a simple list of posts where the posts are filtered by a list of your followings and sorted by timestamp, then it could be more optimal to re-implement this logic and listen to firehose events of new posts. And do your filtering by following manually to populate your timeline
I started thinking down that path of using the firehose, but it got a bit more complicated faster than I was ready for. Right now the getTimeline nicely fully hydrates every post and automatically takes care of the mutes (though not without causing oddities w/ the limit parameter handling).
It would be ideal if the bsky app API's getTimeline endpoint accepted algo=chronological and a cursor, but I don't think it does (or I was doing it wrong).
I've been playing around with simple cli-based clients, this is what I've come up with to monitor the timeline for new posts and it works but seems pretty inefficient as it is polling based. I've also tried wading through the bsky api docs but nothing is jumping out as an obvious and better alternative.
If this really is the right pattern, it might be nice to eventually have some of this logic bundled into the atproto client :)
The text was updated successfully, but these errors were encountered: