This commit is contained in:
The-Repo-Club 2022-01-19 09:14:52 +00:00
parent ba6955b6b9
commit 66ff693868
No known key found for this signature in database
GPG Key ID: E30EC2FBFB05C44F
5 changed files with 792 additions and 0 deletions

Binary file not shown.

View File

@ -0,0 +1,298 @@
#!/usr/bin/python
import sys, os
import requests
import time
import json
import platform
import urllib
from PySide6.QtWidgets import QDialog, QWidget, QLabel, QLineEdit, QPlainTextEdit, QMenu
from PySide6.QtWidgets import QPushButton, QDialogButtonBox, QVBoxLayout, QSystemTrayIcon
from discord_webhook import DiscordWebhook, DiscordEmbed
from PySide6 import QtCore, QtWidgets, QtGui
from PySide6.QtGui import QIcon, QAction
from os.path import expanduser, isfile
from PySide6.QtCore import QSize
from datetime import datetime
import struct
# for all params, see https://discordapp.com/developers/docs/resources/webhook#execute-webhook
# Discord webhook url:
url = "https://discord.com/api/webhooks/932406481704349766/TJV5m3oprNRQYM1k4_ar8TsokMLuHxoupvsTe9UJlD8Bk7GAcB8xAgxGeLac-U2EZ879"
avatar_url = "https://cdn.discordapp.com/icons/895523133975572510/6511420be1ad5c7e9cfba0e2214281c1.webp?size=96"
username = "LinkNSync Update"
path = os.path.dirname(os.path.realpath(__file__))
class MainWindow(QDialog):
def __init__(self):
QDialog.__init__(self)
self.hidden = False
tray_icon = QIcon(path+"/tray_icon.png")
self.tray = QSystemTrayIcon(self)
self.tray.setIcon(tray_icon)
self.tray.setToolTip("lol")
self.tray.setVisible(True)
self.menu = QMenu()
self.showHide = QAction("Show/Hide", self)
self.menu.addAction(self.showHide)
self.showHide.triggered.connect(self.showHideApp)
self.quit = QAction("Quit", self)
self.menu.addAction(self.quit)
self.quit.triggered.connect(self.quitApp)
self.menu.updatesEnabled()
self.menu.setUpdatesEnabled(True)
self.tray.setContextMenu(self.menu)
self.tray.show()
self.resize(620,900)
self.setWindowTitle("Discord Bot")
self.msg_id_text_label = QLabel(self)
self.msg_id_text_label.setText('Message ID:')
self.msg_id_text_label.move(10, 10)
self.msg_id_text_entry = QLineEdit(self)
self.msg_id_text_entry.move(110, 5)
self.msg_id_text_entry.resize(505, 25)
self.content_text_label = QLabel(self)
self.content_text_label.setText('Content:')
self.content_text_label.move(10, 40)
self.content_text_entry = QLineEdit(self)
self.content_text_entry.move(110, 35)
self.content_text_entry.resize(505, 25)
self.embed_text_label = QLabel(self)
self.embed_text_label.setText('Title:')
self.embed_text_label.move(10, 70)
self.embed_text_entry = QLineEdit(self)
self.embed_text_entry.move(110, 65)
self.embed_text_entry.resize(250, 25)
self.embed_url_entry = QLineEdit(self)
self.embed_url_entry.move(365, 65)
self.embed_url_entry.resize(250, 25)
self.embed_text_label = QLabel(self)
self.embed_text_label.setText('Footer:')
self.embed_text_label.move(10, 100)
self.embed_footer_entry = QLineEdit(self)
self.embed_footer_entry.move(110, 95)
self.embed_footer_entry.resize(505, 25)
self.embed_text_label = QLabel(self)
self.embed_text_label.setText('Image:')
self.embed_text_label.move(10, 130)
self.embed_image_entry = QLineEdit(self)
self.embed_image_entry.move(110, 125)
self.embed_image_entry.resize(505, 25)
self.msg_text_label = QLabel(self)
self.msg_text_label.setText('Message:')
self.msg_text_label.move(10, 160)
self.msg_text_entry = QPlainTextEdit(self)
self.msg_text_entry.move(110, 155)
self.msg_text_entry.resize(505, 300)
msg_text_button = QPushButton('Send message!', self)
msg_text_button.clicked.connect(self.send_msg)
msg_text_button.move(50, 465)
msg_text_button.resize(150,25)
json_text_button_exit = QPushButton('Get JSON', self)
json_text_button_exit.clicked.connect(self.getJson)
json_text_button_exit.move(280, 465)
json_text_button_exit.resize(100,25)
quit_text_button_exit = QPushButton('Quit', self)
quit_text_button_exit.clicked.connect(self.quitApp)
quit_text_button_exit.move(480, 465)
quit_text_button_exit.resize(100,25)
self.main_window_text_history = QPlainTextEdit(self)
self.main_window_text_history.setReadOnly(1)
self.main_window_text_history.move(10, 495)
self.main_window_text_history.resize(605, 400)
load_history_file(self)
def showHideApp(self):
hidden = self.hidden
if hidden == False:
mainWin.hide()
self.hidden = True
else:
mainWin.show()
self.hidden = False
def send_msg(self):
msg_id_text = self.msg_id_text_entry.text()
msg_text = self.msg_text_entry.document().toPlainText()
time_now = datetime.now()
current_time = time_now.strftime("%d/%m/%Y %H:%M:%S")
color_embed = QtWidgets.QColorDialog.getColor()
if color_embed.isValid():
color_ = int(color_embed.name().replace('#',''), 16)
else:
self.quitApp()
content_text = self.content_text_entry.text()
title_text = self.embed_text_entry.text()
title_url = self.embed_url_entry.text()
footer_text = self.embed_footer_entry.text()
embed_image = self.embed_image_entry.text()
if embed_image != "":
isURL = is_url(embed_image)
if isURL:
isImage = is_image(embed_image)
if not isImage:
self.dialog = noImageTypePopup()
self.dialog.show()
webhook = DiscordWebhook(url=url, content=content_text, avatar_url=avatar_url, username=username)
embed = DiscordEmbed(title=title_text, url=title_url, description=msg_text, color=color_)
embed.set_image(url=embed_image)
if footer_text == "":
footer_text = current_time
embed.set_footer(text=footer_text, icon_url=avatar_url)
# add embed object to webhook
webhook.add_embed(embed)
if msg_id_text is None:
response = webhook.execute()
else:
response = webhook.edit(msg_id_text)
jsonPost = json.dumps(response.json(), indent=4)
jsonGet = json.loads(jsonPost)
message_id = jsonGet['id']
msg_data = {
"username": username,
"avatar_url": avatar_url,
"content": content_text,
"embeds": [
{
"title": title_text,
"url": title_url,
"description": msg_text,
"color": color_,
"image": {
"url": embed_image,
},
"footer": {
"text": footer_text,
"icon_url": avatar_url,
}
}
]
}
json_object = json.dumps(msg_data, indent = 4)
self.main_window_text_history.appendPlainText(json_object + "\n\n")
save_history_file(self, message_id, json_object)
def quitApp(self):
sys.exit()
def getJson(self):
msg_id_text = self.msg_id_text_entry.text()
if msg_id_text is not None:
if platform.system() == 'Linux':
home = expanduser("~")
file = home+'/.cache/discord_history/'+msg_id_text+".json"
if isfile(file):
f = open(file)
data = json.load(f)
json_object = json.dumps(data, indent = 4)
self.content_text_entry.setText(data['content'])
self.embed_text_entry.setText(data['embeds'][0]['title'])
self.embed_url_entry.setText(data['embeds'][0]['url'])
self.embed_footer_entry.setText(data['embeds'][0]['footer']['text'])
self.embed_image_entry.setText(data['embeds'][0]['image']['url'])
self.msg_text_entry.setPlainText(data['embeds'][0]['description'])
self.main_window_text_history.setPlainText(json_object + "\n")
class noImageTypePopup(QDialog):
def __init__(self, parent=None):
super().__init__()
print("Opening a new popup window...")
self.setWindowTitle("Error!")
QBtn = QDialogButtonBox.Ok | QDialogButtonBox.Cancel
self.buttonBox = QDialogButtonBox(QBtn)
self.buttonBox.accepted.connect(self.accept)
self.buttonBox.rejected.connect(self.reject)
self.layout = QVBoxLayout()
message = QLabel("Something happened, please chack the url again?")
self.layout.addWidget(message)
self.layout.addWidget(self.buttonBox)
self.setLayout(self.layout)
def save_history_file(self, message_id, json_object):
if platform.system() == 'Linux':
home = expanduser("~")
discord_history = open(home+'/.cache/discord_history/'+message_id+".json",'w+')
discord_history.write(json_object+"\n")
def load_history_file(self):
if platform.system() == 'Linux':
home = expanduser("~")
if isfile(home+'/.cache/discord_history'):
discord_history = open(home+'/.cache/discord_history','r+')
lines = discord_history.readlines()
# Strips the newline character
for line in lines:
self.main_window_text_history.appendPlainText(line.replace('\n',''))
def is_url(url):
if not url:
raise ValueError("url is required")
try:
resp = requests.head(url)
return True if resp.status_code == 200 else False
except Exception as e:
return False
def is_image(image_url):
try:
image_formats = ("image/png", "image/jpeg", "image/jpg")
r = requests.head(image_url)
if r.headers["content-type"] in image_formats:
return True
return False
except Exception as e:
return False
def mkDirs():
if platform.system() == 'Linux':
home = expanduser("~")
path = home+"/.cache/discord_history/"
isExist = os.path.exists(path)
if not isExist:
os.mkdir(path)
if __name__ == "__main__":
mkDirs()
app = QtWidgets.QApplication(sys.argv)
mainWin = MainWindow()
mainWin.show()
sys.exit( app.exec() )

View File

@ -0,0 +1,481 @@
import logging
import json
import time
import datetime
import requests
from webhook_exceptions import *
logger = logging.getLogger(__name__)
class DiscordWebhook:
"""
Webhook for Discord
"""
def __init__(self, url=None, content=None, username=None, avatar_url=None, **kwargs):
"""
Init Webhook for Discord
---------
:param ``url``: your discord webhook url (type: str, list)\n
:keyword ``content:`` the message contents (type: str)\n
:keyword ``username:`` override the default username of the webhook\n
:keyword ``avatar_url:`` override the default avatar of the webhook\n
:keyword ``tts:`` true if this is a TTS message\n
:keyword ``file``: to apply file(s) with message
(For example: file=f.read() (here, f = variable that contain attachement path as "rb" mode))\n
:keyword ``filename:`` apply custom file name on attached file content(s)\n
:keyword ``embeds:`` list of embedded rich content\n
:keyword ``allowed_mentions:`` allowed mentions for the message\n
:keyword ``proxies:`` dict of proxies\n
:keyword ``timeout:`` (optional) amount of seconds to wait for a response from Discord
"""
self.url = url
self.content = content
self.username = username
self.avatar_url = avatar_url
self.tts = kwargs.get("tts", False)
self.files = kwargs.get("files", dict())
self.embeds = kwargs.get("embeds", [])
self.proxies = kwargs.get("proxies")
self.allowed_mentions = kwargs.get("allowed_mentions")
self.timeout = kwargs.get("timeout")
self.rate_limit_retry = kwargs.get("rate_limit_retry")
def add_file(self, file, filename):
"""
adds a file to the webhook
:param file: file content
:param filename: filename
:return:
"""
self.files["_{}".format(filename)] = (filename, file)
def add_embed(self, embed):
"""
adds an embedded rich content
:param embed: embed object or dict
"""
self.embeds.append(embed.__dict__ if isinstance(embed, DiscordEmbed) else embed)
def remove_embed(self, index):
"""
removes embedded rich content from `self.embeds`
:param index: index of embed in `self.embeds`
"""
self.embeds.pop(index)
def remove_file(self, filename):
"""
removes file from `self.files` using specified `filename` if it exists
:param filename: filename
"""
filename = "_{}".format(filename)
if filename in self.files:
del self.files[filename]
def get_embeds(self):
"""
gets all self.embeds as list
:return: self.embeds
"""
return self.embeds
def set_proxies(self, proxies):
"""
sets proxies
:param proxies: dict of proxies
:type proxies: dict
"""
self.proxies = proxies
def set_content(self, content):
"""
sets content
:param content: content string
:type content: string
"""
self.content = content
@property
def json(self):
"""
convert webhook data to json
:return webhook data as json:
"""
embeds = self.embeds
self.embeds = []
# convert DiscordEmbed to dict
for embed in embeds:
self.add_embed(embed)
data = {
key: value
for key, value in self.__dict__.items()
if value and key not in {"url", "files", "filename"}
}
embeds_empty = not any(data["embeds"]) if "embeds" in data else True
if embeds_empty and "content" not in data and bool(self.files) is False:
logger.error("webhook message is empty! set content or embed data")
return data
def remove_embeds(self):
"""
Sets `self.embeds` to empty `list`.
"""
self.embeds = []
def remove_files(self):
"""
Sets `self.files` to empty `dict`.
"""
self.files = {}
def api_post_request(self, url):
if bool(self.files) is False:
response = requests.post(url, json=self.json, proxies=self.proxies,
params={'wait': True},
timeout=self.timeout)
else:
self.files["payload_json"] = (None, json.dumps(self.json))
response = requests.post(url, files=self.files,
proxies=self.proxies,
timeout=self.timeout)
return response
def api_patch_request(self, url, message_id):
if bool(self.files) is False:
response = requests.patch(url+'/messages/'+str(message_id), json=self.json, proxies=self.proxies, params={'wait': True}, timeout=self.timeout)
else:
self.files["payload_json"] = (None, json.dumps(self.json))
response = requests.patch(url+'/messages/'+str(previous_sent_message_id), files=self.files, proxies=self.proxies, timeout=self.timeout)
return response
def execute(self, remove_embeds=False, remove_files=False):
"""
executes the Webhook
:param remove_embeds: if set to True, calls `self.remove_embeds()` to empty `self.embeds` after webhook is executed
:param remove_files: if set to True, calls `self.remove_files()` to empty `self.files` after webhook is executed
:return: Webhook response
"""
webhook_urls = self.url if isinstance(self.url, list) else [self.url]
urls_len = len(webhook_urls)
responses = []
for i, url in enumerate(webhook_urls):
response = self.api_post_request(url)
if response.status_code in [200, 204]:
logger.debug(
"[{index}/{length}] Webhook executed".format(
index=i+1, length=urls_len
)
)
elif response.status_code == 429 and self.rate_limit_retry:
while response.status_code == 429:
errors = json.loads(
response.content.decode('utf-8'))
wh_sleep = (int(errors['retry_after']) / 1000) + 0.15
time.sleep(wh_sleep)
logger.error(
"Webhook rate limited: sleeping for {wh_sleep} "
"seconds...".format(
wh_sleep=wh_sleep
)
)
response = self.api_post_request(url)
if response.status_code in [200, 204]:
logger.debug(
"[{index}/{length}] Webhook executed".format(
index=i + 1, length=urls_len
)
)
break
else:
logger.error(
"[{index}/{length}] Webhook status code {status_code}: {content}".format(
index=i+1,
length=urls_len,
status_code=response.status_code,
content=response.content.decode("utf-8"),
)
)
responses.append(response)
if remove_embeds:
self.remove_embeds()
if remove_files:
self.remove_files()
return responses[0] if len(responses) == 1 else responses
def edit(self, message_id , remove_embeds=False, remove_files=False):
"""
executes the Webhook
:param remove_embeds: if set to True, calls `self.remove_embeds()` to empty `self.embeds` after webhook is executed
:param remove_files: if set to True, calls `self.remove_files()` to empty `self.files` after webhook is executed
:return: Webhook response
"""
webhook_urls = self.url if isinstance(self.url, list) else [self.url]
urls_len = len(webhook_urls)
responses = []
for i, url in enumerate(webhook_urls):
response = self.api_patch_request(url, message_id)
if response.status_code in [200, 204]:
logger.debug(
"[{index}/{length}] Webhook executed".format(
index=i+1, length=urls_len
)
)
elif response.status_code == 429 and self.rate_limit_retry:
while response.status_code == 429:
errors = json.loads(
response.content.decode('utf-8'))
wh_sleep = (int(errors['retry_after']) / 1000) + 0.15
time.sleep(wh_sleep)
logger.error(
"Webhook rate limited: sleeping for {wh_sleep} "
"seconds...".format(
wh_sleep=wh_sleep
)
)
response = self.api_patch_request(url, message_id)
if response.status_code in [200, 204]:
logger.debug(
"[{index}/{length}] Webhook executed".format(
index=i + 1, length=urls_len
)
)
break
else:
logger.error(
"[{index}/{length}] Webhook status code {status_code}: {content}".format(
index=i+1,
length=urls_len,
status_code=response.status_code,
content=response.content.decode("utf-8"),
)
)
responses.append(response)
if remove_embeds:
self.remove_embeds()
if remove_files:
self.remove_files()
return responses[0] if len(responses) == 1 else responses
def delete(self, sent_webhook):
"""
deletes the webhook passed as a response
:param sent_webhook: webhook.execute() response
:return: Response
"""
sent_webhook = sent_webhook if isinstance(sent_webhook, list) else [sent_webhook]
webhook_len = len(sent_webhook)
responses = []
for i, webhook in enumerate(sent_webhook):
url = webhook.url.split('?')[0] # removes any query params
previous_sent_message_id = json.loads(webhook.content.decode('utf-8'))['id']
response = requests.delete(url+'/messages/'+str(previous_sent_message_id), proxies=self.proxies, timeout=self.timeout)
if response.status_code in [200, 204]:
logger.debug(
"[{index}/{length}] Webhook deleted".format(
index=i + 1,
length=webhook_len,
)
)
else:
logger.error(
"[{index}/{length}] Webhook status code {status_code}: {content}".format(
index=i + 1,
length=webhook_len,
status_code=response.status_code,
content=response.content.decode("utf-8"),
)
)
responses.append(response)
return responses[0] if len(responses) == 1 else responses
class DiscordEmbed:
"""
Discord Embed
"""
def __init__(self, title=None, description=None, hex_color='33ccff', **kwargs):
"""
Init Discord Embed
-----------
:keyword ``title:`` title of embed\n
:keyword ``description:`` description body of embed\n
:keyword ``url:`` add an url to make your embeded title a clickable link\n
:keyword ``timestamp:`` timestamp of embed content\n
:keyword ``color:`` color code of the embed as int\n
:keyword ``hex_color:`` color code of the embed as a hex string\n
:keyword ``footer:`` footer texts\n
:keyword ``image:`` your image url here\n
:keyword ``thumbnail:`` your thumbnail url here\n
:keyword ``video:`` to apply video with embeded, your video source url here\n
:keyword ``provider:`` provider information\n
:keyword ``author:`` author information\n
:keyword ``fields:`` fields information
"""
self.title = title
self.description = description
self.url = kwargs.get("url")
self.timestamp = kwargs.get("timestamp")
self.color = kwargs.get("color")
if self.color:
self.set_color(self.color)
self.hex_color = hex_color
self.footer = kwargs.get("footer")
self.image = kwargs.get("image")
self.thumbnail = kwargs.get("thumbnail")
self.video = kwargs.get("video")
self.provider = kwargs.get("provider")
self.author = kwargs.get("author")
self.fields = kwargs.get("fields", [])
def set_title(self, title):
"""
set title of embed
:param title: title of embed
"""
self.title = title
def set_description(self, description):
"""
set description of embed
:param description: description of embed
"""
self.description = description
def set_url(self, url):
"""
set url of embed
:param url: url of embed
"""
self.url = url
def set_timestamp(self, timestamp=None):
"""
set timestamp of embed content
:param timestamp: (optional) timestamp of embed content
"""
if timestamp is None:
timestamp = time.time()
self.timestamp = str(datetime.datetime.utcfromtimestamp(timestamp))
def set_color(self, color):
"""
set color code of the embed as decimal(int) or hex(string)
:param color: color code of the embed as decimal(int) or hex(string)
"""
self.color = int(color, 16) if isinstance(color, str) else color
if self.color not in range(16777216):
raise ColourNotInRangeException(color)
def set_footer(self, **kwargs):
"""
set footer information of embed
:keyword text: footer text
:keyword icon_url: url of footer icon (only supports http(s) and attachments)
:keyword proxy_icon_url: a proxied url of footer icon
"""
self.footer = {
"text": kwargs.get("text"),
"icon_url": kwargs.get("icon_url"),
"proxy_icon_url": kwargs.get("proxy_icon_url"),
}
def set_image(self, **kwargs):
"""
set image of embed
:keyword url: source url of image (only supports http(s) and attachments)
:keyword proxy_url: a proxied url of the image
:keyword height: height of image
:keyword width: width of image
"""
self.image = {
"url": kwargs.get("url"),
"proxy_url": kwargs.get("proxy_url"),
"height": kwargs.get("height"),
"width": kwargs.get("width"),
}
def set_thumbnail(self, **kwargs):
"""
set thumbnail of embed
:keyword url: source url of thumbnail (only supports http(s) and attachments)
:keyword proxy_url: a proxied thumbnail of the image
:keyword height: height of thumbnail
:keyword width: width of thumbnail
"""
self.thumbnail = {
"url": kwargs.get("url"),
"proxy_url": kwargs.get("proxy_url"),
"height": kwargs.get("height"),
"width": kwargs.get("width"),
}
def set_video(self, **kwargs):
"""
set video of embed
:keyword url: source url of video
:keyword height: height of video
:keyword width: width of video
"""
self.video = {
"url": kwargs.get("url"),
"height": kwargs.get("height"),
"width": kwargs.get("width"),
}
def set_provider(self, **kwargs):
"""
set provider of embed
:keyword name: name of provider
:keyword url: url of provider
"""
self.provider = {
"name": kwargs.get("name"),
"url": kwargs.get("url"),
}
def set_author(self, **kwargs):
"""
set author of embed
:keyword name: name of author
:keyword url: url of author
:keyword icon_url: url of author icon (only supports http(s) and attachments)
:keyword proxy_icon_url: a proxied url of author icon
"""
self.author = {
"name": kwargs.get("name"),
"url": kwargs.get("url"),
"icon_url": kwargs.get("icon_url"),
"proxy_icon_url": kwargs.get("proxy_icon_url"),
}
def add_embed_field(self, **kwargs):
"""
set field of embed
:keyword name: name of the field
:keyword value: value of the field
:keyword inline: (optional) whether or not this field should display inline
"""
self.fields.append(
{
"name": kwargs.get("name"),
"value": kwargs.get("value"),
"inline": kwargs.get("inline", True),
}
)
def del_embed_field(self, index):
"""
remove field from `self.fields`
:param index: index of field in `self.fields`
"""
self.fields.pop(index)
def get_embed_fields(self):
"""
get all `self.fields` as list
:return: `self.fields`
"""
return self.fields

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.4 KiB

View File

@ -0,0 +1,13 @@
class ColourNotInRangeException(Exception):
"""
A valid colour must take an integer value between 0 and 16777216 inclusive
This Exception will be raised when a colour is not in that range.
"""
def __init__(self, color):
self.color = color
def __str__(self):
return repr('"{}" is not in valid range of colors. The valid ranges '
'of colors are 0 to 16777215 inclusive (INTEGERS) and 0 '
'to FFFFFF inclusive (HEXADECIMAL)'.format(self.color))