问题描述
有人可以帮助我弄清楚如何使用IB API Python套接字执行基本请求吗? (我使用的是最新的IB API,似乎它支持Python,所以不需要人们以前使用的Ibpy)
Can someone help me to figure out how to do basic request by using IB API Python socket? (I am using the latest IB API and it seems it support Python so should not need the Ibpy which people used to use)
我这样的代码可以简单地工作并使其连接到TWS.问题是:我不知道如何看到"从IB发回的消息.
My code like this can simply work and make it connect to TWS.The problem is : I have no idea how to "see" the message sending back from IB.
from ibapi import wrapper
from ibapi.client import EClient
from ibapi.contract import *
w = wrapper.EWrapper()
myTWS = EClient(w)
myTWS.connect(host='localhost', port=7496, clientId=100)
print("serverVersion:%s connectionTime:%s" % (myTWS.serverVersion(),
myTWS.twsConnectionTime()))
myTWS.startApi()
c = Contract()
c.m_symbol = "AAPL"
c.m_secType = "STK"
c.m_exchange = "ISLAND"
c.m_currency = "USD"
myTWS.reqRealTimeBars(999, c, 5, "MIDPOINT", True, [])
我知道在使用IBPy之前它就像Register()一样.我只是不知道如何在当前的IB原始python API中进行操作.有人可以帮我举个简单的例子吗?提前致谢.
I know that it was something like Register() before with IBPy. I just don't know how to do it in this current IB original python API. Can someone help by giving me a simple example? Thanks in advance.
推荐答案
您必须继承/重写/实现wrapper.EWrapper.那就是您告诉EClient
发送从TWS接收到的数据的地方.
You have to subclass/override/implement the wrapper.EWrapper. That's where you're telling EClient
to send the data received from TWS.
我从示例程序中删除了几乎所有内容,然后运行了.
I removed almost everything from the sample program and this runs.
from ibapi import wrapper
from ibapi.client import EClient
from ibapi.utils import iswrapper #just for decorator
from ibapi.common import *
from ibapi.contract import *
from ibapi.ticktype import *
class TestApp(wrapper.EWrapper, EClient):
def __init__(self):
wrapper.EWrapper.__init__(self)
EClient.__init__(self, wrapper=self)
@iswrapper
def nextValidId(self, orderId:int):
print("setting nextValidOrderId: %d", orderId)
self.nextValidOrderId = orderId
#here is where you start using api
contract = Contract()
contract.symbol = "AAPL"
contract.secType = "STK"
contract.currency = "USD"
contract.exchange = "SMART"
self.reqMktData(1101, contract, "", False, None)
@iswrapper
def error(self, reqId:TickerId, errorCode:int, errorString:str):
print("Error. Id: " , reqId, " Code: " , errorCode , " Msg: " , errorString)
@iswrapper
def tickPrice(self, reqId: TickerId , tickType: TickType, price: float,
attrib:TickAttrib):
print("Tick Price. Ticker Id:", reqId, "tickType:", tickType, "Price:", price)
#this will disconnect and end this program because loop finishes
self.done = True
def main():
app = TestApp()
app.connect("127.0.0.1", 7496, clientId=123)
print("serverVersion:%s connectionTime:%s" % (app.serverVersion(),
app.twsConnectionTime()))
app.run()
if __name__ == "__main__":
main()
一旦调用app.run()
,该程序就会启动一个几乎无限的循环来读取消息,因此,由于必须开始循环,因此您将需要一些其他方式来构造程序.
Once you call app.run()
the program starts an almost infinite loop reading messages so you'll need some other way to structure your program since the loop must be started.
这篇关于IB API Python示例未使用Ibpy的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!