1 #include "ypc/corecommon/package.h"
2 #include "ypc/stbox/ebyte.h"
3 #include "ypc/stbox/stx_common.h"
4 #ifdef EXAMPLE_FM_NORMAL
5 #include <glog/logging.h>
8 #include "ypc/core_t/analyzer/data_source.h"
9 #include "ypc/stbox/tsgx/log.h"
10 typedef stbox::bytes bytes;
12 #include "user_type.h"
13 #include "ypc/corecommon/data_source.h"
14 #include "ypc/corecommon/to_type.h"
15 #include <hpda/extractor/raw_data.h>
16 #include <hpda/output/memory_output.h>
17 #include <hpda/processor/query/filter.h>
18 #include <hpda/processor/transform/concat.h>
21 define_nt(input_buf, std::string);
22 typedef ff::net::ntpackage<0, input_buf> input_buf_t;
28 std::vector<std::shared_ptr<ypc::data_source_with_dhash>> &source)
29 : m_datasources(source){};
31 inline bytes do_parse(
const bytes ¶m) {
32 LOG(INFO) <<
"do parse";
35 if (m_datasources.size() == 0) {
36 return stbox::bytes(
"no data source");
41 for (
size_t i = 1; i < m_datasources.size(); i++) {
42 concator.add_upper_stream(m_datasources[i].get());
47 &converter, [&](
const user_item_t &v) {
49 std::string zjhm = v.get<ZJHM>();
50 if (zjhm == pkg.get<input_buf>()) {
57 mo.get_engine()->run();
58 LOG(INFO) <<
"do parse done";
61 for (
auto it : mo.values()) {
62 stbox::printf(
"found\n");
63 result += it.get<XM>();
65 result += it.get<ZJHM>();
71 inline bool merge_parse_result(
const std::vector<bytes> &block_result,
72 const bytes ¶m, bytes &result) {
74 for (
auto k : block_result) {
82 std::vector<std::shared_ptr<ypc::data_source_with_dhash>> m_datasources;