2 #include <hpda/algorithm/internal/kmeans_loyd.h>
3 #include <hpda/extractor/extractor_base.h>
4 #include <hpda/extractor/raw_data.h>
5 #include <hpda/output/output_base.h>
11 template <
typename Po
intType,
typename DistanceType>
struct kmeans_traits {
12 define_nt(mean_point, PointType);
13 define_nt(average_distance, DistanceType);
16 template <
typename PointFlag,
17 typename PointContainer =
18 std::vector<typename ::ff::util::type_of_nt<PointFlag>::type>>
20 template <
typename AllPo
intsType>
21 static PointContainer points(
const AllPointsType &all_points,
int k) {
23 auto it = all_points.begin();
24 for (
int i = 0; i < k && i < all_points.size(); ++i) {
25 pc.push_back((*it).template get<PointFlag>());
32 template <
typename PointFlag,
33 typename PointContainer =
34 std::vector<typename ::ff::util::type_of_nt<PointFlag>::type>>
36 template <
typename AllPo
intsType>
37 static PointContainer points(
const AllPointsType &all_points,
int k) {
39 int t = all_points.size() / k;
40 for (
int i = 0; i < k; ++i) {
41 auto pos = t * i % all_points.size();
42 auto &it = all_points[pos];
43 pc.push_back(it.template get<PointFlag>());
51 template <
typename InputObjType,
typename PointFlag,
typename DistanceType,
52 typename ClassifiedID,
59 int k, DistanceType delta,
60 int max_points = std::numeric_limits<int>::max())
62 m_calculate_flag(
false), m_k(k), m_delta(delta),
63 m_max_points(max_points) {}
65 typedef ::hpda::internal::processor_with_input<InputObjType>
base;
66 typedef loyd_kmeans_impl<InputObjType, PointFlag, DistanceType, ClassifiedID,
70 typedef typename ::ff::util::type_of_nt<PointFlag>::type point_type;
76 typedef ntobject<ClassifiedID, mean_point, average_distance>
77 means_stream_output_type;
79 typedef ::hpda::extractor::internal::raw_data_impl<means_stream_output_type>
81 typedef ::hpda::extractor::internal::raw_data_impl<
82 typename ::ff::util::append_type<InputObjType, ClassifiedID>::type>
87 static point_type point(
const typename PointContainerType::iterator &p) {
88 return (*p).template get<PointFlag>();
92 std::vector<InputObjType> m_all_points;
93 virtual bool process() {
94 if (base::has_input_value()) {
95 m_all_points.push_back(base::input_value().make_copy());
96 base::consume_input_value();
97 if (m_all_points.size() < m_max_points) {
101 if (m_all_points.size() == 0) {
106 using loyd_impl_type =
107 loyd_impl<typename std::vector<InputObjType>::iterator, point_type,
108 DistanceType, point_traits<std::vector<InputObjType>>,
109 std::vector<point_type>>;
111 auto initial_points = InitialPointPicker::points(m_all_points, m_k);
113 loyd_impl_type lit(m_all_points.begin(), m_all_points.end(), initial_points,
117 typedef typename ::ff::util::append_type<InputObjType, ClassifiedID>::type
119 if (m_cluster_stream) {
120 auto &clusters = lit.clusters();
121 for (
int i = 0; i < lit.clusters().size(); ++i) {
122 auto &cluster = lit.clusters()[i];
123 for (
auto it : cluster) {
126 ::ff::util::append_type<InputObjType, ClassifiedID>::value(*it,
128 m_cluster_stream->add_data(ot);
133 if (m_means_stream) {
134 auto means = lit.means();
135 for (
int i = 0; i < means.size(); ++i) {
136 means_stream_output_type ot;
137 ot.template set<ClassifiedID>(i);
138 ot.template set<mean_point>(means[i]);
139 ot.template set<average_distance>(0);
140 m_means_stream->add_data(std::move(ot));
143 m_all_points.clear();
144 m_calculate_flag =
true;
148 data_with_cluster_stream_t *data_with_cluster_stream() {
149 if (!m_cluster_stream) {
150 if (m_calculate_flag) {
151 throw std::runtime_error(
152 "you should call data_with_cluster_stream() in "
153 "kmeans before dump the output");
155 m_cluster_stream.reset(
new data_with_cluster_stream_t());
156 m_cluster_stream->set_engine(functor::get_engine());
157 m_cluster_stream->add_predecessor(
this);
159 return m_cluster_stream.get();
162 means_stream_t *means_stream() {
163 if (!m_means_stream) {
164 if (m_calculate_flag) {
165 throw std::runtime_error(
166 "you should call means_stream() in kmeans before dump the output");
168 m_means_stream.reset(
new means_stream_t());
169 m_means_stream->set_engine(functor::get_engine());
170 m_means_stream->add_predecessor(
this);
172 return m_means_stream.get();
177 std::unique_ptr<data_with_cluster_stream_t> m_cluster_stream;
178 std::unique_ptr<means_stream_t> m_means_stream;
179 bool m_calculate_flag;
181 DistanceType m_delta;
186 template <
typename InputObjType,
typename PointFlag,
typename DistanceType,
187 typename ClassifiedID>
188 using kmeans_processor = internal::loyd_kmeans_impl<InputObjType, PointFlag,
189 DistanceType, ClassifiedID>;