Commit e2427e25 by Oleksandr Barabash

init commit

parents
.idea/
__pycache__/
**.pyc
Cake in a bot bot
""" Bot App """
import json
import sys
import traceback
from datetime import datetime
from http import HTTPStatus
import marshmallow_dataclass
from aiohttp import web
from aiohttp.web import Request, Response, json_response
from azure.cosmos import PartitionKey
from botbuilder.core import (
BotFrameworkAdapterSettings,
TurnContext,
BotFrameworkAdapter,
)
from botbuilder.schema import Activity, ActivityTypes
from marshmallow import ValidationError, EXCLUDE
from bots import TeamsMessagingExtensionsActionPreviewBot
from bots.exceptions import ConversationNotFound, DataParsingError
from config import AppConfig, COSMOS_CLIENT, CosmosDBConfig
from entities.json.notification import Notification, NotificationCosmos
from utils.cosmos_client import ItemNotFound
from utils.json_func import json_loads
app_config = AppConfig()
# Create adapter.
# See https://aka.ms/about-bot-adapter to learn more about how bots work.
app_settings = BotFrameworkAdapterSettings(app_config.APP_ID,
app_config.APP_PASSWORD)
ADAPTER = BotFrameworkAdapter(app_settings)
# noinspection PyShadowingNames
async def on_error(context: TurnContext, error: Exception):
""" Executed on any error """
# This check writes out errors to console log .vs. app insights.
# NOTE: In production environment,
# you should consider logging this to Azure application insights.
print(f"\n [on_turn_error] unhandled error: {error}", file=sys.stderr)
traceback.print_exc()
# Send a message to the user
await context.send_activity(
"The bot encountered an error or bug.\r\n"
"To continue to run this bot, please fix the bot source code."
)
# Send a trace activity if we're talking to the Bot Framework Emulator
if context.activity.channel_id == "emulator":
# Create a trace activity that contains the error object
trace_activity = Activity(
label="TurnError",
name="on_turn_error Trace",
timestamp=datetime.utcnow(),
type=ActivityTypes.trace,
value=f"{error}",
value_type="https://www.botframework.com/schemas/error",
)
# Send a trace activity,
# which will be displayed in Bot Framework Emulator
await context.send_activity(trace_activity)
ADAPTER.on_turn_error = on_error
BOT = TeamsMessagingExtensionsActionPreviewBot(app_settings, ADAPTER)
async def v1_get_initiations(request: Request) -> Response:
""" Get Initiations by Notification ID """
# noinspection PyBroadException
try:
notification_id = request.match_info['notification_id']
inits = await COSMOS_CLIENT.get_initiation_items(notification_id)
data = dict(data=[dict(initiator=init.initiator,
timestamp=init.timestamp,
id=init.id) for init in inits])
return Response(body=json.dumps(data), status=HTTPStatus.OK)
except ItemNotFound as e:
print("ItemNotFound:", e)
return Response(status=HTTPStatus.NOT_FOUND)
except Exception as e:
print("error:", e)
return Response(status=HTTPStatus.BAD_REQUEST)
async def v1_get_notification(request: Request) -> Response:
""" Get Notification by ID """
# noinspection PyBroadException
try:
notification_id = request.match_info['notification_id']
notification = await COSMOS_CLIENT.get_notification(notification_id)
acks = await COSMOS_CLIENT.get_acknowledge_items(notification_id)
data = dict(data=dict(
timestamp=notification.timestamp,
status="DELIVERED",
acknowledged=[dict(username=ack.username,
timestamp=ack.timestamp) for ack in acks],
))
return Response(body=json.dumps(data), status=HTTPStatus.OK)
except ItemNotFound as e:
print("ItemNotFound:", e)
return Response(status=HTTPStatus.NOT_FOUND)
except Exception as e:
print("error:", e)
return Response(status=HTTPStatus.BAD_REQUEST)
async def v1_notification(request: Request) -> Response:
""" Notify channel with the link """
# todo(s1z): add auth
# noinspection PyBroadException
try:
request_body = await request.text()
schema = Notification.get_schema(unknown=EXCLUDE)
notification = schema.load(json_loads(request_body, {})).to_db()
message_id = await BOT.send_notification(notification)
response_body = json.dumps({"data": {"messageId": message_id}})
return Response(body=response_body, status=HTTPStatus.OK)
except ConversationNotFound:
return Response(status=HTTPStatus.NOT_FOUND,
reason="Conversation not found")
except DataParsingError:
return Response(status=HTTPStatus.BAD_REQUEST,
reason="Bad data structure")
except Exception as e:
print("error:", e)
return Response(status=HTTPStatus.INTERNAL_SERVER_ERROR)
async def v1_messages(request: Request) -> Response:
""" messages endpoint """
if "application/json" in request.headers["Content-Type"]:
body = await request.json()
else:
return Response(status=HTTPStatus.UNSUPPORTED_MEDIA_TYPE)
activity = Activity().deserialize(body)
auth_header = (request.headers["Authorization"]
if "Authorization" in request.headers else "")
invoke_response = await ADAPTER.process_activity(
activity, auth_header, BOT.on_turn
)
if invoke_response:
return json_response(data=invoke_response.body,
status=invoke_response.status)
return Response(status=HTTPStatus.OK)
@web.middleware
async def error_middleware(request, handler):
""" Error handler """
try:
response = await handler(request)
if response.status != 404:
return response
message = response.reason
except web.HTTPException as ex:
if ex.status != 404:
raise
message = ex.reason
return Response(status=HTTPStatus.NOT_FOUND,
body=json.dumps({"error": message}))
APP = web.Application(middlewares=[error_middleware])
APP.router.add_post("/api/v1/messages", v1_messages)
APP.router.add_post("/api/v1/notification", v1_notification)
APP.router.add_get("/api/v1/notification/{notification_id}",
v1_get_notification)
APP.router.add_get("/api/v1/initiations/{notification_id}", v1_get_initiations)
BOT.add_web_app(APP)
BOT.add_cosmos_client(COSMOS_CLIENT)
if __name__ == "__main__":
try:
web.run_app(APP, host="0.0.0.0", port=app_config.PORT)
except Exception as error:
raise error
{
"type": "AdaptiveCard",
"$schema": "http://adaptivecards.io/schemas/adaptive-card.json",
"version": "1.5",
"body": [
{
"type": "Container",
"items": [
{
"type": "TextBlock",
"wrap": true,
"text": "Open task module card"
}
]
},
{
"type": "Container",
"items": [
{
"type": "ActionSet",
"actions": [
{
"type": "Action.Submit",
"title": "Open Task",
"data": {
"msteams": {"type": "task/fetch"},
"mx": {
"type": "task/default"
}
}
}
]
}
]
}
]
}
\ No newline at end of file
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
from .messaging_extension_action_preview_bot import (
TeamsMessagingExtensionsActionPreviewBot,
)
__all__ = ["TeamsMessagingExtensionsActionPreviewBot"]
""" Bot Exceptions """
class BotException(Exception):
""" Base Bot Exception """
def __init__(self, message="BotError"):
self.message = message
class ConversationNotFound(BotException):
""" Conversation Not found exception """
pass
class DataParsingError(BotException):
""" Data Parsing Error """
pass
""" Message extension bot """
import asyncio
from asyncio import Future
from typing import Optional
from urllib.parse import urlparse, parse_qsl, urlencode
from aiohttp.web_app import Application
from botbuilder.core import (TurnContext, CardFactory, BotFrameworkAdapter,
BotFrameworkAdapterSettings)
from botbuilder.schema import Activity, ActivityTypes, ConversationReference
from botbuilder.schema.teams import (TaskModuleContinueResponse,
TaskModuleTaskInfo, TaskModuleResponse,
TaskModuleRequest)
from botbuilder.core.teams import TeamsActivityHandler
from marshmallow import EXCLUDE
from bots.exceptions import ConversationNotFound
from config import TaskModuleConfig, AppConfig
from entities.json.acknowledge import Acknowledge
from entities.json.conversation_reference_schema import (
ConversationReferenceSchema
)
from entities.json.medx import MedX, MXTypes
from entities.json.notification import Notification, NotificationCosmos
from utils.card_helper import CardHelper
from utils.cosmos_client import CosmosClient, ItemNotFound
from utils.function import get_first_or_none
class TeamsMessagingExtensionsActionPreviewBot(TeamsActivityHandler):
""" The Bot """
settings: BotFrameworkAdapterSettings
adapter: BotFrameworkAdapter
app: Application
cosmos_client: CosmosClient
def __init__(self, settings: BotFrameworkAdapterSettings,
adapter: BotFrameworkAdapter):
self.settings = settings
self.adapter = adapter
def add_web_app(self, app):
""" Add web app instance """
self.app = app
def add_cosmos_client(self, cosmos_client: CosmosClient):
""" Add cosmos client to the bot """
self.cosmos_client = cosmos_client
def add_adapter(self, adapter):
""" Add bot adapter instance """
self.adapter = adapter
@staticmethod
def get_mx(turn_context: TurnContext) -> Optional[MedX]:
""" Get Medx data """
if isinstance(turn_context.activity.value, dict):
mx = turn_context.activity.value.get("mx", {})
return MedX.get_schema(unknown=EXCLUDE).load(mx)
@staticmethod
def get_mx_type(turn_context: TurnContext) -> Optional[str]:
""" Get message MX type or None """
# "mx": {
# "type": "task/notification",
# "notificationId": notification_id
# }
if isinstance(turn_context.activity.value, dict):
mx = turn_context.activity.value.get("mx", {})
return mx.get("type", None)
@staticmethod
def get_mx_notification_id(turn_context: TurnContext) -> Optional[str]:
""" Get message MX notification ID or None """
if isinstance(turn_context.activity.value, dict):
mx = turn_context.activity.value.get("mx", {})
return mx.get("notificationId", None)
def send_notification(self, notification: NotificationCosmos) -> Future[str]:
""" Notify conversation that there's a message waiting in portal """
io_loop = asyncio.get_event_loop()
future = Future()
# reset parameters
notification.id = None
notification.tenant_id = AppConfig.TENANT_ID
async def routine():
""" async routine """
_saved_notification = await self.cosmos_client.create_notification(
notification
)
destination = notification.destination
reference = await self.get_conversation_reference(destination)
async def callback(turn_context: TurnContext) -> None:
""" Turn Context callback. Kinda awful syntax, I know """
# TODO(s1z): Add exception handler
# (like conversation not found etc...)
card = CardHelper.create_notification_card(notification)
attachments = [CardFactory.adaptive_card(card)]
message = Activity(type=ActivityTypes.message,
attachments=attachments)
await turn_context.send_activity(message)
future.set_result(notification.id)
await self.adapter.continue_conversation(reference, callback,
self.settings.app_id)
io_loop.create_task(routine())
return future
@staticmethod
def generate_url(url: str, channel_id: str) -> str:
""" Generate URL for the task module """
parsed_url = urlparse(url)
params = dict(parse_qsl(parsed_url.query))
params.update(dict(channelId=channel_id))
# noinspection PyProtectedMember
return parsed_url._replace(query=urlencode(params)).geturl()
# noinspection SqlNoDataSourceInspection,SqlDialectInspection
async def get_conversation_reference(self, conversation_id: Optional[str])\
-> ConversationReference:
""" Get conversation reference from DB """
if conversation_id is None:
raise ConversationNotFound("conversation_id is None")
container = await self.cosmos_client.get_conversations_container()
tenant_id = AppConfig.TENANT_ID
item = get_first_or_none(list(container.query_items(
query=f"SELECT * FROM c WHERE c.id=@id",
partition_key=tenant_id,
parameters=[{"name": "@id",
"value": conversation_id}],
)))
if item is None:
raise ConversationNotFound("conversation_id is not Found")
return ConversationReferenceSchema(unknown=EXCLUDE).load(item)
async def on_conversation_update_activity(self, turn_context: TurnContext):
""" On update conversation """
await self.cosmos_client.create_conversation_reference(turn_context)
async def handle_submit_action(self, turn_context: TurnContext) -> None:
""" Handle card submit action """
mx = self.get_mx(turn_context)
if mx.type == MXTypes.ACKNOWLEDGE:
try:
account = turn_context.activity.from_property
acks = await self.cosmos_client.get_acknowledge_items(
mx.notification_id
)
if len(acks) > 0:
return
await self.cosmos_client.create_acknowledge(mx.notification_id,
account)
notification = await self.cosmos_client.get_notification(
mx.notification_id
)
card = CardHelper.create_notification_card(
notification,
turn_context.activity.from_property.name
)
attachments = [CardFactory.adaptive_card(card)]
message = Activity(id=turn_context.activity.reply_to_id,
type=ActivityTypes.message,
attachments=attachments)
await turn_context.update_activity(message)
except ItemNotFound:
# todo(s1z): Handle unknown notification ID
pass
return
await turn_context.send_activity("Unknown request!")
async def on_message_activity(self, turn_context: TurnContext) -> None:
""" on message activity """
if turn_context.activity.value is not None:
return await self.handle_submit_action(turn_context)
# try to save conversation reference,
# who knows maybe we didn't get the on_conversation_update!
await self.cosmos_client.create_conversation_reference(turn_context)
card = CardHelper.load_assets_card("default_card")
attachments = [CardFactory.adaptive_card(card)]
message = Activity(type=ActivityTypes.message, attachments=attachments)
await turn_context.send_activity(message)
async def on_mx_task_unsupported(self, turn_context: TurnContext) \
-> TaskModuleResponse:
""" On unsupported request """
return await self.on_mx_task_default(turn_context)
async def on_mx_task_notification_url(self, turn_context: TurnContext,
notification_id: str) \
-> TaskModuleResponse:
""" On MX Task fetch Notification URL """
try:
notification = await self.cosmos_client.get_notification(
notification_id=notification_id
)
link = notification.url.link
if link is not None:
task_info = TaskModuleTaskInfo(title=TaskModuleConfig.TITLE,
width=TaskModuleConfig.WIDTH,
height=TaskModuleConfig.HEIGHT,
url=link,
fallback_url=link)
return TaskModuleResponse(
task=TaskModuleContinueResponse(value=task_info)
)
except ItemNotFound:
print(f"item '{notification_id}' not found")
return await self.on_mx_task_default(turn_context)
async def on_mx_task_default(self, turn_context: TurnContext) \
-> TaskModuleResponse:
""" On MX Task default handler """
url = self.generate_url(TaskModuleConfig.URL,
turn_context.activity.conversation.id)
task_info = TaskModuleTaskInfo(title=TaskModuleConfig.TITLE,
width=TaskModuleConfig.WIDTH,
height=TaskModuleConfig.HEIGHT,
url=url,
fallback_url=url)
return TaskModuleResponse(
task=TaskModuleContinueResponse(value=task_info)
)
async def on_teams_task_module_fetch(
self, turn_context: TurnContext,
task_module_request: TaskModuleRequest
) -> TaskModuleResponse:
""" On task module fetch.
Requested when user clicks on "msteams": {"type": "task/fetch"} """
# "mx": {
# "type": "task/notification",
# "notificationId": notification_id
# }
mx = MedX.get_schema(unknown=EXCLUDE).load(
task_module_request.data.get("mx", dict())
)
if mx.type == MXTypes.Task.NOTIFICATION and mx.notification_id:
# 1. save action to DB
# 2. return URL
initiator = turn_context.activity.from_property.name
await self.cosmos_client.create_initiation(initiator,
mx.notification_id)
return await self.on_mx_task_notification_url(turn_context,
mx.notification_id)
return await self.on_mx_task_default(turn_context)
""" Config """
import os
from azure.cosmos import PartitionKey
from utils.cosmos_client import CosmosClient
PROJECT_ROOT_PATH = os.path.dirname(os.path.abspath("__file__"))
CARDS_PATH = os.path.join(PROJECT_ROOT_PATH, "assets/cards")
class TaskModuleConfig:
""" Task Module config """
TITLE = os.environ.get("TASK_MODULE_TITLE",
"Example portal")
URL = os.environ.get("TASK_MODULE_URL",
"https://fake.s1z.info/show-channel.html")
WIDTH = "large"
HEIGHT = "large"
class AppConfig:
""" Bot Configuration """
PORT = 3978
APP_ID = os.environ.get("MS_APP_ID",
"d472f12a-323b-4058-b89b-7a4b15c48ab7")
APP_PASSWORD = os.environ.get("MS_APP_PASSWORD",
"ZdL|zw:]Io_}goFfWs2{w70g+.FXwQ")
TENANT_ID = os.environ.get("TENANT_ID",
"5df91ebc-64fa-4aa1-862c-bdc0cba3c656")
class CosmosDBConfig:
""" Cosmos Databases """
HOST = os.environ.get('ACCOUNT_HOST',
'https://nancycosomsdb.documents.azure.com:443/')
KEY = os.environ.get('ACCOUNT_KEY',
'fNVRCesO1NAb9MYZNK2rKdAPkY9J4O5ntR8CRuKu6wVGhndiaXch'
'Q6fKwrTTnTbv4tPM8S74YjZsfcX4uAHgiw==')
class Conversations:
""" Conversation DB """
DATABASE = "bot"
CONTAINER = "conversations"
PK = "id"
PARTITION_KEY = PartitionKey(path="/conversation/tenantId")
class Notifications:
""" Notifications DB """
DATABASE = "bot"
CONTAINER = "notifications"
PK = "id"
PARTITION_KEY = PartitionKey(path="/tenantId")
class Acknowledges:
""" Acknowledges"""
DATABASE = "bot"
CONTAINER = "acknowledges"
PK = "id"
PARTITION_KEY = PartitionKey(path="/notificationId")
class Initiations:
""" Initiations """
DATABASE = "bot"
CONTAINER = "initiations"
PK = "id"
PARTITION_KEY = PartitionKey(path="/notificationId")
COSMOS_CLIENT = CosmosClient(CosmosDBConfig.HOST, CosmosDBConfig.KEY)
""" Acknowledge object """
from dataclasses import dataclass, field
from typing import Optional
from entities.json.camel_case_mixin import CamelCaseMixin, uuid_factory
@dataclass
class Acknowledge(CamelCaseMixin):
""" Acknowledge """
id: str = field(default_factory=uuid_factory)
notification_id: Optional[str] = field(default=None)
username: Optional[str] = field(default=None)
user_aad_id: Optional[str] = field(default=None)
timestamp: Optional[int] = field(default=None)
""" Notification Schema Implementation """
from marshmallow import fields
from .camel_case_schema import CamelCaseSchema
class AcknowledgeSchema(CamelCaseSchema):
""" Notification Schema """
id = fields.String(required=True, allow_none=True) # database message id
notification_id = fields.String(required=True)
username = fields.String(required=True)
user_aad_id = fields.String(required=True)
timestamp = fields.Integer(required=True)
""" marshmallow-dataclass need a mixin to work with camel - snake cases """
import uuid
from datetime import datetime
import marshmallow_dataclass
from marshmallow import pre_load, post_dump
from stringcase import snakecase, camelcase
def uuid_factory() -> str:
""" Set unique UUID """
return uuid.uuid4().__str__()
def timestamp_factory() -> int:
""" Set current unix timestamp """
return int(datetime.utcnow().timestamp() * 1000)
class CamelCaseMixin:
""" Camel Case mixin """
@pre_load
def to_snake_case(self, data, **_kwargs):
""" to snake case pre load method """
return {snakecase(key): value for key, value in data.items()}
@post_dump
def to_camel_case(self, data, **_kwargs):
""" to camel case post load method """
return {camelcase(key): value for key, value in data.items()}
@classmethod
def get_schema(cls, *args, **kwargs):
""" Get schema """
return marshmallow_dataclass.class_schema(cls)(*args, **kwargs)
""" Camel Case schema implementation """
from marshmallow import Schema, fields
def camelcase(s):
""" Convert camel case to snake case"""
parts = iter(s.split("_"))
return next(parts) + "".join(i.title() for i in parts)
class CamelCaseSchema(Schema):
"""Schema that uses camel-case for its external representation
and snake-case for its internal representation.
"""
def on_bind_field(self, field_name, field_obj):
""" On bind field callback """
field_obj.data_key = camelcase(field_obj.data_key or field_name)
""" Conversation Reference Implementation """
from typing import Dict
from botbuilder.schema import ConversationReference, ConversationAccount, \
ChannelAccount
from marshmallow import fields, EXCLUDE, validate, post_load
from .camel_case_schema import CamelCaseSchema
class UserSchema(CamelCaseSchema):
""" User Schema """
id = fields.String(required=True)
name = fields.String(required=True)
# may be null if it's a bot
aad_object_id = fields.String(required=True, allow_none=True)
role = fields.String(required=True, allow_none=True)
class ConversationSchema(CamelCaseSchema):
""" Conversation Schema """
is_group = fields.Boolean(required=True, allow_none=True)
conversation_type = fields.String(required=True)
id = fields.String(required=True)
name = fields.String(required=True, allow_none=True)
aad_object_id = fields.String(required=True, allow_none=True)
role = fields.String(required=True, allow_none=True)
tenant_id = fields.String(required=True)
# Microsoft does not describe this object at all
properties = fields.Dict(required=False, allow_none=True)
class ConversationReferenceSchema(CamelCaseSchema):
""" Conversation Reference schema """
activity_id = fields.String()
user = fields.Nested(UserSchema, required=False, unknown=EXCLUDE)
bot = fields.Nested(UserSchema, unknown=EXCLUDE)
conversation = fields.Nested(ConversationSchema, unknown=EXCLUDE)
channel_id = fields.String(required=True)
locale = fields.String(required=True)
service_url = fields.String()
@post_load
def create_conversation_reference(self, data, **_kwargs):
""" Create Conversation Reference """
data.update(dict(
user=ChannelAccount(**data.pop("user")),
bot=ChannelAccount(**data.pop("bot")),
conversation=ConversationAccount(**data.pop("conversation"))
))
return ConversationReference(**data)
""" Initiation object """
from dataclasses import dataclass, field
from typing import Optional
from entities.json.camel_case_mixin import CamelCaseMixin
@dataclass
class Initiation(CamelCaseMixin):
""" Notification Dataclass """
initiator: str # User name
notification_id: str # Notification ID
timestamp: Optional[int] = field(default=None)
id: Optional[str] = field(default=None) # Unique Initiation ID
""" Notification object """
from dataclasses import dataclass, field
from typing import Optional
from entities.json.camel_case_mixin import CamelCaseMixin
class MXTypes:
""" MedX Types """
UNKNOWN = "UNKNOWN"
ACKNOWLEDGE = "acknowledge"
class Task:
""" Task types """
DEFAULT = "task/default"
NOTIFICATION = "task/notification"
@dataclass
class MedX(CamelCaseMixin):
""" MedX data """
type: str
notification_id: Optional[str]
""" Notification object """
from dataclasses import dataclass, field
from typing import Optional
import marshmallow.validate
from entities.json.camel_case_mixin import CamelCaseMixin, timestamp_factory
@dataclass
class NotificationUrl(CamelCaseMixin):
""" Notifiction URL """
title: Optional[str]
link: Optional[str] = field(metadata=dict(
validate=marshmallow.validate.URL()
))
@dataclass
class Notification(CamelCaseMixin):
""" Notification Dataclass """
message_id: Optional[str]
destination: str
subject: Optional[str] = field(default=None)
message: Optional[str] = field(default=None)
title: Optional[str] = field(default=None)
url: Optional[NotificationUrl] = field(default_factory=NotificationUrl)
acknowledge: Optional[bool] = field(default=False)
def to_db(self) -> "NotificationCosmos":
""" Create NotificationCosmos """
return NotificationCosmos(message_id=self.message_id,
destination=self.destination,
subject=self.subject,
message=self.message,
title=self.title,
url=self.url,
acknowledge=self.acknowledge)
# noinspection PyDataclass
@dataclass
class NotificationCosmos(Notification):
""" Notification Dataclass """
# We have to add these fields
id: Optional[str] = field(default=None)
tenant_id: Optional[str] = field(default=None)
timestamp: Optional[int] = field(default_factory=timestamp_factory)
""" Notification Schema Implementation """
from marshmallow import fields, EXCLUDE
from .camel_case_schema import CamelCaseSchema
class NotificationURLSchema(CamelCaseSchema):
""" Notification URL Schema """
title = fields.String()
link = fields.String()
# TODO(s1z): Migrate to marshmallow-dataclass
class NotificationSchema(CamelCaseSchema):
""" Notification Schema """
id = fields.String(required=True, allow_none=True) # database message id
tenant_id = fields.String(required=True, allow_none=True)
destination = fields.String(required=True)
message_id = fields.String() # teams message id
subject = fields.String()
message = fields.String()
title = fields.String()
url = fields.Nested(NotificationURLSchema, unknown=EXCLUDE)
acknowledge = fields.Boolean(default=False)
import os
from typing import Dict, Any, Optional, Mapping
from config import CARDS_PATH
from entities.json.notification import NotificationCosmos
from utils.json_func import json_loads
FILE = "__file__"
class CardHelper:
""" Card Helper """
@staticmethod
def load_assets_card(name: str) -> Mapping[str, Any]:
""" 123 """
filename = name + ".json" if name.find(".json") < 0 else name
filename_path = os.path.join(CARDS_PATH, filename)
with open(filename_path, "r") as f:
card_data = f.read()
return json_loads(card_data)
@staticmethod
def create_notification_card(notification: NotificationCosmos,
acknowledged_by: Optional[str] = None)\
-> Dict[str, Any]:
""" Create notification card """
title = notification.title or "You got a new notification!"
notification_id = notification.id or None
subject = notification.subject or None
message = notification.message or None
url = notification.url or None
acknowledge = notification.acknowledge
card_body = []
# ======================= TITLE =======================================
if title is not None:
card_body.append({"type": "TextBlock",
"size": "Medium",
"weight": "Bolder",
"text": title})
# ======================= SUBJ and MESSAGE ============================
if subject is not None:
card_body.append({"type": "FactSet",
"facts": [{"title": "Subject:",
"value": subject}]})
# ======================= MESSAGE =================================== #
if message is not None:
card_body.append({"type": "FactSet",
"facts": [{"title": "Message:",
"value": message}]})
# ======================= URL =========================================
if url is not None and notification_id is not None:
url_title = url.title or "Open Notification"
card_body.append({
"type": "ActionSet",
"actions": [{"type": "Action.Submit",
"title": url_title,
"data": {
"msteams": {
"type": "task/fetch"
},
"mx": {
"type": "task/notification",
"notificationId": notification_id
}
}}]
})
# ======================= URL =========================================
if acknowledge is not None and acknowledge and acknowledged_by is None:
card_body.append({
"type": "ActionSet",
"actions": [{"type": "Action.Submit",
"title": "Acknowledge",
"data": {
"mx": {
"type": "acknowledge",
"notificationId": notification_id
}
}}]
})
if acknowledged_by is not None:
card_body.append({"type": "FactSet",
"facts": [{"title": "Acknowledged:",
"value": acknowledged_by}]})
return {
"type": "AdaptiveCard",
"$schema": "http://adaptivecards.io/schemas/adaptive-card.json",
"version": "1.5",
"body": card_body
}
""" Cosmos Client implementation """
import asyncio
import uuid
from concurrent import futures
from datetime import datetime
from typing import Any, Dict, Optional, Union, Iterable, List
import azure.cosmos.cosmos_client as cosmos_client
import azure.cosmos.exceptions as exceptions
from azure.cosmos import DatabaseProxy, ContainerProxy
from botbuilder.core import TurnContext
from botbuilder.schema import ChannelAccount
from marshmallow import EXCLUDE
from entities.json.acknowledge import Acknowledge
from entities.json.acknowledge_schema import AcknowledgeSchema
from entities.json.camel_case_mixin import timestamp_factory
from entities.json.conversation_reference_schema import \
ConversationReferenceSchema
from entities.json.initiation import Initiation
from entities.json.notification import NotificationCosmos
class CosmosClientException(Exception):
""" Cosmos Client base exception """
def __init__(self, message: str):
self.message = message
class ItemExists(CosmosClientException):
""" Item already exists in the DB """
pass
class SaveItemError(CosmosClientException):
""" Save Item Error """
pass
class SaveConversationError(CosmosClientException):
""" Save Conversation Error """
pass
class ItemNotFound(CosmosClientException):
""" Item not found """
pass
class CosmosClient:
""" Cosmos Client class """
def __init__(self, host: str, master_key: str):
self.executor = futures.ThreadPoolExecutor()
self.client = cosmos_client.CosmosClient(host,
dict(masterKey=master_key))
async def execute_blocking(self, bl, *args):
""" Execute blocking code """
return await asyncio.get_event_loop().run_in_executor(self.executor,
bl,
*args)
async def get_db(self, database_id: str) -> DatabaseProxy:
""" Get or create DB """
def bl() -> DatabaseProxy:
""" Get Notifications container blocking """
try:
return self.client.create_database(id=database_id)
except exceptions.CosmosResourceExistsError:
return self.client.get_database_client(database_id)
return await self.execute_blocking(bl)
async def get_container(self, database_id: str, container_id: str,
partition_key: Any, **kwargs) -> ContainerProxy:
""" Get or create container """
db = await self.get_db(database_id)
def bl() -> ContainerProxy:
""" Get Notifications container blocking """
try:
return db.create_container(container_id, partition_key,
**kwargs)
except exceptions.CosmosResourceExistsError:
return db.get_container_client(container_id)
return await self.execute_blocking(bl)
async def get_initiation_items(self, notification_id) -> List[Initiation]:
""" Get Initiation Items """
container = await self.get_initiation_container()
def bl() -> List[Dict[str, Any]]:
""" Potential blocking code """
# noinspection SqlDialectInspection,SqlNoDataSourceInspection
items = []
for item in container.query_items(
query="SELECT * FROM r "
"WHERE r.notificationId=@notification_id "
"ORDER BY r._ts",
parameters=[
{"name": "@notification_id", "value": notification_id},
],
partition_key=notification_id
):
items.append(item)
return Initiation.get_schema(unknown=EXCLUDE).load(items,
many=True)
return await self.execute_blocking(bl)
async def get_acknowledge_items(self, notification_id)\
-> List[Acknowledge]:
""" Get Acknowledge Items """
container = await self.get_acknowledges_container()
def bl() -> List[Dict[str, Any]]:
""" Potential blocking code """
# noinspection SqlDialectInspection,SqlNoDataSourceInspection
items = []
for item in container.query_items(
query="SELECT * FROM r "
"WHERE r.notificationId=@notification_id "
"ORDER BY r._ts",
parameters=[
{"name": "@notification_id", "value": notification_id},
],
partition_key=notification_id
):
items.append(item)
return Acknowledge.get_schema(unknown=EXCLUDE).load(items,
many=True)
return await self.execute_blocking(bl)
async def query_items(self, partition_key, **kwargs):
""" Query items """
# TODO(s1z): IMPL ME
async def get_item(self,
container: ContainerProxy,
item: Union[str, Dict[str, Any]],
partition_key: Any,
populate_query_metrics: Optional[bool] = None,
post_trigger_include: Optional[str] = None,
**kwargs: Any) -> Dict[str, str]:
""" Get Item """
def bl() -> Dict[str, str]:
""" Potential blocking code """
try:
return container.read_item(
item=item,
partition_key=partition_key,
populate_query_metrics=populate_query_metrics,
post_trigger_include=post_trigger_include,
**kwargs)
except exceptions.CosmosHttpResponseError as e:
# raise
raise ItemNotFound(e.http_error_message)
return await self.execute_blocking(bl)
async def create_item(self, container: ContainerProxy,
body: Dict[str, Any],
populate_query_metrics: Optional[bool] = None,
pre_trigger_include: Optional[str] = None,
post_trigger_include: Optional[str] = None,
indexing_directive: Optional[Any] = None,
**kwargs: Any) -> Dict[str, str]:
""" Create an item in DB """
def bl() -> Dict[str, str]:
""" Potential blocking code """
return container.create_item(
body=body,
populate_query_metrics=populate_query_metrics,
pre_trigger_include=pre_trigger_include,
post_trigger_include=post_trigger_include,
indexing_directive=indexing_directive,
**kwargs
)
tries = 0
max_tries = max(kwargs.pop("max_tries", 3), 1)
while tries < max_tries:
body.update(dict(id=uuid.uuid4().__str__()))
try:
return await self.execute_blocking(bl)
except exceptions.CosmosHttpResponseError as e:
tries += 1
if e.status_code == 409: # Already exists
# print(f"Item with {item_id} already exists")
if tries == max_tries:
raise ItemExists(e.http_error_message)
continue
raise SaveItemError(e.http_error_message)
raise SaveItemError("We've reached max_tries values!")
async def get_conversations_container(self) -> ContainerProxy:
""" Get Conversation container """
from config import CosmosDBConfig
return await self.get_container(
CosmosDBConfig.Conversations.DATABASE,
CosmosDBConfig.Conversations.CONTAINER,
CosmosDBConfig.Conversations.PARTITION_KEY
)
async def create_notification(self, notification: NotificationCosmos)\
-> NotificationCosmos:
""" Crete notification to the DB """
notification.id = uuid.uuid4().__str__()
schema = NotificationCosmos.get_schema(unknown=EXCLUDE)
container = await self.get_notifications_container()
saved_item = container.create_item(body=schema.dump(notification))
return schema.load(saved_item)
async def get_acknowledges_container(self) -> ContainerProxy:
""" get_acknowledges_container """
from config import CosmosDBConfig
return await self.get_container(
CosmosDBConfig.Acknowledges.DATABASE,
CosmosDBConfig.Acknowledges.CONTAINER,
CosmosDBConfig.Acknowledges.PARTITION_KEY
)
async def get_notifications_container(self) -> ContainerProxy:
""" Get Notifications container """
from config import CosmosDBConfig
return await self.get_container(
CosmosDBConfig.Notifications.DATABASE,
CosmosDBConfig.Notifications.CONTAINER,
CosmosDBConfig.Notifications.PARTITION_KEY
)
async def get_messages_container(self) -> ContainerProxy:
""" Get Messages container """
from config import CosmosDBConfig
return await self.get_container(
CosmosDBConfig.Notifications.DATABASE,
CosmosDBConfig.Notifications.CONTAINER,
CosmosDBConfig.Notifications.PARTITION_KEY
)
async def get_initiation_container(self) -> ContainerProxy:
""" Get Initiation container """
from config import CosmosDBConfig
return await self.get_container(
CosmosDBConfig.Initiations.DATABASE,
CosmosDBConfig.Initiations.CONTAINER,
CosmosDBConfig.Initiations.PARTITION_KEY
)
async def create_acknowledge(self, notification_id: str,
account: ChannelAccount) -> Dict[str, Any]:
""" Add acknowledge to the DB """
container = await self.get_acknowledges_container()
notification = AcknowledgeSchema().dump(dict(
notification_id=notification_id,
username=account.name,
user_aad_id=account.aad_object_id,
timestamp=timestamp_factory()
))
return await self.create_item(container, notification)
async def get_acknowledge(self, notification_id: str)\
-> Optional[Acknowledge]:
""" Get Acknowledge object """
try:
container = await self.get_acknowledges_container()
items = await self.query_items(container, notification_id)
return Acknowledge.get_schema(unknown=EXCLUDE).load(items)
except ItemNotFound:
return None
async def get_notification(self, notification_id: str)\
-> NotificationCosmos:
""" Get Notification """
from config import AppConfig
container = await self.get_notifications_container()
item = await self.get_item(container, notification_id,
AppConfig.TENANT_ID)
return NotificationCosmos.get_schema(unknown=EXCLUDE).load(item)
async def create_conversation_reference(self, turn_context: TurnContext)\
-> None:
""" Save Conversation Regerence """
from config import CosmosDBConfig
activity = turn_context.activity
reference = TurnContext.get_conversation_reference(activity)
reference_dict = ConversationReferenceSchema().dump(reference)
container = await self.get_conversations_container()
reference_dict.update({
CosmosDBConfig.Conversations.PK: reference.conversation.id
})
try:
await self.create_item(container, body=reference_dict, max_tries=1)
except ItemExists:
pass
async def create_initiation(self, initiator: str,
notification_id: str) -> None:
""" Save initiation """
container = await self.get_initiation_container()
timestamp = timestamp_factory()
initiation = Initiation(initiator=initiator,
timestamp=timestamp,
notification_id=notification_id)
data = Initiation.get_schema().dump(initiation)
await self.create_item(container, body=data)
""" Handy Functions """
from typing import List, Optional, Dict
def get_first_or_none(items: List) -> Optional[Dict[str, any]]:
""" Get first object from list or return None len < 1 """
if len(items) > 0:
return items[0]
return None
""" JSON helper module """
import sys
from typing import Any, Union, Optional, Mapping, Iterable
import simplejson as json
import simplejson.scanner as json_scanner
from utils.log import Log
TAG = __name__
def json_loads(data: Union[str, bytes], default: Optional[Any] = None) -> \
Union[Mapping[str, Any], Iterable[Mapping[str, Any]]]:
""" Json.loads wrapper, tries to load data and prints errors if any """
try:
j_data = json.loads(data, strict=False)
if isinstance(j_data, dict) or isinstance(j_data, list):
return j_data
except TypeError:
Log.e("json_loads", "TypeError:", sys.exc_info())
except json_scanner.JSONDecodeError:
Log.e("json_loads", "JSONDecodeError:", sys.exc_info())
return default
def json_dumps(*args: Any, **kwargs: Mapping[str, Any]) -> str:
""" Json.dumps wrapper, tries to dump data and prints errors if any """
try:
return json.dumps(*args, **kwargs)
except Exception:
Log.e("json_loads", "error:", exc_info=sys.exc_info())
raise
""" Log helper module """
import logging
import traceback
class Log:
""" Logger. This is a pretty useful 'Java style' logger """
@staticmethod
def log(level, source, message="", exc_info=None):
""" log method """
logger = logging.getLogger()
line = "{source}{message}{exc}"
exc = ''
if isinstance(exc_info, (list, tuple)):
ex_type, ex_value, ex_traceback = exc_info
exc = ": " + ''.join(
traceback.format_exception(ex_type, ex_value, ex_traceback)
)
message = "::{}".format(message) if message else ""
logger.log(level, line.format(source=source, message=message, exc=exc))
@staticmethod
def w(source, message="", exc_info=None):
""" warning level """
return Log.log(logging.WARN, source, message, exc_info)
@staticmethod
def d(source, message="", exc_info=None):
""" debug level """
return Log.log(logging.DEBUG, source, message, exc_info)
@staticmethod
def i(source, message="", exc_info=None):
""" info level """
return Log.log(logging.INFO, source, message, exc_info)
@staticmethod
def e(source, message="error", exc_info=None):
""" error level """
return Log.log(logging.ERROR, source, message, exc_info)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment