YPC  0.2.0
group.h
1 #pragma once
2 #include <hpda/processor/processor_base.h>
3 
4 namespace hpda {
5 namespace processor {
6 namespace internal {
7 template <typename InputColumnType, typename OutputColumnType>
8 class aggregator_base {};
9 
10 template <typename InputColumnType, typename OutputColumnType>
11 class sum : public aggregator_base<InputColumnType, OutputColumnType> {
12 public:
13  typedef InputColumnType input_type;
14  typedef OutputColumnType output_type;
15 
16  sum() : m_data(), m_grouping(false) {}
17 
18  void begin_group() { m_data = 0; }
19  template <typename T> void group(const T &t) {
20  m_data += t.template get<InputColumnType>();
21  m_grouping = true;
22  }
23  void end_group() { m_grouping = false; }
24  typename ::ff::util::internal::nt_traits<output_type>::type output() const {
25  return m_data;
26  }
27  bool grouping() { return m_grouping; }
28 
29 protected:
30  typename ::ff::util::internal::nt_traits<OutputColumnType>::type m_data;
31  bool m_grouping;
32 };
33 
34 template <typename InputColumnType, typename OutputColumnType>
35 class avg : public aggregator_base<InputColumnType, OutputColumnType> {
36 public:
37  typedef InputColumnType input_type;
38  typedef OutputColumnType output_type;
39 
40  avg() : m_data(), m_grouping(false) {}
41  void begin_group() {
42  m_data = 0;
43  m_count = 0;
44  }
45  template <typename T> void group(const T &t) {
46  m_data += t.template get<InputColumnType>();
47  m_count += 1;
48  m_grouping = true;
49  }
50  void end_group() {
51  if (m_count == 0) {
52  return;
53  }
54  m_data = m_data / m_count;
55  m_grouping = false;
56  }
57 
58  bool grouping() { return m_grouping; }
59  typename ::ff::util::internal::nt_traits<output_type>::type output() const {
60  return m_data;
61  }
62 
63 protected:
64  typename ::ff::util::internal::nt_traits<OutputColumnType>::type m_data;
65  size_t m_count;
66  bool m_grouping;
67 };
68 
69 template <typename InputColumnType, typename OutputColumnType>
70 class max : public aggregator_base<InputColumnType, OutputColumnType> {
71 public:
72  typedef InputColumnType input_type;
73  typedef OutputColumnType output_type;
74 
75  max() : m_data(), m_grouping(false) {}
76 
77  void begin_group() { m_data = 0; }
78  template <typename T> void group(const T &t) {
79  auto d = t.template get<InputColumnType>();
80  m_data = m_data > d ? m_data : d;
81  m_grouping = true;
82  }
83  void end_group() { m_grouping = false; }
84  bool grouping() {
85  return m_grouping;
86  }
87  typename ::ff::util::internal::nt_traits<output_type>::type output() const {
88  return m_data;
89  }
90 
91 protected:
92  typename ::ff::util::internal::nt_traits<OutputColumnType>::type m_data;
93  bool m_grouping;
94 };
95 
96 template <typename InputColumnType, typename OutputColumnType>
97 class min : public aggregator_base<InputColumnType, OutputColumnType> {
98 public:
99  typedef InputColumnType input_type;
100  typedef OutputColumnType output_type;
101 
102  min() : m_data(), m_grouping(false) {}
103  void begin_group() { m_data = 0; }
104  template <typename T> void group(const T &t) {
105  auto d = t.template get<InputColumnType>();
106  m_data = m_data < d ? m_data : d;
107  m_grouping = true;
108  }
109  void end_group() { m_grouping = false; }
110  bool grouping() { return m_grouping; }
111  typename ::ff::util::internal::nt_traits<output_type>::type output() const {
112  return m_data;
113  }
114 
115 protected:
116  typename ::ff::util::internal::nt_traits<OutputColumnType>::type m_data;
117  bool m_grouping;
118 };
119 
120 template <typename InputColumnType, typename OutputColumnType>
121 class count : public aggregator_base<InputColumnType, OutputColumnType> {
122 public:
123  typedef InputColumnType input_type;
124  typedef OutputColumnType output_type;
125 
126  count() : m_data(), m_grouping(false) {}
127  void begin_group() { m_data = 0; }
128  template <typename T> void group(const T &t) {
129  m_data += 1;
130  m_grouping = true;
131  }
132  void end_group() { m_grouping = false; }
133  bool grouping() { return m_grouping; }
134  typename ::ff::util::internal::nt_traits<output_type>::type output() const {
135  return m_data;
136  }
137 
138 protected:
139  typename ::ff::util::internal::nt_traits<OutputColumnType>::type m_data;
140  bool m_grouping;
141 };
142 
143 template <typename InputObjType, typename OutputObjType, typename GroupByType,
144  typename AggregatorType>
145 class groupby_impl : public processor_base<InputObjType, OutputObjType> {
146 public:
147  groupby_impl(
149  const AggregatorType &aggregator)
151  m_aggregator(aggregator), m_continue(false) {}
152  virtual ~groupby_impl() {}
153 
155 
156  virtual bool process() {
157  typename ::ff::util::internal::nt_traits<GroupByType>::type t;
158  if (!base::has_input_value()) {
159  if (m_aggregator.grouping()) {
160  m_aggregator.end_group();
161  m_data.template set<typename AggregatorType::output_type>(
162  m_aggregator.output());
163  return true;
164  }
165  return false;
166  }
167  t = base::input_value().template get<GroupByType>();
168  if (m_aggregator.grouping()) {
169  if (m_last_group_id == t) {
170  m_aggregator.group(base::input_value());
171  base::consume_input_value();
172  return false;
173  } else {
174  m_aggregator.end_group();
175  m_data.template set<typename AggregatorType::output_type>(
176  m_aggregator.output());
177  m_aggregator.begin_group();
178  m_last_group_id = t;
179  m_data.template set<GroupByType>(t);
180  m_aggregator.group(base::input_value());
181  base::consume_input_value();
182  return true;
183  }
184  } else {
185  m_aggregator.begin_group();
186  m_last_group_id = t;
187  m_data.template set<GroupByType>(t);
188  m_aggregator.group(base::input_value());
189  base::consume_input_value();
190  return false;
191  }
192  }
193 
194  virtual OutputObjType output_value() {
195  return m_data.make_copy();
196  }
197 
198 protected:
199  typename ::ff::util::internal::nt_traits<GroupByType>::type m_last_group_id;
200  OutputObjType m_data;
201  AggregatorType m_aggregator;
202  bool m_continue;
203 };
204 } // namespace internal
205 template <typename InputObjType, typename GroupByType, typename AggregatorType,
206  typename... ARGS>
207 using groupby = internal::groupby_impl<InputObjType, ntobject<ARGS...>,
208  GroupByType, AggregatorType>;
209 
210 template <typename InputObjType, typename OutputObjType, typename GroupByType,
211  typename AggregatorType>
212 using groupby_t = internal::groupby_impl<InputObjType, OutputObjType,
213  GroupByType, AggregatorType>;
214 
215 namespace group {
216 template <typename InputObjType, typename OutputObjType>
218 template <typename InputObjType, typename OutputObjType>
220 template <typename InputObjType, typename OutputObjType>
222 template <typename InputObjType, typename OutputObjType>
224 template <typename InputObjType, typename OutputObjType>
226 }
227 } // namespace processor
228 } // namespace hpda
hpda::processor::internal::processor_base
Definition: processor_base.h:9
hpda::processor::internal::min
Definition: group.h:97
hpda::processor::internal::count
Definition: group.h:121
hpda::processor::internal::sum
Definition: group.h:11
hpda::internal::processor_with_input
Definition: processor_with_input.h:10
hpda::processor::internal::aggregator_base
Definition: group.h:8
hpda::processor::internal::avg
Definition: group.h:35
hpda::internal::processor_with_output< InputObjType >
hpda::processor::internal::max
Definition: group.h:70
hpda::processor::internal::groupby_impl
Definition: group.h:145