pyods/pyods/cli.py

201 lines
7.1 KiB
Python
Raw Normal View History

2017-04-25 22:01:59 -07:00
#!/usr/bin/env python3.5
import os
import argparse
import asyncio
import logging
2019-03-06 19:30:14 -08:00
from fnmatch import fnmatch
2017-07-22 14:33:20 -07:00
from time import sleep
2017-04-25 22:01:59 -07:00
from bs4 import BeautifulSoup
from urllib.parse import urljoin, unquote, urlsplit, urlunsplit
from collections import namedtuple
from requests import Session
from concurrent.futures import ThreadPoolExecutor
from contextlib import closing
2019-03-06 19:30:14 -08:00
ScrapeConfig = namedtuple("ScrapeConfig", "output loop executor base_url visited futures semaphore delay exclude")
2017-04-25 22:01:59 -07:00
"""
output: base dest dir to put downloaded files
loop: asyncio loop object
executor: large file download threadpool executor
base_url: remote open dir base url to scan
visited: list of urls already visiting/visited
"""
2017-07-22 14:33:20 -07:00
class AlreadyDownloadedException(Exception):
pass
2017-04-25 22:01:59 -07:00
http = Session()
def clean_url(url):
"""
Run a url through urlsplit and urljoin, ensuring uniform url format
"""
return urlunsplit(urlsplit(url))
2017-04-25 23:17:33 -07:00
def stream_url(url, **kwargs):
2017-04-25 22:01:59 -07:00
"""
Return a request's Response object for the given URL
"""
2017-04-25 23:17:33 -07:00
return http.get(url, stream=True, **kwargs)
2017-04-25 22:01:59 -07:00
def get_links(content):
"""
Parse and return hrefs for all links on a given page
:param content: html body to scan
"""
doc = BeautifulSoup(content, "html.parser")
for link in doc.find_all("a"):
href = link.attrs.get('href', None)
if href:
yield href
2017-04-25 23:17:33 -07:00
def stream_to_file(response, url, options):
2019-03-06 19:30:14 -08:00
url_suffix = unquote(url[len(options.base_url):])
local_path = os.path.normpath(os.path.join(options.output, url_suffix))
2017-04-25 23:17:33 -07:00
if not local_path.startswith(options.output):
raise Exception("Aborted: directory traversal detected!")
2017-04-25 22:01:59 -07:00
try:
2019-03-06 19:30:14 -08:00
for pattern in options.exclude:
if fnmatch(url_suffix, pattern):
logging.info("Excluded: '%s' on pattern '%s'", url_suffix, pattern)
raise AlreadyDownloadedException("Excluded")
os.makedirs(os.path.dirname(local_path), exist_ok=True)
2017-04-25 23:17:33 -07:00
if os.path.exists(local_path):
response.close()
# Local file exists, restart request with range
fsize = os.path.getsize(local_path)
remote_size = int(response.headers.get("Content-length"))
if fsize == remote_size:
2017-07-22 14:33:20 -07:00
raise AlreadyDownloadedException("Already downloaded")
2017-04-25 23:17:33 -07:00
logging.info("{} already exists, restarting request with range {}-{}".format(local_path, fsize,
remote_size))
2017-07-22 14:33:20 -07:00
if options.delay:
sleep(options.delay)
2017-04-25 23:17:33 -07:00
2017-07-22 14:33:20 -07:00
logging.warning("Downloading {} to {}".format(url, local_path))
2017-04-25 23:17:33 -07:00
response = stream_url(url, headers={"Range": "bytes={}-{}".format(fsize, remote_size)})
2017-07-22 14:33:20 -07:00
response.raise_for_status() # TODO: clobber file and restart w/ no range header if range not satisfiable
2017-04-25 23:17:33 -07:00
with open(local_path, "wb") as f:
2017-04-25 22:01:59 -07:00
for chunk in response.iter_content(chunk_size=256 * 1024):
f.write(chunk)
finally:
2017-04-25 23:17:33 -07:00
options.semaphore.release()
try:
response.close()
except:
pass
return (url, local_path)
2017-04-25 22:01:59 -07:00
2017-04-25 23:17:33 -07:00
async def scrape_url(url, options, skip=False):
2017-04-25 22:01:59 -07:00
"""
- Request the URL
- If HTML, parse and recurse
- If other, download
"""
options.visited.append(url)
g = await options.loop.run_in_executor(None, stream_url, url)
if g.status_code != 200:
2017-07-22 14:33:20 -07:00
logging.error("Fetch failed, code was %s", g.status_code)
2017-04-25 22:01:59 -07:00
return
content_type = g.headers.get("Content-Type", "")
if "text/html" in content_type:
# HTML page, parse it
for link in get_links(g.text):
link_dest = clean_url(urljoin(url, link))
if not link_dest.startswith(options.base_url):
# Link leads outside the base_url, skip it
continue
if link_dest not in options.visited:
# link is valid and not seen before, visit it
logging.info("Visiting %s", url)
await scrape_url(link_dest, options)
g.close()
else:
# Actual file, download it
# await download_file(g, url, options)
2019-03-06 19:30:14 -08:00
await options.semaphore.acquire()
2017-04-25 23:17:33 -07:00
options.futures.append((options.executor.submit(stream_to_file, g, url, options), url, ))
# Purge completed futures
for item in options.futures[:]:
future, url = item
if future.done():
options.futures.remove(item)
exc = future.exception()
if exc:
2017-07-22 14:33:20 -07:00
if type(exc) is AlreadyDownloadedException:
logging.info("ALREADY COMPLETE: url: %s", url)
else:
logging.error("FAILED: %s: %s", url, exc)
2017-04-25 23:17:33 -07:00
else:
2017-07-22 14:33:20 -07:00
logging.warning("COMPLETED downloading url %s to %s", *future.result())
2017-04-25 22:01:59 -07:00
def main():
parser = argparse.ArgumentParser(description="Open directory scraper")
parser.add_argument('-u', '--url', help="url to scrape")
parser.add_argument('-o', '--output-dir', help="destination for downloaded files")
parser.add_argument('-p', '--parallel', type=int, default=5, help="number of downloads to execute in parallel")
2017-04-25 23:17:33 -07:00
parser.add_argument('-c', '--clobber', action="store_true", help="clobber existing files instead of resuming")
2017-07-22 14:33:20 -07:00
parser.add_argument('-d', '--delay', type=int, default=0, help="delay between requests")
2019-03-06 19:30:14 -08:00
parser.add_argument('-e', '--exclude', default=[], nargs="+", help="exclude patterns")
parser.add_argument('-f', '--exclude-from', help="exclude patterns from file")
2017-07-22 14:33:20 -07:00
parser.add_argument('-v', '--verbose', action="store_true", help="enable info logging")
2017-04-25 22:01:59 -07:00
args = parser.parse_args()
2019-03-06 19:30:14 -08:00
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.WARNING,
2017-07-22 14:33:20 -07:00
format="%(asctime)-15s %(levelname)-8s %(filename)s:%(lineno)d %(message)s")
logging.debug("cli args: %s", args)
2017-04-25 22:01:59 -07:00
2019-03-06 19:30:14 -08:00
excludes = list(args.exclude)
if args.exclude_from:
with open(args.exclude_from) as f:
excludes += [l.strip() for l in f.readlines() if l.strip()]
2017-04-25 22:01:59 -07:00
with ThreadPoolExecutor(max_workers=args.parallel) as executor:
with closing(asyncio.get_event_loop()) as loop:
loop.set_debug(True)
loop.set_default_executor(executor)
base_url = clean_url(args.url)
config = ScrapeConfig(os.path.normpath(args.output_dir),
loop,
executor,
base_url,
2017-04-25 23:17:33 -07:00
[], # visited urls
[], # futures
2017-07-22 14:33:20 -07:00
asyncio.Semaphore(value=args.parallel),
2019-03-06 19:30:14 -08:00
args.delay,
excludes)
2017-04-25 22:01:59 -07:00
downloader = asyncio.ensure_future(scrape_url(base_url, config), loop=loop)
try:
loop.set_debug(True)
loop.run_until_complete(downloader)
finally:
logging.debug("Escaped main loop")
if __name__ == '__main__':
main()