1 简介
AMI
AMD
区别:
(1)AMI使得客户端可以异步,AMD使得服务端可以异步。
(2)使用AMI服务端代码不需要修改 ,使用AMD客户端代码不需要修改。
(3)对同步模型的支持
2 使用元数据修改代码生成
3 使用AMI
3.1 代码映射
C++ 代码生成器为每个AMI 操作生成以下代码:
(1)一个抽象的回调类, Ice run time 用它来通知应用,操作已完成。类名是按这样的模式取的 AMI_class_op。
这个类提供两个方法:
void ice_response(<params>);
void ice_exception(const Ice::Exception &);
3.2 一个使用AMI的例子
ModelPrx::checkedCast(base);
- 一个回调对象不能同时用于多个调用。需要聚合来自多个答复的信息的应用可以创建一个单独的对象,让回调对象对它进行委托。
- 对回调对象的调用来自Ice run time 的客户线程池中的线程,因此,如果在答复到达的同时,应用可能要与回调对象进行交互,就有可能需要进行同步。
- 客户线程池中的线程的数目决定了,同时可以为多少异步调用发出回调。客户线程池的缺省尺寸是一,意味着对回调对象的调用是序列化的。如果线程池的尺寸增大了,而同一回调对象被用于多个调用,应用就可能需要进行同步。
4 使用AMD
异步分派方法的型构与AMI 方法的类似:返回类型是void,参数由一个回调对象、以及操作的in 参数组成。在AMI 中,回调对象是由应用提供的,而在AMD 中,回调对象是由Ice run time 提供的,同时它还提供了一些方法,用于返回操作的结果,或报告异常。我们的实现不需要在分派方法返回之前调用回调对象;回调对象可以在任何时候,由任何线程调用,但只能被调用一次。
4.1 代码映射
C++ 代码生成器为每个AMD 操作生成以下代码:
(1)一个抽象的回调类.
实现用它来通知Ice run time,操作已完成。
类名是按这样的模式取的:AMD_class_op。
类的方法:
void ice_response(<params>);
void ice_exception(const Ice::Exception &);服务器可以用这个版本的ice_exception 报告用户异常或本地异常。
void ice_exception(const std::exception &);服务器可以用这个版本的ice_exception 报告标准的异常。
void ice_exception() 服务器可以用这个版本的ice_exception 报告未知异常。
(2)分派方法.
其名字有后缀_async。
异步分派方法,它的第一个参数就是由ICE实现的回调类AMD_class_op ,在这个方法里,我们要两种方案:
- 直接做具体工作,完成后在末尾调用回调类的ice_response方法告知客户端已完成。这种方案就和之前普通版的服务端一样,是同步执行的。
- 把 回调类和请求所需要的参数放入一个指定的位置,再由其它线程取出执行和通知客户端。这种方案就是异步分派方法,具体实现时还可以有多种方式,如使用命令模 式把参数和具体操作直接封装成一个对象放入队列,然后由另一线程(或线程池)取出执行。
这个方法的返回类型是void,第一个参数是一个智能指针,指向上面描述的回调类的一个实例。其他的参数由操作的各个in 参数组成,次序是声明时的次序。
例如,假定我们定义了下面这个操作:
interface I {
["amd"] int foo(short s, out long l);
};
下面是为操作foo 生成的回调类:
class AMD_I_foo : public ... {
public:
void ice_response(Ice::Int, Ice::Long);
void ice_exception(const Ice::Exception &);
void ice_exception(const std::exception &);
void ice_exception();
};
下面是为操作foo 的异步调用生成的分派方法:
void foo_async(const AMD_I_fooPtr &, Ice::Short);
4.2 例子
sequence<float> Row;
sequence<Row> Grid;
exception RangeError {};
interface Model {
["ami", "amd"] Grid interpolate(Grid data, float factor)
throws RangeError;
};
服务端的servant(servant就是服务端实际工作的代码)
我们的servant 类派生自Model,并且提供了interpolate_async
方法的定义:
class ModelI : virtual public Model,
virtual public IceUtil::Mutex {
public:
virtual void interpolate_async(const AMD_Model_interpolatePtr &,const Grid &,Ice::Float,const Ice::Current &);
private:
std::list<JobPtr> _jobs;
};
interpolate_async 的实现使用了同步来在一个Job 中安全地记录回调对象,并把它放进一个队列中:
void ModelI::interpolate_async(const AMD_Model_interpolatePtr & cb,const Grid & data,Ice::Float factor,const Ice::Current & current)
{
IceUtil::Mutex::Lock sync(*this);
JobPtr job = new Job(cb, data, factor);
_jobs.push_back(job);
}
在把信息放进队列之后,该操作把控制返回给Ice run time,使分派线程能够去处理另外的请求。一个应用线程从队列中移除下一个Job,并调用
execute 来进行插值。下面是Job 的定义:
class Job : public IceUtil::Shared {
public:
Job(const AMD_Model_interpolatePtr &,const Grid &,Ice::Float);
void execute();
private:
bool interpolateGrid();
AMD_Model_interpolatePtr _cb;
Grid _grid;
Ice::Float _factor;
};
typedef IceUtil::Handle<Job> JobPtr;
execute 的实现使用interpolateGrid ( 没有给出) 来完成计算工
作:
Job::Job(const AMD_Model_interpolatePtr & cb,const Grid & grid,Ice::Float factor) :_cb(cb), _grid(grid), _factor(factor)
{
}
void Job::execute()
{
if(!interpolateGrid()) {
_cb->ice_exception(RangeError());
return;
}
_cb->ice_response(_grid);
}
如果interpolateGrid 返回false, ice_exception 就会被调用,表明发生了范围错误。在调用了ice_exception 之后,使用return 语句是必要的,因为ice_exception 并没有抛出异常;它仅仅是整编了参数,并把它发送给客户。如果插值成功, ice_response 会被调用,把修改后的栅格返回给客户。
(1)Slice定义(hello.ice)
interface Hello
{
["ami", "amd"] idempotent void sayHello(int delay)
throws RequestCanceledException;
void shutdown();
};
(2)服务端声明文件代码:(helloI.h)
#include <Hello.h>
#include <WorkQueue.h>
class HelloI : virtual publicDemo::Hello
{
public:
HelloI(const WorkQueuePtr&);
//分派函数
virtual void sayHello_async(const Demo::AMD_Hello_sayHelloPtr&, int,const Ice::Current&);
virtual void shutdown(const Ice::Current&);
private:
WorkQueuePtr _workQueue;
};
(3)服务端实现文件代码(helloI.cpp):
HelloI::HelloI(constWorkQueuePtr& workQueue) :
_workQueue(workQueue)
{
}
void
HelloI::sayHello_async(
constDemo::AMD_Hello_sayHelloPtr& cb,
int delay, const Ice::Current&)
{
if(delay == 0)
{
cout << "Hello World!"<< endl;
cb->ice_response();
}
else
{
_workQueue->add(cb, delay);
}
}
void
HelloI::shutdown(constIce::Current& curr)
{
cout << "Shutting down..."<< endl;
_workQueue->destroy();
curr.adapter->getCommunicator()->shutdown();
}
int
AsyncServer::run(int argc, char*argv[])
{
if(argc > 1)
{
cerr << appName() <<": too many arguments" << endl;
return EXIT_FAILURE;
}
callbackOnInterrupt();
Ice::ObjectAdapterPtr adapter =communicator()->createObjectAdapter("Hello");
_workQueue = new WorkQueue();
Demo::HelloPtr hello = newHelloI(_workQueue); //定义服务端servrant
adapter->add(hello, communicator()->stringToIdentity(“hello”));//将serverant加入适配器
// 启动工作队列
_workQueue->start();
adapter->activate(); //
communicator()->waitForShutdown();
_workQueue->getThreadControl().join();//等待工作队列停止
return EXIT_SUCCESS;
}
class WorkQueue : publicIceUtil::Thread
{
public:
WorkQueue();
virtual void run();
//将回调对象加入队列
void add(const Demo::AMD_Hello_sayHelloPtr&, int);
void destroy();
private:
//存储回调对象以及调用参数的的结构体
struct CallbackEntry
{
Demo::AMD_Hello_sayHelloPtr cb;
int delay;
};
IceUtil::Monitor<IceUtil::Mutex> _monitor;
std::list<CallbackEntry> _callbacks;
bool _done;
};
typedefIceUtil::Handle<WorkQueue> WorkQueuePtr;
VoidWorkQueue::run()
{
IceUtil::Monitor<IceUtil::Mutex>::Lock
lock(_monitor);
while(!_done)
{
if(_callbacks.size() == 0)
{
_monitor.wait();
}
if(_callbacks.size() != 0)
{
//从队列中取下一个回调对象
CallbackEntry entry = _callbacks.front();
//等待delay秒
_monitor.timedWait(IceUtil::Time::milliSeconds(entry.delay));
if(!_done)
{
_callbacks.pop_front();
//…处理业务逻辑
Do Something here。
entry.cb->ice_response();
}
}
}
void WorkQueue::add(const
Demo::AMD_Hello_sayHelloPtr&cb,int
delay)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock
lock(_monitor);
if(!_done)
{
CallbackEntry entry;
entry.cb =cb;
entry.delay = delay;
if(_callbacks.size() == 0)
{
_monitor.notify();
}
_callbacks.push_back(entry);
}
else
{
//发送异常通知
cb->ice_exception(Demo::RequestCanceledException());
}
}
5 总结
同步的远地调用是对本地的方法调用的自然扩展,它利用了程序员的面向对象编程经验,使初学分布式应用开发的程序员的学习曲线平缓下来。但是,同步调用的阻塞本质使得有些应用任务的实现变得更为困难,甚至不可能,因此, Ice 提供了一个直截了当的接口,你可以用这个接口来访问Ice 的异步设施。如果使用异步方法调用,发出调用的线程可以调用一个操作,然后马上就重获控制,不用阻塞起来等待操作完成。当Ice run time
收到结果时,它会通过回调通知应用。与此类似,异步方法分派允许servant 在任何时候发送操作的结果,而不一定要在操作实现中发送。通过把费时的请求放在队列中,后面再进行处理, servant 可以改善可伸缩性,并节省线程资源。
注:本文内容主要来源于Ice官方文档