CondStream to es6 class
This commit is contained in:
parent
f3ba7a4a2d
commit
041b3fa508
@ -1,134 +1,140 @@
|
|||||||
|
"use strict";
|
||||||
var Dragonfly = global.Dragonfly;
|
var Dragonfly = global.Dragonfly;
|
||||||
|
|
||||||
var fs = require( "fs" );
|
var fs = require( "fs" );
|
||||||
var path = require( "path" );
|
var path = require( "path" );
|
||||||
var crypto = require( "crypto" );
|
var crypto = require( "crypto" );
|
||||||
var util = require( "util" );
|
|
||||||
var ReadStream = require( "stream" ).Readable;
|
var ReadStream = require( "stream" ).Readable;
|
||||||
|
|
||||||
var ConditionalStream = function( tmpPath, triggerLimit )
|
class ConditionalStream extends String
|
||||||
{
|
{
|
||||||
if( !tmpPath )
|
constructor( tmpPath, triggerLimit )
|
||||||
{
|
{
|
||||||
throw new Error( "Temp path is not defined" );
|
super();
|
||||||
}
|
// XXX: Dirty fix for incompat on node js v5
|
||||||
|
Object.setPrototypeOf( this, new.target.prototype );
|
||||||
|
|
||||||
this.size = 0;
|
if( !tmpPath )
|
||||||
this.limit = triggerLimit * 1024;
|
|
||||||
this.stream = false;
|
|
||||||
this.hexData = "";
|
|
||||||
this.tmpPath = tmpPath;
|
|
||||||
|
|
||||||
this.file = false;
|
|
||||||
|
|
||||||
this.__discard = false;
|
|
||||||
|
|
||||||
this.__ended = false;
|
|
||||||
this.__finished = false;
|
|
||||||
};
|
|
||||||
|
|
||||||
util.inherits( ConditionalStream, String );
|
|
||||||
|
|
||||||
|
|
||||||
ConditionalStream.prototype.write = function( data )
|
|
||||||
{
|
|
||||||
var _self = this;
|
|
||||||
this.size += data.length;
|
|
||||||
|
|
||||||
if( this.stream )
|
|
||||||
{
|
|
||||||
this.hexData = false;
|
|
||||||
this.stream.write( data );
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
this.hexData += data.toString( "hex" );
|
|
||||||
|
|
||||||
// Trigger
|
|
||||||
if( this.limit < this.size )
|
|
||||||
{
|
|
||||||
this.file = path.join( this.tmpPath, "ss_" + crypto.randomBytes( 8 ).toString( "hex" ) );
|
|
||||||
|
|
||||||
this.stream = fs.createWriteStream( this.file, { mode: 0600 } );
|
|
||||||
this.stream.addListener( "finish", this.__end.bind( this ) );
|
|
||||||
this.stream.write( this.hexData, "hex" );
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
ConditionalStream.prototype.end = function( handler )
|
|
||||||
{
|
|
||||||
var _self = this;
|
|
||||||
if( this.stream )
|
|
||||||
{
|
|
||||||
this.stream.addListener( "close", function() {
|
|
||||||
_self.__finished = true;
|
|
||||||
handler( _self );
|
|
||||||
} );
|
|
||||||
this.stream.end();
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
setTimeout( function()
|
|
||||||
{
|
{
|
||||||
_self.__finished = true;
|
throw new Error( "Temp path is not defined" );
|
||||||
handler( _self )
|
}
|
||||||
} , 0 );
|
|
||||||
|
this.size = 0;
|
||||||
|
this.limit = triggerLimit * 1024;
|
||||||
|
this.stream = false;
|
||||||
|
this.hexData = "";
|
||||||
|
this.tmpPath = tmpPath;
|
||||||
|
|
||||||
|
this.file = false;
|
||||||
|
|
||||||
|
this.__discard = false;
|
||||||
|
|
||||||
|
this.__ended = false;
|
||||||
|
this.__finished = false;
|
||||||
}
|
}
|
||||||
};
|
|
||||||
|
|
||||||
ConditionalStream.prototype.discard = function()
|
write( data )
|
||||||
{
|
|
||||||
var _self = this;
|
|
||||||
|
|
||||||
this.__discard = true;
|
|
||||||
if( this.__finished && this.file )
|
|
||||||
{
|
{
|
||||||
fs.unlink( this.file, function( e )
|
var _self = this;
|
||||||
|
this.size += data.length;
|
||||||
|
|
||||||
|
if( this.stream )
|
||||||
{
|
{
|
||||||
Dragonfly.Debug( "Client Data Closed: " + _self.file );
|
this.hexData = false;
|
||||||
if( _self.__error ) throw new Error( _self.__error );
|
this.stream.write( data );
|
||||||
} );
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
this.hexData += data.toString( "hex" );
|
||||||
|
|
||||||
|
// Trigger
|
||||||
|
if( this.limit < this.size )
|
||||||
|
{
|
||||||
|
this.file = path.join( this.tmpPath, "ss_" + crypto.randomBytes( 8 ).toString( "hex" ) );
|
||||||
|
|
||||||
|
this.stream = fs.createWriteStream( this.file, { mode: "0600" } );
|
||||||
|
this.stream.addListener( "finish", this.__end.bind( this ) );
|
||||||
|
this.stream.write( this.hexData, "hex" );
|
||||||
|
}
|
||||||
}
|
}
|
||||||
};
|
|
||||||
|
|
||||||
ConditionalStream.prototype.__end = function()
|
end( handler )
|
||||||
{
|
|
||||||
this.__finished = true;
|
|
||||||
if( this.__discard ) this.discard();
|
|
||||||
};
|
|
||||||
|
|
||||||
ConditionalStream.prototype.toString = function( enc )
|
|
||||||
{
|
|
||||||
if( this.stream )
|
|
||||||
{
|
{
|
||||||
this.discard();
|
var _self = this;
|
||||||
this.__error = "Received data is too large to process";
|
if( this.stream )
|
||||||
|
{
|
||||||
|
this.stream.addListener( "close", function() {
|
||||||
|
_self.__finished = true;
|
||||||
|
handler( _self );
|
||||||
|
} );
|
||||||
|
this.stream.end();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
setTimeout( function()
|
||||||
|
{
|
||||||
|
_self.__finished = true;
|
||||||
|
handler( _self )
|
||||||
|
} , 0 );
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return new Buffer( this.hexData, "hex" ).toString( enc );
|
discard()
|
||||||
};
|
|
||||||
|
|
||||||
ConditionalStream.prototype.resultStream = function()
|
|
||||||
{
|
|
||||||
var _self = this;
|
|
||||||
if( !this.__finished ) throw new Error( "Data is not finished yet" );
|
|
||||||
if( this.__discard ) throw new Error( "Data is discarded" );
|
|
||||||
|
|
||||||
if( this.stream )
|
|
||||||
{
|
{
|
||||||
var rt = fs.createReadStream( this.file );
|
var _self = this;
|
||||||
rt.addListener( "close", () => _self.discard() );
|
|
||||||
return rt;
|
this.__discard = true;
|
||||||
|
if( this.__finished && this.file )
|
||||||
|
{
|
||||||
|
fs.unlink( this.file, function( e )
|
||||||
|
{
|
||||||
|
Dragonfly.Debug( "Client Data Closed: " + _self.file );
|
||||||
|
if( _self.__error ) throw new Error( _self.__error );
|
||||||
|
} );
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var st = new ReadStream();
|
toString( enc )
|
||||||
st._read = function(){};
|
{
|
||||||
|
if( this.stream )
|
||||||
|
{
|
||||||
|
this.discard();
|
||||||
|
this.__error = "Received data is too large to process";
|
||||||
|
}
|
||||||
|
|
||||||
setTimeout( function() {
|
return new Buffer( this.hexData, "hex" ).toString( enc );
|
||||||
st.push( _self.hexData, "hex" );
|
}
|
||||||
st.push( null );
|
|
||||||
}, 0 );
|
|
||||||
|
|
||||||
return st;
|
resultStream()
|
||||||
};
|
{
|
||||||
|
var _self = this;
|
||||||
|
if( !this.__finished ) throw new Error( "Data is not finished yet" );
|
||||||
|
if( this.__discard ) throw new Error( "Data is discarded" );
|
||||||
|
|
||||||
|
if( this.stream )
|
||||||
|
{
|
||||||
|
var rt = fs.createReadStream( this.file );
|
||||||
|
rt.addListener( "close", () => _self.discard() );
|
||||||
|
return rt;
|
||||||
|
}
|
||||||
|
|
||||||
|
var st = new ReadStream();
|
||||||
|
st._read = function(){};
|
||||||
|
|
||||||
|
setTimeout( function() {
|
||||||
|
st.push( _self.hexData, "hex" );
|
||||||
|
st.push( null );
|
||||||
|
}, 0 );
|
||||||
|
|
||||||
|
return st;
|
||||||
|
}
|
||||||
|
|
||||||
|
__end()
|
||||||
|
{
|
||||||
|
this.__finished = true;
|
||||||
|
if( this.__discard ) this.discard();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
module.exports = ConditionalStream;
|
module.exports = ConditionalStream;
|
||||||
|
Loading…
Reference in New Issue
Block a user