问题描述
编写异步平使用原始套接字在F#,使使用尽可能少的线程尽可能的并行请求。不使用System.Net.NetworkInformation.Ping,因为它似乎分配每个请求一个线程。我也有兴趣使用F#异步工作流程。
下面的同步版本正确超时当目标主机不存在/响应,但异步版本挂起。无论工作在主机不响应。不知道这是一个.NET的问题,或F#之一... ...
任何想法?
(注意:这个过程必须以管理员身份运行,让原始套接字的访问)
这将引发超时:
让结果= Ping.Ping(IPAddress.Parse(192.168.33.22),1000)
不过,这挂起:
让结果= Ping.AsyncPing(IPAddress.Parse(192.168.33.22),1000)
|> Async.RunSynchronously
这里的code ...
模块平
开放系统
开System.Net
开放的System.Net.Sockets
开放的System.Threading
// ---- ICMP报文类
键入IcmpMessage(T:字节)=
让可变m_type = T
让可变的m_ code = 0uy
让可变m_checksum = 0US
会员this.Type
以get()= m_type
成员这一点。code
以get()= M_ code
会员this.Checksum = m_checksum
抽象的字节:字节数组
默认this.Bytes
以get()=
[|
m_type
M_ code
字节(m_checksum)
字节(m_checksum>>→8)
|]
会员this.GetChecksum()=
让可变的总和= 0ul
让字节= this.Bytes
让可变I = 0
//总结uint16s
而I< bytes.Length - 1 DO
总之< - 总和+ UINT32(BitConverter.ToUInt16(字节,I))
I< - I + 2
//添加在最后一个字节,如果奇数大小的缓冲区
如果我<> bytes.Length然后
总之< - 总和+ UINT32(字节[I])
//洗牌位
总之< - (总和>>> 16)+(总和与放大器;&安培;&安培; 0xFFFFul)
总之< - 总和+(总和>>> 16)
总之< - ~~~总和
UINT16(和)
会员this.UpdateChecksum()=
m_checksum< - this.GetChecksum()
键入InformationMessage(T:字节)=
继承IcmpMessage(T)
让可变m_identifier = 0US
让可变m_sequenceNumber = 0US
会员this.Identifier = m_identifier
会员this.SequenceNumber = m_sequenceNumber
覆盖this.Bytes
以get()=
Array.append(base.Bytes)
[|
字节(m_identifier)
字节(m_identifier>>→8)
字节(m_sequenceNumber)
字节(m_sequenceNumber>>→8)
|]
键入EchoMessage()=
继承InformationMessage(8uy)
让可变M_DATA = Array.create 32 32uy
做base.UpdateChecksum()
会员this.Data
以get()= M_DATA
并设置(D)= M_DATA< - ð
this.UpdateChecksum()
覆盖this.Bytes
以get()=
Array.append(base.Bytes)
(this.Data)
// ----同步平
让平(主持人:ip地址,超时:INT)=
让可变EP =新IPEndPoint(主机,0)
让插座=新的Socket(AddressFamily.InterNetwork,SocketType.Raw,ProtocolType.Icmp)
socket.SetSocketOption(SocketOptionLevel.Socket,SocketOptionName.SendTimeout,超时)
socket.SetSocketOption(SocketOptionLevel.Socket,SocketOptionName.ReceiveTimeout,超时)
让包= EchoMessage()
让可变的缓冲= packet.Bytes
尝试
如果socket.SendTo(缓冲器,EP)所述; = 0,则
提高(SocketException())
缓存器; - Array.create(buffer.Length +20)0uy
让可变EPR = EP:>端点
如果socket.ReceiveFrom(缓冲液,&安培; EPR)所述; = 0,则
提高(SocketException())
最后
socket.Close()
缓冲
// ---- Entensions到F#异步类,允许多达5 paramters(而不仅仅是3)
有型异步
静态成员FromBeginEnd(?ARG1,ARG2,ARG3,ARG4,beginAction,endAction,cancelAction):异步<T> =
Async.FromBeginEnd((FUN(IAR,州) - > beginAction(ARG1,ARG2,ARG3,ARG4,IAR,状态)),endAction,cancelAction = cancelAction)
静态成员FromBeginEnd(?ARG1,ARG2,ARG3,ARG4,arg5,beginAction,endAction,cancelAction):异步<T> =
Async.FromBeginEnd((FUN(IAR,州) - > beginAction(ARG1,ARG2,ARG3,ARG4,arg5,IAR,状态)),endAction,cancelAction = cancelAction)
// ----扩展到Socket类提供异步的SendTo和ReceiveFrom
对于类型System.Net.Sockets.Socket
构件this.AsyncSendTo(缓冲器,偏移,大小,的SocketFlags,remoteEP)=
Async.FromBeginEnd(缓冲器,偏移,大小,的SocketFlags,remoteEP,
this.BeginSendTo,
this.EndSendTo)
构件this.AsyncReceiveFrom(缓冲器,偏移,大小,的SocketFlags,remoteEP)=
Async.FromBeginEnd(缓冲器,偏移,大小,的SocketFlags,remoteEP,
this.BeginReceiveFrom,
(有趣的asyncResult - > this.EndReceiveFrom(asyncResult,remoteEP)))
// ----异步平
让AsyncPing(主持人:ip地址,超时:INT)=
异步{
让EP = IPEndPoint(主机,0)
使用socket =新的Socket(AddressFamily.InterNetwork,SocketType.Raw,ProtocolType.Icmp)
socket.SetSocketOption(SocketOptionLevel.Socket,SocketOptionName.SendTimeout,超时)
socket.SetSocketOption(SocketOptionLevel.Socket,SocketOptionName.ReceiveTimeout,超时)
让包= EchoMessage()
让outbuffer = packet.Bytes
尝试
让!结果= socket.AsyncSendTo(outbuffer,0,outbuffer.Length,SocketFlags.None,EP)
如果结果< = 0,则
提高(SocketException())
让EPR = REF(EP:>终点)
让inbuffer = Array.create(outbuffer.Length + 256)0uy
让!结果= socket.AsyncReceiveFrom(inbuffer,0,inbuffer.Length,SocketFlags.None,EPR)
如果结果< = 0,则
提高(SocketException())
返回inbuffer
最后
socket.Close()
}
经过一番思考,提出了以下几点。这code增加了一个AsyncReceiveEx成员插槽,其中包括一个超时值。它隐藏接收方法内部看门狗定时器的细节......非常整洁,自成一体。现在,这就是我一直在寻找!
请参阅完整的异步平例子,进一步说明。
不知道,如果锁是必要的,但防患于未然......
型System.Net.Sockets.Socket
会员this.AsyncSend(缓冲区,偏移量,大小,的SocketFlags,ERR)=
Async.FromBeginEnd(缓冲区,偏移量,大小,的SocketFlags,错了,
this.BeginSend,
this.EndSend,
this.Close)
会员this.AsyncReceive(缓冲区,偏移量,大小,的SocketFlags,ERR)=
Async.FromBeginEnd(缓冲区,偏移量,大小,的SocketFlags,错了,
this.BeginReceive,
this.EndReceive,
this.Close)
会员this.AsyncReceiveEx(缓冲区,偏移量,大小,的SocketFlags,错了,(timeoutMS:INT))=
异步{
让TIMEDOUT = REF假
让我们完成= REF假
让计时器=新System.Timers.Timer的(双(timeoutMS),自动复位= FALSE)
timer.Elapsed.Add(乐趣_ - >
锁定TIMEDOUT(乐趣() - GT;
TIMEDOUT:=真
如果不是!完成
然后this.Close()
)
)
让完整的()=
锁定TIMEDOUT(乐趣() - GT;
timer.Stop()
timer.Dispose()
完成:=真
)
返回! Async.FromBeginEnd(缓冲区,偏移量,大小,的SocketFlags,错了,
(FUN(B,O,S,顺丰,电子,意法半导体,UO) - >
让结果= this.BeginReceive(B,O,S,顺丰,电子,意法半导体,UO)
timer.Start()
结果
),
(有趣的结果 - >
完成()
如果!TIMEDOUT
那么错误:= SocketError.TimedOut; 0
其他this.EndReceive(结果,错误)
),
(FUN() - GT;
完成()
this.Close()
)
)
}
下面是一个完整的平例子。为了避免用完的源端口和prevent获得一次太多的答复,它会扫描一个类-C子网的时间。
模块平
开放系统
开System.Net
开放的System.Net.Sockets
开放的System.Threading
// ---- ICMP报文类
键入IcmpMessage(T:字节)=
让可变m_type = T
让可变的m_ code = 0uy
让可变m_checksum = 0US
会员this.Type
以get()= m_type
成员这一点。code
以get()= M_ code
会员this.Checksum = m_checksum
抽象的字节:字节数组
默认this.Bytes
以get()=
[|
m_type
M_ code
字节(m_checksum)
字节(m_checksum>>→8)
|]
会员this.GetChecksum()=
让可变的总和= 0ul
让字节= this.Bytes
让可变I = 0
//总结uint16s
而I< bytes.Length - 1 DO
总之< - 总和+ UINT32(BitConverter.ToUInt16(字节,I))
I< - I + 2
//添加在最后一个字节,如果奇数大小的缓冲区
如果我<> bytes.Length然后
总之< - 总和+ UINT32(字节[I])
//洗牌位
总之< - (总和>>> 16)+(总和与放大器;&安培;&安培; 0xFFFFul)
总之< - 总和+(总和>>> 16)
总之< - ~~~总和
UINT16(和)
会员this.UpdateChecksum()=
m_checksum< - this.GetChecksum()
键入InformationMessage(T:字节)=
继承IcmpMessage(T)
让可变m_identifier = 0US
让可变m_sequenceNumber = 0US
会员this.Identifier = m_identifier
会员this.SequenceNumber = m_sequenceNumber
覆盖this.Bytes
以get()=
Array.append(base.Bytes)
[|
字节(m_identifier)
字节(m_identifier>>→8)
字节(m_sequenceNumber)
字节(m_sequenceNumber>>→8)
|]
键入EchoMessage()=
继承InformationMessage(8uy)
让可变M_DATA = Array.create 32 32uy
做base.UpdateChecksum()
会员this.Data
以get()= M_DATA
并设置(D)= M_DATA< - ð
this.UpdateChecksum()
覆盖this.Bytes
以get()=
Array.append(base.Bytes)
(this.Data)
// ---- Entensions到F#异步类,允许多达5 paramters(而不仅仅是3)
有型异步
静态成员FromBeginEnd(?ARG1,ARG2,ARG3,ARG4,beginAction,endAction,cancelAction):异步<T> =
Async.FromBeginEnd((FUN(IAR,州) - > beginAction(ARG1,ARG2,ARG3,ARG4,IAR,状态)),endAction,cancelAction = cancelAction)
静态成员FromBeginEnd(?ARG1,ARG2,ARG3,ARG4,arg5,beginAction,endAction,cancelAction):异步<T> =
Async.FromBeginEnd((FUN(IAR,州) - > beginAction(ARG1,ARG2,ARG3,ARG4,arg5,IAR,状态)),endAction,cancelAction = cancelAction)
// ----扩展到Socket类提供异步的SendTo和ReceiveFrom
对于类型System.Net.Sockets.Socket
会员this.AsyncSend(缓冲区,偏移量,大小,的SocketFlags,ERR)=
Async.FromBeginEnd(缓冲区,偏移量,大小,的SocketFlags,错了,
this.BeginSend,
this.EndSend,
this.Close)
会员this.AsyncReceive(缓冲区,偏移量,大小,的SocketFlags,ERR)=
Async.FromBeginEnd(缓冲区,偏移量,大小,的SocketFlags,错了,
this.BeginReceive,
this.EndReceive,
this.Close)
会员this.AsyncReceiveEx(缓冲区,偏移量,大小,的SocketFlags,错了,(timeoutMS:INT))=
异步{
让TIMEDOUT = REF假
让我们完成= REF假
让计时器=新System.Timers.Timer的(双(timeoutMS),自动复位= FALSE)
timer.Elapsed.Add(乐趣_ - >
锁定TIMEDOUT(乐趣() - GT;
TIMEDOUT:=真
如果不是!完成
然后this.Close()
)
)
让完整的()=
锁定TIMEDOUT(乐趣() - GT;
timer.Stop()
timer.Dispose()
完成:=真
)
返回! Async.FromBeginEnd(缓冲区,偏移量,大小,的SocketFlags,错了,
(FUN(B,O,S,顺丰,电子,意法半导体,UO) - >
让结果= this.BeginReceive(B,O,S,顺丰,电子,意法半导体,UO)
timer.Start()
结果
),
(有趣的结果 - >
完成()
如果!TIMEDOUT
那么错误:= SocketError.TimedOut; 0
其他this.EndReceive(结果,错误)
),
(FUN() - GT;
完成()
this.Close()
)
)
}
// ----异步平
让AsyncPing(IP:ip地址,超时:INT)=
异步{
使用socket =新的Socket(AddressFamily.InterNetwork,SocketType.Raw,ProtocolType.Icmp)
socket.Connect(IPEndPoint(IP,0))
让pingTime = System.Diagnostics.Stopwatch()
让包= EchoMessage()
让outbuffer = packet.Bytes
让ERR = REF(SocketError())
我们的isAlive = REF假
尝试
pingTime.Start()
让!结果= socket.AsyncSend(outbuffer,0,outbuffer.Length,SocketFlags.None,ERR)
pingTime.Stop()
如果结果< = 0,则
提高(SocketException(INT(!ERR)))
让inbuffer = Array.create(outbuffer.Length + 256)0uy
pingTime.Start()
让!回复= socket.AsyncReceiveEx(inbuffer,0,inbuffer.Length,SocketFlags.None,呃,超时)
pingTime.Stop()
如果结果< = 0&功放;&安培;没有(!ERR = SocketError.TimedOut),然后
提高(SocketException(INT(!ERR)))
的isAlive:=未(!ERR = SocketError.TimedOut)
&功放;&安培; inbuffer。[25] = 0uy //类型0 =回音应答(redundent?有必要吗?)
&功放;&安培; inbuffer。[26] = 0uy // code = 0回音应答(redundent?有必要吗?)
最后
socket.Close()
返回(IP,pingTime.Elapsed,!的isAlive)
}
让主()=
让坪网=
序列{
在0..255节点做
让IP = IPAddress.Parse(sprintf的192.168%D%D网节点)
产量Ping.AsyncPing(IP,1000)
}
净在0..255做
坪网
|> Async.Parallel
|> Async.RunSynchronously
|> Seq.filter(FUN(_,_,活着) - >活着)
|> Seq.iter(乐趣(IP,时间,活着) - >
printfn%A%DMSIP time.Milliseconds)
主要()
System.Console.ReadKey()|>忽略
Writing an asynchronous Ping using Raw Sockets in F#, to enable parallel requests using as few threads as possible. Not using "System.Net.NetworkInformation.Ping", because it appears to allocate one thread per request. Am also interested in using F# async workflows.
The synchronous version below correctly times out when the target host does not exist/respond, but the asynchronous version hangs. Both work when the host does respond. Not sure if this is a .NET issue, or an F# one...
Any ideas?
(note: the process must run as Admin to allow Raw Socket access)
This throws a timeout:
let result = Ping.Ping ( IPAddress.Parse( "192.168.33.22" ), 1000 )
However, this hangs:
let result = Ping.AsyncPing ( IPAddress.Parse( "192.168.33.22" ), 1000 )
|> Async.RunSynchronously
Here's the code...
module Ping
open System
open System.Net
open System.Net.Sockets
open System.Threading
//---- ICMP Packet Classes
type IcmpMessage (t : byte) =
let mutable m_type = t
let mutable m_code = 0uy
let mutable m_checksum = 0us
member this.Type
with get() = m_type
member this.Code
with get() = m_code
member this.Checksum = m_checksum
abstract Bytes : byte array
default this.Bytes
with get() =
[|
m_type
m_code
byte(m_checksum)
byte(m_checksum >>> 8)
|]
member this.GetChecksum() =
let mutable sum = 0ul
let bytes = this.Bytes
let mutable i = 0
// Sum up uint16s
while i < bytes.Length - 1 do
sum <- sum + uint32(BitConverter.ToUInt16( bytes, i ))
i <- i + 2
// Add in last byte, if an odd size buffer
if i <> bytes.Length then
sum <- sum + uint32(bytes.[i])
// Shuffle the bits
sum <- (sum >>> 16) + (sum &&& 0xFFFFul)
sum <- sum + (sum >>> 16)
sum <- ~~~sum
uint16(sum)
member this.UpdateChecksum() =
m_checksum <- this.GetChecksum()
type InformationMessage (t : byte) =
inherit IcmpMessage(t)
let mutable m_identifier = 0us
let mutable m_sequenceNumber = 0us
member this.Identifier = m_identifier
member this.SequenceNumber = m_sequenceNumber
override this.Bytes
with get() =
Array.append (base.Bytes)
[|
byte(m_identifier)
byte(m_identifier >>> 8)
byte(m_sequenceNumber)
byte(m_sequenceNumber >>> 8)
|]
type EchoMessage() =
inherit InformationMessage( 8uy )
let mutable m_data = Array.create 32 32uy
do base.UpdateChecksum()
member this.Data
with get() = m_data
and set(d) = m_data <- d
this.UpdateChecksum()
override this.Bytes
with get() =
Array.append (base.Bytes)
(this.Data)
//---- Synchronous Ping
let Ping (host : IPAddress, timeout : int ) =
let mutable ep = new IPEndPoint( host, 0 )
let socket = new Socket( AddressFamily.InterNetwork, SocketType.Raw, ProtocolType.Icmp )
socket.SetSocketOption( SocketOptionLevel.Socket, SocketOptionName.SendTimeout, timeout )
socket.SetSocketOption( SocketOptionLevel.Socket, SocketOptionName.ReceiveTimeout, timeout )
let packet = EchoMessage()
let mutable buffer = packet.Bytes
try
if socket.SendTo( buffer, ep ) <= 0 then
raise (SocketException())
buffer <- Array.create (buffer.Length + 20) 0uy
let mutable epr = ep :> EndPoint
if socket.ReceiveFrom( buffer, &epr ) <= 0 then
raise (SocketException())
finally
socket.Close()
buffer
//---- Entensions to the F# Async class to allow up to 5 paramters (not just 3)
type Async with
static member FromBeginEnd(arg1,arg2,arg3,arg4,beginAction,endAction,?cancelAction): Async<'T> =
Async.FromBeginEnd((fun (iar,state) -> beginAction(arg1,arg2,arg3,arg4,iar,state)), endAction, ?cancelAction=cancelAction)
static member FromBeginEnd(arg1,arg2,arg3,arg4,arg5,beginAction,endAction,?cancelAction): Async<'T> =
Async.FromBeginEnd((fun (iar,state) -> beginAction(arg1,arg2,arg3,arg4,arg5,iar,state)), endAction, ?cancelAction=cancelAction)
//---- Extensions to the Socket class to provide async SendTo and ReceiveFrom
type System.Net.Sockets.Socket with
member this.AsyncSendTo( buffer, offset, size, socketFlags, remoteEP ) =
Async.FromBeginEnd( buffer, offset, size, socketFlags, remoteEP,
this.BeginSendTo,
this.EndSendTo )
member this.AsyncReceiveFrom( buffer, offset, size, socketFlags, remoteEP ) =
Async.FromBeginEnd( buffer, offset, size, socketFlags, remoteEP,
this.BeginReceiveFrom,
(fun asyncResult -> this.EndReceiveFrom(asyncResult, remoteEP) ) )
//---- Asynchronous Ping
let AsyncPing (host : IPAddress, timeout : int ) =
async {
let ep = IPEndPoint( host, 0 )
use socket = new Socket( AddressFamily.InterNetwork, SocketType.Raw, ProtocolType.Icmp )
socket.SetSocketOption( SocketOptionLevel.Socket, SocketOptionName.SendTimeout, timeout )
socket.SetSocketOption( SocketOptionLevel.Socket, SocketOptionName.ReceiveTimeout, timeout )
let packet = EchoMessage()
let outbuffer = packet.Bytes
try
let! result = socket.AsyncSendTo( outbuffer, 0, outbuffer.Length, SocketFlags.None, ep )
if result <= 0 then
raise (SocketException())
let epr = ref (ep :> EndPoint)
let inbuffer = Array.create (outbuffer.Length + 256) 0uy
let! result = socket.AsyncReceiveFrom( inbuffer, 0, inbuffer.Length, SocketFlags.None, epr )
if result <= 0 then
raise (SocketException())
return inbuffer
finally
socket.Close()
}
After some thought, came up with the following. This code adds an AsyncReceiveEx member to Socket, which includes a timeout value. It hides the details of the watchdog timer inside the receive method... very tidy and self contained. Now THIS is what I was looking for!
See the complete async ping example, further below.
Not sure if the locks are necessary, but better safe than sorry...
type System.Net.Sockets.Socket with
member this.AsyncSend( buffer, offset, size, socketFlags, err ) =
Async.FromBeginEnd( buffer, offset, size, socketFlags, err,
this.BeginSend,
this.EndSend,
this.Close )
member this.AsyncReceive( buffer, offset, size, socketFlags, err ) =
Async.FromBeginEnd( buffer, offset, size, socketFlags, err,
this.BeginReceive,
this.EndReceive,
this.Close )
member this.AsyncReceiveEx( buffer, offset, size, socketFlags, err, (timeoutMS:int) ) =
async {
let timedOut = ref false
let completed = ref false
let timer = new System.Timers.Timer( double(timeoutMS), AutoReset=false )
timer.Elapsed.Add( fun _ ->
lock timedOut (fun () ->
timedOut := true
if not !completed
then this.Close()
)
)
let complete() =
lock timedOut (fun () ->
timer.Stop()
timer.Dispose()
completed := true
)
return! Async.FromBeginEnd( buffer, offset, size, socketFlags, err,
(fun (b,o,s,sf,e,st,uo) ->
let result = this.BeginReceive(b,o,s,sf,e,st,uo)
timer.Start()
result
),
(fun result ->
complete()
if !timedOut
then err := SocketError.TimedOut; 0
else this.EndReceive( result, err )
),
(fun () ->
complete()
this.Close()
)
)
}
Here is a complete Ping example. To avoid running out of source ports and to prevent getting too many replies at once, it scans one class-c subnet at a time.
module Ping
open System
open System.Net
open System.Net.Sockets
open System.Threading
//---- ICMP Packet Classes
type IcmpMessage (t : byte) =
let mutable m_type = t
let mutable m_code = 0uy
let mutable m_checksum = 0us
member this.Type
with get() = m_type
member this.Code
with get() = m_code
member this.Checksum = m_checksum
abstract Bytes : byte array
default this.Bytes
with get() =
[|
m_type
m_code
byte(m_checksum)
byte(m_checksum >>> 8)
|]
member this.GetChecksum() =
let mutable sum = 0ul
let bytes = this.Bytes
let mutable i = 0
// Sum up uint16s
while i < bytes.Length - 1 do
sum <- sum + uint32(BitConverter.ToUInt16( bytes, i ))
i <- i + 2
// Add in last byte, if an odd size buffer
if i <> bytes.Length then
sum <- sum + uint32(bytes.[i])
// Shuffle the bits
sum <- (sum >>> 16) + (sum &&& 0xFFFFul)
sum <- sum + (sum >>> 16)
sum <- ~~~sum
uint16(sum)
member this.UpdateChecksum() =
m_checksum <- this.GetChecksum()
type InformationMessage (t : byte) =
inherit IcmpMessage(t)
let mutable m_identifier = 0us
let mutable m_sequenceNumber = 0us
member this.Identifier = m_identifier
member this.SequenceNumber = m_sequenceNumber
override this.Bytes
with get() =
Array.append (base.Bytes)
[|
byte(m_identifier)
byte(m_identifier >>> 8)
byte(m_sequenceNumber)
byte(m_sequenceNumber >>> 8)
|]
type EchoMessage() =
inherit InformationMessage( 8uy )
let mutable m_data = Array.create 32 32uy
do base.UpdateChecksum()
member this.Data
with get() = m_data
and set(d) = m_data <- d
this.UpdateChecksum()
override this.Bytes
with get() =
Array.append (base.Bytes)
(this.Data)
//---- Entensions to the F# Async class to allow up to 5 paramters (not just 3)
type Async with
static member FromBeginEnd(arg1,arg2,arg3,arg4,beginAction,endAction,?cancelAction): Async<'T> =
Async.FromBeginEnd((fun (iar,state) -> beginAction(arg1,arg2,arg3,arg4,iar,state)), endAction, ?cancelAction=cancelAction)
static member FromBeginEnd(arg1,arg2,arg3,arg4,arg5,beginAction,endAction,?cancelAction): Async<'T> =
Async.FromBeginEnd((fun (iar,state) -> beginAction(arg1,arg2,arg3,arg4,arg5,iar,state)), endAction, ?cancelAction=cancelAction)
//---- Extensions to the Socket class to provide async SendTo and ReceiveFrom
type System.Net.Sockets.Socket with
member this.AsyncSend( buffer, offset, size, socketFlags, err ) =
Async.FromBeginEnd( buffer, offset, size, socketFlags, err,
this.BeginSend,
this.EndSend,
this.Close )
member this.AsyncReceive( buffer, offset, size, socketFlags, err ) =
Async.FromBeginEnd( buffer, offset, size, socketFlags, err,
this.BeginReceive,
this.EndReceive,
this.Close )
member this.AsyncReceiveEx( buffer, offset, size, socketFlags, err, (timeoutMS:int) ) =
async {
let timedOut = ref false
let completed = ref false
let timer = new System.Timers.Timer( double(timeoutMS), AutoReset=false )
timer.Elapsed.Add( fun _ ->
lock timedOut (fun () ->
timedOut := true
if not !completed
then this.Close()
)
)
let complete() =
lock timedOut (fun () ->
timer.Stop()
timer.Dispose()
completed := true
)
return! Async.FromBeginEnd( buffer, offset, size, socketFlags, err,
(fun (b,o,s,sf,e,st,uo) ->
let result = this.BeginReceive(b,o,s,sf,e,st,uo)
timer.Start()
result
),
(fun result ->
complete()
if !timedOut
then err := SocketError.TimedOut; 0
else this.EndReceive( result, err )
),
(fun () ->
complete()
this.Close()
)
)
}
//---- Asynchronous Ping
let AsyncPing (ip : IPAddress, timeout : int ) =
async {
use socket = new Socket( AddressFamily.InterNetwork, SocketType.Raw, ProtocolType.Icmp )
socket.Connect( IPEndPoint( ip, 0 ) )
let pingTime = System.Diagnostics.Stopwatch()
let packet = EchoMessage()
let outbuffer = packet.Bytes
let err = ref (SocketError())
let isAlive = ref false
try
pingTime.Start()
let! result = socket.AsyncSend( outbuffer, 0, outbuffer.Length, SocketFlags.None, err )
pingTime.Stop()
if result <= 0 then
raise (SocketException(int(!err)))
let inbuffer = Array.create (outbuffer.Length + 256) 0uy
pingTime.Start()
let! reply = socket.AsyncReceiveEx( inbuffer, 0, inbuffer.Length, SocketFlags.None, err, timeout )
pingTime.Stop()
if result <= 0 && not (!err = SocketError.TimedOut) then
raise (SocketException(int(!err)))
isAlive := not (!err = SocketError.TimedOut)
&& inbuffer.[25] = 0uy // Type 0 = echo reply (redundent? necessary?)
&& inbuffer.[26] = 0uy // Code 0 = echo reply (redundent? necessary?)
finally
socket.Close()
return (ip, pingTime.Elapsed, !isAlive )
}
let main() =
let pings net =
seq {
for node in 0..255 do
let ip = IPAddress.Parse( sprintf "192.168.%d.%d" net node )
yield Ping.AsyncPing( ip, 1000 )
}
for net in 0..255 do
pings net
|> Async.Parallel
|> Async.RunSynchronously
|> Seq.filter ( fun (_,_,alive) -> alive )
|> Seq.iter ( fun (ip, time, alive) ->
printfn "%A %dms" ip time.Milliseconds)
main()
System.Console.ReadKey() |> ignore
这篇关于如何使用异步Socket.BeginReceive当检测到超时?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!