xpra icon
Bug tracker and wiki

This bug tracker and wiki are being discontinued
please use https://github.com/Xpra-org/xpra instead.


Ticket #1424: html5-zerocopy.patch

File html5-zerocopy.patch, 7.9 KB (added by Antoine Martin, 4 years ago)

implement zero copy in network layer - only aggregate chunks if we have to, do it using fast set method

  • html5/js/Protocol.js

     
    9191        this.cipher_out = null;
    9292        this.mode = 'binary';   // Current WebSocket mode: 'binary', 'base64'
    9393        this.rQ = [];                   // Receive queue
    94         this.rQi = 0;                   // Receive queue index
    95         this.rQmax = 10000;             // Max receive queue size before compacting
    9694        this.sQ = [];                   // Send queue
    9795        this.mQ = [];                   // Worker message queue
     96        this.header = [];
    9897
    9998        //Queue processing via intervals
    10099        this.process_interval = 4;  //milliseconds
     
    107106        var me = this;
    108107        // init
    109108        this.rQ          = [];
    110         this.rQi                = 0;
    111109        this.sQ          = [];
    112110        this.websocket  = null;
    113111        // connect the socket
     
    124122        };
    125123        this.websocket.onmessage = function (e) {
    126124                // push arraybuffer values onto the end
    127                 var u8 = new Uint8Array(e.data);
    128                 for (var i = 0; i < u8.length; i++) {
    129                         me.rQ.push(u8[i]);
    130                 }
    131                 // wait for 8 bytes
    132                 //if (me.rQ.length >= 8) {
    133                         //me._process();
    134                 //}
     125                me.rQ.push(new Uint8Array(e.data));
    135126        };
    136127        this.start_processing();
    137128}
     
    152143        var me = this;
    153144        if(this.rQ_interval_id === null){
    154145                this.rQ_interval_id = setInterval(function(){
    155                         if (me.rQ.length >= 8) {
     146                        if (me.rQ.length > 0) {
    156147                                me.process_receive_queue();
    157148                        }
    158149                }, this.process_interval);
     
    185176        this.mQ_interval_id = null;
    186177}
    187178
     179
    188180XpraProtocol.prototype.process_receive_queue = function() {
    189         // peek at first 8 bytes of buffer
    190         var buf = this._buffer_peek(8);
     181        var i = 0, j = 0;
     182        if (this.header.length<8 && this.rQ.length>0) {
     183                //add from receive queue data to header until we get the 8 bytes we need:
     184                while (this.header.length<8 && this.rQ.length>0) {
     185                        var slice = this.rQ[0];
     186                        var needed = 8-this.header.length;
     187                        var n = Math.min(needed, slice.length);
     188                        //console.log("header size", this.header.length, ", adding", n, "bytes from", slice.length);
     189                        //copy at most n characters:
     190                        for (i = 0; i < n; i++) {
     191                                this.header.push(slice[i]);
     192                        }
     193                        if (slice.length>needed) {
     194                                //replace the slice with what is left over:
     195                                this.rQ[0] = slice.subarray(n);
     196                        }
     197                        else {
     198                                //this slice has been fully consumed already:
     199                                this.rQ.shift();
     200                        }
     201                }
    191202
    192         if (buf[0] !== ord("P")) {
    193                 msg = "invalid packet header format: " + buf[0];
    194                 if (buf.length>1) {
    195                         msg += ": ";
    196                         for (c in buf) {
    197                                 msg += String.fromCharCode(c);
     203                //verify the header format:
     204                if (this.header[0] !== ord("P")) {
     205                        msg = "invalid packet header format: " + this.header[0];
     206                        if (this.header.length>1) {
     207                                msg += ": ";
     208                                for (c in this.header) {
     209                                        msg += String.fromCharCode(c);
     210                                }
    198211                        }
     212                        throw msg;
    199213                }
    200                 throw msg;
    201214        }
    202215
    203         var proto_flags = buf[1];
     216        if (this.header.length<8) {
     217                //we need more data to continue
     218                return;
     219        }
     220
     221        var proto_flags = this.header[1];
    204222        var proto_crypto = proto_flags & 0x2;
    205 
    206223        if (proto_flags!=0) {
    207224                // check for crypto protocol flag
    208225                if (!(proto_crypto)) {
     
    210227                }
    211228        }
    212229
    213         var level = buf[2];
    214         var index = buf[3];
     230        var level = this.header[2];
     231        if (level & 0x20) {
     232                throw "lzo compression is not supported";
     233        }
     234        var index = this.header[3];
     235        if (index>=20) {
     236                throw "invalid packet index: "+index;
     237        }
    215238        var packet_size = 0;
    216         for (var i=0; i<4; i++) {
    217                 //debug("size header["+i+"]="+buf[4+i]);
     239        for (i=0; i<4; i++) {
    218240                packet_size = packet_size*0x100;
    219                 packet_size += buf[4+i];
     241                packet_size += this.header[4+i];
    220242        }
     243
    221244        // work out padding if necessary
    222245        var padding = 0
    223246        if (proto_crypto) {
     
    224247                padding = (this.cipher_in_block_size - packet_size % this.cipher_in_block_size);
    225248                packet_size += padding;
    226249        }
    227         //debug("packet_size="+packet_size+", level="+level+", index="+index);
    228250
    229         // wait for packet to be complete
    230         // the header is still on the buffer so wait for packetsize+headersize bytes!
    231         if (this.rQ.length < packet_size+8) {
    232                 // we already shifted the header off the buffer?
    233                 //debug("packet is not complete yet");
     251        // verify that we have enough data for the full payload:
     252        var rsize = 0;
     253        for (i=0,j=this.rQ.length;i<j;++i) {
     254                rsize += this.rQ[i].length;
     255        }
     256        if (rsize<packet_size) {
    234257                return;
    235258        }
    236259
    237         // packet is complete but header is still on buffer
    238         this._buffer_shift(8);
    239         //debug("got a full packet, shifting off "+packet_size);
    240         var packet_data = this._buffer_shift(packet_size);
     260        // done parsing the header, the next packet will need a new one:
     261        this.header = []
    241262
     263        var packet_data = null;
     264        if (this.rQ[0].length==packet_size) {
     265                //exact match: the payload is in a buffer already:
     266                packet_data = this.rQ.shift();
     267        }
     268        else {
     269                //aggregate all the buffers into "packet_data" until we get exactly "packet_size" bytes:
     270                packet_data = new Uint8Array(packet_size);
     271                rsize = 0;
     272                while (rsize < packet_size) {
     273                        var slice = this.rQ[0];
     274                        var needed = packet_size - rsize;
     275                        //console.log("slice:", slice.length, "bytes, needed", needed);
     276                        if (slice.length>needed) {
     277                                //add part of this slice:
     278                                packet_data.set(slice.subarray(0, needed), rsize);
     279                                rsize += needed;
     280                                this.rQ[0] = slice.subarray(needed);
     281                        }
     282                        else {
     283                                //add this slice in full:
     284                                packet_data.set(slice, rsize);
     285                                rsize += slice.length;
     286                                this.rQ.shift();
     287                        }
     288                }
     289        }
     290
    242291        // decrypt if needed
    243292        if (proto_crypto) {
    244293                this.cipher_in.update(forge.util.createBuffer(uintToString(packet_data)));
    245294                var decrypted = this.cipher_in.output.getBytes();
    246295                packet_data = [];
    247                 for (var i=0; i<decrypted.length; i++)
     296                for (i=0; i<decrypted.length; i++)
    248297                        packet_data.push(decrypted[i].charCodeAt(0));
    249298                packet_data = packet_data.slice(0, -1 * padding);
    250299        }
     
    255304                        // lz4
    256305                        // python-lz4 inserts the length of the uncompressed data as an int
    257306                        // at the start of the stream
    258                         var d = packet_data.splice(0, 4);
    259                         // will always be little endian
     307                        var d = packet_data.subarray(0, 4);
     308                        // output buffer length is stored as little endian
    260309                        var length = d[0] | (d[1] << 8) | (d[2] << 16) | (d[3] << 24);
    261310                        // decode the LZ4 block
    262                         var inflated = new Buffer(length);
    263                         var uncompressedSize = LZ4.decodeBlock(packet_data, inflated);
    264                         if(!proto_crypto)
    265                                 inflated = inflated.slice(0, uncompressedSize);
    266                 } else if (level & 0x20) {
    267                         // lzo
     311                        console.log("lz4 decompress packet size", packet_size, ", lz4 length=", length);
     312                        var inflated = new Uint8Array(length);
     313                        var uncompressedSize = LZ4.decodeBlock(packet_data, inflated, 4);
     314                        // if lz4 errors out at the end of the buffer, ignore it:
     315                        if (uncompressedSize<=0 && packet_size+uncompressedSize!=0) {
     316                                console.error("failed to decompress lz4 data, error code:", uncompressedSize);
     317                                return;
     318                        }
    268319                } else {
    269320                        // zlib
    270321                        var inflated = new Zlib.Inflate(packet_data).decompress();
     
    287338                        }
    288339                        this.raw_packets = {}
    289340                        // pass to our packet handler
    290 
    291341                        if((packet[0] === 'draw') && (packet[6] !== 'scroll')){
    292342                                var img_data = packet[7];
    293343                                if (typeof img_data === 'string') {
    294344                                        var uint = new Uint8Array(img_data.length);
    295                                         for(var i=0,j=img_data.length;i<j;++i) {
     345                                        for(i=0,j=img_data.length;i<j;++i) {
    296346                                                uint[i] = img_data.charCodeAt(i);
    297347                                        }
    298348                                        packet[7] = uint;
    299349                                }
    300                                 else {
    301                                         packet[7] = new Uint8Array(packet[7]);
    302                                 }
    303350                        }
    304                         if(this.is_worker){
     351                        if (this.is_worker){
    305352                                this.mQ[this.mQ.length] = packet;
    306353                        } else {
    307354                                this.packet_handler(packet, this.packet_ctx);
     
    314361        }
    315362
    316363        // see if buffer still has unread packets
    317         if (this.rQ.length >= 8) {
     364        if (this.rQ.length > 0) {
    318365                this.process_receive_queue();
    319366        }
    320367}
     
    405452        this.cipher_out.start({iv: caps['cipher.iv']});
    406453}
    407454
    408 XpraProtocol.prototype._buffer_peek = function(bytes) {
    409         return this.rQ.slice(0, 0+bytes);
    410 }
    411455
    412 XpraProtocol.prototype._buffer_shift = function(bytes) {
    413         return this.rQ.splice(0, 0+bytes);;
    414 }
    415 
    416 
    417456/*
    418457If we are in a web worker, set up an instance of the protocol
    419458*/