10 #include "ypc/corecommon/package.h"
11 #include <boost/property_tree/json_parser.hpp>
12 #include <boost/property_tree/ptree.hpp>
13 #include <ff/net/middleware/ntpackage.h>
14 #include <ff/sql/mysql.hpp>
15 #include <ff/util/ntobject.h>
25 void *create_item_reader(
const char *extra_param,
int len);
27 int reset_for_read(
void *handle);
28 int read_item_data(
void *handle,
char *buf,
int *len);
29 int close_item_reader(
void *handle);
30 uint64_t get_item_number(
void *handle);
40 virtual int reset_for_read() = 0;
41 virtual int read_item_data(
char *buf,
int *len) = 0;
42 virtual int close_item_reader() = 0;
43 virtual int get_item_number() = 0;
46 template <
typename T,
typename Table>
56 : m_extra_param(extra_param) {
57 boost::property_tree::ptree pt;
58 boost::property_tree::json_parser::read_json(m_extra_param, pt);
60 auto url = pt.get_child(
"url").get_value<std::string>();
61 auto username = pt.get_child(
"username").get_value<std::string>();
62 auto password = pt.get_child(
"password").get_value<std::string>();
63 auto dbname = pt.get_child(
"dbname").get_value<std::string>();
65 new ff::sql::mysql<ff::sql::cppconn>(url, username, password, dbname));
68 virtual int reset_for_read() {
73 template <
typename... ARGS>
75 template <
typename TableT,
typename EngineT,
typename RetT>
76 static void read_item_data(EngineT *engine, RetT &ret) {
77 ret = TableT::template select<ARGS...>(engine).eval();
83 template <
typename RT,
typename ST>
84 static void assign(RT &to,
const ST &from) {
85 to.template set<ARGT>(from.template get<ARGT>());
89 template <
typename ARGT1,
typename ARGT2,
typename... ARGS>
91 template <
typename RT,
typename ST>
92 static void assign(RT &to,
const ST &from) {
93 to.template set<ARGT1>(from.template get<ARGT1>());
98 template <
typename... ARGS>
100 template <
typename PT>
101 static void to(PT &_to, const ::ff::util::ntobject<ARGS...> &from) {
106 virtual int read_item_data(
char *buf,
int *len) {
108 if (m_all_items.empty()) {
112 if (m_to_read_index >= m_all_items.size()) {
116 assign_package<item_t>::to(v, m_all_items[m_to_read_index]);
120 ff::net::marshaler lm(ff::net::marshaler::length_retriver);
122 *len =
static_cast<int>(lm.get_length());
125 ff::net::marshaler sm(buf, *len, ff::net::marshaler::serializer);
131 virtual int close_item_reader() {
135 virtual int get_item_number() {
136 if (m_all_items.empty()) {
139 return static_cast<int>(m_all_items.size());
141 const item_t &item_at(
size_t index)
const {
return m_all_items[index]; }
144 void read_all_items() {
145 read_helper<item_t>::template read_item_data<Table>(m_engine.get(),
150 const std::string m_extra_param;
151 std::unique_ptr<::ff::sql::mysql<ff::sql::cppconn>> m_engine;
152 typename Table::row_collection_type m_all_items;
153 size_t m_to_read_index;
158 #define impl_mysql_reader(type) \
159 void *create_item_reader(const char *extra_param, int len) { \
161 ypc::plugins::mysql_reader *reader = \
162 new type(std::string(extra_param, len)); \
164 } catch (const std::exception &e) { \
165 std::cout << "create_item_reader got: " << e.what() << std::endl; \
169 int reset_for_read(void *handle) { \
170 ypc::plugins::mysql_reader *reader = (ypc::plugins::mysql_reader *)handle; \
171 return reader->reset_for_read(); \
173 int read_item_data(void *handle, char *buf, int *len) { \
174 ypc::plugins::mysql_reader *reader = (ypc::plugins::mysql_reader *)handle; \
175 return reader->read_item_data(buf, len); \
177 int close_item_reader(void *handle) { \
178 ypc::plugins::mysql_reader *reader = (ypc::plugins::mysql_reader *)handle; \
179 reader->close_item_reader(); \
184 uint64_t get_item_number(void *handle) { \
185 ypc::plugins::mysql_reader *reader = (ypc::plugins::mysql_reader *)handle; \
186 return reader->get_item_number(); \