@@ -25,19 +25,22 @@ namespace postgres_asio
25
25
{
26
26
inline static boost::shared_ptr<PGresult> make_shared (PGresult* p) { return boost::shared_ptr<PGresult>(p, PQclear); }
27
27
28
- connection::connection (boost::asio::io_service& fg, boost::asio::io_service& bg) :
28
+ connection::connection (boost::asio::io_service& fg, boost::asio::io_service& bg, std::string trace_id ) :
29
29
_fg_ios (fg),
30
30
_bg_ios (bg),
31
31
_socket (fg),
32
32
_pg_conn (NULL ),
33
- _warn_timeout (60000 )
33
+ _warn_timeout (60000 ),
34
+ _trace_id (trace_id)
34
35
{
35
- _log_id = to_string (boost::uuids::random_generator ()());
36
+ if (!_trace_id.size ())
37
+ _trace_id = to_string (boost::uuids::random_generator ()());
38
+ BOOST_LOG_TRIVIAL (trace) << _trace_id << " , " << BOOST_CURRENT_FUNCTION;
36
39
}
37
40
38
41
connection::~connection ()
39
42
{
40
- BOOST_LOG_TRIVIAL (debug ) << _log_id << " , postgres diconnecting " ;
43
+ BOOST_LOG_TRIVIAL (trace ) << _trace_id << " , " << BOOST_CURRENT_FUNCTION ;
41
44
PQfinish (_pg_conn);
42
45
}
43
46
@@ -51,11 +54,10 @@ namespace postgres_asio
51
54
uint32_t connection::backend_pid () const { return (uint32_t )PQbackendPID (_pg_conn); }
52
55
int connection::socket () const { return PQsocket (_pg_conn); }
53
56
bool connection::set_client_encoding (std::string s) { return (PQsetClientEncoding (_pg_conn, s.c_str ()) == 0 ); }
54
- void connection::set_log_id (std::string id) { _log_id = id; }
55
- std::string connection::get_log_id () const { return _log_id; }
57
+ std::string connection::trace_id () const { return _trace_id; }
56
58
void connection::set_warning_timout (uint32_t ms) { _warn_timeout = ms; }
57
59
58
- // connect is a blocking thingh - pass this to bg thread pool
60
+ // connect is a blocking thing - pass this to bg thread pool
59
61
void connection::connect (std::string connect_string, on_connect_callback cb)
60
62
{
61
63
auto self (shared_from_this ()); // keeps connection alive until cb is done
@@ -103,33 +105,34 @@ namespace postgres_asio
103
105
{
104
106
if (duration > _warn_timeout)
105
107
{
106
- BOOST_LOG_TRIVIAL (warning) << _log_id << " , postgres::connect - took long time, t=" << duration;
108
+ BOOST_LOG_TRIVIAL (warning) << _trace_id << " , postgres::connect - took long time, t=" << duration;
107
109
}
108
110
109
- BOOST_LOG_TRIVIAL (info) << _log_id << " , postgres::connect PQconnectdb complete, t=" << duration;
111
+ BOOST_LOG_TRIVIAL (info) << _trace_id << " , postgres::connect PQconnectdb complete, t=" << duration;
110
112
_socket.assign (boost::asio::ip::tcp::v4 (), socket ());
111
113
_fg_ios.post ([this , self, cb](){ cb (0 ); });
112
114
return ;
113
115
}
114
- BOOST_LOG_TRIVIAL (error) << _log_id << " , postgres::connect PQconnectdb failed, status=" << status << " , " << last_error () << " , t=" << duration;
116
+ BOOST_LOG_TRIVIAL (error) << _trace_id << " , postgres::connect PQconnectdb failed, status=" << status << " , " << last_error () << " , t=" << duration;
115
117
_fg_ios.post ([this , self, status, cb](){ cb (status); });
116
118
}
117
119
118
120
void connection::exec (std::string statement, on_query_callback cb)
119
121
{
120
- BOOST_LOG_TRIVIAL (trace) << _log_id << " , " << BOOST_CURRENT_FUNCTION << " , s=" << statement.substr (0 , STATEMENT_LOG_BYTES);
122
+ BOOST_LOG_TRIVIAL (trace) << _trace_id << " , " << BOOST_CURRENT_FUNCTION << " , s=" << statement.substr (0 , STATEMENT_LOG_BYTES);
121
123
auto self (shared_from_this ()); // keeps connection alive until cb is done
122
124
_start_ts = now ();
123
125
_current_statement = statement;
124
126
if (PQsendQuery (_pg_conn, statement.c_str ()) == 0 ) // 1 os good, 0 is bad...
125
127
{
126
- BOOST_LOG_TRIVIAL (error) << _log_id << " , postgres::exec PQsendQuery failed fast: s=" << statement.substr (0 , STATEMENT_LOG_BYTES);
128
+ BOOST_LOG_TRIVIAL (error) << _trace_id << " , postgres::exec PQsendQuery failed fast: s=" << statement.substr (0 , STATEMENT_LOG_BYTES);
127
129
_fg_ios.post ([this , self, cb](){ cb (PGRES_FATAL_ERROR, NULL ); });
128
130
return ;
129
131
}
130
132
_socket.async_read_some (boost::asio::null_buffers (), boost::bind (&connection::_fg_socket_rx_cb, this , boost::asio::placeholders::error, self, cb));
131
133
}
132
134
135
+
133
136
std::pair<int , boost::shared_ptr<PGresult>> connection::exec (std::string statement)
134
137
{
135
138
std::promise<std::pair<int , boost::shared_ptr<PGresult>>> p;
@@ -145,18 +148,18 @@ namespace postgres_asio
145
148
146
149
void connection::_fg_socket_rx_cb (const boost::system::error_code& ec, boost::shared_ptr<connection> self, on_query_callback cb)
147
150
{
148
- BOOST_LOG_TRIVIAL (trace) << _log_id << " , " << BOOST_CURRENT_FUNCTION;
151
+ BOOST_LOG_TRIVIAL (trace) << _trace_id << " , " << BOOST_CURRENT_FUNCTION;
149
152
if (ec)
150
153
{
151
- BOOST_LOG_TRIVIAL (warning) << _log_id << " , postgres::exec asio ec:" << ec.message ();
154
+ BOOST_LOG_TRIVIAL (warning) << _trace_id << " , postgres::exec asio ec:" << ec.message ();
152
155
cb (ec.value (), NULL );
153
156
return ;
154
157
}
155
158
156
159
int res = PQconsumeInput (_pg_conn);
157
160
if (!res)
158
161
{
159
- BOOST_LOG_TRIVIAL (warning) << _log_id << " , postgres::exec PQconsumeInput read error" ;
162
+ BOOST_LOG_TRIVIAL (warning) << _trace_id << " , postgres::exec PQconsumeInput read error" ;
160
163
cb (PGRES_FATAL_ERROR, NULL ); // we reuse a error code here...
161
164
return ;
162
165
}
@@ -169,11 +172,11 @@ namespace postgres_asio
169
172
{
170
173
if (PQisBusy (_pg_conn))
171
174
{
172
- BOOST_LOG_TRIVIAL (trace) << _log_id << " , " << BOOST_CURRENT_FUNCTION << " , PQisBusy() - reading more" ;
175
+ BOOST_LOG_TRIVIAL (trace) << _trace_id << " , " << BOOST_CURRENT_FUNCTION << " , PQisBusy() - reading more" ;
173
176
_socket.async_read_some (boost::asio::null_buffers (), boost::bind (&connection::_fg_socket_rx_cb, this , boost::asio::placeholders::error, self, cb));
174
177
return ;
175
178
}
176
- BOOST_LOG_TRIVIAL (trace) << _log_id << " , " << BOOST_CURRENT_FUNCTION << " , parsing result" ;
179
+ BOOST_LOG_TRIVIAL (trace) << _trace_id << " , " << BOOST_CURRENT_FUNCTION << " , parsing result" ;
177
180
auto r = make_shared (PQgetResult (_pg_conn));
178
181
if (r.get () == NULL )
179
182
break ;
@@ -187,7 +190,7 @@ namespace postgres_asio
187
190
188
191
if (_results.size () == 0 )
189
192
{
190
- BOOST_LOG_TRIVIAL (error) << _log_id << " , postgres::exec returned no result, t=" << duration << " , s=" << _current_statement.substr (0 , STATEMENT_LOG_BYTES);
193
+ BOOST_LOG_TRIVIAL (error) << _trace_id << " , postgres::exec returned no result, t=" << duration << " , s=" << _current_statement.substr (0 , STATEMENT_LOG_BYTES);
191
194
cb (PGRES_FATAL_ERROR, NULL ); // we reuse a error code here...
192
195
return ;
193
196
}
@@ -206,24 +209,39 @@ namespace postgres_asio
206
209
case PGRES_NONFATAL_ERROR:
207
210
case PGRES_COPY_BOTH:
208
211
case PGRES_SINGLE_TUPLE:
209
- BOOST_LOG_TRIVIAL (debug) << _log_id << " , postgres::exec complete, t=" << duration << " , s=" << _current_statement.substr (0 , STATEMENT_LOG_BYTES);
212
+ BOOST_LOG_TRIVIAL (debug) << _trace_id << " , postgres::exec complete, t=" << duration << " , s=" << _current_statement.substr (0 , STATEMENT_LOG_BYTES);
210
213
if (duration > _warn_timeout)
211
214
{
212
- BOOST_LOG_TRIVIAL (warning) << _log_id << " , postgres::exec complete - took long time, t=" << duration << " , s = " << _current_statement.substr (0 , STATEMENT_LOG_BYTES);
215
+ BOOST_LOG_TRIVIAL (warning) << _trace_id << " , postgres::exec complete - took long time, t=" << duration << " , s = " << _current_statement.substr (0 , STATEMENT_LOG_BYTES);
213
216
}
214
217
cb (0 , std::move (last_result));
215
218
break ;
216
219
case PGRES_BAD_RESPONSE:
217
220
case PGRES_FATAL_ERROR:
218
- BOOST_LOG_TRIVIAL (error) << _log_id << " , postgres::exec failed " << last_error () << " , t=" << duration << " , s=" << _current_statement.substr (0 , STATEMENT_LOG_BYTES);
221
+ BOOST_LOG_TRIVIAL (error) << _trace_id << " , postgres::exec failed " << last_error () << " , t=" << duration << " , s=" << _current_statement.substr (0 , STATEMENT_LOG_BYTES);
219
222
cb (status, std::move (last_result));
220
223
break ;
221
224
default :
222
- BOOST_LOG_TRIVIAL (warning) << _log_id << " , postgres::exec unknown status code, t=" << duration << " , s=" << _current_statement.substr (0 , STATEMENT_LOG_BYTES);
225
+ BOOST_LOG_TRIVIAL (warning) << _trace_id << " , postgres::exec unknown status code, t=" << duration << " , s=" << _current_statement.substr (0 , STATEMENT_LOG_BYTES);
223
226
cb (status, std::move (last_result));
224
227
break ;
225
228
}
226
229
}
227
230
231
+
232
+ // connection_pool::connection_pool(boost::asio::io_service& fg, boost::asio::io_service& bg) : _fg_ios(fg), _bg_ios(bg)
233
+ // {
234
+ // }
235
+
236
+ // //TBD reuse connections.
237
+ // boost::shared_ptr<postgres_asio::connection> connection_pool::create()
238
+ // {
239
+ // return boost::make_shared<postgres_asio::connection>(_fg_ios, _bg_ios);
240
+ // }
241
+ //
242
+ // // TBD
243
+ // void connection_pool::release(boost::shared_ptr<postgres_asio::connection>)
244
+ // {
245
+ // }
228
246
};
229
247
0 commit comments