celluloid 官网的文档通读下来,基本有所心得。现结合实际生产,编写一个应用程序,展示如下。
程序目的: 比对新旧两套数据库的同名表记录条数是否一致。预先已将需比对的表名写入 diff_log 表中,添加了两个字段new_count和 old_count用于保存表的记录条数。


  1. require 'celluloid'
  2. require 'dbi'

  3. class MyWorker

  4.   # 定义一个 actor
  5.   include Celluloid

  6.   # 析构函数,当主 actor 退出时,所有的子 actor 将调用析构函数并退出
  7.   finalizer :my_finalizer
  8.   
  9.   # 构造函数:连接新旧两套数据库
  10.   def initialize
  11.     @newdb = DBI.connect('dbi:OCI8:db_new','new_user','new_pass') or exit
  12.     @olddb = DBI.connect('dbi:OCI8:db_old','old_user','old_pass') or exit
  13.   end
  14.   
  15.   # 统计记录条数,写入 diff_log 
  16.   def calc(table_name)
  17.     new_count = @newdb.select_one("select count(1) c from #{table_name}").by_field("C");
  18.     old_count = @olddb.select_one("select count(1) c from #{table_name}").by_field("C");
  19.     
  20.     sql = sprintf "update diff_log set new_count = %d,old_count = %d where table_name = '%s'",
  21.       new_count,old_count,table_name
  22.     puts sql
  23.     
  24.     @newdb.execute(sql)
  25.     @newdb.commit    
  26.   end
  27.   
  28.   def my_finalizer
  29.     @newdb.disconnect if @newdb
  30.     @olddb.disconnect if @olddb
  31.   end
  32. end
  33. # 开启一个进程池,可以同时运行 20 个 actor
  34. pool = MyWorker.pool(size: 20)

  35. dbh = DBI.connect('dbi:OCI8:db_new','new_user','new_pass') or exit
  36. # 主循环
  37. dbh.select_all("select table_name from diff_log where new_count is null") do |row|
  38.   table_name = row.by_field "TABLE_NAME"
  39.   # 异步调用
  40.   pool.async.calc(table_name)
  41. end

  42. dbh.disconnect if dbh
  43. # 休眠主进程。否则主进程将直接退出,所有子 actor 也将被 kill 掉。
  44. # 终止程序只有按 ctrl+c
  45. sleep

09-04 12:52