YPC  0.2.0
kmeans.h
1 #pragma once
2 #include <hpda/algorithm/internal/kmeans_loyd.h>
3 #include <hpda/extractor/extractor_base.h>
4 #include <hpda/extractor/raw_data.h>
5 #include <hpda/output/output_base.h>
6 
7 namespace hpda {
8 
9 namespace algorithm {
10 namespace kmeans {
11 template <typename PointType, typename DistanceType> struct kmeans_traits {
12  define_nt(mean_point, PointType);
13  define_nt(average_distance, DistanceType);
14 };
15 
16 template <typename PointFlag,
17  typename PointContainer =
18  std::vector<typename ::ff::util::type_of_nt<PointFlag>::type>>
20  template <typename AllPointsType>
21  static PointContainer points(const AllPointsType &all_points, int k) {
22  PointContainer pc;
23  auto it = all_points.begin();
24  for (int i = 0; i < k && i < all_points.size(); ++i) {
25  pc.push_back((*it).template get<PointFlag>());
26  it++;
27  }
28  return pc;
29  }
30 };
31 
32 template <typename PointFlag,
33  typename PointContainer =
34  std::vector<typename ::ff::util::type_of_nt<PointFlag>::type>>
36  template <typename AllPointsType>
37  static PointContainer points(const AllPointsType &all_points, int k) {
38  PointContainer pc;
39  int t = all_points.size() / k;
40  for (int i = 0; i < k; ++i) {
41  auto pos = t * i % all_points.size();
42  auto &it = all_points[pos];
43  pc.push_back(it.template get<PointFlag>());
44  }
45  return pc;
46  }
47 };
48 
49 namespace internal {
50 
51 template <typename InputObjType, typename PointFlag, typename DistanceType,
52  typename ClassifiedID,
53  typename InitialPointPicker = initial_point_picker_even_k<PointFlag>>
55  : public ::hpda::output::internal::output_base<InputObjType> {
56 public:
59  int k, DistanceType delta,
60  int max_points = std::numeric_limits<int>::max())
62  m_calculate_flag(false), m_k(k), m_delta(delta),
63  m_max_points(max_points) {}
64 
65  typedef ::hpda::internal::processor_with_input<InputObjType> base;
66  typedef loyd_kmeans_impl<InputObjType, PointFlag, DistanceType, ClassifiedID,
67  InitialPointPicker>
68  self_type;
69 
70  typedef typename ::ff::util::type_of_nt<PointFlag>::type point_type;
71 
72  typedef
75  average_distance;
76  typedef ntobject<ClassifiedID, mean_point, average_distance>
77  means_stream_output_type;
78 
79  typedef ::hpda::extractor::internal::raw_data_impl<means_stream_output_type>
81  typedef ::hpda::extractor::internal::raw_data_impl<
82  typename ::ff::util::append_type<InputObjType, ClassifiedID>::type>
84 
85  template <typename PointContainerType> struct point_traits {
86  public:
87  static point_type point(const typename PointContainerType::iterator &p) {
88  return (*p).template get<PointFlag>();
89  }
90  };
91 
92  std::vector<InputObjType> m_all_points;
93  virtual bool process() {
94  if (base::has_input_value()) {
95  m_all_points.push_back(base::input_value().make_copy());
96  base::consume_input_value();
97  if (m_all_points.size() < m_max_points) {
98  return false;
99  }
100  } else {
101  if (m_all_points.size() == 0) {
102  return false;
103  }
104  }
105 
106  using loyd_impl_type =
107  loyd_impl<typename std::vector<InputObjType>::iterator, point_type,
108  DistanceType, point_traits<std::vector<InputObjType>>,
109  std::vector<point_type>>;
110 
111  auto initial_points = InitialPointPicker::points(m_all_points, m_k);
112 
113  loyd_impl_type lit(m_all_points.begin(), m_all_points.end(), initial_points,
114  m_k, m_delta);
115  lit.run();
116 
117  typedef typename ::ff::util::append_type<InputObjType, ClassifiedID>::type
118  output_obj_type;
119  if (m_cluster_stream) {
120  auto &clusters = lit.clusters();
121  for (int i = 0; i < lit.clusters().size(); ++i) {
122  auto &cluster = lit.clusters()[i];
123  for (auto it : cluster) {
124  // TODO optimize for rvalue
125  output_obj_type ot =
126  ::ff::util::append_type<InputObjType, ClassifiedID>::value(*it,
127  i);
128  m_cluster_stream->add_data(ot);
129  }
130  }
131  }
132 
133  if (m_means_stream) {
134  auto means = lit.means();
135  for (int i = 0; i < means.size(); ++i) {
136  means_stream_output_type ot;
137  ot.template set<ClassifiedID>(i);
138  ot.template set<mean_point>(means[i]);
139  ot.template set<average_distance>(0);
140  m_means_stream->add_data(std::move(ot));
141  }
142  }
143  m_all_points.clear();
144  m_calculate_flag = true;
145  return true;
146  }
147 
148  data_with_cluster_stream_t *data_with_cluster_stream() {
149  if (!m_cluster_stream) {
150  if (m_calculate_flag) {
151  throw std::runtime_error(
152  "you should call data_with_cluster_stream() in "
153  "kmeans before dump the output");
154  }
155  m_cluster_stream.reset(new data_with_cluster_stream_t());
156  m_cluster_stream->set_engine(functor::get_engine());
157  m_cluster_stream->add_predecessor(this);
158  }
159  return m_cluster_stream.get();
160  };
161 
162  means_stream_t *means_stream() {
163  if (!m_means_stream) {
164  if (m_calculate_flag) {
165  throw std::runtime_error(
166  "you should call means_stream() in kmeans before dump the output");
167  }
168  m_means_stream.reset(new means_stream_t());
169  m_means_stream->set_engine(functor::get_engine());
170  m_means_stream->add_predecessor(this);
171  }
172  return m_means_stream.get();
173  }
174 
175 protected:
176 
177  std::unique_ptr<data_with_cluster_stream_t> m_cluster_stream;
178  std::unique_ptr<means_stream_t> m_means_stream;
179  bool m_calculate_flag;
180  int m_k;
181  DistanceType m_delta;
182  int m_max_points;
183 };
184 } // namespace internal
185 
186 template <typename InputObjType, typename PointFlag, typename DistanceType,
187  typename ClassifiedID>
188 using kmeans_processor = internal::loyd_kmeans_impl<InputObjType, PointFlag,
189  DistanceType, ClassifiedID>;
190 
191 } // namespace kmeans
192 
193 } // namespace algorithm
194 } // namespace hpda
hpda::algorithm::kmeans::initial_point_picker_even_k
Definition: kmeans.h:35
hpda::algorithm::kmeans::kmeans_traits
Definition: kmeans.h:11
hpda::extractor::internal::raw_data_impl
Definition: raw_data.h:11
hpda::algorithm::kmeans::internal::loyd_kmeans_impl
Definition: kmeans.h:54
hpda::internal::processor_with_input
Definition: processor_with_input.h:10
hpda::output::internal::output_base
Definition: output_base.h:10
hpda::algorithm::kmeans::initial_point_picker_first_k
Definition: kmeans.h:19
hpda::internal::processor_with_output< InputObjType >
hpda::algorithm::kmeans::internal::loyd_kmeans_impl::point_traits
Definition: kmeans.h:85