问题描述
我有一个应用程序,使用akka,现在我想通过socket连接到它。因此,我使用类似于的机器。
但是如果我尝试 tell
,而我有一个打开 OutputStream
,没有收到消息目标。
这是我的源代码:
def main(args:Array [String]){
val port = 1337
val conf = ConfigFactory.load
val system = ActorSystem(SDDB, conf.getConfig(SDDB))
val master = system.actorOf(Props [TestActor])
master! a
try {
val listener = new ServerSocket(port)
println(listen on port:+ port)
while(true)
new ConnectionThread(listener accept,master).start
listener close
} catch {
case e:IOException =>
System.err.println(Could not listen on port:+ port +。)
System.exit(-1)
} finally {
system.shutdown
}
}
}
案例类ConnectionThread(socket:Socket,master:ActorRef)
extends Thread(ConnectionThread){
private val Select_ * =select(\w +)from(\w +)on(\d {4}) - (\d\d) - (\d\d) - d)。r
private implicit var id = 0L
private implicit val timeout = Timeout(25.0 seconds)
master! b
override def run {
master! c
try {
master! d
val in = new ObjectInputStream(socket getInputStream)
master! e
val out = new ObjectOutputStream(socket getOutputStream)
out writeObject(listening)
out flush
master! f
val command = in.readObject.asInstanceOf [String]
println(client sent:'+ command +')
//处理命令
master! g
out.writeObject(EOF)
out.flush
out.close
in.close
socket.close
} catch {
case e:SocketException =>
case e:IOException => e printStackTrace
}
}
}
class TestActor extends带ActorLogging的Actor {
日志信息(TestActor running)
def receive = {
case s:String =>
日志信息(received:+ s)
}
}
我得到输出:
侦听端口:1337
[INFO] TestActor running
[INFO] received:a
[INFO] received:b
[INFO] received:c
[INFO] received:d
现在我希望它可以直到g,但是我得到:
客户端发送:'从testdata中选择内容2012-07-06'
我发现它工作,直到我打开一个流的套接字,可能是因为
告诉
和问
是基于套接字以及使用套接字的输出流,胎面运行。然后套接字连接工作,但我不能发送任何消息到actor系统。
我没有办法删除连接器和ConnectionThread。如何解决?解决方案我必须承认,我没有完全理解文档中的示例。但我想通过使用
ConnectionHelper
而不是直接解决ActorRef
工作相当不错。
我将我的代码更改为以下内容:object连接器{
def main(args:Array [String]){
val port = 1337
val conf = ConfigFactory.load
val system = ActorSystem(SDDB,conf.getConfig(SDDB) )
// val master = system.actorOf(Props [TestActor],master)
// master! a
try {
val listener = new ServerSocket(port)
println(listen on port:+ port)
while(true)
// new ConnectionThread(listener accept,master.asInstanceOf [TestActor])。start
new ConnectionThread(listener accept,system).start
listener close
} catch {
case e:IOException =>
System.err.println(Could not listen on port:+ port +。)
System.exit(-1)
} finally {
// master ! PoisonPill
system.shutdown
}
}
}
案例类ConnectionThread(socket:Socket,sys:ActorSystem)$ b $ (\ w +)on(\d {4}) - (\ w +)上的(\w +)选择(\w +)在线程(ConnectionThread){
private val Select_ * = d \ d) - (\ d\d)。r
私人隐式var id = 0L
私人隐式val超时=超时(25.0秒)
私人val conHelper = new ConnectionHelper
覆盖def run {
try {
val out = new ObjectOutputStream(socket getOutputStream)
val in = new ObjectInputStream(socket getInputStream)
conHelper告诉funzt
out writeObject(Hi)
out.flush
val command = in.readObject.asInstanceOf [String]
println receive:+ command)
out writeObject(test)
out.flush
out writeObject(EOF)
out.flush
out.close
in.close
socket.close
}
}
私人类ConnectionHelper {
val tester = sys.actorOf Props [TestActor])
def tell(s:String){tester! s}
}
}
不真正理解为什么这个工作,代码从我的问题不。我欢迎所有的解释。I have an application, that uses akka and now I want to connect to it via a socket connection. Therefor I use a machanism similar to the one from the scala page.But if I try to
tell
, while I have an openOutputStream
, no message is received by the target.Here is my source code:
object Connector { def main(args: Array[String]) { val port = 1337 val conf = ConfigFactory.load val system = ActorSystem("SDDB", conf.getConfig("SDDB")) val master = system.actorOf(Props[TestActor]) master ! "a" try { val listener = new ServerSocket(port) println("listening on port: " + port) while (true) new ConnectionThread(listener accept, master).start listener close } catch { case e: IOException => System.err.println("Could not listen on port: " + port + ".") System.exit(-1) } finally { system.shutdown } } } case class ConnectionThread(socket: Socket, master: ActorRef) extends Thread("ConnectionThread") { private val Select_* = """select (\w+) from (\w+) on (\d{4})-(\d\d)-(\d\d)""".r private implicit var id = 0L private implicit val timeout = Timeout(25.0 seconds) master ! "b" override def run { master ! "c" try{ master ! "d" val in = new ObjectInputStream(socket getInputStream) master ! "e" val out = new ObjectOutputStream(socket getOutputStream) out writeObject("listening") out flush master ! "f" val command = in.readObject.asInstanceOf[String] println("client sent: '" + command + "'") // process the command master ! "g" out.writeObject("EOF") out.flush out.close in.close socket.close } catch { case e: SocketException => case e: IOException => e printStackTrace } } } class TestActor extends Actor with ActorLogging{ log info("TestActor running") def receive = { case s: String => log info("received: " + s) } }
I get the output:
listening on port: 1337 [INFO] TestActor running [INFO] received: a [INFO] received: b [INFO] received: c [INFO] received: d
Now I expected it to go on until g, but instead I get:
client sent: 'select content from testdata on 2012-07-06'
I figured out that it works until I open a Stream of the socket, probably because
tell
andask
are socketbased as well and use the outputstream of the socket, the tread runs in. Afterwards the socket connection works, but I am not able to send any message to the actor-system.
There is no way for me to drop the Connector and the ConnectionThread. How can I fix it?解决方案I must admit, that I did not completly understood the example from the documentation. But I figured out that using a
ConnectionHelper
instead of directly addressing theActorRef
works pretty good.
I changed my code to the following:object Connector { def main(args: Array[String]) { val port = 1337 val conf = ConfigFactory.load val system = ActorSystem("SDDB", conf.getConfig("SDDB")) // val master = system.actorOf(Props[TestActor], "master") // master ! "a" try { val listener = new ServerSocket(port) println("listening on port: " + port) while (true) // new ConnectionThread(listener accept, master.asInstanceOf[TestActor]).start new ConnectionThread(listener accept, system).start listener close } catch { case e: IOException => System.err.println("Could not listen on port: " + port + ".") System.exit(-1) } finally { // master ! PoisonPill system.shutdown } } } case class ConnectionThread(socket: Socket, sys: ActorSystem) extends Thread("ConnectionThread") { private val Select_* = """select (\w+) from (\w+) on (\d{4})-(\d\d)-(\d\d)""".r private implicit var id = 0L private implicit val timeout = Timeout(25.0 seconds) private val conHelper = new ConnectionHelper override def run { try { val out = new ObjectOutputStream(socket getOutputStream) val in = new ObjectInputStream(socket getInputStream) conHelper tell "funzt" out writeObject ("Hi") out.flush val command = in.readObject.asInstanceOf[String] println("received: " + command) out writeObject ("test") out.flush out writeObject ("EOF") out.flush out.close in.close socket.close } } private class ConnectionHelper { val tester = sys.actorOf(Props[TestActor]) def tell(s: String) { tester ! s } } }
I don't really understand why this works and the code from my question does not. I welcome all explanations.
这篇关于Socket连接和ActorSystem的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!