blobsend/blobsend/client_ssh.py

252 lines
8.6 KiB
Python

import os
import paramiko
from blobsend.client_base import BaseChunkClient
from blobsend import CHUNK_SIZE, hash_chunk, FilePool
"""
ssh client
- assumes this utility (blobcopy) is installed on the remote end
"""
REMOTE_UTILITY = "/Users/dave/code/blobsend/testenv/bin/_blobsend_ssh_remote"#
class SshChunkClient(BaseChunkClient):
def __init__(self, server, username, fpath, is_src, chunk_size=CHUNK_SIZE, password=None, sshkey=None):
super().__init__(chunk_size)
self.fpath = fpath
self.ssh = paramiko.SSHClient()
self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
connect_args = dict(
hostname=server,
username=username
)
if sshkey:
print("using ssh key", sshkey)
connect_args.update(pkey=paramiko.RSAKey.from_private_key_file(sshkey))
else:
connect_args.update(password=password)
self.ssh.connect(**connect_args)
self.sftp = self.ssh.open_sftp()
# If the file doesnt exist and we are the destination, create it
if not is_src:
try:
with self.sftp.open(self.fpath, "r"):
pass
except FileNotFoundError:
with self.sftp.open(self.fpath, "wb"):
pass
def _fpool_open():
sftp = self.ssh.open_sftp()
f = sftp.open(self.fpath, "r+")
f.seek(0)
return f
self.fpool = FilePool(8, _fpool_open)
def get_hashes(self):
stdin, stdout, stderr = self.ssh.exec_command("{} chunks {}".format(REMOTE_UTILITY, self.fpath))#TODO safe arg escapes
stdin.close()
for line in iter(lambda: stdout.readline(1024), ""):
chunk_number, chunk_hash = line.strip().split(" ")
yield (int(chunk_number), chunk_hash, )
exit = stdout.channel.recv_exit_status()
if exit != 0:
raise Exception("hash command exit code was {}: {}".format(exit, stderr.read()))
def get_chunk(self, chunk_number):
position = chunk_number * self.chunk_size
if position > self.get_length():
raise Exception("requested chunk {} is beyond EOF".format(chunk_number))
with self.fpool.get() as f:
f.seek(position)
return f.read(self.chunk_size)
def put_chunk(self, chunk_number, contents):
position = chunk_number * self.chunk_size
with self.fpool.get() as f:
f.seek(position)
f.write(contents)
def get_length(self):
with self.fpool.get() as f:
f.seek(0, 2) # seek to end
return f.tell()
def set_length(self, length):
if length < self.get_length():
with self.fpool.get() as f:
f.truncate(length)
# do nothing for the case of extending the file
# put_chunk handles it
def close(self):
self.fpool.close()
@staticmethod
def from_uri(uri, extra_args, is_src):
"""
instantiate a client from the given uri
"""
return SshChunkClient(uri.hostname, uri.username, uri.path, is_src, password=uri.password, sshkey=extra_args.get("sshkey"))
class SshChunkClientParallelConnections(BaseChunkClient):
def __init__(self, server, username, fpath, is_src, chunk_size=CHUNK_SIZE, password=None, sshkey=None):
super().__init__(chunk_size)
self.fpath = fpath
self.ssh = paramiko.SSHClient()
self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
connect_args = dict(
hostname=server,
username=username
)
if sshkey:
print("using ssh key", sshkey)
connect_args.update(pkey=paramiko.RSAKey.from_private_key_file(sshkey))
else:
connect_args.update(password=password)
self.ssh.connect(**connect_args)
self.sftp = self.ssh.open_sftp()
# If the file doesnt exist and we are the destination, create it
if not is_src:
try:
with self.sftp.open(self.fpath, "r"):
pass
except FileNotFoundError:
with self.sftp.open(self.fpath, "wb"):
pass
def _fpool_open():
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(**connect_args)
sftp = ssh.open_sftp()
f = sftp.open(self.fpath, "r+")
f.seek(0)
return f
self.fpool = FilePool(8, _fpool_open)
def get_hashes(self):
stdin, stdout, stderr = self.ssh.exec_command("{} chunks {}".format(REMOTE_UTILITY, self.fpath))#TODO safe arg escapes
stdin.close()
for line in iter(lambda: stdout.readline(1024), ""):
chunk_number, chunk_hash = line.strip().split(" ")
yield (int(chunk_number), chunk_hash, )
exit = stdout.channel.recv_exit_status()
if exit != 0:
raise Exception("hash command exit code was {}: {}".format(exit, stderr.read()))
def get_chunk(self, chunk_number):
position = chunk_number * self.chunk_size
if position > self.get_length():
raise Exception("requested chunk {} is beyond EOF".format(chunk_number))
with self.fpool.get() as f:
f.seek(position)
return f.read(self.chunk_size)
def put_chunk(self, chunk_number, contents):
position = chunk_number * self.chunk_size
with self.fpool.get() as f:
f.seek(position)
f.write(contents)
def get_length(self):
with self.fpool.get() as f:
f.seek(0, 2) # seek to end
return f.tell()
def set_length(self, length):
if length < self.get_length():
with self.fpool.get() as f:
f.truncate(length)
# do nothing for the case of extending the file
# put_chunk handles it
def close(self):
self.fpool.close()
@staticmethod
def from_uri(uri, extra_args, is_src):
"""
instantiate a client from the given uri
"""
return SshChunkClientParallelConnections(uri.hostname, uri.username, uri.path, is_src, password=uri.password, sshkey=extra_args.get("sshkey"))
class SshChunkClientOld(BaseChunkClient):
def __init__(self, server, username, password, fpath, is_src, chunk_size=CHUNK_SIZE):
super().__init__(chunk_size)
self.fpath = fpath
self.ssh = paramiko.SSHClient()
self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.ssh.connect(hostname=server,
username=username,
password=password)
self.sftp = self.ssh.open_sftp()
# If the file doesnt exist and we are the destination, create it
if not is_src:
try:
with self.sftp.open(self.fpath, "r"):
pass
except FileNotFoundError:
with self.sftp.open(self.fpath, "wb") as f:
pass
self.file = self.sftp.open(self.fpath, "r+")
def get_hashes(self):
stdin, stdout, stderr = self.ssh.exec_command("{} chunks {}".format(REMOTE_UTILITY, self.fpath))#TODO safe arg escapes
stdin.close()
for line in iter(lambda: stdout.readline(1024), ""):
chunk_number, chunk_hash = line.strip().split(" ")
yield (int(chunk_number), chunk_hash, )
exit = stdout.channel.recv_exit_status()
if exit != 0:
raise Exception("hash command exit code was {}: {}".format(exit, stderr.read()))
def get_chunk(self, chunk_number):
position = chunk_number * self.chunk_size
if position > self.get_length():
raise Exception("requested chunk {} is beyond EOF".format(chunk_number))
self.file.seek(position)#TODO not thread safe
return self.file.read(self.chunk_size)
def put_chunk(self, chunk_number, contents):
position = chunk_number * self.chunk_size
self.file.seek(position)
self.file.write(contents)
def get_length(self):
self.file.seek(0, 2) # seek to end
return self.file.tell()
def set_length(self, length):
if length < self.get_length():
with self.fpool.get() as f:
f.truncate(length)
# do nothing for the case of extending the file
# put_chunk handles it
def close(self):
self.file.close()
@staticmethod
def from_uri(uri, is_src):
"""
instantiate a client from the given uri
"""
return SshChunkClientOld(uri.hostname, uri.username, uri.password, uri.path, is_src)