streamcluster.cpp 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984
  1. /***********************************************
  2. streamcluster.cpp
  3. : original source code of streamcluster with minor
  4. modification regarding function calls
  5. - original code from PARSEC Benchmark Suite
  6. - parallelization with CUDA API has been applied by
  7. Sang-Ha (a.k.a Shawn) Lee - sl4ge@virginia.edu
  8. University of Virginia
  9. Department of Electrical and Computer Engineering
  10. Department of Computer Science
  11. :revised by
  12. Jianbin Fang - j.fang@tudelft.nl
  13. Delft University of Technology
  14. Faculty of Electrical Engineering, Mathematics and Computer Science
  15. Department of Software Technology
  16. Parallel and Distributed Systems Group
  17. on 15/03/2011
  18. ***********************************************/
  19. #include "streamcluster.h"
  20. #include "CLHelper.h"
  21. #include "streamcluster_cl.h"
  22. using namespace std;
  23. #define MAXNAMESIZE 1024 // max filename length
  24. #define SEED 1
  25. /* increase this to reduce probability of random error */
  26. /* increasing it also ups running time of "speedy" part of the code */
  27. /* SP = 1 seems to be fine */
  28. #define SP 1 // number of repetitions of speedy must be >=1
  29. /* higher ITER --> more likely to get correct # of centers */
  30. /* higher ITER also scales the running time almost linearly */
  31. #define ITER 3 // iterate ITER* k log k times; ITER >= 1
  32. //#define PRINTINFO //comment this out to disable output
  33. //#define PROFILE_TMP // comment this out to disable instrumentation code
  34. //#define ENABLE_THREADS // comment this out to disable threads
  35. //#define INSERT_WASTE //uncomment this to insert waste computation into dist function
  36. #define CACHE_LINE 512 // cache line in byte
  37. /* global */
  38. static char *switch_membership; //whether to switch membership in pgain
  39. static bool *is_center; //whether a point is a center
  40. static int *center_table; //index table of centers
  41. static int nproc; //# of threads
  42. /* timing info */
  43. static double serial;
  44. static double cpu_gpu_memcpy;
  45. static double memcpy_back;
  46. static double gpu_malloc;
  47. static double kernel;
  48. static double gpu_free;
  49. static int cnt_speedy;
  50. // instrumentation code
  51. #ifdef PROFILE_TMP
  52. double time_local_search;
  53. double time_speedy;
  54. double time_select_feasible;
  55. double time_gain;
  56. double time_shuffle;
  57. double time_gain_dist;
  58. double time_gain_init;
  59. double time_FL;
  60. #endif
  61. void inttofile(int data, char *filename){
  62. FILE *fp = fopen(filename, "w");
  63. fprintf(fp, "%d ", data);
  64. fclose(fp);
  65. }
  66. int isIdentical(float *i, float *j, int D){
  67. // tells whether two points of D dimensions are identical
  68. int a = 0;
  69. int equal = 1;
  70. while (equal && a < D) {
  71. if (i[a] != j[a]) equal = 0;
  72. else a++;
  73. }
  74. if (equal) return 1;
  75. else return 0;
  76. }
  77. /* comparator for floating point numbers */
  78. static int floatcomp(const void *i, const void *j)
  79. {
  80. float a, b;
  81. a = *(float *)(i);
  82. b = *(float *)(j);
  83. if (a > b) return (1);
  84. if (a < b) return (-1);
  85. return(0);
  86. }
  87. /* shuffle points into random order */
  88. void shuffle(Points *points)
  89. {
  90. #ifdef PROFILE_TMP
  91. double t1 = gettime();
  92. #endif
  93. long i, j;
  94. Point temp;
  95. for (i=0;i<points->num-1;i++) {
  96. j=(lrand48()%(points->num - i)) + i;
  97. temp = points->p[i];
  98. points->p[i] = points->p[j];
  99. points->p[j] = temp;
  100. }
  101. #ifdef PROFILE_TMP
  102. double t2 = gettime();
  103. time_shuffle += t2-t1;
  104. #endif
  105. }
  106. /* shuffle an array of integers */
  107. void intshuffle(int *intarray, int length)
  108. {
  109. #ifdef PROFILE_TMP
  110. double t1 = gettime();
  111. #endif
  112. long i, j;
  113. int temp;
  114. for (i=0;i<length;i++) {
  115. j=(lrand48()%(length - i))+i;
  116. temp = intarray[i];
  117. intarray[i]=intarray[j];
  118. intarray[j]=temp;
  119. }
  120. #ifdef PROFILE_TMP
  121. double t2 = gettime();
  122. time_shuffle += t2-t1;
  123. #endif
  124. }
  125. #ifdef INSERT_WASTE
  126. float waste(float s )
  127. {
  128. for( int i =0 ; i< 4; i++ ) {
  129. s += pow(s,0.78);
  130. }
  131. return s;
  132. }
  133. #endif
  134. /* compute Euclidean distance squared between two points */
  135. float dist(Point p1, Point p2, int dim)
  136. {
  137. int i;
  138. float result=0.0;
  139. for (i=0;i<dim;i++)
  140. result += (p1.coord[i] - p2.coord[i])*(p1.coord[i] - p2.coord[i]);
  141. #ifdef INSERT_WASTE
  142. float s = waste(result);
  143. result += s;
  144. result -= s;
  145. #endif
  146. return(result);
  147. }
  148. /* run speedy on the points, return total cost of solution */
  149. float pspeedy(Points *points, float z, long *kcenter, int pid, pthread_barrier_t* barrier)
  150. {
  151. #ifdef PROFILE_TMP
  152. double t1 = gettime();
  153. #endif
  154. cnt_speedy++;
  155. #ifdef ENABLE_THREADS
  156. pthread_barrier_wait(barrier);
  157. #endif
  158. //my block
  159. long bsize = points->num/nproc;
  160. long k1 = bsize * pid;
  161. long k2 = k1 + bsize;
  162. if( pid == nproc-1 ) k2 = points->num;
  163. static float totalcost;
  164. static bool open = false;
  165. static float* costs; //cost for each thread.
  166. static int i;
  167. #ifdef ENABLE_THREADS
  168. static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
  169. static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
  170. #endif
  171. #ifdef PRINTINFO
  172. if( pid == 0 ){
  173. fprintf(stderr, "Speedy: facility cost %lf\n", z);
  174. }
  175. #endif
  176. /* create center at first point, send it to itself */
  177. for( int k = k1; k < k2; k++ ) {
  178. float distance = dist(points->p[k],points->p[0],points->dim);
  179. points->p[k].cost = distance * points->p[k].weight;
  180. points->p[k].assign=0;
  181. }
  182. if( pid==0 ) {
  183. *kcenter = 1;
  184. costs = (float*)malloc(sizeof(float)*nproc);
  185. }
  186. if( pid != 0 ) { // we are not the master threads. we wait until a center is opened.
  187. while(1) {
  188. #ifdef ENABLE_THREADS
  189. pthread_mutex_lock(&mutex);
  190. while(!open) pthread_cond_wait(&cond,&mutex);
  191. pthread_mutex_unlock(&mutex);
  192. #endif
  193. if( i >= points->num ) break;
  194. for( int k = k1; k < k2; k++ )
  195. {
  196. float distance = dist(points->p[i],points->p[k],points->dim);
  197. if( distance*points->p[k].weight < points->p[k].cost )
  198. {
  199. points->p[k].cost = distance * points->p[k].weight;
  200. points->p[k].assign=i;
  201. }
  202. }
  203. #ifdef ENABLE_THREADS
  204. pthread_barrier_wait(barrier);
  205. pthread_barrier_wait(barrier);
  206. #endif
  207. }
  208. }
  209. else { // I am the master thread. I decide whether to open a center and notify others if so.
  210. for(i = 1; i < points->num; i++ ) {
  211. bool to_open = ((float)lrand48()/(float)INT_MAX)<(points->p[i].cost/z); //--cambine: what standard?
  212. if( to_open ) {
  213. (*kcenter)++;
  214. #ifdef ENABLE_THREADS
  215. pthread_mutex_lock(&mutex);
  216. #endif
  217. open = true;
  218. #ifdef ENABLE_THREADS
  219. pthread_mutex_unlock(&mutex);
  220. pthread_cond_broadcast(&cond);
  221. #endif
  222. for( int k = k1; k < k2; k++ ) { //--cambine: for a new open, compute new cost and center.
  223. float distance = dist(points->p[i],points->p[k],points->dim);
  224. if( distance*points->p[k].weight < points->p[k].cost ) {
  225. points->p[k].cost = distance * points->p[k].weight;
  226. points->p[k].assign=i;
  227. }
  228. }
  229. #ifdef ENABLE_THREADS
  230. pthread_barrier_wait(barrier);
  231. #endif
  232. open = false;
  233. #ifdef ENABLE_THREADS
  234. pthread_barrier_wait(barrier);
  235. #endif
  236. }
  237. }
  238. #ifdef ENABLE_THREADS
  239. pthread_mutex_lock(&mutex);
  240. #endif
  241. open = true;
  242. #ifdef ENABLE_THREADS
  243. pthread_mutex_unlock(&mutex);
  244. pthread_cond_broadcast(&cond);
  245. #endif
  246. }
  247. #ifdef ENABLE_THREADS
  248. pthread_barrier_wait(barrier);
  249. #endif
  250. open = false;
  251. float mytotal = 0;
  252. for( int k = k1; k < k2; k++ ) {
  253. mytotal += points->p[k].cost;
  254. }
  255. costs[pid] = mytotal;
  256. #ifdef ENABLE_THREADS
  257. pthread_barrier_wait(barrier);
  258. #endif
  259. // aggregate costs from each thread
  260. if( pid == 0 )
  261. {
  262. totalcost=z*(*kcenter);
  263. for( int i = 0; i < nproc; i++ )
  264. {
  265. totalcost += costs[i];
  266. }
  267. free(costs);
  268. }
  269. #ifdef ENABLE_THREADS
  270. pthread_barrier_wait(barrier);
  271. #endif
  272. #ifdef PRINTINFO
  273. if( pid == 0 )
  274. {
  275. fprintf(stderr, "Speedy opened %d facilities for total cost %lf\n",
  276. *kcenter, totalcost);
  277. fprintf(stderr, "Distance Cost %lf\n", totalcost - z*(*kcenter));
  278. }
  279. #endif
  280. #ifdef PROFILE_TMP
  281. double t2 = gettime();
  282. if( pid== 0 ) {
  283. time_speedy += t2 -t1;
  284. }
  285. #endif
  286. return(totalcost);
  287. }
  288. /* facility location on the points using local search */
  289. /* z is the facility cost, returns the total cost and # of centers */
  290. /* assumes we are seeded with a reasonable solution */
  291. /* cost should represent this solution's cost */
  292. /* halt if there is < e improvement after iter calls to gain */
  293. /* feasible is an array of numfeasible points which may be centers */
  294. float pFL(Points *points, int *feasible, int numfeasible,
  295. float z, long *k, int kmax, float cost, long iter, float e,
  296. int pid, pthread_barrier_t* barrier)
  297. {
  298. #ifdef PROFILE_TMP
  299. double t1 = gettime();
  300. #endif
  301. #ifdef ENABLE_THREADS
  302. pthread_barrier_wait(barrier);
  303. #endif
  304. long i;
  305. long long x;
  306. float change;
  307. long numberOfPoints;
  308. change = cost;
  309. /* continue until we run iter iterations without improvement */
  310. /* stop instead if improvement is less than e */
  311. while (change/cost > 1.0*e) {
  312. change = 0.0;
  313. numberOfPoints = points->num;
  314. /* randomize order in which centers are considered */
  315. if( pid == 0 ) {
  316. intshuffle(feasible, numfeasible);
  317. }
  318. #ifdef ENABLE_THREADS
  319. pthread_barrier_wait(barrier);
  320. #endif
  321. for (i=0;i<iter;i++) {
  322. x = i%numfeasible;
  323. //printf("--cambine: feasible x=%ld, z=%f, k=%ld, kmax=%d\n", x, z, *k, kmax);
  324. change += pgain(feasible[x], points, z, k, kmax, is_center, center_table, switch_membership,
  325. &serial, &cpu_gpu_memcpy, &memcpy_back, &gpu_malloc, &kernel);
  326. }
  327. cost -= change;
  328. #ifdef PRINTINFO
  329. if( pid == 0 ) {
  330. fprintf(stderr, "%d centers, cost %lf, total distance %lf\n",
  331. *k, cost, cost - z*(*k));
  332. }
  333. #endif
  334. #ifdef ENABLE_THREADS
  335. pthread_barrier_wait(barrier);
  336. #endif
  337. }
  338. #ifdef PROFILE_TMP
  339. double t2 = gettime();
  340. time_FL += t2 - t1;
  341. #endif
  342. return(cost);
  343. }
  344. int selectfeasible_fast(Points *points, int **feasible, int kmin, int pid, pthread_barrier_t* barrier)
  345. {
  346. #ifdef PROFILE_TMP
  347. double t1 = gettime();
  348. #endif
  349. int numfeasible = points->num;
  350. if (numfeasible > (ITER*kmin*log((float)kmin)))
  351. numfeasible = (int)(ITER*kmin*log((float)kmin));
  352. *feasible = (int *)malloc(numfeasible*sizeof(int));
  353. float* accumweight;
  354. float totalweight;
  355. /*
  356. Calcuate my block.
  357. For now this routine does not seem to be the bottleneck, so it is not parallelized.
  358. When necessary, this can be parallelized by setting k1 and k2 to
  359. proper values and calling this routine from all threads ( it is called only
  360. by thread 0 for now ).
  361. Note that when parallelized, the randomization might not be the same and it might
  362. not be difficult to measure the parallel speed-up for the whole program.
  363. */
  364. // long bsize = numfeasible;
  365. long k1 = 0;
  366. long k2 = numfeasible;
  367. float w;
  368. int l,r,k;
  369. /* not many points, all will be feasible */
  370. if (numfeasible == points->num) {
  371. for (int i=k1;i<k2;i++)
  372. (*feasible)[i] = i;
  373. return numfeasible;
  374. }
  375. accumweight= (float*)malloc(sizeof(float)*points->num);
  376. accumweight[0] = points->p[0].weight;
  377. totalweight=0;
  378. for( int i = 1; i < points->num; i++ ) {
  379. accumweight[i] = accumweight[i-1] + points->p[i].weight;
  380. }
  381. totalweight=accumweight[points->num-1];
  382. for(int i=k1; i<k2; i++ ) {
  383. w = (lrand48()/(float)INT_MAX)*totalweight;
  384. //binary search
  385. l=0;
  386. r=points->num-1;
  387. if( accumweight[0] > w ) {
  388. (*feasible)[i]=0;
  389. continue;
  390. }
  391. while( l+1 < r ) {
  392. k = (l+r)/2;
  393. if( accumweight[k] > w ) {
  394. r = k;
  395. }
  396. else {
  397. l=k;
  398. }
  399. }
  400. (*feasible)[i]=r;
  401. }
  402. free(accumweight);
  403. #ifdef PROFILE_TMP
  404. double t2 = gettime();
  405. time_select_feasible += t2-t1;
  406. #endif
  407. return numfeasible;
  408. }
  409. /* compute approximate kmedian on the points */
  410. float pkmedian(Points *points, long kmin, long kmax, long* kfinal,
  411. int pid, pthread_barrier_t* barrier )
  412. {
  413. int i;
  414. float cost;
  415. float lastcost;
  416. float hiz, loz, z;
  417. static long k;
  418. static int *feasible;
  419. static int numfeasible;
  420. static float* hizs;
  421. if( pid==0 ) hizs = (float*)calloc(nproc,sizeof(float));
  422. hiz = loz = 0.0;
  423. long numberOfPoints = points->num;
  424. long ptDimension = points->dim;
  425. //my block
  426. long bsize = points->num/nproc;
  427. long k1 = bsize * pid;
  428. long k2 = k1 + bsize;
  429. if( pid == nproc-1 ) k2 = points->num;
  430. #ifdef PRINTINFO
  431. if( pid == 0 )
  432. {
  433. printf("Starting Kmedian procedure\n");
  434. printf("%i points in %i dimensions\n", numberOfPoints, ptDimension);
  435. }
  436. #endif
  437. #ifdef ENABLE_THREADS
  438. pthread_barrier_wait(barrier);
  439. #endif
  440. float myhiz = 0;
  441. for (long kk=k1;kk < k2; kk++ ) {
  442. myhiz += dist(points->p[kk], points->p[0],
  443. ptDimension)*points->p[kk].weight;
  444. }
  445. hizs[pid] = myhiz;
  446. #ifdef ENABLE_THREADS
  447. pthread_barrier_wait(barrier);
  448. #endif
  449. for( int i = 0; i < nproc; i++ ) {
  450. hiz += hizs[i];
  451. }
  452. loz=0.0; z = (hiz+loz)/2.0;
  453. /* NEW: Check whether more centers than points! */
  454. if (points->num <= kmax) { //--cambine: just ignore for the timebeing
  455. /* just return all points as facilities */
  456. for (long kk=k1;kk<k2;kk++) {
  457. points->p[kk].assign = kk;
  458. points->p[kk].cost = 0;
  459. }
  460. cost = 0;
  461. if( pid== 0 ) {
  462. free(hizs);
  463. *kfinal = k;
  464. }
  465. return cost;
  466. }
  467. if( pid == 0 ) shuffle(points); //--cambine: why need shuffle?
  468. cost = pspeedy(points, z, &k, pid, barrier);
  469. #ifdef PRINTINFO
  470. if( pid == 0 )
  471. printf("thread %d: Finished first call to speedy, cost=%lf, k=%i\n",pid,cost,k);
  472. #endif
  473. i=0;
  474. /* give speedy SP chances to get at least kmin/2 facilities */
  475. while ((k < kmin)&&(i<SP)) {
  476. cost = pspeedy(points, z, &k, pid, barrier);
  477. i++;
  478. }
  479. #ifdef PRINTINFO
  480. if( pid==0)
  481. printf("thread %d: second call to speedy, cost=%lf, k=%d\n",pid,cost,k);
  482. #endif
  483. /* if still not enough facilities, assume z is too high */
  484. while (k < kmin) {
  485. #ifdef PRINTINFO
  486. if( pid == 0 ) {
  487. printf("%lf %lf\n", loz, hiz);
  488. printf("Speedy indicates we should try lower z\n");
  489. }
  490. #endif
  491. if (i >= SP) {hiz=z; z=(hiz+loz)/2.0; i=0;}
  492. if( pid == 0 ) shuffle(points);
  493. cost = pspeedy(points, z, &k, pid, barrier);
  494. i++;
  495. }
  496. /* now we begin the binary search for real */
  497. /* must designate some points as feasible centers */
  498. /* this creates more consistancy between FL runs */
  499. /* helps to guarantee correct # of centers at the end */
  500. if( pid == 0 )
  501. {
  502. numfeasible = selectfeasible_fast(points,&feasible,kmin,pid,barrier); //--cambine?
  503. for( int i = 0; i< points->num; i++ ) {
  504. is_center[points->p[i].assign]= true;
  505. }
  506. }
  507. #ifdef ENABLE_THREADS
  508. pthread_barrier_wait(barrier);
  509. #endif
  510. while(1) {
  511. #ifdef PRINTINFO
  512. if( pid==0 )
  513. {
  514. printf("loz = %lf, hiz = %lf\n", loz, hiz);
  515. printf("Running Local Search...\n");
  516. }
  517. #endif
  518. /* first get a rough estimate on the FL solution */
  519. // pthread_barrier_wait(barrier);
  520. lastcost = cost;
  521. cost = pFL(points, feasible, numfeasible,
  522. z, &k, kmax, cost, (long)(ITER*kmax*log((float)kmax)), 0.1, pid, barrier);
  523. /* if number of centers seems good, try a more accurate FL */
  524. if (((k <= (1.1)*kmax)&&(k >= (0.9)*kmin))||
  525. ((k <= kmax+2)&&(k >= kmin-2))) {
  526. #ifdef PRINTINFO
  527. if( pid== 0)
  528. {
  529. printf("Trying a more accurate local search...\n");
  530. }
  531. #endif
  532. /* may need to run a little longer here before halting without
  533. improvement */
  534. cost = pFL(points, feasible, numfeasible,
  535. z, &k, kmax, cost, (long)(ITER*kmax*log((float)kmax)), 0.001, pid, barrier);
  536. }
  537. if (k > kmax) {
  538. /* facilities too cheap */
  539. /* increase facility cost and up the cost accordingly */
  540. loz = z; z = (hiz+loz)/2.0;
  541. cost += (z-loz)*k;
  542. }
  543. if (k < kmin) {
  544. /* facilities too expensive */
  545. /* decrease facility cost and reduce the cost accordingly */
  546. hiz = z; z = (hiz+loz)/2.0;
  547. cost += (z-hiz)*k;
  548. }
  549. /* if k is good, return the result */
  550. /* if we're stuck, just give up and return what we have */
  551. if (((k <= kmax)&&(k >= kmin))||((loz >= (0.999)*hiz)) )
  552. {
  553. break;
  554. }
  555. #ifdef ENABLE_THREADS
  556. pthread_barrier_wait(barrier);
  557. #endif
  558. }
  559. //clean up...
  560. if( pid==0 ) {
  561. free(feasible);
  562. free(hizs);
  563. *kfinal = k;
  564. }
  565. return cost;
  566. }
  567. /* compute the means for the k clusters */
  568. int contcenters(Points *points)
  569. {
  570. long i, ii;
  571. float relweight;
  572. for (i=0;i<points->num;i++) {
  573. /* compute relative weight of this point to the cluster */
  574. if (points->p[i].assign != i) {
  575. relweight=points->p[points->p[i].assign].weight + points->p[i].weight;
  576. relweight = points->p[i].weight/relweight;
  577. for (ii=0;ii<points->dim;ii++) {
  578. points->p[points->p[i].assign].coord[ii]*=1.0-relweight;
  579. points->p[points->p[i].assign].coord[ii]+=
  580. points->p[i].coord[ii]*relweight;
  581. }
  582. points->p[points->p[i].assign].weight += points->p[i].weight;
  583. }
  584. }
  585. return 0;
  586. }
  587. /* copy centers from points to centers */
  588. void copycenters(Points *points, Points* centers, long* centerIDs, long offset)
  589. {
  590. long i;
  591. long k;
  592. bool *is_a_median = (bool *) calloc(points->num, sizeof(bool));
  593. /* mark the centers */
  594. for ( i = 0; i < points->num; i++ ) {
  595. is_a_median[points->p[i].assign] = 1;
  596. }
  597. k=centers->num;
  598. /* count how many */
  599. for ( i = 0; i < points->num; i++ ) {
  600. if ( is_a_median[i] ) {
  601. memcpy( centers->p[k].coord, points->p[i].coord, points->dim * sizeof(float));
  602. centers->p[k].weight = points->p[i].weight;
  603. centerIDs[k] = i + offset;
  604. k++;
  605. }
  606. }
  607. centers->num = k;
  608. free(is_a_median);
  609. }
  610. void* localSearchSub(void* arg_) {
  611. pkmedian_arg_t* arg= (pkmedian_arg_t*)arg_;
  612. pkmedian(arg->points,arg->kmin,arg->kmax,arg->kfinal,arg->pid,arg->barrier);
  613. return NULL;
  614. }
  615. void localSearch( Points* points, long kmin, long kmax, long* kfinal ) {
  616. #ifdef PROFILE_TMP
  617. double t1 = gettime();
  618. #endif
  619. pthread_barrier_t barrier;
  620. #ifdef ENABLE_THREADS
  621. pthread_barrier_init(&barrier,NULL,nproc);
  622. #endif
  623. pthread_t* threads = new pthread_t[nproc];
  624. pkmedian_arg_t* arg = new pkmedian_arg_t[nproc];
  625. for( int i = 0; i < nproc; i++ ) {
  626. arg[i].points = points;
  627. arg[i].kmin = kmin;
  628. arg[i].kmax = kmax;
  629. arg[i].pid = i;
  630. arg[i].kfinal = kfinal;
  631. arg[i].barrier = &barrier;
  632. #ifdef ENABLE_THREADS
  633. pthread_create(threads+i,NULL,localSearchSub,(void*)&arg[i]);
  634. #else
  635. localSearchSub(&arg[0]);
  636. #endif
  637. }
  638. for ( int i = 0; i < nproc; i++) {
  639. #ifdef ENABLE_THREADS
  640. pthread_join(threads[i],NULL);
  641. #endif
  642. }
  643. delete[] threads;
  644. delete[] arg;
  645. #ifdef ENABLE_THREADS
  646. pthread_barrier_destroy(&barrier);
  647. #endif
  648. #ifdef PROFILE_TMP
  649. double t2 = gettime();
  650. time_local_search += t2-t1;
  651. #endif
  652. }
  653. void outcenterIDs( Points* centers, long* centerIDs, char* outfile ) {
  654. FILE* fp = fopen(outfile, "w");
  655. if( fp==NULL ) {
  656. fprintf(stderr, "error opening %s\n",outfile);
  657. exit(1);
  658. }
  659. int* is_a_median = (int*)calloc( sizeof(int), centers->num );
  660. for( int i =0 ; i< centers->num; i++ ) {
  661. is_a_median[centers->p[i].assign] = 1;
  662. }
  663. for( int i = 0; i < centers->num; i++ ) {
  664. if( is_a_median[i] ) {
  665. fprintf(fp, "%u\n", centerIDs[i]);
  666. fprintf(fp, "%lf\n", centers->p[i].weight);
  667. for( int k = 0; k < centers->dim; k++ ) {
  668. fprintf(fp, "%lf ", centers->p[i].coord[k]);
  669. }
  670. fprintf(fp,"\n\n");
  671. }
  672. }
  673. fclose(fp);
  674. }
  675. void streamCluster( PStream* stream,
  676. long kmin, long kmax, int dim,
  677. long chunksize, long centersize, char* outfile )
  678. {
  679. float* block = (float*)malloc( chunksize*dim*sizeof(float) );
  680. float* centerBlock = (float*)malloc(centersize*dim*sizeof(float) );
  681. long* centerIDs = (long*)malloc(centersize*dim*sizeof(long));
  682. if( block == NULL ) {
  683. fprintf(stderr,"not enough memory for a chunk!\n");
  684. exit(1);
  685. }
  686. Points points;
  687. points.dim = dim;
  688. points.num = chunksize;
  689. points.p = (Point *)malloc(chunksize*sizeof(Point));
  690. for( int i = 0; i < chunksize; i++ ) {
  691. points.p[i].coord = &block[i*dim];
  692. }
  693. Points centers;
  694. centers.dim = dim;
  695. centers.p = (Point *)malloc(centersize*sizeof(Point));
  696. centers.num = 0;
  697. for( int i = 0; i< centersize; i++ ) {
  698. centers.p[i].coord = &centerBlock[i*dim];
  699. centers.p[i].weight = 1.0;
  700. }
  701. long IDoffset = 0;
  702. long kfinal;
  703. while(1) {
  704. size_t numRead = stream->read(block, dim, chunksize );
  705. fprintf(stderr,"read %d points\n",numRead);
  706. if( stream->ferror() || numRead < (unsigned int)chunksize && !stream->feof() ) {
  707. fprintf(stderr, "error reading data!\n");
  708. exit(1);
  709. }
  710. points.num = numRead;
  711. for( int i = 0; i < points.num; i++ ) {
  712. points.p[i].weight = 1.0;
  713. }
  714. switch_membership = (char*)malloc(points.num*sizeof(char));
  715. is_center = (bool*)calloc(points.num,sizeof(bool));
  716. center_table = (int*)malloc(points.num*sizeof(int));
  717. localSearch(&points,kmin, kmax,&kfinal);
  718. fprintf(stderr,"finish local search\n");
  719. contcenters(&points);
  720. if( kfinal + centers.num > centersize ) {
  721. //here we don't handle the situation where # of centers gets too large.
  722. fprintf(stderr,"oops! no more space for centers\n");
  723. exit(1);
  724. }
  725. #ifdef PRINTINFO
  726. printf("finish cont center\n");
  727. #endif
  728. copycenters(&points, &centers, centerIDs, IDoffset);
  729. IDoffset += numRead;
  730. #ifdef PRINTINFO
  731. printf("finish copy centers\n");
  732. #endif
  733. free(is_center);
  734. free(switch_membership);
  735. free(center_table);
  736. if( stream->feof() ) {
  737. break;
  738. }
  739. }
  740. //finally cluster all temp centers
  741. switch_membership = (char*)malloc(centers.num*sizeof(char));
  742. is_center = (bool*)calloc(centers.num,sizeof(bool));
  743. center_table = (int*)malloc(centers.num*sizeof(int));
  744. localSearch( &centers, kmin, kmax ,&kfinal );
  745. contcenters(&centers);
  746. outcenterIDs( &centers, centerIDs, outfile);
  747. }
  748. int main(int argc, char **argv)
  749. {
  750. char *outfilename = new char[MAXNAMESIZE];
  751. char *infilename = new char[MAXNAMESIZE];
  752. long kmin, kmax, n, chunksize, clustersize;
  753. int dim;
  754. #ifdef PARSEC_VERSION
  755. #define __PARSEC_STRING(x) #x
  756. #define __PARSEC_XSTRING(x) __PARSEC_STRING(x)
  757. printf("PARSEC Benchmark Suite Version "__PARSEC_XSTRING(PARSEC_VERSION)"\n");
  758. fflush(NULL);
  759. #else
  760. printf("PARSEC Benchmark Suite\n");
  761. fflush(NULL);
  762. #endif //PARSEC_VERSION
  763. #ifdef ENABLE_PARSEC_HOOKS
  764. __parsec_bench_begin(__parsec_streamcluster);
  765. #endif
  766. if (argc<11) {
  767. fprintf(stderr,"usage: %s k1 k2 d n chunksize clustersize infile outfile nproc\n",
  768. argv[0]);
  769. fprintf(stderr," k1: Min. number of centers allowed\n");
  770. fprintf(stderr," k2: Max. number of centers allowed\n");
  771. fprintf(stderr," d: Dimension of each data point\n");
  772. fprintf(stderr," n: Number of data points\n");
  773. fprintf(stderr," chunksize: Number of data points to handle per step\n");
  774. fprintf(stderr," clustersize: Maximum number of intermediate centers\n");
  775. fprintf(stderr," infile: Input file (if n<=0)\n");
  776. fprintf(stderr," outfile: Output file\n");
  777. fprintf(stderr," nproc: Number of threads to use\n");
  778. fprintf(stderr,"\n");
  779. fprintf(stderr, "if n > 0, points will be randomly generated instead of reading from infile.\n");
  780. exit(1);
  781. }
  782. kmin = atoi(argv[1]);
  783. kmax = atoi(argv[2]);
  784. dim = atoi(argv[3]);
  785. n = atoi(argv[4]);
  786. chunksize = atoi(argv[5]);
  787. clustersize = atoi(argv[6]);
  788. strcpy(infilename, argv[7]);
  789. strcpy(outfilename, argv[8]);
  790. nproc = atoi(argv[9]);
  791. _clCmdParams(argc, argv);
  792. try{
  793. _clInit(platform_idx, device_idx, device_type);
  794. }
  795. catch(std::string msg){
  796. std::cout<<"exception caught in main function->"<<msg<<std::endl;
  797. return -1;
  798. }
  799. srand48(SEED);
  800. PStream* stream;
  801. if( n > 0 ) {
  802. stream = new SimStream(n);
  803. }
  804. else {
  805. stream = new FileStream(infilename);
  806. }
  807. #ifdef PROFILE_TMP
  808. double t1 = gettime();
  809. #endif
  810. #ifdef ENABLE_PARSEC_HOOKS
  811. __parsec_roi_begin();
  812. #endif
  813. #ifdef PROFILE_TMP
  814. serial = 0.0;
  815. cpu_gpu_memcpy = 0.0;
  816. gpu_malloc = 0.0;
  817. gpu_free = 0.0;
  818. kernel = 0.0;
  819. time_FL = 0.0;
  820. cnt_speedy = 0;
  821. #endif
  822. std::cout<<"before sc"<<std::endl;
  823. streamCluster(stream, kmin, kmax, dim, chunksize, clustersize, outfilename );
  824. std::cout<<"after sc"<<std::endl;
  825. #ifdef PROFILE_TMP
  826. gpu_free = gettime();
  827. #endif
  828. freeDevMem();
  829. #ifdef PROFILE_TMP
  830. gpu_free = gettime() - gpu_free;
  831. #endif
  832. _clRelease();
  833. #ifdef ENABLE_PARSEC_HOOKS
  834. __parsec_roi_end();
  835. #endif
  836. #ifdef PROFILE_TMP
  837. double t2 = gettime();
  838. printf("time = %lf\n",t2-t1);
  839. #endif
  840. delete stream;
  841. #ifdef PROFILE_TMP
  842. printf("time pgain = %lf\n", time_gain);
  843. printf("time pgain_dist = %lf\n", time_gain_dist);
  844. printf("time pgain_init = %lf\n", time_gain_init);
  845. printf("time pselect = %lf\n", time_select_feasible);
  846. printf("time pspeedy = %lf\n", time_speedy);
  847. printf("time pshuffle = %lf\n", time_shuffle);
  848. printf("time FL = %lf\n", time_FL);
  849. printf("time localSearch = %lf\n", time_local_search);
  850. printf("\n");
  851. printf("====GPU Timing info====\n");
  852. printf("time serial = %lf\n", serial);
  853. printf("time CPU to GPU memory copy = %lf\n", cpu_gpu_memcpy);
  854. printf("time GPU to CPU memory copy back = %lf\n", memcpy_back);
  855. printf("time GPU malloc = %lf\n", gpu_malloc);
  856. printf("time GPU free = %lf\n", gpu_free);
  857. printf("time kernel = %lf\n", kernel);
  858. FILE *fp = fopen("PD.txt", "w");
  859. fprintf(fp, "%lf, %lf, %lf, %lf, %lf, %lf, %lf, %lf, %lf\n", time_FL, cpu_gpu_memcpy, memcpy_back, kernel, gpu_malloc, gpu_free, 0.0);
  860. fclose(fp);
  861. #endif
  862. _clStatistics();
  863. #ifdef ENABLE_PARSEC_HOOKS
  864. __parsec_bench_end();
  865. #endif
  866. return 0;
  867. }