/* This program implements a parallel sorting of "Shi & Schaeffer" 
 * Parallel sorting with Regular sampling using Message passing Interface on the LAM 
 * A maximum of 8 process "Threads" under one sequential single-user  processor has been used.
 * 
 * Author Basem Shihada
 * 	  2001
 */


#include <math.h>
#include <stdio.h>
#include <stdlib.h>
#include "mpi.h"


/* functions prototype section */

/* the compair function for the bult in qsort */

static int icompar(int *b , int *c)
{
 if ( *b > *c )
   return 1;
  if ( *b < *c )
    return -1;
  return 0;
 }

/* sublists creats the sublist depending on the pivots elements */

void sublists(int buff[],int start, int end,int subsize[],int at
			,int pivots[],int fp,int lp);

void heapsort(int a[], int tlen);
void shiftup(int a[],int k , int n);

int main (int argc , char *argv[]) /* the main program */
{
  const int n = 10000 ; /* the maximum size of elements */
  int my_rank ; 		/* rank of the process*/
  int p	     ;		/* number of processes */
  int source  ; 		/* rank of the sender */
  int dest    ; 		/* rank of the receiver*/
  int tag=0   ; 		/* tag for messages */
  int size    ; 		/* the size of the array */
  int rsize   ; 		/* the size in between the elements */
  int val=0   ; 		/* variable to read the elements from the files */
  int i=0,j=0 ; 		/* counters */
  int start,end ; 	/* the begining & the end of the arrays  */
  int buff[400] ; 	/* array which will hold the elements */
  int sample[10]; 	/* array which will hold the sample elements */
  int temp[400] ; 	/* array which will hold the p sample subarrays */
  int pivots[10]; 	/* the pivots array */
  int samp[10]  ; 	/* array which will hold the transformed elements */
  int subsize[10];	/* how many sublists will we need */
  int buff0[400] , buff1[400] , buff2[400] , buff3[400] ; /* tempurary arrays */
  int rbuff[400];    /* array to hold the received data */
  int tlen=0 ,ttlen=0 ,len0=0,len1=0,len2=0,len3=0; /* arrays length */
  int count ; 			/* receiving the actual length of the array */
  double starttime , endtime , totaltime; /* compute the processing time */
  FILE *pe0,*pe1,*pe2,*pe3 ;              /* file contains the input data */
  MPI_Status  status ; 							/* return the status of the receiver  */
  
 
  MPI_Init(&argc,&argv);  		      			/* start up the MPI    */
  MPI_Comm_rank(MPI_COMM_WORLD, &my_rank );  /* find the processor rank */
  MPI_Comm_size(MPI_COMM_WORLD, &p);         /* find the number of processors */
  

  MPI_Barrier(MPI_COMM_WORLD);               /* initialize the timmer */
  starttime = MPI_Wtime();
   
  /* compute the size of the sub arrays and the rsize */
  
  size = floor((n+p-1)/p);
  rsize = floor((size + p-1)/p);
  

  /* initialize the "buff" array by reading the values from a P files.
   * Each process has it's own storage local memory so each one will
   * have a buff[size]
   * apply the quick sort function and then chose the sample elements and
   * store them in the sample array
   */
  
  
  
 if (my_rank == 0){ if (( pe0 = fopen ("data0.txt","r"))==NULL)
    			printf("file data0.txt could not be opend \n");
 			for(i=0;i <= size-1; i++){
		   	 fscanf(pe0,"%d",&val);		
/* read integer from data0.txt file */
		     	 buff[i] = val;}           /* but an integer in array*/
	           }
 if (my_rank == 1){ if (( pe1 = fopen ("data1.txt","r"))==NULL)
    			printf("file data1.txt could not be opend \n");
 			for(i=0;i <= size-1; i++){
		   	 fscanf(pe1,"%d",&val);    
/* read integer from data1.txt file */
		   	 buff[i] = val;}           
/* but an integer in array*/
	           }	         		
 if (my_rank == 2){ if (( pe2 = fopen ("data2.txt","r"))==NULL)
    			printf("file data2.txt could not be opend \n");
 			for(i=0;i <= size-1; i++){
		   	 fscanf(pe2,"%d",&val);    /* read integer from data2.txt file */
		   	 buff[i] = val;}           /* but an integer in array*/
	           }
  if (my_rank == 3){ if (( pe3 = fopen ("data3.txt","r"))==NULL)
    			printf("file data3.txt could not be opend \n");
 			for(i=0;i <= size-1; i++){
		   	 fscanf(pe3,"%d",&val);    
/* read integer from data3.txt file */
		   	 buff[i] = val;}           /* but an integer in array*/
	           }

   /* apply quick sort on each of the buffers */
  
   qsort((void *)buff,size,sizeof(int),icompar);
           
   /* each processors will pick up the samples */ 
    start = 0;
    end = size-1;

        for(j=0;j<=p-1;j++)
         if ((j*rsize)<= end)
          sample[j] = buff[(j)*rsize];
         else
          sample[j] = buff[end];
         
       
  /* send the sample array to processor 0 to sort it using sequential
   * quick sort then determine the pivots elements.... send the pivots
   * elements to all processores to complete thier list partitioning
   */
  
  
  if(my_rank != 0 )
     {
      /* all other processors will send the sample array to PE0 */

      MPI_Send( sample,p,MPI_INT,0,tag,MPI_COMM_WORLD);
    }  
 else
   {
      for (i=0;i <= p-1; i++)
			temp[i] = sample[i];  /* input the sample of PE0 in temp */
      
      /* Pe0 will receive the samples from all other processors and put the
       * samples together in temp array  
       */
      
      for( source =1; source < p ;source++){
			MPI_Recv( samp,p,MPI_INT,source,tag,MPI_COMM_WORLD,&status );
	
		for ( j=0;j<= p-1;j++)
	  		temp[j+(p)*source] = samp[j];
   } /* end of the else */
      
  /* only Pe0 will perforn a quicksort for the temp array then chose a pivots
   * elements and send them to all other processors
   */
      
     qsort((void *)temp,p*p,sizeof(int),icompar); /* built in quick sort */

       for(i=1;i<=p-1;i++)
			pivots[i-1] = temp[(i*p+p/2)-1];   		  /* find the pivots */
     
    
    }  /* end of else statement */

 if (my_rank == 0 )
  {  /* distripute the pivots to all other processors */
     for(dest=1;dest<=p-1;dest++)
		MPI_Send(pivots,p,MPI_INT,dest,1,MPI_COMM_WORLD); }
   else
    {
     /* all processors will receive the pivots array */
      
      MPI_Recv(pivots,p,MPI_INT,0,1,MPI_COMM_WORLD,&status);
    } /* end of else statement */
  
/* divide each sorted list buff in to p sublists with the pivots as spliters
 * by using the subsize array */

  start = 0;
  end   = size-1;
  subsize[0] = start;
  subsize[p] = end;
   
  sublists(buff,start,end,subsize,1,pivots,0,p-2);
  subsize[p] = end ;

 /* divide the buff array into a sub lists according to the sublist array
  * to prepare them for distripution between the processors... each processor
  * will keep its sub array and then send the others to the appropriet
  * processor  */

  for ( i=0 ; i < p ; i++){

    	if (i==0) { for ( j=0 ;j <= subsize[i+1] - subsize[i];j++){
      					buff0[j] = buff[j];
							len0 = len0 +1;}
			 			if(my_rank != i)
			  				MPI_Send(buff0,len0,MPI_INT,i,my_rank,MPI_COMM_WORLD);
			 			}
	if(i==1) { for (j=0; j <  subsize[i+1] - subsize[i]; j++){
							buff1[j]= buff[(j+1)+ subsize[i]];
							len1=len1 + 1;}
			 			if(my_rank != i)
			  				MPI_Send(buff1,len1,MPI_INT,i,my_rank,MPI_COMM_WORLD);
			 	}
	if(i==2) { for (j=0; j< subsize[i+1] - subsize[i];j++){
							buff2[j] = buff[(j+1)+subsize[i]];
						l	en2=len2 +1;}
						if(my_rank != i)
			 				MPI_Send(buff2,len2,MPI_INT,i,my_rank,MPI_COMM_WORLD);
				}
	if(i==3) { for (j=0; j < subsize[i+1] - subsize[i];j++){		
      					buff3[j] = buff[(j+1)+subsize[i]];
      					len3=len3 + 1;}
					if(my_rank != i)
							MPI_Send(buff3,len3,MPI_INT,i,my_rank,MPI_COMM_WORLD);
				}
   
 }  /* end of the for loop */
  
 /* taking each own sub array and stor it in the temp array to keep it for
  * future concatinating the received data */

  if (my_rank ==0) { for(i=0;i< len0;i++)
                 		temp[i] = buff0[i];
                		tlen = len0;}
  if (my_rank ==1) { for(i=0;i< len1;i++)
  							temp[i] = buff1[i];
                     tlen = len1;}
  if (my_rank ==2) { for(i=0;i< len2;i++)
							temp[i] = buff2[i];
							tlen = len2;}
  if (my_rank ==3) { for(i=0;i< len3;i++)
        		         temp[i] = buff3[i];
                		tlen = len3;}

  /* each processor will receive its corresponding sub array and everytime
   * each processor is abdating the temp array */

  for(source = 0 ;source < p;source++){
    if(my_rank != source) {
    	ttlen = tlen;
    	MPI_Recv(rbuff,400,MPI_INT,source,source,MPI_COMM_WORLD,&status);
    	MPI_Get_count(&status,MPI_INT,&count);
    for(i= 0;i < count;i++){
     temp[i+tlen] = rbuff[i];
     ttlen = ttlen+1;}
     } /* end the if statement */
     tlen = ttlen;
     } /* end the external for loop */
        

   /* last phase is applying the heapsort for the temp array */

    heapsort(temp,tlen);
   /* compute the time taken to run this program */

 MPI_Barrier(MPI_COMM_WORLD);
 endtime = MPI_Wtime();
 totaltime = endtime - starttime;
 printf("Elapsed time = %e seconds \n", totaltime);
    
  
/* shut down the MPI  */
  MPI_Finalize();
/* close the used files */

  fclose(pe0);
  fclose(pe1);
  fclose(pe2);
  fclose(pe3);
  return 0;
 

} /* end of the main function */




void sublists(int buff[],int start, int end,int subsize[],int at,int pivots[],int fp,int lp)
{
  int mid = floor((fp+lp)/2);
  int pv = pivots[mid];
  int lb  = start;
  int ub  = end;
  int center;
  
  while ( lb<= ub)
    {
      center = floor((lb+ub)/2);
      if (buff[center] > pv )
	ub = center -1;
   else
     lb = center +1;
    }
  
  subsize[at+mid]= lb -1;
  if (fp < mid)
    sublists( buff,start,lb-1,subsize,at,pivots,fp,mid-1);
  if (mid < lp)
    sublists ( buff,lb,end,subsize,at,pivots,mid+1,lp);
  
} /* end of sublists function */




void heapsort(int a[],int tlen)
{
  int tmp;
  int i ;
  for ( i = tlen/2 ; i > 1 ; --i)
    shiftup(a,i,tlen);
  
  for ( i = tlen; i>1; --i){
    shiftup(a,1,i);
    tmp = a[1];
    a[1] = a[i];
    a[i] = tmp;}
  
} /* end of the heap sort function */


void shiftup(int a[],int k , int n)
{
  int j;
  int rootkey ;
  int nf ;
  
  rootkey = a[k];
  j = 2* k ;
  nf =(j <= n)  ;
  
  while ( nf ){
    if ( j < n){
     if( a[j+1] > a[j])
       j++;
    }
    if ( a[j] <= rootkey) {
      nf = 0; }
    else
      {
	a[k] = a[j];
	k = j;
	j = 2 * k ;
	nf = (j<=n);
      }
    
  }
  a[k] = rootkey;
 }
