This repository has been archived on 2024-08-02. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
Keycloak_Notifier/notifier.py
2024-02-05 06:11:53 +01:00

192 lines
6.1 KiB
Python

import requests
import os
import time
import smtplib
import schedule
import logging
import sys
from keycloak import KeycloakAdmin
from time import sleep
from datetime import datetime
from email.message import EmailMessage
from dotenv import load_dotenv
log = logging.getLogger('notifier')
log.setLevel("INFO")
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(asctime)s : %(levelname)s : %(name)s : %(message)s'))
log.addHandler(handler)
load_dotenv()
# Keycloak credentials
keycloak_username = os.getenv('KEYCLOAK_USERNAME')
keycloak_password = os.getenv('KEYCLOAK_PASSWORD')
keycloak_url = os.getenv('KEYCLOAK_URL')
keycloak_realm = os.getenv('KEYCLOAK_REALM')
keycloak_admin = KeycloakAdmin(
server_url=keycloak_url,
username=keycloak_username,
password=keycloak_password,
realm_name=keycloak_realm,
verify=True)
keycloak_data = {
'username': keycloak_username,
'password': keycloak_password,
'url': keycloak_url,
'realm': keycloak_realm,
'admin': keycloak_admin
}
def get_user(user_id):
user = keycloak_data['admin'].get_user(user_id)
return user
def get_keycloak_token():
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
}
data = {
'username': keycloak_data['username'],
'password': keycloak_data['password'],
'grant_type': 'password',
'client_id': 'admin-cli'
}
try:
response = requests.post(keycloak_data['url'] + 'realms/' + keycloak_data['realm'] +
'/protocol/openid-connect/token', headers=headers, data=data)
except Exception as e:
log.error(e)
log.info("Something went wrong. Sleeping it off for 10s")
sleep(10)
token = response.json()['access_token']
return token
def send_mail(message, event):
# SMTP configuration
sender = os.getenv('SMTP_SENDER')
receiver = os.getenv('SMTP_RECEIVER')
password = os.getenv('SMTP_PASSWORD')
host = os.getenv('SMTP_HOST')
port = os.getenv('SMTP_PORT')
# Email configuration
msg = EmailMessage()
msg.set_content(message)
msg['Subject'] = 'New Keycloak Event: ' + event
msg['From'] = sender
msg['To'] = receiver
# Start TLS and send email
server = smtplib.SMTP_SSL(host, port)
server.login(sender, password)
server.sendmail(
sender,
receiver,
str(msg)
)
server.quit()
def get_keycloak_events(token, event_type):
headers = {
'Accept': 'application/json',
'Authorization': 'Bearer ' + str(token),
}
response = requests.get(keycloak_data['url'] + 'admin/realms/' +
keycloak_data['realm'] + '/events?type=' + event_type, headers=headers)
if response.status_code == 200:
events = response.json()
# read a text file as a list of lines
# find the last line, change to a file you have
tsfile = './log/ts_' + event_type + '.log'
f = open(tsfile, "r")
line_list = f.readlines()
f.close()
last_timestamp = line_list[-1]
events_total = 0
events_relevant = 0
events_notif = 0
for event in events:
events_total += 1
try:
time = event.get('time')
timestring = str(time or '1000000')
unixtimestring = timestring[:-3]
human_time = str(datetime.fromtimestamp(int(unixtimestring)).strftime('%Y-%m-%d %H:%M:%S') or '')
user_id = event.get('userId')
if(event_type == 'REGISTER'):
events_relevant += 1
email = str(event.get('details').get('email') or 'n/A')
first_name = str(event.get('details').get('first_name') or 'n/A')
last_name = str(event.get('details').get('last_name') or 'n/A')
username = str(event.get('details').get('username') or 'n/A')
message = event_type + ': ' + first_name + ' ' + last_name + ' / ' + username + ' at' + \
' Time: ' + human_time + ' Email: ' + email
elif (event_type == 'UPDATE_PROFILE'):
events_relevant += 1
if(event.get('details').get('updated_email')):
message = event_type + ': ' + user_id + ' changed email from ' + str(event.get('details').get('previous_email') or 'n/A') \
+ ' to ' + event.get('details').get('updated_email') + ' at ' + human_time
else:
continue
f = open(tsfile, "r")
line_list = f.readlines()
f.close()
last_timestamp_new = line_list[-1]
if time > int(last_timestamp):
events_notif += 1
log.info("Sending Email: " + message)
send_mail(message, event_type)
log.info("Email sent.")
if time > int(last_timestamp_new):
log.info("Updating file.")
with open(tsfile, "w") as ts:
ts.write(str(time))
log.info("Updated file.")
sleep(2) # Time in seconds.dd
except Exception as e:
log.warning(e)
continue
log.info("Total events checked : " + str(events_total or "0"))
log.info("Relevant events : " + str(events_relevant or "0"))
log.info("Notifications generated : " + str(events_notif or "0"))
else:
log.error('Error: ' + str(response.status_code))
if response.status_code == 401:
log.warning("Token probably expired. Terminating notifier to restart...")
sys.exit()
def job():
log.info("Starting job.")
get_keycloak_events(token, 'REGISTER')
get_keycloak_events(token, 'UPDATE_PROFILE')
log.info("Job's done!")
log.info("Grabbing token...")
token = get_keycloak_token()
log.info("Got token!")
schedule.every(1).minutes.do(job)
while True:
schedule.run_pending()
time.sleep(1)