Skip to content

Commit

Permalink
Merge InstaPage branch
Browse files Browse the repository at this point in the history
Added new ```InstaPage``` and ```Hashtag``` classes
* Hashtag class wraps the data from a scraped Instagram hashtag
* InstaPage is an abstract class representing any scrapable Instagram page; InstaUser and Hashtag are subclasses

Added new methods to ```InstaClient```
* ```scrape()``` can be used to scrape an Instagram page
* ```get_user()``` and ```get_hashtag()``` scrape a profile or hashtag
* ```get_username``` retrieves username from a userid

Updated docs, docstrings, variables and method names to reflect the changes
* Notable change is that ```user_map``` is now ```page_map```
  • Loading branch information
TDKorn committed May 9, 2023
1 parent 1dd425f commit 83cc540
Show file tree
Hide file tree
Showing 21 changed files with 592 additions and 457 deletions.
2 changes: 1 addition & 1 deletion InstaTweet/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from .db import DBConnection
# API Interaction/Wrapper Classes
from .instapost import InstaPost
from .instauser import InstaUser
from .instapage import InstaPage, InstaUser, Hashtag
from .instaclient import InstaClient, USER_AGENT
from .tweetclient import TweetClient
# User Interface Classes
Expand Down
77 changes: 64 additions & 13 deletions InstaTweet/instaclient.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import os
import requests
from requests import Response
from typing import Type, Union, Optional, Dict
from json.decoder import JSONDecodeError
from . import InstaUser, InstaPost
from . import InstaPage, InstaUser, InstaPost, Hashtag


USER_AGENT = "Mozilla/5.0 (Linux; Android 9; GM1903 Build/PKQ1.190110.001; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/75.0.3770.143 Mobile Safari/537.36 Instagram 103.1.0.15.119 Android (28/9; 420dpi; 1080x2260; OnePlus; GM1903; OnePlus7; qcom; sv_SE; 164094539)"
Expand All @@ -17,7 +19,7 @@ class InstaClient:

DOWNLOAD_DIR = os.path.abspath('downloads') #: [*Optional*] -- Directory to temporarily download media to

def __init__(self, session_id: str, user_agent: str = USER_AGENT, proxies: dict = None):
def __init__(self, session_id: str, user_agent: str = USER_AGENT, proxies: Optional[Dict] = None):
"""Initialize an :class:`~InstaClient` with an Instagram sessionid cookie (at minimum)
.. note:: As of v2.0.0b13, the endpoint used by :meth:`~get_user` seems to require a specific :attr:`~USER_AGENT`
Expand All @@ -31,7 +33,7 @@ def __init__(self, session_id: str, user_agent: str = USER_AGENT, proxies: dict
raise TypeError('session_id must be a string')

self.session_id = session_id
self.user_agent = user_agent # Hardcoded one works for now...
self.user_agent = user_agent
self.proxies = proxies

if not os.path.exists(InstaClient.DOWNLOAD_DIR):
Expand All @@ -48,32 +50,81 @@ def request(self, url: str) -> requests.Response:
proxies=self.proxies
)

def scrape(self, page: str) -> InstaPage:
"""Scrapes an Instagram page and wraps the response data
:param page: an Instagram hashtag (prefixed with ``#``) or username
:returns: an :class:`~.InstaUser` or :class:`~.Hashtag`
"""
if isinstance(page, str):
if page.startswith("#"):
return self.get_hashtag(page)
return self.get_user(page)
raise TypeError(f"`page` must be of type {str}")

def get_hashtag(self, tag: str, max_id: str = '') -> Hashtag:
"""Scrapes an Instagram hashtag and wraps the response with :class:`~.Hashtag`
:param tag: the hashtag to scrape (with or without a ``#``)
:param max_id: the end cursor
"""
tag = tag.lstrip("#")
endpoint = f'https://www.instagram.com/explore/tags/{tag}/?__a=1&max_id={max_id}&__d=dis'
response = self.request(endpoint)
return self._wrap(tag, response, Hashtag)

def get_user(self, username: str) -> InstaUser:
"""Scrapes an Instagram user's profile and wraps the response
"""Scrapes an Instagram user's profile and wraps the response with :class:`~.InstaUser`
:param username: the username of the IG user to scrape (without the @)
:return: an :class:`~.InstaUser` object, which wraps the response data
:param username: the username of the IG user to scrape
"""
username = username.lstrip('@')
endpoint = f"https://i.instagram.com/api/v1/users/web_profile_info/?username={username}"
response = self.request(endpoint)
return self._wrap(username, response, InstaUser)

def _wrap(self, page: str, response: Response, Wrapper: Type[InstaPage]) -> InstaPage:
"""Validates and wraps the API response from an Instagram page
"""
page = f'user @{page}' if Wrapper is InstaUser else f'hashtag #{page}'
if response.ok:
try:
return InstaUser(response.json(), self)
return Wrapper(response.json(), self)
except JSONDecodeError as e:
raise RuntimeError(f'Unable to scrape Instagram user @{username}') from e
raise RuntimeError(f'Unable to scrape Instagram {page}') from e
else:
try:
error = response.json()
except JSONDecodeError:
error = response.reason
raise RuntimeError(
'Failed to scrape Instagram user @{u}\nResponse: [{code}] -- {e}'.format(
u=username, code=response.status_code, e=error
'Failed to scrape Instagram {page}\nResponse: [{code}] -- {e}'.format(
page=page, code=response.status_code, e=error
)
)

def download_post(self, post: InstaPost, filepath: str = None) -> bool:
def get_username(self, user_id: Union[int, str]) -> str:
"""Retrieves the Instagram username for the user with the provided ``user_id``
.. tip:: Use this with :meth:`get_user` to scrape by ``user_id``::
>> user_id = 51276430399
>> username = insta.get_username(user_id)
>> user = insta.get_user(username)
>> print(user.posts[0])
Post 2981866202934977614 by @dailykittenig on 2022-11-29 01:44:37
:param user_id: the id of the Instagram user to retrieve the username of
"""
endpoint = f"https://i.instagram.com/api/v1/users/{user_id}/info"
response = self.request(endpoint)
if response.ok:
return response.json().get('user', {}).get('username', '')
else:
raise RuntimeError(f"Failed to retrieve info for Instagram user with id {user_id}")

def download_post(self, post: InstaPost, filepath: Optional[str] = None) -> bool:
"""Downloads the media from an Instagram post
:param post: the :class:`~.InstaPost` of the post to download
Expand All @@ -98,11 +149,11 @@ def download_post(self, post: InstaPost, filepath: str = None) -> bool:
return True

@property
def headers(self) -> dict:
def headers(self) -> Dict:
"""Headers to use in :meth:`~.request`"""
return {'User-Agent': self.user_agent, }

@property
def cookies(self) -> dict:
def cookies(self) -> Dict:
"""Cookies to use in :meth:`~.request`"""
return {'sessionid': self.session_id, }
190 changes: 190 additions & 0 deletions InstaTweet/instapage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from functools import cached_property
from typing import Dict, Optional, TYPE_CHECKING, List
from . import InstaPost

if TYPE_CHECKING:
from . import InstaClient


class InstaPage(ABC):

"""Abstract wrapper class for wrapping API responses from Instagram pages"""

def __init__(self, data: Dict, client: Optional[InstaClient] = None):
"""Initialize an :class:`InstaPage`
Used to wrap responses from endpoints that contain Instagram post data,
like Instagram user profiles and Instagram hashtag searches
:param data: the API response JSON to use as source data
:param client: the :class:`~.InstaClient` to use; required for :meth:`~.get_more_posts`
"""
self.data = data
self.client = client
self._posts = []

@abstractmethod
def __str__(self) -> str:
pass

@property
@abstractmethod
def name(self) -> str:
"""Name of the Instagram page"""
pass

@property
@abstractmethod
def page_data(self) -> Dict:
"""Data about the Instagram page itself"""
pass

@property
@abstractmethod
def media_data(self) -> Dict:
"""Data about posts on the Instagram page"""
pass

@property
def id(self) -> int:
"""ID of the Instagram page"""
return int(self.page_data.get('id', -1))

@property
def posts(self) -> List[InstaPost]:
"""Posts that have been scraped from the Instagram page
To retrieve the next page of posts, call :meth:`get_more_posts`
:returns: the page's posts as :class:`~.InstaPost` objects
"""
if not self._posts:
if edges := self.media_data.get('edges'):
self._posts = [InstaPost(edge['node'], self.client) for edge in edges]
return self._posts

def get_more_posts(self) -> bool:
"""Requests the next page of posts from the :class:`InstaPage`
If the page :attr:`~.has_more_posts`, they'll be added to the :attr:`~.posts` list
:returns: ``True`` if the request was successful, otherwise ``False``
"""
if not self.client:
raise AttributeError("Must provide an InstaClient to scrape with")

if not self.has_more_posts:
print("All posts have already been scraped")
return False

if not (next_page := self._get_next_page()):
print("Unable to retrieve the next page of posts")
return False

self.media_page_info.update(next_page.media_page_info)
self._posts.extend(next_page.posts)
return True

@abstractmethod
def _get_next_page(self) -> Optional[InstaPage]:
"""Makes the request for the next page of posts; wraps the response if successful"""
pass

@property
def has_more_posts(self) -> bool:
"""Returns ``True`` if more posts can be scraped using :meth:`~.get_more_posts`"""
return self.media_page_info.get('has_next_page')

@property
def end_cursor(self) -> str:
"""Cursor used in request by :meth:`~.get_more_posts`"""
return self.media_page_info.get('end_cursor', '').strip('=')

@property
def media_page_info(self) -> Dict:
return self.media_data.get('page_info', {})


class InstaUser(InstaPage):

"""API response wrapper for an Instagram user's profile"""

def __init__(self, data: Dict, client: Optional[InstaClient] = None):
"""Initialize an :class:`InstaUser`
:param data: the API response from :meth:`~.get_user`
:param client: the :class:`~.InstaClient` to use
"""
super().__init__(data, client)

def __str__(self) -> str:
return f"Instagram User: @{self.name}"

@property
def name(self) -> str:
return self.page_data.get('username')

@property
def page_data(self) -> Dict:
return self.data.get('data', {}).get('user', {})

@property
def media_data(self) -> Dict:
return self.page_data.get('edge_owner_to_timeline_media', {'edges': []})

def _get_next_page(self) -> Optional[InstaPage]:
endpoint = 'https://www.instagram.com/graphql/query/?query_hash=8c2a529969ee035a5063f2fc8602a0fd' + \
f'&variables=%7B%22id%22%3A%22{self.id}%22%2C%22first%22%3A12%2C%22' + \
f'after%22%3A%22{self.end_cursor}%3D%3D%22%7D'
response = self.client.request(endpoint)
if not response.ok:
return None
try:
return InstaUser(response.json())
except Exception as e:
raise RuntimeError('Failed to get more posts') from e


class Hashtag(InstaPage):

"""API response wrapper for an Instagram hashtag"""

def __init__(self, data: Dict, client: Optional[InstaClient] = None):
"""Initialize a :class:`Hashtag`
:param data: the API response from :meth:`~.get_hashtag`
:param client: the :class:`~.InstaClient` to use
"""
if (data := data.get('graphql', {}).get('hashtag')) is None:
raise ValueError(f"Hashtag response data is missing")

super().__init__(data, client)
self._top_posts = []

def __str__(self) -> str:
return f"Instagram Hashtag: {self.name}"

@property
def name(self) -> str:
return "#" + self.page_data.get('name')

@property
def page_data(self) -> Dict:
return self.data

@property
def media_data(self) -> Dict:
return self.page_data.get('edge_hashtag_to_media', {'count': 0, 'edges': []})

@cached_property
def top_posts(self) -> List[InstaPost]:
return [InstaPost(edge['node'], self.client) for edge in self.top_media_data['edges']]

@property
def top_media_data(self) -> Dict:
return self.page_data.get("edge_hashtag_to_top_posts", {"edges": []})

def _get_next_page(self) -> Optional[InstaPage]:
return self.client.get_hashtag(self.name, max_id=self.end_cursor)
Loading

0 comments on commit 83cc540

Please sign in to comment.