2 #include <hpda/algorithm/internal/kmeans_loyd.h> 
    3 #include <hpda/extractor/extractor_base.h> 
    4 #include <hpda/extractor/raw_data.h> 
    5 #include <hpda/output/output_base.h> 
   11 template <
typename Po
intType, 
typename DistanceType> 
struct kmeans_traits {
 
   12   define_nt(mean_point, PointType);
 
   13   define_nt(average_distance, DistanceType);
 
   16 template <
typename PointFlag,
 
   17           typename PointContainer =
 
   18               std::vector<typename ::ff::util::type_of_nt<PointFlag>::type>>
 
   20   template <
typename AllPo
intsType>
 
   21   static PointContainer points(
const AllPointsType &all_points, 
int k) {
 
   23     auto it = all_points.begin();
 
   24     for (
int i = 0; i < k && i < all_points.size(); ++i) {
 
   25       pc.push_back((*it).template get<PointFlag>());
 
   32 template <
typename PointFlag,
 
   33           typename PointContainer =
 
   34               std::vector<typename ::ff::util::type_of_nt<PointFlag>::type>>
 
   36   template <
typename AllPo
intsType>
 
   37   static PointContainer points(
const AllPointsType &all_points, 
int k) {
 
   39     int t = all_points.size() / k;
 
   40     for (
int i = 0; i < k; ++i) {
 
   41       auto pos = t * i % all_points.size();
 
   42       auto &it = all_points[pos];
 
   43       pc.push_back(it.template get<PointFlag>());
 
   51 template <
typename InputObjType, 
typename PointFlag, 
typename DistanceType,
 
   52           typename ClassifiedID,
 
   59       int k, DistanceType delta,
 
   60       int max_points = std::numeric_limits<int>::max())
 
   62         m_calculate_flag(
false), m_k(k), m_delta(delta),
 
   63         m_max_points(max_points) {}
 
   65   typedef ::hpda::internal::processor_with_input<InputObjType> 
base;
 
   66   typedef loyd_kmeans_impl<InputObjType, PointFlag, DistanceType, ClassifiedID,
 
   70   typedef typename ::ff::util::type_of_nt<PointFlag>::type point_type;
 
   76   typedef ntobject<ClassifiedID, mean_point, average_distance>
 
   77       means_stream_output_type;
 
   79   typedef ::hpda::extractor::internal::raw_data_impl<means_stream_output_type>
 
   81   typedef ::hpda::extractor::internal::raw_data_impl<
 
   82       typename ::ff::util::append_type<InputObjType, ClassifiedID>::type>
 
   87     static point_type point(
const typename PointContainerType::iterator &p) {
 
   88       return (*p).template get<PointFlag>();
 
   92   std::vector<InputObjType> m_all_points;
 
   93   virtual bool process() {
 
   94     if (base::has_input_value()) {
 
   95       m_all_points.push_back(base::input_value().make_copy());
 
   96       base::consume_input_value();
 
   97       if (m_all_points.size() < m_max_points) {
 
  101       if (m_all_points.size() == 0) {
 
  106     using loyd_impl_type =
 
  107         loyd_impl<typename std::vector<InputObjType>::iterator, point_type,
 
  108                   DistanceType, point_traits<std::vector<InputObjType>>,
 
  109                   std::vector<point_type>>;
 
  111     auto initial_points = InitialPointPicker::points(m_all_points, m_k);
 
  113     loyd_impl_type lit(m_all_points.begin(), m_all_points.end(), initial_points,
 
  117     typedef typename ::ff::util::append_type<InputObjType, ClassifiedID>::type
 
  119     if (m_cluster_stream) {
 
  120       auto &clusters = lit.clusters();
 
  121       for (
int i = 0; i < lit.clusters().size(); ++i) {
 
  122         auto &cluster = lit.clusters()[i];
 
  123         for (
auto it : cluster) {
 
  126               ::ff::util::append_type<InputObjType, ClassifiedID>::value(*it,
 
  128           m_cluster_stream->add_data(ot);
 
  133     if (m_means_stream) {
 
  134       auto means = lit.means();
 
  135       for (
int i = 0; i < means.size(); ++i) {
 
  136         means_stream_output_type ot;
 
  137         ot.template set<ClassifiedID>(i);
 
  138         ot.template set<mean_point>(means[i]);
 
  139         ot.template set<average_distance>(0);
 
  140         m_means_stream->add_data(std::move(ot));
 
  143     m_all_points.clear();
 
  144     m_calculate_flag = 
true;
 
  148   data_with_cluster_stream_t *data_with_cluster_stream() {
 
  149     if (!m_cluster_stream) {
 
  150       if (m_calculate_flag) {
 
  151         throw std::runtime_error(
 
  152             "you should call data_with_cluster_stream() in " 
  153             "kmeans before dump the output");
 
  155       m_cluster_stream.reset(
new data_with_cluster_stream_t());
 
  156       m_cluster_stream->set_engine(functor::get_engine());
 
  157       m_cluster_stream->add_predecessor(
this);
 
  159     return m_cluster_stream.get();
 
  162   means_stream_t *means_stream() {
 
  163     if (!m_means_stream) {
 
  164       if (m_calculate_flag) {
 
  165         throw std::runtime_error(
 
  166             "you should call means_stream() in kmeans before dump the output");
 
  168       m_means_stream.reset(
new means_stream_t());
 
  169       m_means_stream->set_engine(functor::get_engine());
 
  170       m_means_stream->add_predecessor(
this);
 
  172     return m_means_stream.get();
 
  177   std::unique_ptr<data_with_cluster_stream_t> m_cluster_stream;
 
  178   std::unique_ptr<means_stream_t> m_means_stream;
 
  179   bool m_calculate_flag;
 
  181   DistanceType m_delta;
 
  186 template <
typename InputObjType, 
typename PointFlag, 
typename DistanceType,
 
  187           typename ClassifiedID>
 
  188 using kmeans_processor = internal::loyd_kmeans_impl<InputObjType, PointFlag,
 
  189                                                     DistanceType, ClassifiedID>;