用QQ监控PBS任务

本文于2018年8月20日发表于知乎专栏,查看原文
本文于2018年8月7日发表于微信公众号,查看原文

环境准备

1.安装酷Q和HTTP API插件

酷Q:https://cqp.cc/
CoolQ HTTP API 插件:https://github.com/richardchien/coolq-http-api
将HTTP API插件放置于app文件夹中,启动并登陆酷Q,在菜单中启用插件。
HTTP API插件的默认地址为http://localhost:5700/

2.安装cqhttp

1
pip install cqhttp

安装后在Python中使用:

1
2
3
from cqhttp import CQHttp
bot = CQHttp(api_root='http://219.228.63.56:5700/')
bot.send_group_msg(group_id=599070209,message='hello world')

default

第二行定义了一个机器人,第三行让这个机器人发一条hello world的消息。

开始编写程序

1.获取任务状态

1
2
3
4
5
6
import subprocess as sp
class CQJobMonitor():
def jobstate(self):
states=sp.check_output(self.command.split()).decode('utf-8').split("\n")
states=[' '.join(line.split()) for line in states if any([keyword in line for keyword in self.keywords])]
return states

这里self.command为获取任务状态的命令,如qstatself.keywords为关键词,如['jzzeng']。如此即可返回含有关键词的任务状态。

2.发送消息

1
2
3
4
5
6
7
8
import time
from cqhttp import CQHttp
class CQJobMonitor():
def sendstate(self):
localtime = time.asctime(time.localtime(time.time()))
message=localtime+"\n"+self.tip+"\n"
message+="\n".join(self.jobstate())
self.bot.send_group_msg(group_id=self.group_id,message=message)

调用定义好的CQHttp类发送消息,顺便带上当前时间。

3.循环发送

设置每300秒发送一次:

1
2
3
4
5
6
7
8
9
from threading import Timer
import os
class CQJobMonitor():
def loopmonitor(self):
while not os.path.exists("pause"):
timer=Timer(self.timeinterval,self.sendstate)
timer.start()
timer.join()
print("Exit per signal.")

这里self.timeinterval设置为300.

运行

完整的程序如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import os
import subprocess as sp
from cqhttp import CQHttp
import time
from threading import Timer

class CQJobMonitor():
def __init__(self,command="qstat",cqroot='http://219.228.63.56:5700/',group_id=312676525,keywords=['jzzeng'],timeinterval=300):
self.tip="JobMonitor"
print(self.tip)
self.command=command
self.group_id=group_id
self.keywords=keywords
self.bot = CQHttp(api_root=cqroot)
self.timeinterval=timeinterval

def jobstate(self):
states=sp.check_output(self.command.split()).decode('utf-8').split("\n")
states=[' '.join(line.split()) for line in states if any([keyword in line for keyword in self.keywords])]
return states

def sendstate(self):
localtime = time.asctime(time.localtime(time.time()))
message=localtime+"\n"+self.tip+"\n"
message+="\n".join(self.jobstate())
self.bot.send_group_msg(group_id=self.group_id,message=message)

def loopmonitor(self):
while not os.path.exists("pause"):
timer=Timer(self.timeinterval,self.sendstate)
timer.start()
timer.join()
print("Exit per signal.")

代码已上传至pypi,可用pip install CQJobMonitor安装。
编写monitor.py

1
2
from CQJobMonitor import CQJobMonitor
CQJobMonitor(command="qstat",cqroot='http://219.228.63.56:5700/',group_id=312676525,keywords=['jzzeng'],timeinterval=300).loopmonitor()

并在后台运行:

1
nohup python monitor.py >/dev/null &

image

即可定时发送任务情况。