在前两讲《初探.Net Remoting服务端 Loading Remtoing配置内容的过程 》《初探.Net Remoting客户端 Loading Remtoing配置内容的过程 》中,我已经分析了Remoting 的Sink机制,接下来,就提供一个具体的范例:CompressionSink(原始SourceCode源于Advanced .Net Remoting 1StED)。 CompressionSink通过在客户端和服务端各自插入一个数据压缩-解压缩的Sink。目的是希望减少大数据量传递对网络带宽的占用,提高传输效率。下载SourceCode ,BTW,这个压缩Sink相对比较稳定,大家可以在各自的项目中放心使用。:-)
详细设计: 
提供一个Assembly: CompressionSink.dll 
它包括: 
    客户端: 
        CompressionSink.CompressionClientSinkProvider类和CompressionSink.CompressionClientSink类 
    服务端: 
        CompressionSink.CompressionServerSinkProvider类和CompressionSink.CompressionServerSink类 
    压缩类:CompressionHelper 
    压缩内核:NZipLib库。 
客户端的配置文件 : 
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<system.runtime.remoting>
<application>
<channels>
<channel ref="http">
<clientProviders>
<formatter ref="soap" />
<provider type="CompressionSink.CompressionClientSinkProvider, CompressionSink" />
</clientProviders>
</channel>
</channels>
<client>
<wellknown type="Service.SomeSAO, Service" url="http://localhost:5555/SomeSAO.soap" />
</client>
</application>
</system.runtime.remoting>
</configuration>
服务端的配置文件 :
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<system.runtime.remoting>
<application>
<channels>
<channel ref="http" port="5555">
<serverProviders>
<provider type="CompressionSink.CompressionServerSinkProvider, CompressionSink" />
<formatter ref="soap"/>
</serverProviders>
</channel>
</channels>
<service>
<wellknown mode="Singleton" type="Service.SomeSAO, Service" objectUri="SomeSAO.soap" />
</service>
</application>
</system.runtime.remoting>
</configuration>
 public classCompressionClientSinkProvider: IClientChannelSinkProvider
public classCompressionClientSinkProvider: IClientChannelSinkProvider 
 
     {
{  private IClientChannelSinkProvider _nextProvider;
        private IClientChannelSinkProvider _nextProvider; 
 public CompressionClientSinkProvider(IDictionary properties, ICollection providerData)
        public CompressionClientSinkProvider(IDictionary properties, ICollection providerData)  
 
         {
{  // not yet needed
            // not yet needed  }
        } 
 public IClientChannelSinkProvider Next
        public IClientChannelSinkProvider Next 
 
         {
{ 
 get
            get  {
{  return _nextProvider;
                return _nextProvider;  }
            } 
 set
            set  {
{  _nextProvider = value;
                _nextProvider = value;  }
            }  }
        } 
 public IClientChannelSink CreateSink(IChannelSender channel, string url, object remoteChannelData)
        public IClientChannelSink CreateSink(IChannelSender channel, string url, object remoteChannelData)  
 
         {
{  // create other sinks in the chain
            // create other sinks in the chain  IClientChannelSink next = _nextProvider.CreateSink(channel,
            IClientChannelSink next = _nextProvider.CreateSink(channel,  url,
                url,  remoteChannelData);
                remoteChannelData);      
      // put our sink on top of the chain and return it
            // put our sink on top of the chain and return it                  return new CompressionClientSink(next);
            return new CompressionClientSink(next);  }
        }  }
    } 
 public classCompressionClientSink: BaseChannelSinkWithProperties,
public classCompressionClientSink: BaseChannelSinkWithProperties,  2
 IClientChannelSink
                                        IClientChannelSink 3

 
     {
{ 4
 private IClientChannelSink _nextSink;
        private IClientChannelSink _nextSink; 5

6
 public CompressionClientSink(IClientChannelSink next)
        public CompressionClientSink(IClientChannelSink next)  7

 
         {
{ 8
 _nextSink = next;
            _nextSink = next; 9
 }
        } 10

11
 public IClientChannelSink NextChannelSink
        public IClientChannelSink NextChannelSink  12

 
         {
{ 13

 get
            get  {
{ 14
 return _nextSink;
                return _nextSink; 15
 }
            } 16
 }
        } 17

18

19
 public void AsyncProcessRequest(IClientChannelSinkStack sinkStack,
        public void AsyncProcessRequest(IClientChannelSinkStack sinkStack,  20
 IMessage msg,
                                        IMessage msg,  21
 ITransportHeaders headers,
                                        ITransportHeaders headers,  22
 Stream stream)
                                        Stream stream)  23

 
         {
{ 24

25

26
 // generate a compressed stream using NZipLib
            // generate a compressed stream using NZipLib 27
 stream = CompressionHelper.getCompressedStreamCopy(stream);
            stream = CompressionHelper.getCompressedStreamCopy(stream); 28

29
 // push onto stack and forward the request
            // push onto stack and forward the request 30
 sinkStack.Push(this,null);
            sinkStack.Push(this,null); 31
 _nextSink.AsyncProcessRequest(sinkStack,msg,headers,stream);
            _nextSink.AsyncProcessRequest(sinkStack,msg,headers,stream); 32
 }
        } 33

34

35
 public void AsyncProcessResponse(IClientResponseChannelSinkStack sinkStack,
        public void AsyncProcessResponse(IClientResponseChannelSinkStack sinkStack,  36
 object state,
                                            object state,  37
 ITransportHeaders headers,
                                            ITransportHeaders headers,  38
 Stream stream)
                                            Stream stream)  39

 
         {
{ 40

41
 // deflate the response
            // deflate the response 42
 stream =
            stream =  43
 CompressionHelper.getUncompressedStreamCopy(stream);
                CompressionHelper.getUncompressedStreamCopy(stream); 44

45
 // forward the request
            // forward the request 46
 sinkStack.AsyncProcessResponse(headers,stream);
            sinkStack.AsyncProcessResponse(headers,stream); 47
 }
        } 48

49

50
 public Stream GetRequestStream(IMessage msg,
        public Stream GetRequestStream(IMessage msg,  51
 ITransportHeaders headers)
                                       ITransportHeaders headers)  52

 
         {
{ 53
 return _nextSink.GetRequestStream(msg, headers);
            return _nextSink.GetRequestStream(msg, headers); 54
 }
        } 55

56

57
 public void ProcessMessage(IMessage msg,
        public void ProcessMessage(IMessage msg,  58
 ITransportHeaders requestHeaders,
                                   ITransportHeaders requestHeaders,  59
 Stream requestStream,
                                   Stream requestStream,  60
 out ITransportHeaders responseHeaders,
                                   out ITransportHeaders responseHeaders,  61
 out Stream responseStream)
                                   out Stream responseStream)  62

 
         {
{ 63
 // generate a compressed stream using NZipLib
            // generate a compressed stream using NZipLib 64
 
 65
 Stream localrequestStream  =
            Stream localrequestStream  =  66
 CompressionHelper.getCompressedStreamCopy(requestStream);
                CompressionHelper.getCompressedStreamCopy(requestStream); 67

68
 Stream localresponseStream;
            Stream localresponseStream; 69
 // forward the call to the next sink
            // forward the call to the next sink 70
 _nextSink.ProcessMessage(msg,
            _nextSink.ProcessMessage(msg, 71
 requestHeaders,
                                     requestHeaders, 72
 localrequestStream,
                                     localrequestStream,  73
 out responseHeaders,
                                     out responseHeaders,  74
 out localresponseStream);
                                     out localresponseStream); 75

76
 // deflate the response
            // deflate the response 77
 responseStream =
            responseStream =  78
 CompressionHelper.getUncompressedStreamCopy(localresponseStream);
                CompressionHelper.getUncompressedStreamCopy(localresponseStream); 79

80
 }
        } 81
 }
    } 
 public classCompressionServerSinkProvider: IServerChannelSinkProvider
public classCompressionServerSinkProvider: IServerChannelSinkProvider 2

 
     {
{ 3
 private IServerChannelSinkProvider _nextProvider;
        private IServerChannelSinkProvider _nextProvider; 4

5
 public CompressionServerSinkProvider(IDictionary properties, ICollection providerData)
        public CompressionServerSinkProvider(IDictionary properties, ICollection providerData)  6

 
         {
{ 7
 // not yet needed
            // not yet needed 8
 }
        } 9

10
 public IServerChannelSinkProvider Next
        public IServerChannelSinkProvider Next 11

 
         {
{ 12

 get
            get  {
{ 13
 return _nextProvider;
                return _nextProvider;  14
 }
            } 15

 set
            set  {
{ 16
 _nextProvider = value;
                _nextProvider = value; 17
 }
            } 18
 }
        } 19

20
 public IServerChannelSink CreateSink(IChannelReceiver channel)
        public IServerChannelSink CreateSink(IChannelReceiver channel) 21

 
         {
{ 22
 // create other sinks in the chain
            // create other sinks in the chain 23
 IServerChannelSink next = _nextProvider.CreateSink(channel);
            IServerChannelSink next = _nextProvider.CreateSink(channel);                 24
 
     25
 // put our sink on top of the chain and return it
            // put our sink on top of the chain and return it                 26
 return new CompressionServerSink(next);
            return new CompressionServerSink(next); 27
 }
        } 28

29
 public void GetChannelData(IChannelDataStore channelData)
        public void GetChannelData(IChannelDataStore channelData) 30

 
         {
{ 31
 // not yet needed
            // not yet needed 32
 }
        } 33

34
 }
    } 
 usingSystem;
usingSystem;  usingSystem.Runtime.Remoting.Channels;
usingSystem.Runtime.Remoting.Channels;  usingSystem.Runtime.Remoting.Messaging;
usingSystem.Runtime.Remoting.Messaging;  usingSystem.IO;
usingSystem.IO; 
 namespaceCompressionSink
namespaceCompressionSink 

 {
{ 
 public class CompressionServerSink: BaseChannelSinkWithProperties,
    public class CompressionServerSink: BaseChannelSinkWithProperties,  IServerChannelSink
        IServerChannelSink 
 
     {
{ 
 private IServerChannelSink _nextSink;
        private IServerChannelSink _nextSink; 
 public CompressionServerSink(IServerChannelSink next)
        public CompressionServerSink(IServerChannelSink next)  
 
         {
{  _nextSink = next;
            _nextSink = next;  }
        } 
 public IServerChannelSink NextChannelSink
        public IServerChannelSink NextChannelSink  
 
         {
{  get
            get  
 
             {
{  return _nextSink;
                return _nextSink;  }
            }  }
        } 
 public void AsyncProcessResponse(IServerResponseChannelSinkStack sinkStack,
        public void AsyncProcessResponse(IServerResponseChannelSinkStack sinkStack,   object state,
            object state,   IMessage msg,
            IMessage msg,   ITransportHeaders headers,
            ITransportHeaders headers,   Stream stream)
            Stream stream)  
 
         {
{  // compressing the response
            // compressing the response  stream=CompressionHelper.getCompressedStreamCopy(stream);
            stream=CompressionHelper.getCompressedStreamCopy(stream); 
 // forwarding to the stack for further processing
            // forwarding to the stack for further processing  sinkStack.AsyncProcessResponse(msg,headers,stream);
            sinkStack.AsyncProcessResponse(msg,headers,stream);  }
        } 
 public Stream GetResponseStream(IServerResponseChannelSinkStack sinkStack,
        public Stream GetResponseStream(IServerResponseChannelSinkStack sinkStack,   object state,
            object state,   IMessage msg,
            IMessage msg,   ITransportHeaders headers)
            ITransportHeaders headers) 
 
         {
{  return null;
            return null;  }
        } 
 public ServerProcessing ProcessMessage(IServerChannelSinkStack sinkStack,
        public ServerProcessing ProcessMessage(IServerChannelSinkStack sinkStack,   IMessage requestMsg,
            IMessage requestMsg,   ITransportHeaders requestHeaders,
            ITransportHeaders requestHeaders,  Stream requestStream,
            Stream requestStream,   out IMessage responseMsg,
            out IMessage responseMsg,   out ITransportHeaders responseHeaders,
            out ITransportHeaders responseHeaders,   out Stream responseStream)
            out Stream responseStream)  
 
         {
{  // uncompressing the request
            // uncompressing the request  Stream  localrequestStream =
            Stream  localrequestStream =   CompressionHelper.getUncompressedStreamCopy(requestStream);
                CompressionHelper.getUncompressedStreamCopy(requestStream); 
 // pushing onto stack and forwarding the call
            // pushing onto stack and forwarding the call  sinkStack.Push(this,null);
            sinkStack.Push(this,null); 
 Stream localresponseStream;
            Stream localresponseStream;  ServerProcessing srvProc = _nextSink.ProcessMessage(sinkStack,
            ServerProcessing srvProc = _nextSink.ProcessMessage(sinkStack,  requestMsg,
                requestMsg,  requestHeaders,
                requestHeaders,  localrequestStream,
                localrequestStream,  out responseMsg,
                out responseMsg,  out responseHeaders,
                out responseHeaders,  out localresponseStream);
                out localresponseStream); 
 // compressing the response
            // compressing the response  responseStream=
            responseStream=  CompressionHelper.getCompressedStreamCopy(localresponseStream);
                CompressionHelper.getCompressedStreamCopy(localresponseStream); 
 // returning status information
            // returning status information  return srvProc;
            return srvProc;  }
        }  }
    }  }
} 
 
 public classCompressionHelper
public classCompressionHelper  2

 
     {
{ 3

4

 /**//// <summary>
        /**//// <summary> 5
 /// refactor  by zendy
        /// refactor  by zendy 6
 /// </summary>
        /// </summary> 7
 /// <param name="inStream"></param>
        /// <param name="inStream"></param> 8
 /// <returns></returns>
        /// <returns></returns> 9
 public static Stream getCompressedStreamCopy(Stream inStream)
        public static Stream getCompressedStreamCopy(Stream inStream)  10

 
         {
{ 11
 MemoryStream outStream = new MemoryStream();
            MemoryStream outStream = new MemoryStream(); 12
 Deflater mDeflater = new Deflater(Deflater.BEST_COMPRESSION);
            Deflater mDeflater = new Deflater(Deflater.BEST_COMPRESSION); 13
 DeflaterOutputStream compressStream = new DeflaterOutputStream(outStream,mDeflater);
            DeflaterOutputStream compressStream = new DeflaterOutputStream(outStream,mDeflater); 14

15
 byte[] buf = new Byte[4096];
            byte[] buf = new Byte[4096]; 16
 int cnt = inStream.Read(buf,0,4096);
            int cnt = inStream.Read(buf,0,4096); 17

 while (cnt>0)
            while (cnt>0)  {
{ 18
 compressStream.Write(buf,0,cnt);
                compressStream.Write(buf,0,cnt); 19
 cnt = inStream.Read(buf,0,4096);
                cnt = inStream.Read(buf,0,4096); 20
 }
            } 21
 compressStream.Finish();
            compressStream.Finish(); 22
 //modify by zendy //这个设置非常重要,否则会导致后续Sink在处理该stream时失败,在原来的源码中就是因为没有这个处理导致程序运行失败
            //modify by zendy //这个设置非常重要,否则会导致后续Sink在处理该stream时失败,在原来的源码中就是因为没有这个处理导致程序运行失败 23
 outStream.Seek(0,SeekOrigin.Begin);
            outStream.Seek(0,SeekOrigin.Begin); 24
 return outStream;
            return outStream; 25
 }
        } 26

27

 /**//// <summary>
        /**//// <summary> 28
 /// refactor  by zendy
        /// refactor  by zendy 29
 /// </summary>
        /// </summary> 30
 /// <param name="inStream"></param>
        /// <param name="inStream"></param> 31
 /// <returns></returns>
        /// <returns></returns> 32
 public static Stream getUncompressedStreamCopy(Stream inStream)
        public static Stream getUncompressedStreamCopy(Stream inStream)  33

 
         {
{ 34
 InflaterInputStream unCompressStream = new InflaterInputStream(inStream);
            InflaterInputStream unCompressStream = new InflaterInputStream(inStream);  35
 MemoryStream outStream = new MemoryStream();
            MemoryStream outStream = new MemoryStream(); 36
 int mSize;
            int mSize; 37
 Byte[] mWriteData = new Byte[4096];
            Byte[] mWriteData = new Byte[4096]; 38
 while(true)
            while(true) 39

 
             {
{ 40
 mSize = unCompressStream.Read(mWriteData, 0, mWriteData.Length);
                mSize = unCompressStream.Read(mWriteData, 0, mWriteData.Length); 41
 if (mSize > 0)
                if (mSize > 0) 42

 
                 {
{ 43
 outStream.Write(mWriteData, 0, mSize);
                    outStream.Write(mWriteData, 0, mSize); 44
 }
                } 45
 else
                else 46

 
                 {
{ 47
 break;
                    break; 48
 }
                } 49
 }
            } 50
 unCompressStream.Close();
            unCompressStream.Close(); 51
 //modify by zendy//这个设置非常重要,否则会导致后续Sink在处理该stream时失败,,在原来的源码中就是因为没有这个处理导致程序运行失败
            //modify by zendy//这个设置非常重要,否则会导致后续Sink在处理该stream时失败,,在原来的源码中就是因为没有这个处理导致程序运行失败 52
 outStream.Seek(0,SeekOrigin.Begin);
            outStream.Seek(0,SeekOrigin.Begin); 53
 return outStream;
            return outStream; 54
 }
        } 55
 }
    } 
BTW,这个Sink还可以扩展,比如判断需要压缩Stream的大小,如果很大,就压缩,否则不压缩(可以在responseHeaders和requestHeaders添加是否 已经压缩的标记)