简介:

 SocketAsyncEventArgs是一个套接字操作得类,主要作用是实现socket消息的异步接收和发送,跟Socket的BeginSend和BeginReceive方法异步处理没有多大区别,它的优势在于完成端口的实现来处理大数据的并发情况。

BufferManager类, 管理传输流的大小SocketEventPool类: 管理SocketAsyncEventArgs的一个应用池. 有效地重复使用. AsyncUserToken类: 这个可以根据自己的实际情况来定义.主要作用就是存储客户端的信息.SocketManager类: 核心,实现Socket监听,收发信息等操作.额外功能   1.自动检测无效连接并断开    2.自动释放资源

BufferManager类

using System;

using System.Collections.Generic;

using System.Linq;

using System.Net.Sockets;

using System.Text;

namespace Plates.Service

{

class BufferManager

{

int m_numBytes; // the total number of bytes controlled by the buffer pool

byte[] m_buffer; // the underlying byte array maintained by the Buffer Manager

Stack m_freeIndexPool; //

int m_currentIndex;

int m_bufferSize;

public BufferManager(int totalBytes, int bufferSize)

{

m_numBytes = totalBytes;

m_currentIndex = 0;

m_bufferSize = bufferSize;

m_freeIndexPool = new Stack();

}

// Allocates buffer space used by the buffer pool

public void InitBuffer()

{

// create one big large buffer and divide that

// out to each SocketAsyncEventArg object

m_buffer = new byte[m_numBytes];

}

// Assigns a buffer from the buffer pool to the

// specified SocketAsyncEventArgs object

//

// true if the buffer was successfully set, else false

public bool SetBuffer(SocketAsyncEventArgs args)

{

if (m_freeIndexPool.Count > 0)

{

args.SetBuffer(m_buffer, m_freeIndexPool.Pop(), m_bufferSize);

}

else

{

if ((m_numBytes - m_bufferSize) < m_currentIndex)

{

return false;

}

args.SetBuffer(m_buffer, m_currentIndex, m_bufferSize);

m_currentIndex += m_bufferSize;

}

return true;

}

// Removes the buffer from a SocketAsyncEventArg object.

// This frees the buffer back to the buffer pool

public void FreeBuffer(SocketAsyncEventArgs args)

{

m_freeIndexPool.Push(args.Offset);

args.SetBuffer(null, 0, 0);

}

}

}

    

SocketEventPool类:

using System;

using System.Collections.Generic;

using System.Linq;

using System.Net.Sockets;

using System.Text;

namespace Plates.Service

{

class SocketEventPool

{

Stack m_pool;

public SocketEventPool(int capacity)

{

m_pool = new Stack(capacity);

}

public void Push(SocketAsyncEventArgs item)

{

if (item == null) { throw new ArgumentNullException("Items added to a SocketAsyncEventArgsPool cannot be null"); }

lock (m_pool)

{

m_pool.Push(item);

}

}

// Removes a SocketAsyncEventArgs instance from the pool

// and returns the object removed from the pool

public SocketAsyncEventArgs Pop()

{

lock (m_pool)

{

return m_pool.Pop();

}

}

// The number of SocketAsyncEventArgs instances in the pool

public int Count

{

get { return m_pool.Count; }

}

public void Clear()

{

m_pool.Clear();

}

}

}

 AsyncUserToken类

using System;

using System.Collections;

using System.Collections.Generic;

using System.Linq;

using System.Net;

using System.Net.Sockets;

using System.Text;

namespace Plates.Service

{

class AsyncUserToken

{

///

/// 客户端IP地址

///

public IPAddress IPAddress { get; set; }

///

/// 远程地址

///

public EndPoint Remote { get; set; }

///

/// 通信SOKET

///

public Socket Socket { get; set; }

///

/// 连接时间

///

public DateTime ConnectTime { get; set; }

///

/// 所属用户信息

///

public UserInfoModel UserInfo { get; set; }

///

/// 数据缓存区

///

public List Buffer { get; set; }

public AsyncUserToken()

{

this.Buffer = new List();

}

}

}

  SocketManager类

using Plates.Common;

using System;

using System.Collections;

using System.Collections.Generic;

using System.Linq;

using System.Net;

using System.Net.Sockets;

using System.Text;

using System.Threading;

namespace Plates.Service

{

class SocketManager

{

private int m_maxConnectNum; //最大连接数

private int m_revBufferSize; //最大接收字节数

BufferManager m_bufferManager;

const int opsToAlloc = 2;

Socket listenSocket; //监听Socket

SocketEventPool m_pool;

int m_clientCount; //连接的客户端数量

Semaphore m_maxNumberAcceptedClients;

List m_clients; //客户端列表

#region 定义委托

///

/// 客户端连接数量变化时触发

///

/// 当前增加客户的个数(用户退出时为负数,增加时为正数,一般为1)

/// 增加用户的信息

public delegate void OnClientNumberChange(int num, AsyncUserToken token);

///

/// 接收到客户端的数据

///

/// 客户端

/// 客户端数据

public delegate void OnReceiveData(AsyncUserToken token, byte[] buff);

#endregion

#region 定义事件

///

/// 客户端连接数量变化事件

///

public event OnClientNumberChange ClientNumberChange;

///

/// 接收到客户端的数据事件

///

public event OnReceiveData ReceiveClientData;

#endregion

#region 定义属性

///

/// 获取客户端列表

///

public List ClientList { get { return m_clients; } }

#endregion

///

/// 构造函数

///

/// 最大连接数

/// 缓存区大小

public SocketManager(int numConnections, int receiveBufferSize)

{

m_clientCount = 0;

m_maxConnectNum = numConnections;

m_revBufferSize = receiveBufferSize;

// allocate buffers such that the maximum number of sockets can have one outstanding read and

//write posted to the socket simultaneously

m_bufferManager = new BufferManager(receiveBufferSize * numConnections * opsToAlloc, receiveBufferSize);

m_pool = new SocketEventPool(numConnections);

m_maxNumberAcceptedClients = new Semaphore(numConnections, numConnections);

}

///

/// 初始化

///

public void Init()

{

// Allocates one large byte buffer which all I/O operations use a piece of. This gaurds

// against memory fragmentation

m_bufferManager.InitBuffer();

m_clients = new List();

// preallocate pool of SocketAsyncEventArgs objects

SocketAsyncEventArgs readWriteEventArg;

for (int i = 0; i < m_maxConnectNum; i++)

{

readWriteEventArg = new SocketAsyncEventArgs();

readWriteEventArg.Completed += new EventHandler(IO_Completed);

readWriteEventArg.UserToken = new AsyncUserToken();

// assign a byte buffer from the buffer pool to the SocketAsyncEventArg object

m_bufferManager.SetBuffer(readWriteEventArg);

// add SocketAsyncEventArg to the pool

m_pool.Push(readWriteEventArg);

}

}

///

/// 启动服务

///

///

public bool Start(IPEndPoint localEndPoint)

{

try

{

m_clients.Clear();

listenSocket = new Socket(localEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);

listenSocket.Bind(localEndPoint);

// start the server with a listen backlog of 100 connections

listenSocket.Listen(m_maxConnectNum);

// post accepts on the listening socket

StartAccept(null);

return true;

}

catch (Exception)

{

return false;

}

}

///

/// 停止服务

///

public void Stop()

{

foreach (AsyncUserToken token in m_clients)

{

try

{

token.Socket.Shutdown(SocketShutdown.Both);

}

catch (Exception) { }

}

try

{

listenSocket.Shutdown(SocketShutdown.Both);

}

catch (Exception) { }

listenSocket.Close();

int c_count = m_clients.Count;

lock (m_clients) { m_clients.Clear(); }

if (ClientNumberChange != null)

ClientNumberChange(-c_count, null);

}

public void CloseClient(AsyncUserToken token)

{

try

{

token.Socket.Shutdown(SocketShutdown.Both);

}

catch (Exception) { }

}

// Begins an operation to accept a connection request from the client

//

// The context object to use when issuing

// the accept operation on the server's listening socket

public void StartAccept(SocketAsyncEventArgs acceptEventArg)

{

if (acceptEventArg == null)

{

acceptEventArg = new SocketAsyncEventArgs();

acceptEventArg.Completed += new EventHandler(AcceptEventArg_Completed);

}

else

{

// socket must be cleared since the context object is being reused

acceptEventArg.AcceptSocket = null;

}

m_maxNumberAcceptedClients.WaitOne();

if (!listenSocket.AcceptAsync(acceptEventArg))

{

ProcessAccept(acceptEventArg);

}

}

// This method is the callback method associated with Socket.AcceptAsync

// operations and is invoked when an accept operation is complete

//

void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs e)

{

ProcessAccept(e);

}

private void ProcessAccept(SocketAsyncEventArgs e)

{

try

{

Interlocked.Increment(ref m_clientCount);

// Get the socket for the accepted client connection and put it into the

//ReadEventArg object user token

SocketAsyncEventArgs readEventArgs = m_pool.Pop();

AsyncUserToken userToken = (AsyncUserToken)readEventArgs.UserToken;

userToken.Socket = e.AcceptSocket;

userToken.ConnectTime = DateTime.Now;

userToken.Remote = e.AcceptSocket.RemoteEndPoint;

userToken.IPAddress = ((IPEndPoint)(e.AcceptSocket.RemoteEndPoint)).Address;

lock (m_clients) { m_clients.Add(userToken); }

if (ClientNumberChange != null)

ClientNumberChange(1, userToken);

if (!e.AcceptSocket.ReceiveAsync(readEventArgs))

{

ProcessReceive(readEventArgs);

}

}

catch (Exception me)

{

RuncomLib.Log.LogUtils.Info(me.Message + "\r\n" + me.StackTrace);

}

// Accept the next connection request

if (e.SocketError == SocketError.OperationAborted) return;

StartAccept(e);

}

void IO_Completed(object sender, SocketAsyncEventArgs e)

{

// determine which type of operation just completed and call the associated handler

switch (e.LastOperation)

{

case SocketAsyncOperation.Receive:

ProcessReceive(e);

break;

case SocketAsyncOperation.Send:

ProcessSend(e);

break;

default:

throw new ArgumentException("The last operation completed on the socket was not a receive or send");

}

}

// This method is invoked when an asynchronous receive operation completes.

// If the remote host closed the connection, then the socket is closed.

// If data was received then the data is echoed back to the client.

//

private void ProcessReceive(SocketAsyncEventArgs e)

{

try

{

// check if the remote host closed the connection

AsyncUserToken token = (AsyncUserToken)e.UserToken;

if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)

{

//读取数据

byte[] data = new byte[e.BytesTransferred];

Array.Copy(e.Buffer, e.Offset, data, 0, e.BytesTransferred);

lock (token.Buffer)

{

token.Buffer.AddRange(data);

}

//注意:你一定会问,这里为什么要用do-while循环?

//如果当客户发送大数据流的时候,e.BytesTransferred的大小就会比客户端发送过来的要小,

//需要分多次接收.所以收到包的时候,先判断包头的大小.够一个完整的包再处理.

//如果客户短时间内发送多个小数据包时, 服务器可能会一次性把他们全收了.

//这样如果没有一个循环来控制,那么只会处理第一个包,

//剩下的包全部留在token.Buffer中了,只有等下一个数据包过来后,才会放出一个来.

do

{

//判断包的长度

byte[] lenBytes = token.Buffer.GetRange(0, 4).ToArray();

int packageLen = BitConverter.ToInt32(lenBytes, 0);

if (packageLen > token.Buffer.Count - 4)

{ //长度不够时,退出循环,让程序继续接收

break;

}

//包够长时,则提取出来,交给后面的程序去处理

byte[] rev = token.Buffer.GetRange(4, packageLen).ToArray();

//从数据池中移除这组数据

lock (token.Buffer)

{

token.Buffer.RemoveRange(0, packageLen + 4);

}

//将数据包交给后台处理,这里你也可以新开个线程来处理.加快速度.

if(ReceiveClientData != null)

ReceiveClientData(token, rev);

//这里API处理完后,并没有返回结果,当然结果是要返回的,却不是在这里, 这里的代码只管接收.

//若要返回结果,可在API处理中调用此类对象的SendMessage方法,统一打包发送.不要被微软的示例给迷惑了.

} while (token.Buffer.Count > 4);

//继续接收. 为什么要这么写,请看Socket.ReceiveAsync方法的说明

if (!token.Socket.ReceiveAsync(e))

this.ProcessReceive(e);

}

else

{

CloseClientSocket(e);

}

}

catch (Exception xe)

{

RuncomLib.Log.LogUtils.Info(xe.Message + "\r\n" + xe.StackTrace);

}

}

// This method is invoked when an asynchronous send operation completes.

// The method issues another receive on the socket to read any additional

// data sent from the client

//

//

private void ProcessSend(SocketAsyncEventArgs e)

{

if (e.SocketError == SocketError.Success)

{

// done echoing data back to the client

AsyncUserToken token = (AsyncUserToken)e.UserToken;

// read the next block of data send from the client

bool willRaiseEvent = token.Socket.ReceiveAsync(e);

if (!willRaiseEvent)

{

ProcessReceive(e);

}

}

else

{

CloseClientSocket(e);

}

}

//关闭客户端

private void CloseClientSocket(SocketAsyncEventArgs e)

{

AsyncUserToken token = e.UserToken as AsyncUserToken;

lock (m_clients) { m_clients.Remove(token); }

//如果有事件,则调用事件,发送客户端数量变化通知

if (ClientNumberChange != null)

ClientNumberChange(-1, token);

// close the socket associated with the client

try

{

token.Socket.Shutdown(SocketShutdown.Send);

}

catch (Exception) { }

token.Socket.Close();

// decrement the counter keeping track of the total number of clients connected to the server

Interlocked.Decrement(ref m_clientCount);

m_maxNumberAcceptedClients.Release();

// Free the SocketAsyncEventArg so they can be reused by another client

e.UserToken = new AsyncUserToken();

m_pool.Push(e);

}

///

/// 对数据进行打包,然后再发送

///

///

///

///

public void SendMessage(AsyncUserToken token, byte[] message)

{

if (token == null || token.Socket == null || !token.Socket.Connected)

return;

try

{

//对要发送的消息,制定简单协议,头4字节指定包的大小,方便客户端接收(协议可以自己定)

byte[] buff = new byte[message.Length + 4];

byte[] len = BitConverter.GetBytes(message.Length);

Array.Copy(len, buff, 4);

Array.Copy(message, 0, buff, 4, message.Length);

//token.Socket.Send(buff); //这句也可以发送, 可根据自己的需要来选择

//新建异步发送对象, 发送消息

SocketAsyncEventArgs sendArg = new SocketAsyncEventArgs();

sendArg.UserToken = token;

sendArg.SetBuffer(buff, 0, buff.Length); //将数据放置进去.

token.Socket.SendAsync(sendArg);

}

catch (Exception e){

RuncomLib.Log.LogUtils.Info("SendMessage - Error:" + e.Message);

}

}

}

}

使用方法:

SocketManager m_socket = new SocketManager(200, 1024);  

m_socket.Init();  

m_socket.Start(new IPEndPoint(IPAddress.Any, 13909));  

//m_socket.Stop();

下载地址:https://download.csdn.net/download/a876106354/88563747

推荐文章

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。