Skip to content

Commit

Permalink
Implement %task purge and %task kill #154
Browse files Browse the repository at this point in the history
  • Loading branch information
Bo Peng committed Dec 7, 2018
1 parent 91579a9 commit 5fe5a33
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 15 deletions.
27 changes: 25 additions & 2 deletions src/sos_notebook/kernel.js
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,20 @@ define([
console.log(`Cannot find cell by cell ID ${info.cell_id} or task ID ${info.task_id}`)
return;
}
if (info.status == 'purged') {
if (has_status_table) {
let data = {
'output_type': 'update_display_data',
'transient': {'display_id': `task_${elem_id}`},
'metadata': {},
'data': {
'text/html': ''
}
}
cell.output_area.append_output(data);
}
return;
}
// if there is an existing status table, try to retrieve its information
// the new data does not have it
let timer_text = '';
Expand Down Expand Up @@ -2081,9 +2095,18 @@ td.task_timer
text-align: left;
}
td.task_timer pre
{
text-overflow: ellipsis;
overflow: hidden;
white-space: nowrap;
}
td.task_tags
{
text-align: left;
max-width: 33em;
}
td.task_icon {
Expand All @@ -2097,12 +2120,12 @@ td.task_status,
}
table.workflow_table span {
text-transform: uppercase;
/* text-transform: uppercase; */
font-family: monospace;
}
table.task_table span {
text-transform: uppercase;
/* text-transform: uppercase; */
font-family: monospace;
}
Expand Down
84 changes: 71 additions & 13 deletions src/sos_notebook/magics.py
Original file line number Diff line number Diff line change
Expand Up @@ -2020,11 +2020,16 @@ def status(self, args):
# <tr><th align="right" width="30%">Status</th><td align="left"><div class="one_liner">completed</div></td></tr>
status = result.split(
'>Status<', 1)[-1].split('</div', 1)[0].split('>')[-1]
host._task_engine.update_task_status(args.tasks[0], status)
self.sos_kernel.send_frontend_msg('task_status',
{
'update_only': True,
'queue': args.queue,
'task_id': args.tasks[0],
'status': status,
}
)
elif args.tags:
status_output = host._task_engine.query_tasks(tags=args.tags, verbosity=2)
env.log_to_file(f'tags={args.tags}')
env.log_to_file(status_output)
self.sos_kernel.send_response(self.sos_kernel.iopub_socket, 'stream',
{'name': 'stdout', 'text': status_output })
for line in status_output.split('\n'):
Expand Down Expand Up @@ -2053,18 +2058,71 @@ def resume(self, args):
def kill(self, args):
# kill specified task
from sos.hosts import Host
Host(args.queue)._task_engine.kill_tasks(args.tasks)
for tid in args.tasks:
self.sos_kernel.send_frontend_msg('task_status',
{
'task_id': tid,
'queue': args.queue,
'status': 'abort'
})
host = Host(args.queue)
if args.tasks:
# kill specified task
ret = host._task_engine.kill_tasks(args.tasks)
elif args.tags:
ret = host._task_engine.kill_tasks([], tags=args.tags)
else:
self.sos_kernel.warn('Please specify either a list of task or a tag')
return
self.sos_kernel.send_response(self.sos_kernel.iopub_socket, 'stream',
{'name': 'stdout', 'text': ret })
for line in ret.split('\n'):
if not line.strip():
continue
try:
# return creation time, start time, and duration
tid, tst = line.split('\t')
self.sos_kernel.send_frontend_msg('task_status',
{
'update_only': True,
'queue': args.queue,
'task_id': tid,
'status': tst
}
)
except Exception as e:
env.logger.warning(
f'Unrecognized response "{line}" ({e.__class__.__name__}): {e}')


def purge(self, args):
self.sos_kernel.warn(args)
return
# kill specified task
from sos.hosts import Host
host = Host(args.queue)
if args.tasks:
# kill specified task
ret = host._task_engine.purge_tasks(args.tasks)
elif args.tags:
ret = host._task_engine.purge_tasks([], tags=args.tags)
else:
self.sos_kernel.warn('Please specify either a list of task or a tag')
return
if ret:
self.sos_kernel.send_response(self.sos_kernel.iopub_socket, 'stream',
{'name': 'stdout', 'text': ret })
else:
self.sos_kernel.send_response(self.sos_kernel.iopub_socket, 'stream',
{'name': 'stderr', 'text': 'No matching task to purge' })
for line in ret.split('\n'):
if not line.strip():
continue
try:
# return creation time, start time, and duration
tid, tst = line.split('\t')
self.sos_kernel.send_frontend_msg('task_status',
{
'update_only': True,
'queue': args.queue,
'task_id': tid,
'status': tst
}
)
except Exception as e:
env.logger.warning(
f'Unrecognized response "{line}" ({e.__class__.__name__}): {e}')

def apply(self, code, silent, store_history, user_expressions, allow_stdin):
options, remaining_code = self.get_magic_and_code(code, False)
Expand Down

0 comments on commit 5fe5a33

Please sign in to comment.