YPC  0.2.0
difference.h
1 #pragma once
2 #include <hpda/processor/processor_base.h>
3 #include <hpda/processor/set/helper/copy_helper.h>
4 #include <hpda/processor/set/helper/upper_stream_helper.h>
5 
6 namespace hpda {
7 namespace processor {
8 namespace internal {
9 
10 template <typename OutputObjType, typename CT>
12  : public ::hpda::internal::processor_with_output<OutputObjType> {
13 
14 public:
15  ordered_difference_impl() : m_set_engine(false) {}
16 
17  template <typename KT, typename InputObjType>
18  void add_upper_stream(
20  return upper_stream_helper<OutputObjType, CT, KT,
21  InputObjType>::add_upper_stream(upper_stream,
22  m_upper_streams,
23  m_traits,
24  m_filler,
25  m_set_engine,
26  this);
27  }
28 
29  bool has_difference() {
30  CT val_0 = m_traits[0](m_upper_streams[0]);
31  CT val_min = val_0;
32  for (size_t i = 1; i < m_upper_streams.size(); i++) {
33  if (m_upper_streams[i]->has_value()) {
34  CT val_i = m_traits[i](m_upper_streams[i]);
35  val_min = std::min(val_min, val_i);
36  }
37  }
38  bool flag_min = false;
39  for (size_t i = 1; i < m_upper_streams.size(); i++) {
40  if (m_upper_streams[i]->has_value()) {
41  CT val_i = m_traits[i](m_upper_streams[i]);
42  if (val_i == val_min) {
43  flag_min = true;
44  m_upper_streams[i]->reset_done_value();
45  }
46  }
47  }
48  if (flag_min) {
49  if (val_0 == val_min) {
50  m_upper_streams[0]->reset_done_value();
51  }
52  return false;
53  }
54 
55  m_filler[0](m_upper_streams[0], m_data);
56  m_upper_streams[0]->reset_done_value();
57  return true;
58  }
59 
60  virtual bool process() {
61  if (m_upper_streams.size() < 2) {
62  return false;
63  }
64  if (!m_upper_streams[0]->has_value()) {
65  return false;
66  }
67  return has_difference();
68  }
69 
70  virtual OutputObjType output_value() { return m_data; }
71 
72 protected:
73  std::vector<functor *> m_upper_streams;
74  typedef std::function<CT(functor *)> kt_traits_t;
75  std::vector<kt_traits_t> m_traits;
76  bool m_set_engine;
77 
78  typedef std::function<void(functor *, OutputObjType &)> output_t;
79  std::vector<output_t> m_filler;
80  OutputObjType m_data;
81 };
82 
83 } // namespace internal
84 template <typename CT, typename... ARGS>
85 using ordered_difference =
86  internal::ordered_difference_impl<ntobject<ARGS...>, CT>;
87 } // namespace processor
88 } // namespace hpda
hpda::processor::internal::upper_stream_helper
Definition: upper_stream_helper.h:11
hpda::processor::internal::ordered_difference_impl
Definition: difference.h:11
hpda::internal::processor_with_output
Definition: processor_with_output.h:9
hpda::functor
Definition: functor.h:7