Jinzhe Zeng's Blog

苟利国家生死以,岂因祸福避趋之

0%

用QQ监控PBS任务

2018/08/07

环境准备

1.安装酷Q和HTTP API插件

酷Q:https://cqp.cc/
CoolQ HTTP API 插件:https://github.com/richardchien/coolq-http-api
将HTTP API插件放置于app文件夹中,启动并登陆酷Q,在菜单中启用插件。
HTTP API插件的默认地址为http://localhost:5700/

2.安装cqhttp

1
pip install cqhttp

安装后在Python中使用:

1
2
3
from cqhttp import CQHttp
bot = CQHttp(api_root='http://219.228.63.56:5700/')
bot.send_group_msg(group_id=599070209,message='hello world')

default

第二行定义了一个机器人,第三行让这个机器人发一条hello world的消息。

开始编写程序

1.获取任务状态

1
2
3
4
5
6
import subprocess as sp
class CQJobMonitor():
def jobstate(self):
states=sp.check_output(self.command.split()).decode('utf-8').split("\n")
states=[' '.join(line.split()) for line in states if any([keyword in line for keyword in self.keywords])]
return states

这里self.command为获取任务状态的命令,如qstatself.keywords为关键词,如['jzzeng']。如此即可返回含有关键词的任务状态。

2.发送消息

1
2
3
4
5
6
7
8
import time
from cqhttp import CQHttp
class CQJobMonitor():
def sendstate(self):
localtime = time.asctime(time.localtime(time.time()))
message=localtime+"\n"+self.tip+"\n"
message+="\n".join(self.jobstate())
self.bot.send_group_msg(group_id=self.group_id,message=message)

调用定义好的CQHttp类发送消息,顺便带上当前时间。

3.循环发送

设置每300秒发送一次:

1
2
3
4
5
6
7
8
9
from threading import Timer
import os
class CQJobMonitor():
def loopmonitor(self):
while not os.path.exists("pause"):
timer=Timer(self.timeinterval,self.sendstate)
timer.start()
timer.join()
print("Exit per signal.")

这里self.timeinterval设置为300.

运行

完整的程序如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import os
import subprocess as sp
from cqhttp import CQHttp
import time
from threading import Timer

class CQJobMonitor():
truedef __init__(self,command="qstat",cqroot='http://219.228.63.56:5700/',group_id=312676525,keywords=['jzzeng'],timeinterval=300):
truetrueself.tip="JobMonitor"
truetrueprint(self.tip)
truetrueself.command=command
truetrueself.group_id=group_id
truetrueself.keywords=keywords
truetrueself.bot = CQHttp(api_root=cqroot)
truetrueself.timeinterval=timeinterval

truedef jobstate(self):
truetruestates=sp.check_output(self.command.split()).decode('utf-8').split("\n")
truetruestates=[' '.join(line.split()) for line in states if any([keyword in line for keyword in self.keywords])]
truetruereturn states

truedef sendstate(self):
truetruelocaltime = time.asctime(time.localtime(time.time()))
truetruemessage=localtime+"\n"+self.tip+"\n"
truetruemessage+="\n".join(self.jobstate())
truetrueself.bot.send_group_msg(group_id=self.group_id,message=message)

truedef loopmonitor(self):
truetruewhile not os.path.exists("pause"):
truetruetruetimer=Timer(self.timeinterval,self.sendstate)
truetruetruetimer.start()
truetruetruetimer.join()
truetrueprint("Exit per signal.")

代码已上传至pypi,可用pip install CQJobMonitor安装。
编写monitor.py

1
2
from CQJobMonitor import CQJobMonitor
CQJobMonitor(command="qstat",cqroot='http://219.228.63.56:5700/',group_id=312676525,keywords=['jzzeng'],timeinterval=300).loopmonitor()

并在后台运行:

1
nohup python monitor.py >/dev/null &

image

即可定时发送任务情况。

Jinzhe Zeng 微信

微信

Jinzhe Zeng 支付宝

支付宝

  • 本文作者: Jinzhe Zeng
  • 本文链接: https://njzjz.win/2018/08/06/qqpbs/
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!

欢迎关注我的其它发布渠道