YPC  0.2.0
first_match_parser.h
1 #include "ypc/corecommon/package.h"
2 #include "ypc/stbox/ebyte.h"
3 #include "ypc/stbox/stx_common.h"
4 #ifdef EXAMPLE_FM_NORMAL
5 #include <glog/logging.h>
6 typedef ypc::bytes bytes;
7 #else
8 #include "ypc/core_t/analyzer/data_source.h"
9 #include "ypc/stbox/tsgx/log.h"
10 typedef stbox::bytes bytes;
11 #endif
12 #include "user_type.h"
13 #include "ypc/corecommon/data_source.h"
14 #include "ypc/corecommon/to_type.h"
15 #include <hpda/extractor/raw_data.h>
16 #include <hpda/output/memory_output.h>
17 #include <hpda/processor/query/filter.h>
18 #include <hpda/processor/transform/concat.h>
19 #include <string.h>
20 
21 define_nt(input_buf, std::string);
22 typedef ff::net::ntpackage<0, input_buf> input_buf_t;
23 
25 public:
28  std::vector<std::shared_ptr<ypc::data_source_with_dhash>> &source)
29  : m_datasources(source){};
30 
31  inline bytes do_parse(const bytes &param) {
32  LOG(INFO) << "do parse";
34  int counter = 0;
35  if (m_datasources.size() == 0) {
36  return stbox::bytes("no data source");
37  }
38 
39  typedef ypc::nt<stbox::bytes> ntt;
40  hpda::processor::concat<ntt::data> concator(m_datasources[0].get());
41  for (size_t i = 1; i < m_datasources.size(); i++) {
42  concator.add_upper_stream(m_datasources[i].get());
43  }
44 
45  ypc::to_type<bytes, user_item_t> converter(&concator);
47  &converter, [&](const user_item_t &v) {
48  counter++;
49  std::string zjhm = v.get<ZJHM>();
50  if (zjhm == pkg.get<input_buf>()) {
51  return true;
52  }
53  return false;
54  });
55 
57  mo.get_engine()->run();
58  LOG(INFO) << "do parse done";
59 
60  bytes result;
61  for (auto it : mo.values()) {
62  stbox::printf("found\n");
63  result += it.get<XM>();
64  result += " : ";
65  result += it.get<ZJHM>();
66  result += " .";
67  }
68  return result;
69  }
70 
71  inline bool merge_parse_result(const std::vector<bytes> &block_result,
72  const bytes &param, bytes &result) {
73  bytes s;
74  for (auto k : block_result) {
75  s = s + k;
76  }
77  result = s;
78  return false;
79  }
80 
81 protected:
82  std::vector<std::shared_ptr<ypc::data_source_with_dhash>> m_datasources;
83 };
84 
hpda::processor::internal::concat_impl
Definition: concat.h:8
first_match_parser
Definition: first_match_parser.h:24
ypc::nt
Definition: nt_cols.h:6
hpda::processor::internal::filter_impl
Definition: filter.h:9
ypc::utc::bytes
Definition: bytes.h:143
hpda::output::internal::memory_output_impl
Definition: memory_output.h:9
ypc::to_type
Definition: to_type.h:15
ypc::make_package
Definition: package.h:48