之前写过一篇文章写了tcp异步通讯,使用的异步回调AsyncCallback的方法,最近看到服务端处理高并发tcp连接有SocketAsyncEventArgs,于是简单写了个tcp服务端客户端通讯
目录
一、什么是SocketAsyncEventArgs
二、主要属性及方法
三、使用此类执行异步套接字操作的模式步骤
四、工具类
五、实现服务端(源码)
一、什么是SocketAsyncEventArgs
SocketAsyncEventArgs是一个套接字操作的类,主要作用是实现socket消息的异步接收和发送。由专用的高性能套接字应用程序使用的替代异步模式,主要功能是避免在大容量异步套接字 I/O 期间重复分配和同步对象。
二、主要属性及方法
在使用SocketAsyncEventArgs进行TCP或UDP通讯的时候最常用的几个成员分别是:Buffer,BufferList,BytesTransferred,SocketError,UserToken,BytesTransferred属性,SetBuffer方法和Completed事件。
SocketAsyncEventArgs接收和发送数据都需要设置buffer,一般用SetBuffer方法或设置BufferList。通过Completed来查看完成情况,而在Completed通过SocketError和BytesTransferred结合来判断当前操作是否成功能,如在receive状态当BytesTransferred为零的时候,说明对方已经把连接断开了。
由于SocketAsyncEventArgs是异步操作,在很多情况需要的一些其他信息传递到Completed事件中,这个时候可以通过UserToken属性来解决异步信息传递的问题。
使用注意:SocketAsyncEventArgs同一时间只能进行一个操作,通过Completed来确认当前操作是否完成,如果同步完成是不会触该事件需要自己手动调用处理
三、使用此类执行异步套接字操作的模式步骤
分配一个新的 SocketAsyncEventArgs 上下文对象,或从应用程序池中获取一个空闲对象。 将上下文对象的属性设置为即将 (完成回调方法执行的操作、数据缓冲区、缓冲区中的偏移量以及要传输的最大数据量,例如) 。 调用适当的套接字方法 (xxxAsync) 以启动异步操作。 如果异步套接字方法 (xxxAsync) 返回 true,请在回调中查询完成状态的上下文属性。 如果异步套接字方法 (xxxAsync) 返回 false,则操作同步完成。 可查询上下文属性获取操作结果。 重新使用上下文进行另一项操作,将其放回池中,或放弃它。
新的异步套接字操作上下文对象的生存期由应用程序代码和异步 I/O 引用的引用决定。 作为参数提交给异步套接字操作方法之一后,应用程序不必保留对异步套接字操作上下文对象的引用。 完成回调返回之前,应用程序会继续引用它。 但是,应用程序最好保留对上下文的引用,以便将来的异步套接字操作可以重复使用该引用。
四、工具类
AsyncUserToken类
public class AsyncUserToken { ///
///
///
///
///
///
public AsyncUserToken() { this.Buffer = new List
BufferManager类
internal class BufferManager { private int m_numBytes; // 缓冲池控制的字节总数 private byte[] m_buffer; // 由缓冲区管理器维护的底层字节数组 private Stack
public BufferManager(int totalBytes, int bufferSize) { m_numBytes = totalBytes; m_currentIndex = 0; m_bufferSize = bufferSize; m_freeIndexPool = new Stack
// 分配缓冲池使用的缓冲空间 public void InitBuffer() { // 创建一个大缓冲区,并将其分配给每个 SocketAsyncEventArg 对象 m_buffer = new byte[m_numBytes]; }
// 将缓冲池中的缓冲区分配给指定的 SocketAsyncEventArgs 对象 // // 如果成功设置了缓冲区,则为 true,否则为 false public bool SetBuffer(SocketAsyncEventArgs args) { if (m_freeIndexPool.Count > 0) { args.SetBuffer(m_buffer, m_freeIndexPool.Pop(), m_bufferSize); } else { if ((m_numBytes - m_bufferSize) < m_currentIndex) { return false; } args.SetBuffer(m_buffer, m_currentIndex, m_bufferSize); m_currentIndex += m_bufferSize; } return true; }
// 删除 SocketAsyncEventArg 对象的缓冲区。 // 这就将缓冲区释放回缓冲区池 public void FreeBuffer(SocketAsyncEventArgs args) { m_freeIndexPool.Push(args.Offset); args.SetBuffer(null, 0, 0); } }
SocketEventPool类
internal class SocketEventPool { private Stack
public SocketEventPool(int capacity) { m_pool = new Stack
public void Push(SocketAsyncEventArgs item) { if (item == null) { throw new ArgumentNullException("不能将空事件加增事件池中"); } lock (m_pool) { m_pool.Push(item); } }
// 从池中移除 SocketAsyncEventArgs 实例,并返回从池中移除的对象 public SocketAsyncEventArgs Pop() { lock (m_pool) { return m_pool.Pop(); } }
// 池中 SocketAsyncEventArgs 实例的数量 public int Count { get { return m_pool.Count; } }
public void Clear() { m_pool.Clear(); } }
五、实现服务端
#region 定义变量、事件及委托 ///
///
#endregion
///
m_pool = new SocketEventPool(numConnections); m_maxNumberAcceptedClients = new Semaphore(numConnections, numConnections); }
///
for (int i = 0; i < m_maxConnectNum; i++) { readWriteEventArg = new SocketAsyncEventArgs(); readWriteEventArg.Completed += new EventHandler
// 将缓冲池中的字节缓冲区分配给 SocketAsyncEventArg 对象 m_bufferManager.SetBuffer(readWriteEventArg); //向池中添加 SocketAsyncEventArg m_pool.Push(readWriteEventArg); } } ///
listenSocket.Close(); int c_count = m_clients.Count; lock (m_clients) { m_clients.Clear(); }
if (ClientNumberChange != null) ClientNumberChange(-c_count, null); ClientDic.Clear(); comboBox1.Items.Clear(); } ///
// 开始接受客户端连接请求的操作 //在服务器监听套接字上执行接受操作时使用的上下文对象 public void StartAccept(SocketAsyncEventArgs acceptEventArg) { if (acceptEventArg == null) { acceptEventArg = new SocketAsyncEventArgs(); acceptEventArg.Completed += new EventHandler
m_maxNumberAcceptedClients.WaitOne(); if (!listenSocket.AcceptAsync(acceptEventArg)) { ProcessAccept(acceptEventArg); } }
// 该方法是与 Socket.AcceptAsync 相关联的回调方法操作,并在接受操作完成时调用 void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs e) { if (stopclient != "停止监听") { ProcessAccept(e); } stopclient = ""; }
private void ProcessAccept(SocketAsyncEventArgs e) { stopclient = ""; try { Interlocked.Increment(ref m_clientCount); // 获取已接受客户端连接的套接字,并将其放入 ReadEventArg 对象的用户标记中 //这里的SocketAsyncEventArgsPool类一般是自己实现,MSDN有通过栈结构实现的程序池,也可以使用队列或链表 SocketAsyncEventArgs readEventArgs = m_pool.Pop(); AsyncUserToken userToken = (AsyncUserToken)readEventArgs.UserToken; userToken.Socket = e.AcceptSocket; userToken.ConnectTime = DateTime.Now; userToken.Remote = e.AcceptSocket.RemoteEndPoint; userToken.IPAddress = ((IPEndPoint)(e.AcceptSocket.RemoteEndPoint)).Address;
lock (m_clients) { m_clients.Add(userToken); } ClientDic.Add(userToken.Remote.ToString(), userToken);//添加客户端字典集合 this.Invoke(new Action(() => { comboBox1.Items.Add(userToken.Remote.ToString()); comboBox1.Text = userToken.Remote.ToString();
})); if (ClientNumberChange != null) ClientNumberChange(1, userToken); if (!e.AcceptSocket.ReceiveAsync(readEventArgs)) { ProcessReceive(readEventArgs); } } catch (Exception me) { MessageBox.Show(me.Message); }
// 接受下一个连接请求 if (e.SocketError == SocketError.OperationAborted) return; StartAccept(e); }
void IO_Completed(object sender, SocketAsyncEventArgs e) { // 确定刚刚完成的操作类型,并调用相关处理程序 switch (e.LastOperation) { case SocketAsyncOperation.Receive: ProcessReceive(e); break; case SocketAsyncOperation.Send: ProcessSend(e); break; default: throw new ArgumentException("套接字上最后完成的操作不是接收或发送"); }
} //异步接收操作完成时调用该方法。. //如果远程主机关闭了连接,则关闭套接字 //如果接收到数据,则将数据回传至客户端。 private void ProcessReceive(SocketAsyncEventArgs e) { try { // 检查远程主机是否关闭了连接 AsyncUserToken token = (AsyncUserToken)e.UserToken; if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success) { //读取数据 byte[] data = new byte[e.BytesTransferred]; Array.Copy(e.Buffer, e.Offset, data, 0, e.BytesTransferred); lock (token.Buffer) { token.Buffer.AddRange(data); } this.Invoke(new Action(() => { if (radioButton5.Checked) { string recvdata = ToHexStrFromByte(data, 0, data.Length); textBox3.AppendText(DateTime.Now + " 【接收】" + token.Remote + "发送的数据 :" + recvdata + "\r\n"); } else { textBox3.AppendText(DateTime.Now + " 【接收】" + token.Remote + "发送的数据 :" + Encoding.Default.GetString(data) + "\r\n"); } })); //继续接收. 为什么要这么写,请看Socket.ReceiveAsync方法的说明 if (!token.Socket.ReceiveAsync(e)) this.ProcessReceive(e); } else { CloseClientSocket(e); } } catch (Exception xe) { MessageBox.Show(xe.Message + "\r\n" + xe.StackTrace); } }
///
lock (m_clients) { m_clients.Remove(token); } //如果有事件,则调用事件,发送客户端数量变化通知 if (ClientNumberChange != null) ClientNumberChange(-1, token); // 关闭与客户端相关的套接字 try { token.Socket.Shutdown(SocketShutdown.Send); } catch (Exception) { } token.Socket.Close(); // 递减记录连接到服务器的客户端总数的计数器 Interlocked.Decrement(ref m_clientCount); m_maxNumberAcceptedClients.Release(); // 释放 SocketAsyncEventArg,以便另一个客户端重复使用 e.UserToken = new AsyncUserToken(); m_pool.Push(e); }
///
} catch (Exception e) { MessageBox.Show("发送消息异常:" + e.Message); } }
private void button1_Click(object sender, EventArgs e)//开启服务端监听 { if (textBox1.Text.Length <= 0) { MessageBox.Show("端口不为空"); return; } SocketManager(10, 412); Init(); if(Start(new IPEndPoint(IPAddress.Any, Convert.ToInt16(textBox1.Text)))) { button1.Enabled = false; button9.Enabled = true; } else { button1.Enabled = true; button9.Enabled = false; } }
private void button4_Click(object sender, EventArgs e)//清空接收区 { textBox3.Text = string.Empty; }
private void button3_Click(object sender, EventArgs e) {if(textBox2.Text.Length > 0) { if (comboBox1.Text.Length > 0) { if (radioButton1.Checked)//十六进制发送消息 { if ((textBox2.Text.Length % 2) != 0) textBox2.Text = textBox2.Text.Insert(textBox2.Text.Length - 1, 0.ToString()); byte[] returnBytes = new byte[textBox2.Text.Length / 2]; for (int i = 0; i < returnBytes.Length; i++) returnBytes[i] = Convert.ToByte(textBox2.Text.Substring(i * 2, 2), 16); //根据客户端IP端口找到字典集合socket SendMessage(ClientDic[comboBox1.Text].Socket, returnBytes); } else { SendMessage(ClientDic[comboBox1.Text].Socket, Encoding.Default.GetBytes(textBox2.Text)); } textBox3.AppendText(DateTime.Now +" 【发送】"+ ClientDic[comboBox1.Text].Socket.RemoteEndPoint + "数据 :" + textBox2.Text + "\r\n"); } } } string stopclient = ""; private void button9_Click(object sender, EventArgs e) { stopclient = "停止监听"; Stop(); button1.Enabled = true; button9.Enabled = false; }
精彩文章
发表评论