xpra icon
Bug tracker and wiki

This bug tracker and wiki are being discontinued
please use https://github.com/Xpra-org/xpra instead.


Ticket #1026: chunked-file-transfers.patch

File chunked-file-transfers.patch, 16.2 KB (added by Antoine Martin, 5 years ago)

mostly complete patch, just needs cleaning up and cancelling the timers

  • xpra/client/client_base.py

     
    283283            })
    284284
    285285    def init_authenticated_packet_handlers(self):
    286         self.set_packet_handlers(self._packet_handlers, {"send-file" : self._process_send_file})
     286        self.set_packet_handlers(self._packet_handlers, {
     287                                                         "send-file"        : self._process_send_file,
     288                                                         "ack-file-chunk"   : self._process_ack_file_chunk,
     289                                                         "send-file-chunk"  : self._process_send_file_chunk,
     290                                                         })
    287291
    288292
    289293    def init_aliases(self):
  • xpra/net/file_transfer.py

     
    88import time
    99import subprocess, shlex
    1010import hashlib
     11import uuid
    1112
    1213from xpra.log import Logger
    1314printlog = Logger("printing")
     
    1415filelog = Logger("file")
    1516
    1617from xpra.child_reaper import getChildReaper
    17 from xpra.util import typedict, csv
     18from xpra.util import typedict, csv, nonl
    1819from xpra.simple_stats import std_unit
    1920
    2021DELETE_PRINTER_FILE = os.environ.get("XPRA_DELETE_PRINTER_FILE", "1")=="1"
     22FILE_CHUNKS_SIZE = max(0, int(os.environ.get("XPRA_FILE_CHUNKS_SIZE", "1024")))
     23MAX_CONCURRENT_FILES = 10
     24CHUNK_TIMEOUT = 10*1000
    2125
    2226MIMETYPE_EXTS = {
    2327                 "application/postscript"   : "ps",
     
    5155    except:
    5256        pass
    5357    fd = os.open(filename, flags)
     58    filelog("using filename '%s'", filename)
    5459    return filename, fd
    5560
    5661
     
    6974        #printing and file transfer:
    7075        self.file_transfer = file_transfer
    7176        self.file_size_limit = file_size_limit
     77        self.file_chunks = min(self.file_size_limit*1024*1024, FILE_CHUNKS_SIZE)
    7278        self.printing = printing
    7379        self.open_files = open_files
    7480        self.open_command = open_command
     
    7884        return {
    7985                "file-transfer"     : self.file_transfer,
    8086                "file-size-limit"   : self.file_size_limit,
     87                "file-chunks"       : self.file_chunks,
    8188                "open-files"        : self.open_files,
    8289                "printing"          : self.printing,
    8390                }
     
    104111        self.remote_printing = False
    105112        self.remote_open_files = False
    106113        self.remote_file_size_limit = 0
     114        self.remote_file_chunks = 0
     115        self.send_chunks_in_progress = {}
     116        self.receive_chunks_in_progress = {}
     117        if not getattr(self, "timeout_add", None):
     118            from xpra.gtk_common.gobject_compat import import_glib
     119            glib = import_glib()
     120            self.timeout_add = glib.timeout_add
    107121
    108122    def parse_file_transfer_caps(self, c):
    109123        self.remote_file_transfer = c.boolget("file-transfer")
     
    110124        self.remote_printing = c.boolget("printing")
    111125        self.remote_open_files = c.boolget("open-files")
    112126        self.remote_file_size_limit = c.boolget("file-size-limit")
     127        self.remote_file_chunks = max(0, min(self.remote_file_size_limit*1024*1024, c.intget("file-chunks")))
    113128
    114129    def get_info(self):
    115130        info = FileTransferAttributes.get_info(self)
    116131        info["remote"] = {
    117132                          "file-transfer"   : self.remote_file_transfer,
     133                          "file-size-limit" : self.remote_file_size_limit,
     134                          "file-chunks"     : self.remote_file_chunks,
     135                          "open-files"      : self.remote_open_files,
    118136                          "printing"        : self.remote_printing,
    119                           "open-files"      : self.remote_open_files,
    120                           "file-size-limit" : self.remote_file_size_limit,
    121137                          }
    122138        return info
    123139
     140    def check_digest(self, filename, digest, expected_digest, algo="sha1"):
     141        filelog("%s digest: %s - expected: %s", algo, digest, expected_digest)
     142        if digest!=expected_digest:
     143            filelog.error("Error: data does not match, invalid %s file digest for '%s'", algo, filename)
     144            filelog.error(" received %s, expected %s", digest, expected_digest)
     145            raise Exception("failed %s digest verification" % algo)
    124146
     147
     148    def _check_chunk_receiving(self, chunk_id, chunk_no):
     149        chunk_state = self.receive_chunks_in_progress.get(chunk_id)
     150        filelog("_check_chunk_receiving(%s, %s) chunk_state=%s", chunk_id, chunk_no, chunk_state)
     151        if chunk_state[-1]==0:
     152            filelog.error("Error: chunked file transfer timed out")
     153            del self.receive_chunks_in_progress[chunk_id]
     154
     155    def _process_send_file_chunk(self, packet):
     156        chunk_id, chunk, file_data, has_more = packet[1:5]
     157        filelog("_process_send_file_chunk%s", (chunk_id, chunk, "%i bytes" % len(file_data), has_more))
     158        chunk_state = self.receive_chunks_in_progress.get(chunk_id)
     159        if not chunk_state:
     160            filelog.error("Error: cannot find the file transfer id '%s'", nonl(chunk_id))
     161            self.send("ack-file-chunk", chunk_id, False, "file transfer id not found", chunk)
     162            return
     163        fd = chunk_state[1]
     164        if chunk_state[-1]+1!=chunk:
     165            filelog.error("Error: chunk number mismatch, expected %i but got %i", chunk_state[-1]+1, chunk)
     166            self.send("ack-file-chunk", chunk_id, False, "chunk number mismatch", chunk)
     167            del self.receive_chunks_in_progress[chunk_id]
     168            os.close(fd)
     169            return
     170        #update chunk number:
     171        chunk_state[-1] = chunk
     172        digest = chunk_state[8]
     173        written = chunk_state[9]
     174        try:
     175            os.write(fd, file_data)
     176            digest.update(file_data)
     177            written += len(file_data)
     178            chunk_state[9] = written
     179        except OSError as e:
     180            filelog.error("Error: cannot write file chunk")
     181            filelog.error(" %s", e)
     182            self.send("ack-file-chunk", chunk_id, False, "write error: %s" % e, chunk)
     183            try:
     184                os.close(fd)
     185            except:
     186                pass
     187            return
     188        self.send("ack-file-chunk", chunk_id, True, "", chunk)
     189        if has_more:
     190            #remote end will send more after receiving the ack
     191            self.timeout_add(CHUNK_TIMEOUT, self._check_chunk_receiving, chunk_id, chunk)
     192            return
     193        os.close(fd)
     194        #check file size and digest then process it:
     195        filename, mimetype, printit, openit, filesize, options = chunk_state[2:8]
     196        if written!=filesize:
     197            filelog.error("Error: expected a file of %i bytes, got %i", filesize, written)
     198            return
     199        expected_digest = options.get("sha1")
     200        if expected_digest:
     201            self.check_digest(filename, digest.hexdigest(), expected_digest)
     202        start_time = chunk_state[0]
     203        filelog("%i bytes received in %i chunks, took %ims", filesize, chunk, (time.time()-start_time)*1000)
     204        self.do_process_downloaded_file(filename, mimetype, printit, openit, filesize, options)
     205
    125206    def _process_send_file(self, packet):
    126207        #the remote end is sending us a file
    127         basefilename, mimetype, printit, openit, filesize, file_data, options = packet[1:11]
     208        basefilename, mimetype, printit, openit, filesize, file_data, options = packet[1:8]
    128209        options = typedict(options)
    129210        if printit:
    130211            l = printlog
     
    132213        else:
    133214            l = filelog
    134215            assert self.file_transfer
    135         l("received file: %s", [basefilename, mimetype, printit, openit, filesize, "%s bytes" % len(file_data), options])
     216        l("receiving file: %s", [basefilename, mimetype, printit, openit, filesize, "%s bytes" % len(file_data), options])
    136217        assert filesize>0, "invalid file size: %s" % filesize
    137218        if filesize>self.file_size_limit*1024*1024:
    138219            l.error("Error: file '%s' is too large:", basefilename)
    139220            l.error(" %iMB, the file size limit is %iMB", filesize//1024//1024, self.file_size_limit)
    140221            return
     222        filename, fd = safe_open_download_file(basefilename, mimetype)
     223        chunk_id = options.get("file-chunk-id")
     224        if chunk_id:
     225            if len(self.receive_chunks_in_progress)>=MAX_CONCURRENT_FILES:
     226                self.send("ack-file-chunk", chunk_id, False, "too many file transfers in progress", 0)
     227                os.close(fd)
     228                return
     229            digest = hashlib.sha1()
     230            chunk = 0
     231            chunk_state = [time.time(), fd, filename, mimetype, printit, openit, filesize, options, digest, 0, chunk]
     232            self.receive_chunks_in_progress[chunk_id] = chunk_state
     233            self.send("ack-file-chunk", chunk_id, True, "", chunk)
     234            self.timeout_add(CHUNK_TIMEOUT, self._check_chunk_receiving, chunk_id, chunk)
     235            return
     236        #not chunked, full file:
    141237        assert file_data, "no data!"
    142238        if len(file_data)!=filesize:
    143239            l.error("Error: invalid data size for file '%s'", basefilename)
     
    150246                u = libfn()
    151247                u.update(file_data)
    152248                l("%s digest: %s - expected: %s", algo, u.hexdigest(), digest)
    153                 if digest!=u.hexdigest():
    154                     l.error("Error: data does not match, invalid %s file digest for '%s'", algo, basefilename)
    155                     l.error(" received %s, expected %s", u.hexdigest(), digest)
    156                     raise Exception("invalid %s digest" % algo)
     249                self.check_digest(basefilename, u.hexdigest(), digest, algo)
    157250        check_digest("sha1", hashlib.sha1)
    158251        check_digest("md5", hashlib.md5)
    159 
    160         filename, fd = safe_open_download_file(basefilename, mimetype)
    161252        try:
    162253            os.write(fd, file_data)
    163254        finally:
    164255            os.close(fd)
    165         l.info("downloaded %s bytes to %s file%s:", filesize, (mimetype or "temporary"), ["", " for printing"][int(printit)])
    166         l.info(" '%s'", filename)
     256        self.do_process_downloaded_file(filename, mimetype, printit, openit, filesize, options)
     257
     258    def do_process_downloaded_file(self, filename, mimetype, printit, openit, filesize, options):
     259        filelog.info("downloaded %s bytes to %s file%s:", filesize, (mimetype or "temporary"), ["", " for printing"][int(printit)])
     260        filelog.info(" '%s'", filename)
    167261        if printit:
    168262            self._print_file(filename, mimetype, options)
    169263            return
     
    269363        l("send_file%s", (filename, mimetype, type(data), "%i bytes" % filesize, printit, openit, options))
    270364        absfile = os.path.abspath(filename)
    271365        basefilename = os.path.basename(filename)
    272         cdata = self.compressed_wrapper("file-data", data)
    273         assert len(cdata)<=filesize     #compressed wrapper ensures this is true
    274366        def sizewarn(location="local"):
    275367            filelog.warn("Warning: cannot %s the file '%s'", action, basefilename)
    276368            filelog.warn(" this file is too large: %sB", std_unit(filesize, unit=1024))
     
    286378        u.update(data)
    287379        filelog("sha1 digest(%s)=%s", absfile, u.hexdigest())
    288380        options["sha1"] = u.hexdigest()
     381        chunk_size = min(self.file_chunks, self.remote_file_chunks)
     382        if chunk_size>0 and filesize>chunk_size:
     383            if len(self.send_chunks_in_progress)>=MAX_CONCURRENT_FILES:
     384                raise Exception("too many file transfers in progress")
     385            #chunking is supported and the file is big enough
     386            chunk_id = uuid.uuid4().hex
     387            options["file-chunk-id"] = chunk_id
     388            chunk_state = [time.time(), data, chunk_size, 0]
     389            self.send_chunks_in_progress[chunk_id] = chunk_state
     390            cdata = ""
     391            #timer to check that the other end is requesting more chunks:
     392            self.timeout_add(CHUNK_TIMEOUT, self._check_chunk_sending, chunk_id, 0)
     393        else:
     394            #send everything now:
     395            cdata = self.compressed_wrapper("file-data", data)
     396            assert len(cdata)<=filesize     #compressed wrapper ensures this is true
    289397        self.send("send-file", basefilename, mimetype, printit, openit, filesize, cdata, options)
    290398        return True
     399
     400    def _check_chunk_sending(self, chunk_id, chunk_no):
     401        chunk_state = self.send_chunks_in_progress.get(chunk_id)
     402        filelog("_check_chunk_sending(%s, %s) chunk_state found: %s", chunk_id, chunk_no, bool(chunk_state))
     403        if chunk_state and chunk_state[3]==chunk_no:
     404            filelog.error("Error: chunked file transfer timed out on chunk %i", chunk_no)
     405            del self.send_chunks_in_progress[chunk_id]
     406
     407    def _process_ack_file_chunk(self, packet):
     408        #the other end received our send-file or send-file-chunk,
     409        #send some more file data
     410        filelog("%s", packet)
     411        chunk_id, state, error_message, chunk = packet[1:5]
     412        if not state:
     413            filelog.error("Error: remote end is cancelling the file transfer:")
     414            filelog.error(" %s", error_message)
     415            del self.send_chunks_in_progress[chunk_id]
     416            return
     417        chunk_state = self.send_chunks_in_progress.get(chunk_id)
     418        if not chunk_state:
     419            filelog.error("Error: cannot find the file transfer id '%s'", nonl(chunk_id))
     420            return
     421        if chunk_state[-1]!=chunk:
     422            filelog.error("Error: chunk number mismatch (%i vs %i)", chunk_state, chunk)
     423            return
     424        start_time, data, chunk_size, chunk = chunk_state
     425        if not data:
     426            #all sent!
     427            filelog("%i chunks of %i bytes sent in %ims", chunk, chunk_size, (time.time()-start_time)*1000)
     428            del self.send_chunks_in_progress[chunk_id]
     429            return
     430        assert chunk_size>0
     431        #carve out another chunk:
     432        cdata = self.compressed_wrapper("file-data", data[:chunk_size])
     433        data = data[chunk_size:]
     434        chunk += 1
     435        self.send_chunks_in_progress[chunk_id] = start_time, data, chunk_size, chunk
     436        self.timeout_add(CHUNK_TIMEOUT, self._check_chunk_sending, chunk_id, chunk)
     437        self.send("send-file-chunk", chunk_id, chunk, cdata, bool(data))
  • xpra/server/server_base.py

     
    660660            "command_request":                      self._process_command_request,
    661661            "printers":                             self._process_printers,
    662662            "send-file":                            self._process_send_file,
     663            "ack-file-chunk":                       self._process_ack_file_chunk,
     664            "send-file-chunk":                      self._process_send_file_chunk,
    663665            "webcam-start":                         self._process_webcam_start,
    664666            "webcam-stop":                          self._process_webcam_stop,
    665667            "webcam-frame":                         self._process_webcam_frame,
     
    17031705    def _process_send_file(self, proto, packet):
    17041706        ss = self._server_sources.get(proto)
    17051707        if not ss:
    1706             log.warn("Warning: invalid client source for file packet")
     1708            log.warn("Warning: invalid client source for send-file packet")
    17071709            return
    17081710        ss._process_send_file(packet)
    17091711
     1712    def _process_ack_file_chunk(self, proto, packet):
     1713        ss = self._server_sources.get(proto)
     1714        if not ss:
     1715            log.warn("Warning: invalid client source for ack-file-chunk packet")
     1716            return
     1717        ss._process_ack_file_chunk(packet)
     1718
     1719    def _process_send_file_chunk(self, proto, packet):
     1720        ss = self._server_sources.get(proto)
     1721        if not ss:
     1722            log.warn("Warning: invalid client source for send-file-chunk packet")
     1723            return
     1724        ss._process_send_file_chunk(packet)
     1725
    17101726    def _process_print(self, proto, packet):
    17111727        #ie: from the xpraforwarder we call this command:
    17121728        #command = ["xpra", "print", "socket:/path/tosocket", filename, mimetype, source, title, printer, no_copies, print_options]