__init__.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. # Copyright Materialize, Inc. and contributors. All rights reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License in the LICENSE file at the
  6. # root of this repository, or online at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. """
  16. Materialize MCP Server
  17. A server that exposes Materialize indexes as "tools" over the Model Context
  18. Protocol (MCP). Each Materialize index that the connected role is allowed to
  19. `SELECT` from (and whose cluster it can `USAGE`) is surfaced as a tool whose
  20. inputs correspond to the indexed columns and whose output is the remaining
  21. columns of the underlying view.
  22. The server supports two transports:
  23. * stdio – lines of JSON over stdin/stdout (handy for local CLIs)
  24. * sse – server‑sent events suitable for web browsers
  25. ---------------
  26. 1. ``list_tools`` executes a catalog query to derive the list of exposable
  27. indexes; the result is translated into MCP ``Tool`` objects.
  28. 2. ``call_tool`` validates the requested tool, switches the session to the
  29. appropriate cluster, executes a parameterised ``SELECT`` against the
  30. indexed view, and returns the first matching row (minus any columns whose
  31. values were supplied as inputs).
  32. """
  33. import asyncio
  34. import logging
  35. from collections.abc import AsyncIterator
  36. from contextlib import asynccontextmanager
  37. from typing import Any
  38. from mcp.server import NotificationOptions, Server
  39. from mcp.types import Tool
  40. from psycopg.rows import dict_row
  41. from psycopg_pool import AsyncConnectionPool
  42. from mcp_materialize.transports import http_transport, sse_transport, stdio_transport
  43. from .config import Config, load_config
  44. from .mz_client import MzClient
  45. logger = logging.getLogger("mz_mcp_server")
  46. logging.basicConfig(
  47. level=logging.DEBUG,
  48. format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
  49. )
  50. @asynccontextmanager
  51. async def create_client(cfg: Config) -> AsyncIterator[MzClient]:
  52. logger.info(
  53. "Initializing connection pool with min_size=%s, max_size=%s",
  54. cfg.pool_min_size,
  55. cfg.pool_max_size,
  56. )
  57. async def configure(conn):
  58. await conn.set_autocommit(True)
  59. logger.debug("Configured new database connection")
  60. try:
  61. async with AsyncConnectionPool(
  62. conninfo=cfg.dsn,
  63. min_size=cfg.pool_min_size,
  64. max_size=cfg.pool_max_size,
  65. kwargs={"application_name": "mcp_materialize"},
  66. configure=configure,
  67. ) as pool:
  68. try:
  69. logger.debug("Testing database connection...")
  70. async with pool.connection() as conn:
  71. await conn.set_autocommit(True)
  72. async with conn.cursor(row_factory=dict_row) as cur:
  73. await cur.execute(
  74. "SELECT"
  75. " mz_environment_id() AS env,"
  76. " current_role AS role;"
  77. )
  78. meta = await cur.fetchone()
  79. logger.info(
  80. "Connected to Materialize environment %s as user %s",
  81. meta["env"],
  82. meta["role"],
  83. )
  84. logger.debug("Connection pool initialized successfully")
  85. async with MzClient(pool=pool) as client:
  86. yield client
  87. except Exception as e:
  88. logger.error(f"Failed to initialize connection pool: {str(e)}")
  89. raise
  90. finally:
  91. logger.info("Closing connection pool...")
  92. await pool.close()
  93. except Exception as e:
  94. logger.error(f"Failed to create connection pool: {str(e)}")
  95. raise
  96. async def run():
  97. cfg = load_config()
  98. async with create_client(cfg) as mz:
  99. @asynccontextmanager
  100. async def lifespan(_server):
  101. yield mz
  102. server = Server("mcp_materialize", lifespan=lifespan)
  103. @server.list_tools()
  104. async def list_tools() -> list[Tool]:
  105. logger.debug("Listing available tools...")
  106. tools = await server.request_context.lifespan_context.list_tools()
  107. return tools
  108. @server.call_tool()
  109. async def call_tool(name: str, arguments: dict[str, Any]):
  110. logger.debug(f"Calling tool '{name}' with arguments: {arguments}")
  111. try:
  112. result = await server.request_context.lifespan_context.call_tool(
  113. name, arguments
  114. )
  115. logger.debug(f"Tool '{name}' executed successfully")
  116. return result
  117. except Exception as e:
  118. logger.error(f"Error executing tool '{name}': {str(e)}")
  119. await server.request_context.session.send_tool_list_changed()
  120. raise
  121. options = server.create_initialization_options(
  122. notification_options=NotificationOptions(tools_changed=True)
  123. )
  124. match cfg.transport:
  125. case "stdio":
  126. logger.info("Starting server in stdio mode...")
  127. await stdio_transport(server, options)
  128. case "http":
  129. logger.info("Starting server in HTTP mode...")
  130. await http_transport(server, cfg)
  131. case "sse":
  132. logger.info(f"Starting SSE server on {cfg.host}:{cfg.port}...")
  133. await sse_transport(server, options, cfg)
  134. case t:
  135. raise ValueError(f"Unknown transport: {t}")
  136. def main():
  137. """Synchronous wrapper for the async main function."""
  138. try:
  139. logger.info("Starting Materialize MCP Server...")
  140. asyncio.run(run())
  141. except KeyboardInterrupt:
  142. logger.info("Shutting down …")
  143. if __name__ == "__main__":
  144. main()