Commit a2f91220 by Oleksandr Barabash

flows added

parent 4a9a5f1f
......@@ -307,6 +307,11 @@ async def init_db_containers():
CosmosDBConfig.Initiations.CONTAINER,
CosmosDBConfig.Initiations.PARTITION_KEY
)
await COSMOS_CLIENT.create_container(
CosmosDBConfig.Flows.DATABASE,
CosmosDBConfig.Flows.CONTAINER,
CosmosDBConfig.Flows.PARTITION_KEY
)
async def app_factory(bot):
......
""" Message extension bot """
import asyncio
import sys
import time
import uuid
from asyncio import Future
......@@ -238,6 +239,7 @@ class TeamsMessagingExtensionsActionPreviewBot(TeamsActivityHandler):
await turn_context.send_activity(i18n.t("unknown_request"))
async def on_message_activity(self, turn_context: TurnContext) -> None:
""" Fired when message is received """
i18n = get_i18n(turn_context)
# check tenant
......@@ -246,12 +248,9 @@ class TeamsMessagingExtensionsActionPreviewBot(TeamsActivityHandler):
return
# save conversation reference
start = time.time()
reference = await self.cosmos_client.create_conversation_reference(
turn_context
)
Log.d(TAG, "on_message_activity::create_conversation_reference::took:"
f"{time.time() - start}")
if turn_context.activity.value is not None:
return await self.handle_submit_action(turn_context)
......@@ -295,64 +294,95 @@ class TeamsMessagingExtensionsActionPreviewBot(TeamsActivityHandler):
"""
# send request to PA
async with aiohttp.ClientSession() as session:
# TODO(s1z): string bot's @mention if needed.
message = turn_context.activity.text.strip().lower()
data = dict(authorizartion=dict(token=reference),
reference=reference,
message=message)
async with session.post(AppConfig.PA_URL, json=data) as resp:
Log.e(TAG, f"on_message_activity::response.status:"
f"{resp.status}")
rest_text = await resp.text()
Log.e(TAG, f"on_message_activity::response.text: {rest_text}")
# # send request to PA
# async with aiohttp.ClientSession() as session:
# # TODO(s1z): string bot's @mention if needed.
# message = turn_context.activity.text.strip().lower()
# data = dict(authorizartion=dict(token=reference),
# reference=reference,
# message=message)
# async with session.post(AppConfig.PA_URL, json=data) as resp:
# Log.e(TAG, f"on_message_activity::response.status:"
# f"{resp.status}")
# rest_text = await resp.text()
# Log.e(TAG, f"on_message_activity::response.text: {rest_text}")
# return
i18n = get_i18n(turn_context)
if turn_context.activity.conversation.tenant_id != AppConfig.TENANT_ID:
await turn_context.send_activity(i18n.t("tenant_forbidden"))
return
message = turn_context.activity.text.strip().lower()
cmd_help = i18n.t("cmd_help")
cmd_portal = i18n.t("cmd_portal")
if message == cmd_help.lower():
tenant_id = turn_context.activity.conversation.tenant_id
conversation_id = turn_context.activity.conversation.id
response = await turn_context.send_activity(
i18n.t("response_help",
cmd_portal=cmd_portal,
cmd_help=cmd_help,
tenant_id=tenant_id,
conversation_id=conversation_id)
)
Log.d(TAG, "on_message_activity::help_resp: {}".format(response))
return
if message == cmd_portal.lower():
card = CardHelper.load_assets_card("default_card")
attachments = [CardFactory.adaptive_card(card)]
message = Activity(type=ActivityTypes.message,
attachments=attachments)
await turn_context.send_activity(message)
return
# TODO(s1z): Remove me when it's prod
if message.find("flow") == 0:
params = message.split(' ')
if len(params) != 3:
response = await turn_context.send_activity(
"Incorrect syntax. Please the syntax below:<br/>"
" flow 'cmd' 'url'<br/><br/>"
"Where:<br/>"
"'cmd' - command you want to assing,<br/>"
"'url' - link of the flow you want it to be handled with."
)
Log.d(TAG,
"on_message_activity::help_resp: {}".format(response))
return
_, cmd, url = params
# noinspection PyBroadException
try:
_ = await self.cosmos_client.create_flow(cmd, url)
await turn_context.send_activity("Flow cmd saved")
return
except Exception:
Log.e(TAG, f"on_message_activity::create_flow:error",
sys.exc_info())
await turn_context.send_activity("Error saving flow cmd")
return
# async def on_message_activity(self, turn_context: TurnContext) -> None:
# """ on message activity """
#
# i18n = get_i18n(turn_context)
#
# if turn_context.activity.conversation.tenant_id != AppConfig.TENANT_ID:
# await turn_context.send_activity(i18n.t("tenant_forbidden"))
# return
#
# # try to save conversation reference,
# # who knows maybe we didn't get the on_conversation_update!
# await self.cosmos_client.create_conversation_reference(turn_context)
#
# if turn_context.activity.value is not None:
# return await self.handle_submit_action(turn_context)
#
# message = turn_context.activity.text.strip().lower()
#
# cmd_help = i18n.t("cmd_help")
# cmd_portal = i18n.t("cmd_portal")
#
# if message == cmd_help.lower():
# tenant_id = turn_context.activity.conversation.tenant_id
# conversation_id = turn_context.activity.conversation.id
# response = await turn_context.send_activity(
# i18n.t("response_help",
# cmd_portal=cmd_portal,
# cmd_help=cmd_help,
# tenant_id=tenant_id,
# conversation_id=conversation_id)
# )
# Log.d(TAG, "on_message_activity::help_resp: {}".format(response))
# return
#
# if message == cmd_portal.lower():
# card = CardHelper.load_assets_card("default_card")
# attachments = [CardFactory.adaptive_card(card)]
# message = Activity(type=ActivityTypes.message,
# attachments=attachments)
# await turn_context.send_activity(message)
# return
#
# await turn_context.send_activity(i18n.t("response_unknown_cmd",
# cmd_help=cmd_help))
# try get flow link
# noinspection PyBroadException
try:
flow = await self.cosmos_client.get_flow(message)
async with aiohttp.ClientSession() as session:
# TODO(s1z): string bot's @mention if needed.
data = dict(reference=reference, message=message)
async with session.post(flow.url, json=data) as resp:
Log.e(TAG, f"on_message_activity::response.status:"
f"{resp.status}")
rest_text = await resp.text()
Log.e(TAG, f"on_message_activity::response.text: {rest_text}")
return
except Exception:
Log.e(TAG, f"on_message_activity::get_flow:error", sys.exc_info())
await turn_context.send_activity(i18n.t("response_unknown_cmd",
cmd_help=cmd_help))
async def on_mx_task_unsupported(self, turn_context: TurnContext) \
-> TaskModuleResponse:
......
......@@ -114,6 +114,13 @@ class CosmosDBConfig:
PK = "id"
PARTITION_KEY = PartitionKey(path="/notificationId")
class Flows:
""" Flows """
DATABASE = "bot"
CONTAINER = "flows"
PK = "id"
PARTITION_KEY = PartitionKey(path="/tenantId")
COSMOS_CLIENT = CosmosClient(CosmosDBConfig.HOST, CosmosDBConfig.KEY)
KEY_VAULT_CLIENT = AzureKeyVaultClient(AppConfig.CLIENT_ID,
......
""" Flow object """
from dataclasses import dataclass, field
from typing import Optional
from entities.json.camel_case_mixin import CamelCaseMixin
@dataclass
class Flow(CamelCaseMixin):
""" Notification Dataclass """
tenant_id: str
cmd: str
url: str
......@@ -17,6 +17,7 @@ from entities.json.acknowledge import Acknowledge
from entities.json.acknowledge_schema import AcknowledgeSchema
from entities.json.camel_case_mixin import timestamp_factory
from entities.json.conversation_reference import ConversationReference
from entities.json.flow import Flow
from entities.json.initiation import Initiation
from entities.json.notification import NotificationCosmos
from utils.functions import get_first_or_none
......@@ -305,6 +306,16 @@ class CosmosClient:
CosmosDBConfig.Initiations.PARTITION_KEY
)
async def get_flow_container(self) -> ContainerProxy:
""" Get Flow container """
from config import CosmosDBConfig
return await self.get_container(
CosmosDBConfig.Flows.DATABASE,
CosmosDBConfig.Flows.CONTAINER,
CosmosDBConfig.Flows.PARTITION_KEY
)
async def create_acknowledge(self, notification_id: str,
account: ChannelAccount) -> Dict[str, Any]:
""" Add acknowledge to the DB """
......@@ -384,3 +395,21 @@ class CosmosClient:
notification_id=notification_id)
data = Initiation.get_schema().dump(initiation)
await self.create_item(container, body=data)
async def create_flow(self, cmd, url, tenant_id=None):
""" Create Flow """
from config import AppConfig
container = await self.get_flow_container()
flow = Flow(tenant_id=tenant_id or AppConfig.TENANT_ID, cmd=cmd,
url=url)
data = Flow.get_schema().dump(flow)
return await self.create_item(container, body=data)
async def get_flow(self, cmd, tenant_id=None) -> Flow:
""" Get Flow """
from config import AppConfig
container = await self.get_flow_container()
item = await self.get_item(container, cmd,
tenant_id or AppConfig.TENANT_ID)
return Flow.get_schema(unknown=EXCLUDE).load(item)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment