Ticket #1026: chunked-file-transfers.patch
File chunked-file-transfers.patch, 16.2 KB (added by , 5 years ago) |
---|
-
xpra/client/client_base.py
283 283 }) 284 284 285 285 def init_authenticated_packet_handlers(self): 286 self.set_packet_handlers(self._packet_handlers, {"send-file" : self._process_send_file}) 286 self.set_packet_handlers(self._packet_handlers, { 287 "send-file" : self._process_send_file, 288 "ack-file-chunk" : self._process_ack_file_chunk, 289 "send-file-chunk" : self._process_send_file_chunk, 290 }) 287 291 288 292 289 293 def init_aliases(self): -
xpra/net/file_transfer.py
8 8 import time 9 9 import subprocess, shlex 10 10 import hashlib 11 import uuid 11 12 12 13 from xpra.log import Logger 13 14 printlog = Logger("printing") … … 14 15 filelog = Logger("file") 15 16 16 17 from xpra.child_reaper import getChildReaper 17 from xpra.util import typedict, csv 18 from xpra.util import typedict, csv, nonl 18 19 from xpra.simple_stats import std_unit 19 20 20 21 DELETE_PRINTER_FILE = os.environ.get("XPRA_DELETE_PRINTER_FILE", "1")=="1" 22 FILE_CHUNKS_SIZE = max(0, int(os.environ.get("XPRA_FILE_CHUNKS_SIZE", "1024"))) 23 MAX_CONCURRENT_FILES = 10 24 CHUNK_TIMEOUT = 10*1000 21 25 22 26 MIMETYPE_EXTS = { 23 27 "application/postscript" : "ps", … … 51 55 except: 52 56 pass 53 57 fd = os.open(filename, flags) 58 filelog("using filename '%s'", filename) 54 59 return filename, fd 55 60 56 61 … … 69 74 #printing and file transfer: 70 75 self.file_transfer = file_transfer 71 76 self.file_size_limit = file_size_limit 77 self.file_chunks = min(self.file_size_limit*1024*1024, FILE_CHUNKS_SIZE) 72 78 self.printing = printing 73 79 self.open_files = open_files 74 80 self.open_command = open_command … … 78 84 return { 79 85 "file-transfer" : self.file_transfer, 80 86 "file-size-limit" : self.file_size_limit, 87 "file-chunks" : self.file_chunks, 81 88 "open-files" : self.open_files, 82 89 "printing" : self.printing, 83 90 } … … 104 111 self.remote_printing = False 105 112 self.remote_open_files = False 106 113 self.remote_file_size_limit = 0 114 self.remote_file_chunks = 0 115 self.send_chunks_in_progress = {} 116 self.receive_chunks_in_progress = {} 117 if not getattr(self, "timeout_add", None): 118 from xpra.gtk_common.gobject_compat import import_glib 119 glib = import_glib() 120 self.timeout_add = glib.timeout_add 107 121 108 122 def parse_file_transfer_caps(self, c): 109 123 self.remote_file_transfer = c.boolget("file-transfer") … … 110 124 self.remote_printing = c.boolget("printing") 111 125 self.remote_open_files = c.boolget("open-files") 112 126 self.remote_file_size_limit = c.boolget("file-size-limit") 127 self.remote_file_chunks = max(0, min(self.remote_file_size_limit*1024*1024, c.intget("file-chunks"))) 113 128 114 129 def get_info(self): 115 130 info = FileTransferAttributes.get_info(self) 116 131 info["remote"] = { 117 132 "file-transfer" : self.remote_file_transfer, 133 "file-size-limit" : self.remote_file_size_limit, 134 "file-chunks" : self.remote_file_chunks, 135 "open-files" : self.remote_open_files, 118 136 "printing" : self.remote_printing, 119 "open-files" : self.remote_open_files,120 "file-size-limit" : self.remote_file_size_limit,121 137 } 122 138 return info 123 139 140 def check_digest(self, filename, digest, expected_digest, algo="sha1"): 141 filelog("%s digest: %s - expected: %s", algo, digest, expected_digest) 142 if digest!=expected_digest: 143 filelog.error("Error: data does not match, invalid %s file digest for '%s'", algo, filename) 144 filelog.error(" received %s, expected %s", digest, expected_digest) 145 raise Exception("failed %s digest verification" % algo) 124 146 147 148 def _check_chunk_receiving(self, chunk_id, chunk_no): 149 chunk_state = self.receive_chunks_in_progress.get(chunk_id) 150 filelog("_check_chunk_receiving(%s, %s) chunk_state=%s", chunk_id, chunk_no, chunk_state) 151 if chunk_state[-1]==0: 152 filelog.error("Error: chunked file transfer timed out") 153 del self.receive_chunks_in_progress[chunk_id] 154 155 def _process_send_file_chunk(self, packet): 156 chunk_id, chunk, file_data, has_more = packet[1:5] 157 filelog("_process_send_file_chunk%s", (chunk_id, chunk, "%i bytes" % len(file_data), has_more)) 158 chunk_state = self.receive_chunks_in_progress.get(chunk_id) 159 if not chunk_state: 160 filelog.error("Error: cannot find the file transfer id '%s'", nonl(chunk_id)) 161 self.send("ack-file-chunk", chunk_id, False, "file transfer id not found", chunk) 162 return 163 fd = chunk_state[1] 164 if chunk_state[-1]+1!=chunk: 165 filelog.error("Error: chunk number mismatch, expected %i but got %i", chunk_state[-1]+1, chunk) 166 self.send("ack-file-chunk", chunk_id, False, "chunk number mismatch", chunk) 167 del self.receive_chunks_in_progress[chunk_id] 168 os.close(fd) 169 return 170 #update chunk number: 171 chunk_state[-1] = chunk 172 digest = chunk_state[8] 173 written = chunk_state[9] 174 try: 175 os.write(fd, file_data) 176 digest.update(file_data) 177 written += len(file_data) 178 chunk_state[9] = written 179 except OSError as e: 180 filelog.error("Error: cannot write file chunk") 181 filelog.error(" %s", e) 182 self.send("ack-file-chunk", chunk_id, False, "write error: %s" % e, chunk) 183 try: 184 os.close(fd) 185 except: 186 pass 187 return 188 self.send("ack-file-chunk", chunk_id, True, "", chunk) 189 if has_more: 190 #remote end will send more after receiving the ack 191 self.timeout_add(CHUNK_TIMEOUT, self._check_chunk_receiving, chunk_id, chunk) 192 return 193 os.close(fd) 194 #check file size and digest then process it: 195 filename, mimetype, printit, openit, filesize, options = chunk_state[2:8] 196 if written!=filesize: 197 filelog.error("Error: expected a file of %i bytes, got %i", filesize, written) 198 return 199 expected_digest = options.get("sha1") 200 if expected_digest: 201 self.check_digest(filename, digest.hexdigest(), expected_digest) 202 start_time = chunk_state[0] 203 filelog("%i bytes received in %i chunks, took %ims", filesize, chunk, (time.time()-start_time)*1000) 204 self.do_process_downloaded_file(filename, mimetype, printit, openit, filesize, options) 205 125 206 def _process_send_file(self, packet): 126 207 #the remote end is sending us a file 127 basefilename, mimetype, printit, openit, filesize, file_data, options = packet[1: 11]208 basefilename, mimetype, printit, openit, filesize, file_data, options = packet[1:8] 128 209 options = typedict(options) 129 210 if printit: 130 211 l = printlog … … 132 213 else: 133 214 l = filelog 134 215 assert self.file_transfer 135 l("receiv edfile: %s", [basefilename, mimetype, printit, openit, filesize, "%s bytes" % len(file_data), options])216 l("receiving file: %s", [basefilename, mimetype, printit, openit, filesize, "%s bytes" % len(file_data), options]) 136 217 assert filesize>0, "invalid file size: %s" % filesize 137 218 if filesize>self.file_size_limit*1024*1024: 138 219 l.error("Error: file '%s' is too large:", basefilename) 139 220 l.error(" %iMB, the file size limit is %iMB", filesize//1024//1024, self.file_size_limit) 140 221 return 222 filename, fd = safe_open_download_file(basefilename, mimetype) 223 chunk_id = options.get("file-chunk-id") 224 if chunk_id: 225 if len(self.receive_chunks_in_progress)>=MAX_CONCURRENT_FILES: 226 self.send("ack-file-chunk", chunk_id, False, "too many file transfers in progress", 0) 227 os.close(fd) 228 return 229 digest = hashlib.sha1() 230 chunk = 0 231 chunk_state = [time.time(), fd, filename, mimetype, printit, openit, filesize, options, digest, 0, chunk] 232 self.receive_chunks_in_progress[chunk_id] = chunk_state 233 self.send("ack-file-chunk", chunk_id, True, "", chunk) 234 self.timeout_add(CHUNK_TIMEOUT, self._check_chunk_receiving, chunk_id, chunk) 235 return 236 #not chunked, full file: 141 237 assert file_data, "no data!" 142 238 if len(file_data)!=filesize: 143 239 l.error("Error: invalid data size for file '%s'", basefilename) … … 150 246 u = libfn() 151 247 u.update(file_data) 152 248 l("%s digest: %s - expected: %s", algo, u.hexdigest(), digest) 153 if digest!=u.hexdigest(): 154 l.error("Error: data does not match, invalid %s file digest for '%s'", algo, basefilename) 155 l.error(" received %s, expected %s", u.hexdigest(), digest) 156 raise Exception("invalid %s digest" % algo) 249 self.check_digest(basefilename, u.hexdigest(), digest, algo) 157 250 check_digest("sha1", hashlib.sha1) 158 251 check_digest("md5", hashlib.md5) 159 160 filename, fd = safe_open_download_file(basefilename, mimetype)161 252 try: 162 253 os.write(fd, file_data) 163 254 finally: 164 255 os.close(fd) 165 l.info("downloaded %s bytes to %s file%s:", filesize, (mimetype or "temporary"), ["", " for printing"][int(printit)]) 166 l.info(" '%s'", filename) 256 self.do_process_downloaded_file(filename, mimetype, printit, openit, filesize, options) 257 258 def do_process_downloaded_file(self, filename, mimetype, printit, openit, filesize, options): 259 filelog.info("downloaded %s bytes to %s file%s:", filesize, (mimetype or "temporary"), ["", " for printing"][int(printit)]) 260 filelog.info(" '%s'", filename) 167 261 if printit: 168 262 self._print_file(filename, mimetype, options) 169 263 return … … 269 363 l("send_file%s", (filename, mimetype, type(data), "%i bytes" % filesize, printit, openit, options)) 270 364 absfile = os.path.abspath(filename) 271 365 basefilename = os.path.basename(filename) 272 cdata = self.compressed_wrapper("file-data", data)273 assert len(cdata)<=filesize #compressed wrapper ensures this is true274 366 def sizewarn(location="local"): 275 367 filelog.warn("Warning: cannot %s the file '%s'", action, basefilename) 276 368 filelog.warn(" this file is too large: %sB", std_unit(filesize, unit=1024)) … … 286 378 u.update(data) 287 379 filelog("sha1 digest(%s)=%s", absfile, u.hexdigest()) 288 380 options["sha1"] = u.hexdigest() 381 chunk_size = min(self.file_chunks, self.remote_file_chunks) 382 if chunk_size>0 and filesize>chunk_size: 383 if len(self.send_chunks_in_progress)>=MAX_CONCURRENT_FILES: 384 raise Exception("too many file transfers in progress") 385 #chunking is supported and the file is big enough 386 chunk_id = uuid.uuid4().hex 387 options["file-chunk-id"] = chunk_id 388 chunk_state = [time.time(), data, chunk_size, 0] 389 self.send_chunks_in_progress[chunk_id] = chunk_state 390 cdata = "" 391 #timer to check that the other end is requesting more chunks: 392 self.timeout_add(CHUNK_TIMEOUT, self._check_chunk_sending, chunk_id, 0) 393 else: 394 #send everything now: 395 cdata = self.compressed_wrapper("file-data", data) 396 assert len(cdata)<=filesize #compressed wrapper ensures this is true 289 397 self.send("send-file", basefilename, mimetype, printit, openit, filesize, cdata, options) 290 398 return True 399 400 def _check_chunk_sending(self, chunk_id, chunk_no): 401 chunk_state = self.send_chunks_in_progress.get(chunk_id) 402 filelog("_check_chunk_sending(%s, %s) chunk_state found: %s", chunk_id, chunk_no, bool(chunk_state)) 403 if chunk_state and chunk_state[3]==chunk_no: 404 filelog.error("Error: chunked file transfer timed out on chunk %i", chunk_no) 405 del self.send_chunks_in_progress[chunk_id] 406 407 def _process_ack_file_chunk(self, packet): 408 #the other end received our send-file or send-file-chunk, 409 #send some more file data 410 filelog("%s", packet) 411 chunk_id, state, error_message, chunk = packet[1:5] 412 if not state: 413 filelog.error("Error: remote end is cancelling the file transfer:") 414 filelog.error(" %s", error_message) 415 del self.send_chunks_in_progress[chunk_id] 416 return 417 chunk_state = self.send_chunks_in_progress.get(chunk_id) 418 if not chunk_state: 419 filelog.error("Error: cannot find the file transfer id '%s'", nonl(chunk_id)) 420 return 421 if chunk_state[-1]!=chunk: 422 filelog.error("Error: chunk number mismatch (%i vs %i)", chunk_state, chunk) 423 return 424 start_time, data, chunk_size, chunk = chunk_state 425 if not data: 426 #all sent! 427 filelog("%i chunks of %i bytes sent in %ims", chunk, chunk_size, (time.time()-start_time)*1000) 428 del self.send_chunks_in_progress[chunk_id] 429 return 430 assert chunk_size>0 431 #carve out another chunk: 432 cdata = self.compressed_wrapper("file-data", data[:chunk_size]) 433 data = data[chunk_size:] 434 chunk += 1 435 self.send_chunks_in_progress[chunk_id] = start_time, data, chunk_size, chunk 436 self.timeout_add(CHUNK_TIMEOUT, self._check_chunk_sending, chunk_id, chunk) 437 self.send("send-file-chunk", chunk_id, chunk, cdata, bool(data)) -
xpra/server/server_base.py
660 660 "command_request": self._process_command_request, 661 661 "printers": self._process_printers, 662 662 "send-file": self._process_send_file, 663 "ack-file-chunk": self._process_ack_file_chunk, 664 "send-file-chunk": self._process_send_file_chunk, 663 665 "webcam-start": self._process_webcam_start, 664 666 "webcam-stop": self._process_webcam_stop, 665 667 "webcam-frame": self._process_webcam_frame, … … 1703 1705 def _process_send_file(self, proto, packet): 1704 1706 ss = self._server_sources.get(proto) 1705 1707 if not ss: 1706 log.warn("Warning: invalid client source for file packet")1708 log.warn("Warning: invalid client source for send-file packet") 1707 1709 return 1708 1710 ss._process_send_file(packet) 1709 1711 1712 def _process_ack_file_chunk(self, proto, packet): 1713 ss = self._server_sources.get(proto) 1714 if not ss: 1715 log.warn("Warning: invalid client source for ack-file-chunk packet") 1716 return 1717 ss._process_ack_file_chunk(packet) 1718 1719 def _process_send_file_chunk(self, proto, packet): 1720 ss = self._server_sources.get(proto) 1721 if not ss: 1722 log.warn("Warning: invalid client source for send-file-chunk packet") 1723 return 1724 ss._process_send_file_chunk(packet) 1725 1710 1726 def _process_print(self, proto, packet): 1711 1727 #ie: from the xpraforwarder we call this command: 1712 1728 #command = ["xpra", "print", "socket:/path/tosocket", filename, mimetype, source, title, printer, no_copies, print_options]