YPC  0.2.0
intersection.h
1 #pragma once
2 #include <hpda/processor/processor_base.h>
3 #include <hpda/processor/set/helper/copy_helper.h>
4 #include <hpda/processor/set/helper/upper_stream_helper.h>
5 
6 namespace hpda {
7 namespace processor {
8 namespace internal {
9 
10 template <typename OutputObjType, typename CT>
12  : public ::hpda::internal::processor_with_output<OutputObjType> {
13 
14 public:
15  ordered_intersection_impl() : m_set_engine(false) {}
16 
17  template <typename KT, typename InputObjType>
18  void add_upper_stream(
20  return upper_stream_helper<OutputObjType, CT, KT,
21  InputObjType>::add_upper_stream(upper_stream,
22  m_upper_streams,
23  m_traits,
24  m_filler,
25  m_set_engine,
26  this);
27  }
28 
29  bool has_intersection() {
30  CT val = m_traits[0](m_upper_streams[0]);
31  for (size_t i = 1; i < m_upper_streams.size(); i++) {
32  CT tmp = m_traits[i](m_upper_streams[i]);
33  if (val != tmp) {
34  return false;
35  }
36  }
37  return true;
38  }
39 
40  void next_value() {
41  CT val = m_traits[0](m_upper_streams[0]);
42  for (size_t i = 1; i < m_upper_streams.size(); i++) {
43  CT tmp = m_traits[i](m_upper_streams[i]);
44  if (val > tmp) {
45  val = tmp;
46  }
47  }
48  for (size_t i = 0; i < m_upper_streams.size(); i++) {
49  CT tmp = m_traits[i](m_upper_streams[i]);
50  if (val == tmp) {
51  m_upper_streams[i]->reset_done_value();
52  }
53  }
54  }
55 
56  void fill_output() {
57  for (size_t i = 0; i < m_upper_streams.size(); i++) {
58  m_filler[i](m_upper_streams[i], m_data);
59  }
60  }
61 
62  virtual bool process() {
63  if (m_upper_streams.size() < 2) {
64  return false;
65  }
66  for (auto &s : m_upper_streams) {
67  bool has_value = s->has_value();
68  if (!has_value) {
69  return false;
70  }
71  }
72 
73  bool ret = has_intersection();
74  if (ret) {
75  fill_output();
76  }
77  next_value();
78  return ret;
79  }
80 
81  virtual OutputObjType output_value() { return m_data; }
82 
83 protected:
84  std::vector<functor *> m_upper_streams;
85  typedef std::function<CT(functor *)> kt_traits_t;
86  std::vector<kt_traits_t> m_traits;
87  bool m_set_engine;
88 
89  typedef std::function<void(functor *, OutputObjType &)> output_t;
90  std::vector<output_t> m_filler;
91  OutputObjType m_data;
92 };
93 
94 } // namespace internal
95 template <typename CT, typename... ARGS>
97  internal::ordered_intersection_impl<ntobject<ARGS...>, CT>;
98 } // namespace processor
99 } // namespace hpda
hpda::processor::internal::upper_stream_helper
Definition: upper_stream_helper.h:11
hpda::processor::internal::ordered_intersection_impl
Definition: intersection.h:11
hpda::internal::processor_with_output
Definition: processor_with_output.h:9
hpda::functor
Definition: functor.h:7