import requests import os import time import smtplib import schedule import logging import sys from keycloak import KeycloakAdmin from time import sleep from datetime import datetime from email.message import EmailMessage from dotenv import load_dotenv log = logging.getLogger('notifier') log.setLevel("INFO") handler = logging.StreamHandler() handler.setFormatter(logging.Formatter('%(asctime)s : %(levelname)s : %(name)s : %(message)s')) log.addHandler(handler) load_dotenv() # Keycloak credentials keycloak_username = os.getenv('KEYCLOAK_USERNAME') keycloak_password = os.getenv('KEYCLOAK_PASSWORD') keycloak_url = os.getenv('KEYCLOAK_URL') keycloak_realm = os.getenv('KEYCLOAK_REALM') keycloak_admin = KeycloakAdmin( server_url=keycloak_url, username=keycloak_username, password=keycloak_password, realm_name=keycloak_realm, verify=True) keycloak_data = { 'username': keycloak_username, 'password': keycloak_password, 'url': keycloak_url, 'realm': keycloak_realm, 'admin': keycloak_admin } def get_user(user_id): user = keycloak_data['admin'].get_user(user_id) return user def get_keycloak_token(): headers = { 'Content-Type': 'application/x-www-form-urlencoded', } data = { 'username': keycloak_data['username'], 'password': keycloak_data['password'], 'grant_type': 'password', 'client_id': 'admin-cli' } try: response = requests.post(keycloak_data['url'] + 'realms/' + keycloak_data['realm'] + '/protocol/openid-connect/token', headers=headers, data=data) except Exception as e: log.error(e) log.info("Something went wrong. Sleeping it off for 10s") sleep(10) token = response.json()['access_token'] return token def send_mail(message, event): # SMTP configuration sender = os.getenv('SMTP_SENDER') receiver = os.getenv('SMTP_RECEIVER') password = os.getenv('SMTP_PASSWORD') host = os.getenv('SMTP_HOST') port = os.getenv('SMTP_PORT') # Email configuration msg = EmailMessage() msg.set_content(message) msg['Subject'] = 'New Keycloak Event: ' + event msg['From'] = sender msg['To'] = receiver # Start TLS and send email server = smtplib.SMTP_SSL(host, port) server.login(sender, password) server.sendmail( sender, receiver, str(msg) ) server.quit() def get_keycloak_events(token, event_type): headers = { 'Accept': 'application/json', 'Authorization': 'Bearer ' + str(token), } response = requests.get(keycloak_data['url'] + 'admin/realms/' + keycloak_data['realm'] + '/events?type=' + event_type, headers=headers) if response.status_code == 200: events = response.json() # read a text file as a list of lines # find the last line, change to a file you have tsfile = './log/ts_' + event_type + '.log' f = open(tsfile, "r") line_list = f.readlines() f.close() last_timestamp = line_list[-1] events_total = 0 events_relevant = 0 events_notif = 0 for event in events: events_total += 1 try: time = event.get('time') timestring = str(time or '1000000') unixtimestring = timestring[:-3] human_time = str(datetime.fromtimestamp(int(unixtimestring)).strftime('%Y-%m-%d %H:%M:%S') or '') user_id = event.get('userId') if(event_type == 'REGISTER'): events_relevant += 1 email = str(event.get('details').get('email') or 'n/A') first_name = str(event.get('details').get('first_name') or 'n/A') last_name = str(event.get('details').get('last_name') or 'n/A') username = str(event.get('details').get('username') or 'n/A') message = event_type + ': ' + first_name + ' ' + last_name + ' / ' + username + ' at' + \ ' Time: ' + human_time + ' Email: ' + email elif (event_type == 'UPDATE_PROFILE'): events_relevant += 1 if(event.get('details').get('updated_email')): message = event_type + ': ' + user_id + ' changed email from ' + str(event.get('details').get('previous_email') or 'n/A') \ + ' to ' + event.get('details').get('updated_email') + ' at ' + human_time else: continue f = open(tsfile, "r") line_list = f.readlines() f.close() last_timestamp_new = line_list[-1] if time > int(last_timestamp): events_notif += 1 log.info("Sending Email: " + message) send_mail(message, event_type) log.info("Email sent.") if time > int(last_timestamp_new): log.info("Updating file.") with open(tsfile, "w") as ts: ts.write(str(time)) log.info("Updated file.") sleep(2) # Time in seconds.dd except Exception as e: log.warning(e) continue log.info("Total events checked : " + str(events_total or "0")) log.info("Relevant events : " + str(events_relevant or "0")) log.info("Notifications generated : " + str(events_notif or "0")) else: log.error('Error: ' + str(response.status_code)) if response.status_code == 401: log.warning("Token probably expired. Terminating notifier to restart...") sys.exit() def job(): log.info("Starting job.") get_keycloak_events(token, 'REGISTER') get_keycloak_events(token, 'UPDATE_PROFILE') log.info("Job's done!") log.info("Grabbing token...") token = get_keycloak_token() log.info("Got token!") schedule.every(1).minutes.do(job) while True: schedule.run_pending() time.sleep(1)