2 #include <hpda/processor/processor_base.h>
7 template <
typename InputColumnType,
typename OutputColumnType>
10 template <
typename InputColumnType,
typename OutputColumnType>
13 typedef InputColumnType input_type;
14 typedef OutputColumnType output_type;
16 sum() : m_data(), m_grouping(
false) {}
18 void begin_group() { m_data = 0; }
19 template <
typename T>
void group(
const T &t) {
20 m_data += t.template get<InputColumnType>();
23 void end_group() { m_grouping =
false; }
24 typename ::ff::util::internal::nt_traits<output_type>::type output()
const {
27 bool grouping() {
return m_grouping; }
30 typename ::ff::util::internal::nt_traits<OutputColumnType>::type m_data;
34 template <
typename InputColumnType,
typename OutputColumnType>
37 typedef InputColumnType input_type;
38 typedef OutputColumnType output_type;
40 avg() : m_data(), m_grouping(
false) {}
45 template <
typename T>
void group(
const T &t) {
46 m_data += t.template get<InputColumnType>();
54 m_data = m_data / m_count;
58 bool grouping() {
return m_grouping; }
59 typename ::ff::util::internal::nt_traits<output_type>::type output()
const {
64 typename ::ff::util::internal::nt_traits<OutputColumnType>::type m_data;
69 template <
typename InputColumnType,
typename OutputColumnType>
72 typedef InputColumnType input_type;
73 typedef OutputColumnType output_type;
75 max() : m_data(), m_grouping(
false) {}
77 void begin_group() { m_data = 0; }
78 template <
typename T>
void group(
const T &t) {
79 auto d = t.template get<InputColumnType>();
80 m_data = m_data > d ? m_data : d;
83 void end_group() { m_grouping =
false; }
87 typename ::ff::util::internal::nt_traits<output_type>::type output()
const {
92 typename ::ff::util::internal::nt_traits<OutputColumnType>::type m_data;
96 template <
typename InputColumnType,
typename OutputColumnType>
99 typedef InputColumnType input_type;
100 typedef OutputColumnType output_type;
102 min() : m_data(), m_grouping(
false) {}
103 void begin_group() { m_data = 0; }
104 template <
typename T>
void group(
const T &t) {
105 auto d = t.template get<InputColumnType>();
106 m_data = m_data < d ? m_data : d;
109 void end_group() { m_grouping =
false; }
110 bool grouping() {
return m_grouping; }
111 typename ::ff::util::internal::nt_traits<output_type>::type output()
const {
116 typename ::ff::util::internal::nt_traits<OutputColumnType>::type m_data;
120 template <
typename InputColumnType,
typename OutputColumnType>
123 typedef InputColumnType input_type;
124 typedef OutputColumnType output_type;
126 count() : m_data(), m_grouping(
false) {}
127 void begin_group() { m_data = 0; }
128 template <
typename T>
void group(
const T &t) {
132 void end_group() { m_grouping =
false; }
133 bool grouping() {
return m_grouping; }
134 typename ::ff::util::internal::nt_traits<output_type>::type output()
const {
139 typename ::ff::util::internal::nt_traits<OutputColumnType>::type m_data;
143 template <
typename InputObjType,
typename OutputObjType,
typename GroupByType,
144 typename AggregatorType>
149 const AggregatorType &aggregator)
151 m_aggregator(aggregator), m_continue(
false) {}
156 virtual bool process() {
157 typename ::ff::util::internal::nt_traits<GroupByType>::type t;
158 if (!base::has_input_value()) {
159 if (m_aggregator.grouping()) {
160 m_aggregator.end_group();
161 m_data.template set<typename AggregatorType::output_type>(
162 m_aggregator.output());
167 t = base::input_value().template get<GroupByType>();
168 if (m_aggregator.grouping()) {
169 if (m_last_group_id == t) {
170 m_aggregator.group(base::input_value());
171 base::consume_input_value();
174 m_aggregator.end_group();
175 m_data.template set<typename AggregatorType::output_type>(
176 m_aggregator.output());
177 m_aggregator.begin_group();
179 m_data.template set<GroupByType>(t);
180 m_aggregator.group(base::input_value());
181 base::consume_input_value();
185 m_aggregator.begin_group();
187 m_data.template set<GroupByType>(t);
188 m_aggregator.group(base::input_value());
189 base::consume_input_value();
194 virtual OutputObjType output_value() {
195 return m_data.make_copy();
199 typename ::ff::util::internal::nt_traits<GroupByType>::type m_last_group_id;
200 OutputObjType m_data;
201 AggregatorType m_aggregator;
205 template <
typename InputObjType,
typename GroupByType,
typename AggregatorType,
208 GroupByType, AggregatorType>;
210 template <
typename InputObjType,
typename OutputObjType,
typename GroupByType,
211 typename AggregatorType>
213 GroupByType, AggregatorType>;
216 template <
typename InputObjType,
typename OutputObjType>
218 template <
typename InputObjType,
typename OutputObjType>
220 template <
typename InputObjType,
typename OutputObjType>
222 template <
typename InputObjType,
typename OutputObjType>
224 template <
typename InputObjType,
typename OutputObjType>