diff --git a/watch.py b/watch.py index f3d3d30..6f75464 100755 --- a/watch.py +++ b/watch.py @@ -11,143 +11,192 @@ from fsevents import Observer, Stream from paramiko.ssh_exception import SSHException from re import compile as regexp -# Path regexps to ignore -ignore = [ - regexp(r'.*\.git.*'), - regexp(r'.*\.DS_Store$') -] - -# Local dir : remote path mappings -PATH_MAPPING = [ -] - -def ssh_connect(hostname, username, password): - """Connect to SSH server and return (authenticated) transport socket""" - host_keys = paramiko.util.load_host_keys(expanduser('~/.ssh/known_hosts')) - if hostname in host_keys: - hostkeytype = host_keys[hostname].keys()[0] - hostkey = host_keys[hostname][hostkeytype] - logging.debug('Using host key of type %s' % hostkeytype) - else: - logging.critical("Host key not found") - exit(0) +class sftpwatch: + def __init__( + self, + ignore = [ + regexp(r'.*\.git.*'), + regexp(r'.*\.DS_Store$') + ], + mapping = [], + host = None, + user = None, + password = None, + rootdir = None + + ): + self.ignore = ignore + self.PATH_MAPPING = mapping + self.host = host + self.user = user + self.password = password + self.root = rootdir + self.observer = None + self.stream = None + self.sf = None + self.connected = False + self.exit = False + try: + self.connect(); + except Exception as e: + logging.critical("SSH Could not connect!") + logging.critical(str(e)) + exit(1) - try: + def connect(self): + self.sf = self.getsftp(args.host, args.user, args.password) + + def watch(self): + self.observer = Observer() + self.observer.start() + self.stream = Stream(self.file_event_callback, self.root, file_events=True) + self.observer.schedule(self.stream) + + + def ssh_connect(self, hostname, username, password): + """Connect to SSH server and return (authenticated) transport socket""" + host_keys = paramiko.util.load_host_keys(expanduser('~/.ssh/known_hosts')) + if hostname in host_keys: + hostkeytype = host_keys[hostname].keys()[0] + hostkey = host_keys[hostname][hostkeytype] + logging.debug('Using host key of type %s' % hostkeytype) + else: + logging.critical("Host key not found") + exit(0) + t = paramiko.Transport((hostname, 22)) t.set_keepalive(30) t.connect(hostkey, username, password) return t - except SSHException as sshe: - logging.critical(str(sshe)) - logging.critical("SSH: unable to connect!") - exit(1) -def getsftp(hostname, username, password): - """Return a ready-to-roll paramiko sftp object""" - t = ssh_connect(hostname, username, password) - return paramiko.SFTPClient.from_transport(t) + def getsftp(self, hostname, username, password): + """Return a ready-to-roll paramiko sftp object""" + t = self.ssh_connect(hostname, username, password) + return paramiko.SFTPClient.from_transport(t) -def transfer_file(localpath, remotepath): - """Transfer file over sftp""" - with sf.open(remotepath, 'wb') as destination: - with open(localpath, 'rb') as source: - total = 0 - while True: - data = source.read(8192) - if not data: - return total - destination.write(data) - total += len(data) + def transfer_file(self, localpath, remotepath): + """Transfer file over sftp""" + with self.sf.open(remotepath, 'wb') as destination: + with open(localpath, 'rb') as source: + total = 0 + while True: + data = source.read(8192) + if not data: + return total + destination.write(data) + total += len(data) -def file_event_callback(event): - """Respond to file events""" - - # check ignored - for expr in ignore: - if not expr.match(event.name) == None: - return - - # Determine file path relative to our root - filePath = event.name.replace(args.root, "") - logging.debug("Path from basedir: %s" % filePath) - - # Apply directory mapping - for mapping in PATH_MAPPING: - localMapPath,remoteMapPath = mapping - if filePath[0:len(localMapPath)]==localMapPath: - logging.debug("Using mapping: %s" % (str(mapping))) - filePath = remoteMapPath + "/" + filePath[len(localMapPath):] - break - - filePath = normpath(filePath) - - # Ensure path starts with / - if filePath[0] != "/": - filePath = "/" + filePath - - if event.mask & (fsevents.IN_MODIFY|fsevents.IN_CREATE|fsevents.IN_MOVED_TO): - logging.debug("\nFile was modified: %s" % event.name) + def file_event_callback(self, event): + """Respond to file events""" - logging.debug("Remote path: %s" % filePath) - # Ensure directory exists - path_dirs = dirname(filePath).split("/") - - for i in range(1,len(path_dirs)): - pathSegment = "/".join(path_dirs[0:i+1]) - logging.debug("stat %s" % pathSegment) + # Make sure we have sftp connectivity + while not self.exit: try: - sf.stat(pathSegment) - except IOError as e: - logging.info("Creating %s" % pathSegment) - sf.mkdir(pathSegment) - - # If file, upload it - if isfile(event.name) or islink(event.name): - tries = 0 - while True: + assert not self.sf == None + self.sf.stat("/") + break + except (OSError, AssertionError) as e: + logging.warning("Attempting to connect...") try: - bytesSent = transfer_file(event.name, filePath) + self.connect() break - except IOError as ioe: - logging.error("Unable to upload file: %s" % str(ioe)) - # reconnect to SSH on error - #sf.close() - #sf = getsftp(args.host, args.user, args.password) - tries+=1 - sleep(0.5) - if tries > 5: - return False - - logging.info("%s: sent %s KB to %s" % (event.name, max(1, int(bytesSent/1024)), filePath)) - else: - logging.info("Not a file: %s" % event.name) + except Exception as ee: + logging.warning("Could not Connect.") + logging.error("Error was: %s" % str(ee)) + logging.warning("Trying again in 5 seconds...") + sleep(5) + + if self.exit: + return + + # check ignored + for expr in self.ignore: + if not expr.match(event.name) == None: + return - if event.mask & (fsevents.IN_MOVED_FROM|fsevents.IN_DELETE): - logging.info("removing %s" % filePath) - # Just delete it - try: - sf.remove(filePath) - except: - # Silently fail so we don't delete. - pass + # Determine file path relative to our root + filePath = event.name.replace(self.root, "") + logging.debug("Path from basedir: %s" % filePath) - """ - We can respond to: + # Apply directory mapping + for mapping in self.PATH_MAPPING: + localMapPath,remoteMapPath = mapping + if filePath[0:len(localMapPath)]==localMapPath: + logging.debug("Using mapping: %s" % (str(mapping))) + filePath = remoteMapPath + "/" + filePath[len(localMapPath):] + break - done IN_MOVED_FROM - path is old file path - done IN_MOVED_TO - path is new file path - done IN_MODIFY - file was edited - done IN_CREATE - file was created - done IN_DELETE - file was deleted - IN_ATTRIB - attributes modified - ignore for now - """ + filePath = normpath(filePath) + + # Ensure path starts with / + if filePath[0] != "/": + filePath = "/" + filePath -def signal_handler(signal, frame): - logging.info('Cleaning up....') + if event.mask & (fsevents.IN_MODIFY|fsevents.IN_CREATE|fsevents.IN_MOVED_TO): + logging.debug("\nFile was modified: %s" % event.name) + + logging.debug("Remote path: %s" % filePath) + + # Ensure directory exists + path_dirs = dirname(filePath).split("/") + + for i in range(1,len(path_dirs)): + pathSegment = "/".join(path_dirs[0:i+1]) + logging.debug("stat %s" % pathSegment) + try: + self.sf.stat(pathSegment) + except IOError as e: + logging.info("Creating %s" % pathSegment) + self.sf.mkdir(pathSegment) + + # If file, upload it + if isfile(event.name) or islink(event.name): + tries = 0 + while True: + try: + bytesSent = self.transfer_file(event.name, filePath) + break + except IOError as ioe: + logging.error("Unable to upload file: %s" % str(ioe)) + # reconnect to SSH on error + #sf.close() + #sf = getsftp(args.host, args.user, args.password) + tries+=1 + sleep(0.5) + if tries > 5: + return False + + logging.info("%s: sent %s KB to %s" % (event.name, max(1, int(bytesSent/1024)), filePath)) + else: + logging.info("Not a file: %s" % event.name) - observer.unschedule(stream) - observer.stop() + if event.mask & (fsevents.IN_MOVED_FROM|fsevents.IN_DELETE): + logging.info("removing %s" % filePath) + # Just delete it + try: + self.sf.remove(filePath) + except: + # Silently fail so we don't delete unexpected stuff + pass + + """ + We can respond to: + + done IN_MOVED_FROM - path is old file path + done IN_MOVED_TO - path is new file path + done IN_MODIFY - file was edited + done IN_CREATE - file was created + done IN_DELETE - file was deleted + IN_ATTRIB - attributes modified - ignore for now + """ + #self.sf.close() + + def signal_handler(self, signal, frame): + logging.info('Cleaning up....') + self.exit = True + self.observer.unschedule(self.stream) + self.observer.stop() if __name__ == "__main__": @@ -171,17 +220,16 @@ if __name__ == "__main__": logging.critical("At least one --map is required.") exit(1) - for mapping in args.map: - PATH_MAPPING.append(mapping.split(":")) + path_maps = [] - sf = getsftp(args.host, args.user, args.password) + for mapping in args.map: + path_maps.append(mapping.split(":")) + + + pywatch = sftpwatch(mapping=path_maps, host=args.host, user=args.user, password=args.password, rootdir=args.root) + + signal.signal(signal.SIGINT, pywatch.signal_handler) logging.info("watching %s" % args.root) - observer = Observer() - observer.start() - stream = Stream(file_event_callback, args.root, file_events=True) - observer.schedule(stream) - - signal.signal(signal.SIGINT, signal_handler) - signal.pause() + pywatch.watch()