Class-ify code and make it more robust
This commit is contained in:
parent
01b183023d
commit
f55dec5bdb
306
watch.py
306
watch.py
|
@ -11,143 +11,192 @@ from fsevents import Observer, Stream
|
||||||
from paramiko.ssh_exception import SSHException
|
from paramiko.ssh_exception import SSHException
|
||||||
from re import compile as regexp
|
from re import compile as regexp
|
||||||
|
|
||||||
# Path regexps to ignore
|
class sftpwatch:
|
||||||
ignore = [
|
def __init__(
|
||||||
regexp(r'.*\.git.*'),
|
self,
|
||||||
regexp(r'.*\.DS_Store$')
|
ignore = [
|
||||||
]
|
regexp(r'.*\.git.*'),
|
||||||
|
regexp(r'.*\.DS_Store$')
|
||||||
# Local dir : remote path mappings
|
],
|
||||||
PATH_MAPPING = [
|
mapping = [],
|
||||||
]
|
host = None,
|
||||||
|
user = None,
|
||||||
def ssh_connect(hostname, username, password):
|
password = None,
|
||||||
"""Connect to SSH server and return (authenticated) transport socket"""
|
rootdir = None
|
||||||
host_keys = paramiko.util.load_host_keys(expanduser('~/.ssh/known_hosts'))
|
|
||||||
if hostname in host_keys:
|
):
|
||||||
hostkeytype = host_keys[hostname].keys()[0]
|
self.ignore = ignore
|
||||||
hostkey = host_keys[hostname][hostkeytype]
|
self.PATH_MAPPING = mapping
|
||||||
logging.debug('Using host key of type %s' % hostkeytype)
|
self.host = host
|
||||||
else:
|
self.user = user
|
||||||
logging.critical("Host key not found")
|
self.password = password
|
||||||
exit(0)
|
self.root = rootdir
|
||||||
|
self.observer = None
|
||||||
|
self.stream = None
|
||||||
|
self.sf = None
|
||||||
|
self.connected = False
|
||||||
|
self.exit = False
|
||||||
|
try:
|
||||||
|
self.connect();
|
||||||
|
except Exception as e:
|
||||||
|
logging.critical("SSH Could not connect!")
|
||||||
|
logging.critical(str(e))
|
||||||
|
exit(1)
|
||||||
|
|
||||||
try:
|
def connect(self):
|
||||||
|
self.sf = self.getsftp(args.host, args.user, args.password)
|
||||||
|
|
||||||
|
def watch(self):
|
||||||
|
self.observer = Observer()
|
||||||
|
self.observer.start()
|
||||||
|
self.stream = Stream(self.file_event_callback, self.root, file_events=True)
|
||||||
|
self.observer.schedule(self.stream)
|
||||||
|
|
||||||
|
|
||||||
|
def ssh_connect(self, hostname, username, password):
|
||||||
|
"""Connect to SSH server and return (authenticated) transport socket"""
|
||||||
|
host_keys = paramiko.util.load_host_keys(expanduser('~/.ssh/known_hosts'))
|
||||||
|
if hostname in host_keys:
|
||||||
|
hostkeytype = host_keys[hostname].keys()[0]
|
||||||
|
hostkey = host_keys[hostname][hostkeytype]
|
||||||
|
logging.debug('Using host key of type %s' % hostkeytype)
|
||||||
|
else:
|
||||||
|
logging.critical("Host key not found")
|
||||||
|
exit(0)
|
||||||
|
|
||||||
t = paramiko.Transport((hostname, 22))
|
t = paramiko.Transport((hostname, 22))
|
||||||
t.set_keepalive(30)
|
t.set_keepalive(30)
|
||||||
t.connect(hostkey, username, password)
|
t.connect(hostkey, username, password)
|
||||||
return t
|
return t
|
||||||
except SSHException as sshe:
|
|
||||||
logging.critical(str(sshe))
|
|
||||||
logging.critical("SSH: unable to connect!")
|
|
||||||
exit(1)
|
|
||||||
|
|
||||||
def getsftp(hostname, username, password):
|
def getsftp(self, hostname, username, password):
|
||||||
"""Return a ready-to-roll paramiko sftp object"""
|
"""Return a ready-to-roll paramiko sftp object"""
|
||||||
t = ssh_connect(hostname, username, password)
|
t = self.ssh_connect(hostname, username, password)
|
||||||
return paramiko.SFTPClient.from_transport(t)
|
return paramiko.SFTPClient.from_transport(t)
|
||||||
|
|
||||||
def transfer_file(localpath, remotepath):
|
def transfer_file(self, localpath, remotepath):
|
||||||
"""Transfer file over sftp"""
|
"""Transfer file over sftp"""
|
||||||
with sf.open(remotepath, 'wb') as destination:
|
with self.sf.open(remotepath, 'wb') as destination:
|
||||||
with open(localpath, 'rb') as source:
|
with open(localpath, 'rb') as source:
|
||||||
total = 0
|
total = 0
|
||||||
while True:
|
while True:
|
||||||
data = source.read(8192)
|
data = source.read(8192)
|
||||||
if not data:
|
if not data:
|
||||||
return total
|
return total
|
||||||
destination.write(data)
|
destination.write(data)
|
||||||
total += len(data)
|
total += len(data)
|
||||||
|
|
||||||
def file_event_callback(event):
|
def file_event_callback(self, event):
|
||||||
"""Respond to file events"""
|
"""Respond to file events"""
|
||||||
|
|
||||||
# check ignored
|
|
||||||
for expr in ignore:
|
|
||||||
if not expr.match(event.name) == None:
|
|
||||||
return
|
|
||||||
|
|
||||||
# Determine file path relative to our root
|
|
||||||
filePath = event.name.replace(args.root, "")
|
|
||||||
logging.debug("Path from basedir: %s" % filePath)
|
|
||||||
|
|
||||||
# Apply directory mapping
|
|
||||||
for mapping in PATH_MAPPING:
|
|
||||||
localMapPath,remoteMapPath = mapping
|
|
||||||
if filePath[0:len(localMapPath)]==localMapPath:
|
|
||||||
logging.debug("Using mapping: %s" % (str(mapping)))
|
|
||||||
filePath = remoteMapPath + "/" + filePath[len(localMapPath):]
|
|
||||||
break
|
|
||||||
|
|
||||||
filePath = normpath(filePath)
|
|
||||||
|
|
||||||
# Ensure path starts with /
|
|
||||||
if filePath[0] != "/":
|
|
||||||
filePath = "/" + filePath
|
|
||||||
|
|
||||||
if event.mask & (fsevents.IN_MODIFY|fsevents.IN_CREATE|fsevents.IN_MOVED_TO):
|
|
||||||
logging.debug("\nFile was modified: %s" % event.name)
|
|
||||||
|
|
||||||
logging.debug("Remote path: %s" % filePath)
|
|
||||||
|
|
||||||
# Ensure directory exists
|
# Make sure we have sftp connectivity
|
||||||
path_dirs = dirname(filePath).split("/")
|
while not self.exit:
|
||||||
|
|
||||||
for i in range(1,len(path_dirs)):
|
|
||||||
pathSegment = "/".join(path_dirs[0:i+1])
|
|
||||||
logging.debug("stat %s" % pathSegment)
|
|
||||||
try:
|
try:
|
||||||
sf.stat(pathSegment)
|
assert not self.sf == None
|
||||||
except IOError as e:
|
self.sf.stat("/")
|
||||||
logging.info("Creating %s" % pathSegment)
|
break
|
||||||
sf.mkdir(pathSegment)
|
except (OSError, AssertionError) as e:
|
||||||
|
logging.warning("Attempting to connect...")
|
||||||
# If file, upload it
|
|
||||||
if isfile(event.name) or islink(event.name):
|
|
||||||
tries = 0
|
|
||||||
while True:
|
|
||||||
try:
|
try:
|
||||||
bytesSent = transfer_file(event.name, filePath)
|
self.connect()
|
||||||
break
|
break
|
||||||
except IOError as ioe:
|
except Exception as ee:
|
||||||
logging.error("Unable to upload file: %s" % str(ioe))
|
logging.warning("Could not Connect.")
|
||||||
# reconnect to SSH on error
|
logging.error("Error was: %s" % str(ee))
|
||||||
#sf.close()
|
logging.warning("Trying again in 5 seconds...")
|
||||||
#sf = getsftp(args.host, args.user, args.password)
|
sleep(5)
|
||||||
tries+=1
|
|
||||||
sleep(0.5)
|
if self.exit:
|
||||||
if tries > 5:
|
return
|
||||||
return False
|
|
||||||
|
# check ignored
|
||||||
logging.info("%s: sent %s KB to %s" % (event.name, max(1, int(bytesSent/1024)), filePath))
|
for expr in self.ignore:
|
||||||
else:
|
if not expr.match(event.name) == None:
|
||||||
logging.info("Not a file: %s" % event.name)
|
return
|
||||||
|
|
||||||
if event.mask & (fsevents.IN_MOVED_FROM|fsevents.IN_DELETE):
|
# Determine file path relative to our root
|
||||||
logging.info("removing %s" % filePath)
|
filePath = event.name.replace(self.root, "")
|
||||||
# Just delete it
|
logging.debug("Path from basedir: %s" % filePath)
|
||||||
try:
|
|
||||||
sf.remove(filePath)
|
|
||||||
except:
|
|
||||||
# Silently fail so we don't delete.
|
|
||||||
pass
|
|
||||||
|
|
||||||
"""
|
# Apply directory mapping
|
||||||
We can respond to:
|
for mapping in self.PATH_MAPPING:
|
||||||
|
localMapPath,remoteMapPath = mapping
|
||||||
|
if filePath[0:len(localMapPath)]==localMapPath:
|
||||||
|
logging.debug("Using mapping: %s" % (str(mapping)))
|
||||||
|
filePath = remoteMapPath + "/" + filePath[len(localMapPath):]
|
||||||
|
break
|
||||||
|
|
||||||
done IN_MOVED_FROM - path is old file path
|
filePath = normpath(filePath)
|
||||||
done IN_MOVED_TO - path is new file path
|
|
||||||
done IN_MODIFY - file was edited
|
# Ensure path starts with /
|
||||||
done IN_CREATE - file was created
|
if filePath[0] != "/":
|
||||||
done IN_DELETE - file was deleted
|
filePath = "/" + filePath
|
||||||
IN_ATTRIB - attributes modified - ignore for now
|
|
||||||
"""
|
|
||||||
|
|
||||||
def signal_handler(signal, frame):
|
if event.mask & (fsevents.IN_MODIFY|fsevents.IN_CREATE|fsevents.IN_MOVED_TO):
|
||||||
logging.info('Cleaning up....')
|
logging.debug("\nFile was modified: %s" % event.name)
|
||||||
|
|
||||||
|
logging.debug("Remote path: %s" % filePath)
|
||||||
|
|
||||||
|
# Ensure directory exists
|
||||||
|
path_dirs = dirname(filePath).split("/")
|
||||||
|
|
||||||
|
for i in range(1,len(path_dirs)):
|
||||||
|
pathSegment = "/".join(path_dirs[0:i+1])
|
||||||
|
logging.debug("stat %s" % pathSegment)
|
||||||
|
try:
|
||||||
|
self.sf.stat(pathSegment)
|
||||||
|
except IOError as e:
|
||||||
|
logging.info("Creating %s" % pathSegment)
|
||||||
|
self.sf.mkdir(pathSegment)
|
||||||
|
|
||||||
|
# If file, upload it
|
||||||
|
if isfile(event.name) or islink(event.name):
|
||||||
|
tries = 0
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
bytesSent = self.transfer_file(event.name, filePath)
|
||||||
|
break
|
||||||
|
except IOError as ioe:
|
||||||
|
logging.error("Unable to upload file: %s" % str(ioe))
|
||||||
|
# reconnect to SSH on error
|
||||||
|
#sf.close()
|
||||||
|
#sf = getsftp(args.host, args.user, args.password)
|
||||||
|
tries+=1
|
||||||
|
sleep(0.5)
|
||||||
|
if tries > 5:
|
||||||
|
return False
|
||||||
|
|
||||||
|
logging.info("%s: sent %s KB to %s" % (event.name, max(1, int(bytesSent/1024)), filePath))
|
||||||
|
else:
|
||||||
|
logging.info("Not a file: %s" % event.name)
|
||||||
|
|
||||||
observer.unschedule(stream)
|
if event.mask & (fsevents.IN_MOVED_FROM|fsevents.IN_DELETE):
|
||||||
observer.stop()
|
logging.info("removing %s" % filePath)
|
||||||
|
# Just delete it
|
||||||
|
try:
|
||||||
|
self.sf.remove(filePath)
|
||||||
|
except:
|
||||||
|
# Silently fail so we don't delete unexpected stuff
|
||||||
|
pass
|
||||||
|
|
||||||
|
"""
|
||||||
|
We can respond to:
|
||||||
|
|
||||||
|
done IN_MOVED_FROM - path is old file path
|
||||||
|
done IN_MOVED_TO - path is new file path
|
||||||
|
done IN_MODIFY - file was edited
|
||||||
|
done IN_CREATE - file was created
|
||||||
|
done IN_DELETE - file was deleted
|
||||||
|
IN_ATTRIB - attributes modified - ignore for now
|
||||||
|
"""
|
||||||
|
#self.sf.close()
|
||||||
|
|
||||||
|
def signal_handler(self, signal, frame):
|
||||||
|
logging.info('Cleaning up....')
|
||||||
|
self.exit = True
|
||||||
|
self.observer.unschedule(self.stream)
|
||||||
|
self.observer.stop()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
@ -171,17 +220,16 @@ if __name__ == "__main__":
|
||||||
logging.critical("At least one --map is required.")
|
logging.critical("At least one --map is required.")
|
||||||
exit(1)
|
exit(1)
|
||||||
|
|
||||||
for mapping in args.map:
|
path_maps = []
|
||||||
PATH_MAPPING.append(mapping.split(":"))
|
|
||||||
|
|
||||||
sf = getsftp(args.host, args.user, args.password)
|
for mapping in args.map:
|
||||||
|
path_maps.append(mapping.split(":"))
|
||||||
|
|
||||||
|
|
||||||
|
pywatch = sftpwatch(mapping=path_maps, host=args.host, user=args.user, password=args.password, rootdir=args.root)
|
||||||
|
|
||||||
|
signal.signal(signal.SIGINT, pywatch.signal_handler)
|
||||||
|
|
||||||
logging.info("watching %s" % args.root)
|
logging.info("watching %s" % args.root)
|
||||||
|
|
||||||
observer = Observer()
|
pywatch.watch()
|
||||||
observer.start()
|
|
||||||
stream = Stream(file_event_callback, args.root, file_events=True)
|
|
||||||
observer.schedule(stream)
|
|
||||||
|
|
||||||
signal.signal(signal.SIGINT, signal_handler)
|
|
||||||
signal.pause()
|
|
||||||
|
|
Loading…
Reference in New Issue