我试图使用proc ds2尝试通过使用多线程功能来在常规数据步骤上获得一些性能提升。
fred.testdata是一个SPDE数据集,其中包含500万个观察值。我的代码如下:
proc ds2;
thread home_claims_thread / overwrite = yes;
/*declare char(10) producttype;
declare char(12) wrknat_clmtype;
declare char(7) claimtypedet;
declare char(1) event_flag;*/
/*declare date week_ending having format date9.;*/
method run();
/*declare char(7) _week_ending;*/
set fred.testdata;
if claim = 'X' then claimtypedet= 'ABC';
else if claim = 'Y' then claimtypedet= 'DEF';
/*_week_ending = COMPRESS(exposmth,'M');
week_ending = to_date(substr(_week_ending,1,4) || '-' || substr(_week_ending,5,2) || '-01');*/
end;
endthread;
data home_claims / overwrite = yes;
declare thread home_claims_thread t;
method run();
set from t threads=8;
end;
enddata;
run;
quit;
我没有包括所有的IF语句,只包括了一些,否则将占用几页(您应该有个主意)。就目前的代码而言,它的工作速度比正常数据步骤要快得多,但是,如果发生以下任何情况,则会出现严重的性能问题:
我的问题是:
谢谢
最佳答案
可以尝试两种方法:使用proc hpds2
让SAS处理并行执行,或者使用更手动的方法。请注意,使用这两种方法中的任何一种都不可能始终保持顺序。
方法1:PROC HPDS2
HPDS2是执行大规模并行数据处理的一种方式。在单机模式下,它将使每个内核并行运行,然后将所有数据放回原处。您只需要对代码进行一些小的修改即可运行它。hpds2
有一个设置,您可以在data
语句的out
和proc
语句中声明数据。您的data
和set
语句将始终使用以下语法:
data DS2GTF.out;
method run();
set DS2GTF.in;
<code>;
end;
enddata;
知道这一点,我们可以修改您的代码以在HPDS2上运行:
proc hpds2 data=fred.test_data
out=home_claims;
data DS2GTF.out;
/*declare char(10) producttype;
declare char(12) wrknat_clmtype;
declare char(7) claimtypedet;
declare char(1) event_flag;*/
/*declare date week_ending having format date9.;*/
method run();
/*declare char(7) _week_ending;*/
set DS2GTF.in;
if claim = 'X' then claimtypedet= 'ABC';
else if claim = 'Y' then claimtypedet= 'DEF';
/*_week_ending = COMPRESS(exposmth,'M');
week_ending = to_date(substr(_week_ending,1,4) || '-' || substr(_week_ending,5,2) || '-01');*/
end;
enddata;
run;
quit;
方法2:使用rsubmit分割数据并追加
下面的代码利用
rsubmit
和直接观察访问来读取块中的数据,然后将它们全部附加在一起。如果您已为Block I/O设置了数据,则这一功能特别有效options sascmd='!sascmd'
autosignon=yes
noconnectwait
noconnectpersist
;
%let cpucount = %sysfunc(getoption(cpucount));
%macro parallel_execute(data=, out=, threads=&cpucount);
/* Get total obs from data */
%let dsid = %sysfunc(open(&data.));
%let n = %sysfunc(attrn(&dsid., nlobs));
%let rc = %sysfunc(close(&dsid.));
/* Run &threads rsubmit sessions */
%do i = 1 %to &threads;
/* Determine the records that each worker will read */
%let firstobs = %sysevalf(&n.-(&n./&threads.)*(&threads.-&i+1)+1, floor);
%let lastobs = %sysevalf(&n.-(&n./&threads.)*(&threads.-&i.), floor);
/* Get this session's work directory */
%let workdir = %sysfunc(getoption(work));
/* Send all macro variables to the remote session, and simultaneously start the remote session */
%syslput _USER_ / remote=worker&i.;
/* Check for an input libname */
%if(%scan(&data., 2, .) NE) %then %do;
%let inlib = %scan(&data., 1, .);
%let indsn = %scan(&data., 2, .);
%end;
%else %do;
%let inlib = workdir;
%let indsn = &data.;
%end;
/* Check for an output libname */
%if(%scan(&out., 2, .) NE) %then %do;
%let outlib = %scan(&out., 1, .);
%let outdsn = %scan(&out., 2, .);
%end;
%else %do;
%let outlib = workdir;
%let outdsn = &out.;
%end;
/* Work library location of this session to be inherited by the parallel session */
%let workdir = %sysfunc(getoption(work));
/* Sign on to a remote session and send over all user-made macro variables */
%syslput _USER_ / remote=worker&i.;
/* Run code on remote session &i */
rsubmit remote=worker&i. inheritlib=(&inlib.);
libname workdir "&workdir.";
data workdir._&outdsn._&i.;
set &inlib..&indsn.(firstobs=&firstobs. obs=&lastobs.);
/* <PUT CODE HERE>;*/
run;
endrsubmit;
%end;
/* Wait for everything to complete */
waitfor _ALL_;
/* Append all of the chunks together */
proc datasets nolist;
delete &out.;
%do i = 1 %to &threads.;
append base=&out.
data=_&outdsn._&i.
force
;
%end;
/* Optional: remove all temporary data */
/* delete _&outdsn._:;*/
quit;
libname workdir clear;
%mend;
您可以使用以下代码测试其功能:
data pricedata;
set sashelp.pricedata;
run;
%parallel_execute(data=pricedata, out=test, threads=3);
如果查看WORK目录中的临时文件,您会发现它均匀地将数据集分配给了3个并行进程,并将其总计为原始总数。
_test_1 = 340
_test_2 = 340
_test_3 = 340
TOTAL = 1020
pricedata = 1020