@@ -170,6 +170,80 @@ static VFIOUserFDs *vfio_user_getfds(int numfds)
170
170
* Functions only called by iothread
171
171
*/
172
172
173
+ /*
174
+ * Process a received message.
175
+ */
176
+ static void vfio_user_process (VFIOProxy * proxy , VFIOUserMsg * msg , bool isreply )
177
+ {
178
+
179
+ /*
180
+ * Replies signal a waiter, if none just check for errors
181
+ * and free the message buffer.
182
+ *
183
+ * Requests get queued for the BH.
184
+ */
185
+ if (isreply ) {
186
+ msg -> complete = true;
187
+ if (msg -> type == VFIO_MSG_WAIT ) {
188
+ qemu_cond_signal (& msg -> cv );
189
+ } else {
190
+ if (msg -> hdr -> flags & VFIO_USER_ERROR ) {
191
+ error_printf ("vfio_user_rcv: error reply on async request " );
192
+ error_printf ("command %x error %s\n" , msg -> hdr -> command ,
193
+ strerror (msg -> hdr -> error_reply ));
194
+ }
195
+ /* youngest nowait msg has been ack'd */
196
+ if (proxy -> last_nowait == msg ) {
197
+ proxy -> last_nowait = NULL ;
198
+ }
199
+ vfio_user_recycle (proxy , msg );
200
+ }
201
+ } else {
202
+ QTAILQ_INSERT_TAIL (& proxy -> incoming , msg , next );
203
+ qemu_bh_schedule (proxy -> req_bh );
204
+ }
205
+ }
206
+
207
+ /*
208
+ * Complete a partial message read
209
+ */
210
+ static int vfio_user_complete (VFIOProxy * proxy , Error * * errp )
211
+ {
212
+ VFIOUserMsg * msg = proxy -> part_recv ;
213
+ size_t msgleft = proxy -> recv_left ;
214
+ bool isreply ;
215
+ char * data ;
216
+ int ret ;
217
+
218
+ data = (char * )msg -> hdr + (msg -> hdr -> size - msgleft );
219
+ while (msgleft > 0 ) {
220
+ ret = qio_channel_read (proxy -> ioc , data , msgleft , errp );
221
+
222
+ /* error or would block */
223
+ if (ret <= 0 ) {
224
+ /* try for rest on next iternation */
225
+ if (ret == QIO_CHANNEL_ERR_BLOCK ) {
226
+ proxy -> recv_left = msgleft ;
227
+ }
228
+ return ret ;
229
+ }
230
+
231
+ msgleft -= ret ;
232
+ data += ret ;
233
+ }
234
+
235
+ /*
236
+ * Read complete message, process it.
237
+ */
238
+ proxy -> part_recv = NULL ;
239
+ proxy -> recv_left = 0 ;
240
+ isreply = (msg -> hdr -> flags & VFIO_USER_TYPE ) == VFIO_USER_REPLY ;
241
+ vfio_user_process (proxy , msg , isreply );
242
+
243
+ /* return positive value */
244
+ return 1 ;
245
+ }
246
+
173
247
static void vfio_user_recv (void * opaque )
174
248
{
175
249
VFIOProxy * proxy = opaque ;
@@ -206,6 +280,23 @@ static int vfio_user_recv_one(VFIOProxy *proxy)
206
280
char * buf = NULL ;
207
281
Error * local_err = NULL ;
208
282
283
+ /*
284
+ * Complete any partial reads
285
+ */
286
+ if (proxy -> part_recv != NULL ) {
287
+ ret = vfio_user_complete (proxy , & local_err );
288
+
289
+ /* still not complete, try later */
290
+ if (ret == QIO_CHANNEL_ERR_BLOCK ) {
291
+ return ret ;
292
+ }
293
+
294
+ if (ret <= 0 ) {
295
+ goto fatal ;
296
+ }
297
+ /* else fall into reading another msg */
298
+ }
299
+
209
300
/*
210
301
* Read header
211
302
*/
@@ -214,25 +305,22 @@ static int vfio_user_recv_one(VFIOProxy *proxy)
214
305
if (ret == QIO_CHANNEL_ERR_BLOCK ) {
215
306
return ret ;
216
307
}
308
+
309
+ /* read error or other side closed connection */
217
310
if (ret <= 0 ) {
218
- /* read error or other side closed connection */
219
- if (ret == 0 ) {
220
- error_setg (& local_err , "vfio_user_recv server closed socket" );
221
- } else {
222
- error_prepend (& local_err , "vfio_user_recv" );
223
- }
224
311
goto fatal ;
225
312
}
313
+
226
314
if (ret < sizeof (msg )) {
227
- error_setg (& local_err , "vfio_user_recv short read of header" );
315
+ error_setg (& local_err , "short read of header" );
228
316
goto fatal ;
229
317
}
230
318
231
319
/*
232
320
* Validate header
233
321
*/
234
322
if (hdr .size < sizeof (VFIOUserHdr )) {
235
- error_setg (& local_err , "vfio_user_recv bad header size" );
323
+ error_setg (& local_err , "bad header size" );
236
324
goto fatal ;
237
325
}
238
326
switch (hdr .flags & VFIO_USER_TYPE ) {
@@ -243,7 +331,7 @@ static int vfio_user_recv_one(VFIOProxy *proxy)
243
331
isreply = true;
244
332
break ;
245
333
default :
246
- error_setg (& local_err , "vfio_user_recv unknown message type" );
334
+ error_setg (& local_err , "unknown message type" );
247
335
goto fatal ;
248
336
}
249
337
@@ -258,7 +346,7 @@ static int vfio_user_recv_one(VFIOProxy *proxy)
258
346
}
259
347
}
260
348
if (msg == NULL ) {
261
- error_setg (& local_err , "vfio_user_recv unexpected reply" );
349
+ error_setg (& local_err , "unexpected reply" );
262
350
goto err ;
263
351
}
264
352
QTAILQ_REMOVE (& proxy -> pending , msg , next );
@@ -268,7 +356,7 @@ static int vfio_user_recv_one(VFIOProxy *proxy)
268
356
*/
269
357
if (numfds != 0 ) {
270
358
if (msg -> fds == NULL || msg -> fds -> recv_fds < numfds ) {
271
- error_setg (& local_err , "vfio_user_recv unexpected FDs" );
359
+ error_setg (& local_err , "unexpected FDs" );
272
360
goto err ;
273
361
}
274
362
msg -> fds -> recv_fds = numfds ;
@@ -288,15 +376,14 @@ static int vfio_user_recv_one(VFIOProxy *proxy)
288
376
*/
289
377
if (isreply ) {
290
378
if (hdr .size > msg -> rsize ) {
291
- error_setg (& local_err ,
292
- "vfio_user_recv reply larger than recv buffer" );
379
+ error_setg (& local_err , "reply larger than recv buffer" );
293
380
goto err ;
294
381
}
295
382
* msg -> hdr = hdr ;
296
383
data = (char * )msg -> hdr + sizeof (hdr );
297
384
} else {
298
385
if (hdr .size > max_xfer_size + sizeof (VFIOUserDMARW )) {
299
- error_setg (& local_err , "vfio_user_recv request larger than max" );
386
+ error_setg (& local_err , "request larger than max" );
300
387
goto err ;
301
388
}
302
389
buf = g_malloc0 (hdr .size );
@@ -306,11 +393,20 @@ static int vfio_user_recv_one(VFIOProxy *proxy)
306
393
msg -> type = VFIO_MSG_REQ ;
307
394
}
308
395
396
+ /*
397
+ * Read rest of message.
398
+ */
309
399
msgleft = hdr .size - sizeof (hdr );
310
400
while (msgleft > 0 ) {
311
401
ret = qio_channel_read (proxy -> ioc , data , msgleft , & local_err );
312
402
313
- /* error or would block */
403
+ /* prepare to complete read on next iternation */
404
+ if (ret == QIO_CHANNEL_ERR_BLOCK ) {
405
+ proxy -> part_recv = msg ;
406
+ proxy -> recv_left = msgleft ;
407
+ return ret ;
408
+ }
409
+
314
410
if (ret <= 0 ) {
315
411
goto fatal ;
316
412
}
@@ -319,32 +415,7 @@ static int vfio_user_recv_one(VFIOProxy *proxy)
319
415
data += ret ;
320
416
}
321
417
322
- /*
323
- * Replies signal a waiter, if none just check for errors
324
- * and free the message buffer.
325
- *
326
- * Requests get queued for the BH.
327
- */
328
- if (isreply ) {
329
- msg -> complete = true;
330
- if (msg -> type == VFIO_MSG_WAIT ) {
331
- qemu_cond_signal (& msg -> cv );
332
- } else {
333
- if (hdr .flags & VFIO_USER_ERROR ) {
334
- error_printf ("vfio_user_rcv error reply on async request " );
335
- error_printf ("command %x error %s\n" , hdr .command ,
336
- strerror (hdr .error_reply ));
337
- }
338
- /* youngest nowait msg has been ack'd */
339
- if (proxy -> last_nowait == msg ) {
340
- proxy -> last_nowait = NULL ;
341
- }
342
- vfio_user_recycle (proxy , msg );
343
- }
344
- } else {
345
- QTAILQ_INSERT_TAIL (& proxy -> incoming , msg , next );
346
- qemu_bh_schedule (proxy -> req_bh );
347
- }
418
+ vfio_user_process (proxy , msg , isreply );
348
419
return 0 ;
349
420
350
421
/*
@@ -355,6 +426,11 @@ static int vfio_user_recv_one(VFIOProxy *proxy)
355
426
vfio_user_shutdown (proxy );
356
427
proxy -> state = VFIO_PROXY_ERROR ;
357
428
429
+ /* set error if server side closed */
430
+ if (ret == 0 ) {
431
+ error_setg (& local_err , "server closed socket" );
432
+ }
433
+
358
434
err :
359
435
for (i = 0 ; i < numfds ; i ++ ) {
360
436
close (fdp [i ]);
@@ -365,6 +441,7 @@ static int vfio_user_recv_one(VFIOProxy *proxy)
365
441
msg -> complete = true;
366
442
qemu_cond_signal (& msg -> cv );
367
443
}
444
+ error_prepend (& local_err , "vfio_user_recv: " );
368
445
error_report_err (local_err );
369
446
return -1 ;
370
447
}
0 commit comments