Support range resuming

This commit is contained in:
dave 2017-04-25 23:17:33 -07:00
parent 006dad5ace
commit 682694e7c7
2 changed files with 56 additions and 13 deletions

View File

@ -4,6 +4,7 @@ import os
import argparse
import asyncio
import logging
import traceback
from bs4 import BeautifulSoup
from urllib.parse import urljoin, unquote, urlsplit, urlunsplit
from collections import namedtuple
@ -12,7 +13,7 @@ from concurrent.futures import ThreadPoolExecutor
from contextlib import closing
ScrapeConfig = namedtuple("ScrapeConfig", "output loop executor base_url visited semaphore")
ScrapeConfig = namedtuple("ScrapeConfig", "output loop executor base_url visited futures semaphore")
"""
output: base dest dir to put downloaded files
loop: asyncio loop object
@ -31,11 +32,11 @@ def clean_url(url):
return urlunsplit(urlsplit(url))
def stream_url(url):
def stream_url(url, **kwargs):
"""
Return a request's Response object for the given URL
"""
return http.get(url, stream=True)
return http.get(url, stream=True, **kwargs)
def get_links(content):
@ -50,25 +51,52 @@ def get_links(content):
yield href
def stream_to_file(response, url, semaphore):
print("STREAMING {}".format(url))
# Stream to disk
def stream_to_file(response, url, options):
url_suffix = url[len(options.base_url):]
local_path = os.path.normpath(os.path.join(options.output, unquote(url_suffix)))
if not local_path.startswith(options.output):
raise Exception("Aborted: directory traversal detected!")
logging.info("Downloading {} to {}".format(url, local_path))
os.makedirs(os.path.dirname(local_path), exist_ok=True)
try:
with open("/dev/null", "wb") as f:
if os.path.exists(local_path):
response.close()
# Local file exists, restart request with range
fsize = os.path.getsize(local_path)
remote_size = int(response.headers.get("Content-length"))
if fsize == remote_size:
raise Exception("Already downloaded")
logging.info("{} already exists, restarting request with range {}-{}".format(local_path, fsize,
remote_size))
response = stream_url(url, headers={"Range": "bytes={}-{}".format(fsize, remote_size)})
response.raise_for_status() #TODO: clobber file and restart w/ no range header if range not satisfiable
with open(local_path, "wb") as f:
for chunk in response.iter_content(chunk_size=256 * 1024):
f.write(chunk)
except:
traceback.print_exc()
raise
finally:
semaphore.release()
response.close()
options.semaphore.release()
try:
response.close()
except:
pass
return (url, local_path)
async def scrape_url(url, options):
async def scrape_url(url, options, skip=False):
"""
- Request the URL
- If HTML, parse and recurse
- If other, download
"""
options.visited.append(url)
g = await options.loop.run_in_executor(None, stream_url, url)
@ -94,7 +122,17 @@ async def scrape_url(url, options):
# Actual file, download it
# await download_file(g, url, options)
options.semaphore.acquire()
options.executor.submit(stream_to_file, g, url, options.semaphore)
options.futures.append((options.executor.submit(stream_to_file, g, url, options), url, ))
# Purge completed futures
for item in options.futures[:]:
future, url = item
if future.done():
options.futures.remove(item)
exc = future.exception()
if exc:
logging.error("FAILED: %s: %s", url, exc)
else:
logging.info("COMPLETED downloading url %s to %s", *future.result())
def main():
@ -105,6 +143,8 @@ def main():
parser.add_argument('-u', '--url', help="url to scrape")
parser.add_argument('-o', '--output-dir', help="destination for downloaded files")
parser.add_argument('-p', '--parallel', type=int, default=5, help="number of downloads to execute in parallel")
parser.add_argument('-c', '--clobber', action="store_true", help="clobber existing files instead of resuming")
args = parser.parse_args()
logging.info("cli args: %s", args)
@ -120,7 +160,8 @@ def main():
loop,
executor,
base_url,
[],
[], # visited urls
[], # futures
asyncio.Semaphore(value=args.parallel))
downloader = asyncio.ensure_future(scrape_url(base_url, config), loop=loop)

2
requirements.txt Normal file
View File

@ -0,0 +1,2 @@
beautifulsoup4==4.5.3
requests==2.13.0