# SPDX-License-Identifier: AGPL-3.0-or-later
"""Engine for Azure resources. This engine mimics the standard search bar in Azure
Portal (for resources and resource groups).
Configuration
=============
You must `register an application in Microsoft Entra ID`_ and assign it the
'Reader' role in your subscription.
To use this engine, add an entry similar to the following to your engine list in
``settings.yml``:
.. code:: yaml
- name: azure
engine: azure
...
azure_tenant_id: "your_tenant_id"
azure_client_id: "your_client_id"
azure_client_secret: "your_client_secret"
azure_token_expiration_seconds: 5000
.. _register an application in Microsoft Entra ID:
https://learn.microsoft.com/en-us/entra/identity-platform/quickstart-register-app
"""
import typing as t
from searx.enginelib import EngineCache
from searx.network import post as http_post
from searx.result_types import EngineResults
if t.TYPE_CHECKING:
from searx.extended_types import SXNG_Response
from searx.search.processors import OnlineParams
engine_type = "online"
categories = ["it", "cloud"]
# Default values, should be overridden in settings.yml
azure_tenant_id = ""
azure_client_id = ""
azure_client_secret = ""
azure_token_expiration_seconds = 5000
"""Time for which an auth token is valid (sec.)"""
azure_batch_endpoint = "https://management.azure.com/batch?api-version=2020-06-01"
about = {
"website": "https://www.portal.azure.com",
"wikidata_id": "Q725967",
"official_api_documentation": "https://learn.microsoft.com/en-us/\
rest/api/azure-resourcegraph/?view=rest-azureresourcegraph-resourcegraph-2024-04-01",
"use_official_api": True,
"require_api_key": True,
"results": "JSON",
"language": "en",
}
CACHE: EngineCache
"""Persistent (SQLite) key/value cache that deletes its values after ``expire``
seconds."""
[docs]
def setup(engine_settings: dict[str, t.Any]) -> bool:
"""Initialization of the engine.
- Instantiate a cache for this engine (:py:obj:`CACHE`).
- Checks whether the tenant_id, client_id and client_secret are set,
otherwise the engine is inactive.
"""
global CACHE # pylint: disable=global-statement
CACHE = EngineCache(engine_settings["name"])
missing_opts: list[str] = []
for opt in ("azure_tenant_id", "azure_client_id", "azure_client_secret"):
if not engine_settings.get(opt, ""):
missing_opts.append(opt)
if missing_opts:
logger.error("missing values for options: %s", ", ".join(missing_opts))
return False
return True
[docs]
def authenticate(t_id: str, c_id: str, c_secret: str) -> str:
"""Authenticates to Azure using Oauth2 Client Credentials Flow and returns
an access token."""
url = f"https://login.microsoftonline.com/{t_id}/oauth2/v2.0/token"
body = {
"client_id": c_id,
"client_secret": c_secret,
"grant_type": "client_credentials",
"scope": "https://management.azure.com/.default",
}
resp: SXNG_Response = http_post(url, body)
if resp.status_code != 200:
raise RuntimeError(f"Azure authentication failed (status {resp.status_code}): {resp.text}")
return resp.json()["access_token"]
def get_auth_token(t_id: str, c_id: str, c_secret: str) -> str:
key = f"azure_tenant_id: {t_id:}, azure_client_id: {c_id}, azure_client_secret: {c_secret}"
token: str | None = CACHE.get(key)
if token:
return token
token = authenticate(t_id, c_id, c_secret)
CACHE.set(key=key, value=token, expire=azure_token_expiration_seconds)
return token
def request(query: str, params: "OnlineParams") -> None:
token = get_auth_token(azure_tenant_id, azure_client_id, azure_client_secret)
params["url"] = azure_batch_endpoint
params["method"] = "POST"
params["headers"]["Authorization"] = f"Bearer {token}"
params["headers"]["Content-Type"] = "application/json"
params["json"] = {
"requests": [
{
"url": "/providers/Microsoft.ResourceGraph/resources?api-version=2024-04-01",
"httpMethod": "POST",
"name": "resourceGroups",
"requestHeaderDetails": {"commandName": "Microsoft.ResourceGraph"},
"content": {
"query": (
f"ResourceContainers"
f" | where (name contains ('{query}'))"
f" | where (type =~ ('Microsoft.Resources/subscriptions/resourcegroups'))"
f" | project id,name,type,kind,subscriptionId,resourceGroup"
f" | extend matchscore = name startswith '{query}'"
f" | extend normalizedName = tolower(tostring(name))"
f" | sort by matchscore desc, normalizedName asc"
f" | take 30"
)
},
},
{
"url": "/providers/Microsoft.ResourceGraph/resources?api-version=2024-04-01",
"httpMethod": "POST",
"name": "resources",
"requestHeaderDetails": {
"commandName": "Microsoft.ResourceGraph",
},
"content": {
"query": f"Resources | where name contains '{query}' | take 30",
},
},
]
}
def response(resp: "SXNG_Response") -> EngineResults:
res = EngineResults()
json_data = resp.json()
for result in json_data["responses"]:
if result["name"] == "resourceGroups":
for data in result["content"]["data"]:
res.add(
res.types.MainResult(
url=(
f"https://portal.azure.com/#@/resource"
f"/subscriptions/{data['subscriptionId']}/resourceGroups/{data['name']}/overview"
),
title=data["name"],
content=f"Resource Group in Subscription: {data['subscriptionId']}",
)
)
elif result["name"] == "resources":
for data in result["content"]["data"]:
res.add(
res.types.MainResult(
url=(
f"https://portal.azure.com/#@/resource"
f"/subscriptions/{data['subscriptionId']}/resourceGroups/{data['resourceGroup']}"
f"/providers/{data['type']}/{data['name']}/overview"
),
title=data["name"],
content=(
f"Resource of type {data['type']} in Subscription:"
f" {data['subscriptionId']}, Resource Group: {data['resourceGroup']}"
),
)
)
return res