streamrecord/libs/recordTick.py

210 lines
6.5 KiB
Python

#!/usr/bin/env python3
from threading import Thread
import time
import datetime
from sched import scheduler
import cherrypy
import sys
import subprocess
import os
import os.path
class recordTick:
def __init__(self, database):
# sqlite3 reference
self.db = database
# list of downloader threads
self.threads = {}
def tick(self):
now=datetime.datetime.now()
#print("Tick start: %s" % now)
# Look for starting times set to now
days = ["m", "t", "w", "r", "f", "sa", "su"]
day = days[datetime.datetime.now().weekday()]
startTimes = self.db.execute('SELECT * FROM "times" JOIN "streams" ON "streams"."id" = "times"."streamid" where "starthour"=? AND "startmin"=? AND "'+day+'"=1 AND "status"=0', (now.hour, now.minute))
for startTime in startTimes:
# Start each downloader
self.startStream(startTime["streamid"])
# Look for end times set to now
endTimes = self.db.execute('SELECT * FROM "times" where "endhour"=? AND "endmin"=?', (now.hour, now.minute))
for endTime in endTimes:
# terminate each downloader
self.endStream(endTime["streamid"])
#print("Tick end: %s" % now)
def startStream(self, id):
# Find stream information
stream = self.db.execute('SELECT * FROM "streams" WHERE "id"=? ;', (id,))[0]
# if the downloader isnt running already:
if not stream["id"] in self.threads:
# Create the recording thread
self.threads[stream["id"]] = recordThread(stream["url"], stream["directory"])
def endStream(self, id):
if id in self.threads:
# tell the downloader to finish
self.threads[id].cancel()
del self.threads[id]
def streamStatus(self, id):
if not id in self.threads:
return -1
return self.threads[id].status
def getSelf(self):
return self
def timeToNextMinute(self):
# calculate time to the milliscond until the next minute rolls over
# Find the next minute
then = datetime.datetime.now()+datetime.timedelta(minutes=1)
# Drop the seconds
then = then-datetime.timedelta(seconds=then.second,microseconds=then.microsecond)
# calculate difference
wait = then - datetime.datetime.now()
waitMillis = wait.seconds + int(wait.microseconds/1000)/1000
return waitMillis
class recordThread(Thread):
def __init__(self, url, directory):
Thread.__init__(self)
# Status
self.status = 1
# URL to download
self.url = url
# Directory name to use
self.directory = directory
# True means the downloader keeps alive on failure
self.running = True
# Start time of the recording
self.startdate = None
self.start()
def run(self):
print("%s starting downloader for %s" % (datetime.datetime.now(), self.url))
# Download the stream to temp file(s)
self.status = 2
self.clearTempDir()
self.downloadStream()
# Combine files into 1 audio file
self.status = 3
self.mergeStream()
# Encode to mp3
self.status = 4
self.transcodeStream()
# Delete temp files, move recording to save directory
self.status = 5
self.cleanup()
print("%s finished downloader for %s" % (datetime.datetime.now(), self.url))
self.status = 0
def clearTempDir(self):
# Delete temp files
files = os.listdir("files/temp/%s"%self.directory)
for f in files:
os.unlink("files/temp/%s/%s"%(self.directory,f))
def downloadStream(self):
self.startdate = datetime.datetime.now()
recdate = str(int(time.time()))
# As long as we're supposed to keep retrying
while self.running:
# Create the temp dir for this stream
os.makedirs("files/temp/"+self.directory, exist_ok=True)
# If there are already files, we're resuming. take the next available number
recNum = 0
while os.path.exists("files/temp/%s/%s_%s.mp3" % (self.directory, recdate, recNum)):
recNum = recNum + 1
# Filename is something like files/temp/stream-name/rec-y-m-d_h-m-s.0.mp3
fileName = "files/temp/%s/%s_%s.mp3" % (self.directory, recdate, recNum)
args_libav = [
os.environ.get("CONVERT_PROGRAM") or "/usr/bin/ffmpeg",
'-loglevel',
'error',
'-i',
self.url,
'-ab',
'128k',
fileName
]
self.proc = subprocess.Popen(args_libav, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output = self.proc.communicate()
print("LibAV output for %s:\n%s" % (self.url, output))
self.proc = None
def mergeStream(self):
# Get an ordered list of the piece files
files = os.listdir("files/temp/%s"%self.directory)
files.sort()
# merge audio tracks into a matroska audio file
command = ['/usr/bin/mkvmerge', '-o', "files/temp/%s/temp.mka"%self.directory, "files/temp/%s/%s"%(self.directory,files.pop(0))]
for fname in files:
command.append("+files/temp/%s/%s"%(self.directory,fname))
self.mergeproc = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# Wait for the merge to finish
output = self.mergeproc.communicate()
def transcodeStream(self):
# Delete the existing output file
if os.path.exists("files/temp/%s/out.mp3"%self.directory):
os.unlink("files/temp/%s/out.mp3"%self.directory)
# Convert the matroska file to mp3
command = [
os.environ.get("CONVERT_PROGRAM") or '/usr/bin/ffmpeg',
'-i', "files/temp/%s/temp.mka"%self.directory,
'-q:a', '0',
'-ab', '128k',
"files/temp/%s/out.mp3"%self.directory
]
self.transcodeproc = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# wait for the trancode to finish
output = self.transcodeproc.communicate()
def cleanup(self):
# create a dated name for the file
newname = self.startdate.strftime("%Y-%m-%d_%H-%M-%S")+".mp3"
# make it's finished storage location
os.makedirs("files/output/"+self.directory, exist_ok=True)
# copy final recording to output dir
os.rename("files/temp/%s/out.mp3"%(self.directory), "files/output/%s/%s"%(self.directory,newname))
self.clearTempDir()
def cancel(self):
print("Closing %s" % self.url)
# turn off keep-alive dow the downloader
self.running = False
# Kill the download process
self.proc.terminate()
Thread(target=self.kill).start()
def kill(self):
print("Starting kill thread for %s" % self.url)
time.sleep(3)
# kill the thread
if not self.proc == None:
# One more chance to go quietly...
self.proc.terminate()
time.sleep(3)
else:
print("Nothing to kill for %s" % self.url)
if not self.proc == None:
# Kill it
self.proc.kill()