Skip to content
This repository was archived by the owner on Jan 11, 2022. It is now read-only.
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 95 additions & 49 deletions instagram_scraper.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

umm... why?

import argparse
import csv
import os
Expand All @@ -17,88 +18,132 @@
'username': re.compile('(?:@)([A-Za-z0-9_](?:(?:[A-Za-z0-9_]|(?:\.(?!\.))){0,28}(?:[A-Za-z0-9_]))?)'),
}

# In case Instagram switches it up on us
IMG_XPATH = '//img[@alt]'

def scrape_instagram_tag(tag: str, total_count: int=50, existing: set=None):
def send_scrape_request(insta_url: str, total_count: int=50, existing: set=None, short_circuit: bool=False):
"""
Scrape and yield recently tagged instagram photos.
:param insta_url:
Instagram url to scrape
:param total_count:
Total amount of images to scrape
:param existing:
URLs to skip
:param short_circuit:
Whether or not to short_circuit total_count loop
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dedent lines 26-33 by 4 spaces


Yields url, captions, hashtags, and mentions for provided insta url
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. caption*
  2. Move this to the top, in the docstring.

"""
url = f'https://www.instagram.com/explore/tags/{tag}'
session = HTMLSession()
req = session.get(url)
req = session.get(insta_url)

imgs = set(existing)
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary change? You removed the usage of existing completely.

imgs = set()
count = 0
page = 0

while count <= total_count:
while count < total_count:
req.html.render(scrolldown=page)
images = req.html.xpath('//img[@alt]')
images = req.html.xpath(IMG_XPATH)
page += 1
for image in images:
if count > total_count:
if count >= total_count:
break
try:
url, caption = image.attrs['src'], image.attrs['alt']
except:
pass
else:
if url in imgs:
continue
imgs.add(url)
hashtags = set(REGEXES['hashtag'].findall(caption))
mentions = set(REGEXES['username'].findall(caption))
count += 1
yield url, caption, hashtags, mentions


def scrape_instagram(tags: List[str], total_count: int=50, existing: set=None):
except Exception as e:
print(e)
if url in imgs:
# Short-circuit if user has less photos than the total_count
if short_circuit:
if len(images) < total_count:
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean len(images) + count < total_count?

total_count = 0
break
continue
imgs.add(url)
hashtags = set(REGEXES['hashtag'].findall(caption))
mentions = set(REGEXES['username'].findall(caption))
count += 1
yield url, caption, hashtags, mentions



def scrape_instagram(target: str, total_count: int=50, existing: set=None, mode: str='tags'):
"""
:param tags:
List of tags that need to be scraped.
:param targets:
List of targets that need to be scraped.
:param total_count:
Total number of images to be scraped.
:param existing:
URLs to skip
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

End each of the parameter descriptions with a period for consistency.

:param mode
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a colon after mode

Two options: 'tags' or 'users'. Determines whether we are scraping users or tags

Builds url and sets short_circuit based on target and then issues request to url
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move function description to top of docstring.

"""
for tag in tags:
yield from scrape_instagram_tag(tag, total_count)
if mode == 'users':
short_circuit = True
url = f'https://www.instagram.com/{target}'
else:
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

elif mode == 'tags', and in the else clause, raise NotImplementedError

short_circuit = False
url = f'https://www.instagram.com/explore/tags/{target}'

yield from send_scrape_request(url, total_count=total_count, existing=existing, short_circuit=short_circuit)

def main(tags, total_count, should_continue):
def _single_tag_processing(tag, total_count, existing_links, start):
os.makedirs(f'data/{tag}', exist_ok=True)
with open(f'data/{tag}/data.csv', 'a' if existing_links else 'w', newline='', encoding='utf-8') as csvfile:

def main(tags: List[str], users: List[str], total_count: int=50, should_continue: bool=False):
"""
:param tags:
List of tags to be scraped
:param users:
List of users to be scraped
:param total_count:
total number of images to be scraped
:param should_continue
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add colon after should_continue

Flag for whether or not we should read from disk and skip existing URLs

Scrapes user and hashtag images from Instagram
"""
def _single_input_processing(target: str, total_count: int, existing_links: set, start: int, mode: str='tag'):
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename this, this is no longer single input processing.

os.makedirs(f'data/{target}', exist_ok=True)
with open(f'data/{target}/data.csv', 'a' if existing_links else 'w', newline='', encoding='utf-8') as csvfile:
writer = csv.writer(csvfile, delimiter=',')
for count, (url, caption, hashtags, mentions) in enumerate(scrape_instagram_tag(
tag, total_count, existing_links), start):
for count, (url, caption, hashtags, mentions) in enumerate(scrape_instagram(
target, total_count, existing_links, mode), start):

try:
req = requests.get(url)
with open(f'data/{tag}/{count}.jpg', 'wb') as img:
with open(f'data/{target}/{count}.jpg', 'wb') as img:
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want the users to be able to distinguish between the user photos, and tag photos, since if I scrape @instagram, I might mistake it for images scraped from instagram tag. So, mode specific data directories. :)

img.write(req.content)
except:
print(f'An error occured while downloading {url}')
else:
file_index = count + 1
writer.writerow([
f'{count}.jpg',
f'{file_index}.jpg',
url,
caption.replace('\n', '\\n'),
', '.join(hashtags),
', '.join(mentions)
])
print(f'[{tag}] downloaded {url} as {count}.jpg in data/{tag}')

for tag in tags:
existing_links = set()
start = 0
if os.path.exists(f'data/{tag}/data.csv') and should_continue:
with open(f'data/{tag}/data.csv', newline='', encoding='utf-8') as csvfile:
reader = csv.reader(csvfile)
for i, row in enumerate(reader):
existing_links.add(row[1])
start = i + 1
_single_tag_processing(tag, total_count, existing_links, start)
print(f'[{target}] downloaded {url} as {file_index}.jpg in data/{target}')
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This becomes incorrect, since we are downloading as f'{count}.jpg' which is one less than file_index. Replace count with file_index, better variable name.


targets = {'tags': tags, 'users': users}
for mode,lists in targets.items():
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

space after ,

for target in lists:
existing_links = set()
start = 0

if os.path.exists(f'data/{target}/data.csv') and should_continue:
with open(f'data/{target}/data.csv', newline='', encoding='utf-8') as csvfile:
reader = csv.reader(csvfile)
for i, row in enumerate(reader):
existing_links.add(row[1])
start = i + 1
_single_input_processing(target, total_count, existing_links, start, mode=mode)
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Account the rename here too


if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--tags', '-t', nargs='+',
parser.add_argument('--users', '-u', default=[], nargs='+', help='Users to scrape images from')
parser.add_argument('--tags', '-t', default=[], nargs='+',
help='Tags to scrape images from')
parser.add_argument('--count', '-c', type=int, default=50,
help='Total number of images to scrape for each given '
Expand All @@ -108,6 +153,7 @@ def _single_tag_processing(tag, total_count, existing_links, start):
help='See existing data, and do not parse those again, '
'and append to the data file, instead of a rewrite')
args = parser.parse_args()
assert args.tags, "Enter tags to scrape! Use --tags option, see help."

assert (len(args.tags) >= 1) or (len(args.users) >= 1), "Enter tags or users to scrape! Use --tags or --users option, see help."
assert args.count, "Enter total number of images to scrape using --count option, see help."
main(args.tags, args.count, args.cont)
main(args.tags, args.users, args.count, args.cont)