Build a Telegram Bot Using Google Gemini API
Build a Telegram bot using Google Gemini API. This code walkthrough covers configuration, functions, and Docker deployment for seamless conversational AI integration.
1. Main Code
1.1 Import necessary libraries and modules
import argparse
import traceback
import asyncio
import google.generativeai as genai
import re
import telebot
from telebot.async_telebot import AsyncTeleBot
from pathlib import Path
from telebot import TeleBot
from telebot.types import BotCommand, Message
1.2 Configuration for text generation
generation_config = {
"temperature": 0.9,
"top_p": 1,
"top_k": 1,
"max_output_tokens": 8192,
}
safety_settings = []
1.3 Function to find all indices of a pattern in a given string
def find_all_index(str, pattern):
index_list = [0]
for match in re.finditer(pattern, str, re.MULTILINE):
if match.group(1) != None:
start = match.start(1)
end = match.end(1)
index_list += [start, end]
index_list.append(len(str))
return index_list
1.4 Function to replace all occurrences of a pattern with a given function's result
def replace_all(text, pattern, function):
poslist = [0]
strlist = []
originstr = []
poslist = find_all_index(text, pattern)
for i in range(1, len(poslist[:-1]), 2):
start, end = poslist[i : i + 2]
strlist.append(function(text[start:end]))
for i in range(0, len(poslist), 2):
j, k = poslist[i : i + 2]
originstr.append(text[j:k])
if len(strlist) < len(originstr):
strlist.append("")
else:
originstr.append("")
new_list = [item for pair in zip(originstr, strlist) for item in pair]
return "".join(new_list)
1.4 Various functions to escape specific characters in the text
def escapeshape(text):
return "▎*" + text.split()[1] + "*"
def escapeminus(text):
return "\\" + text
def escapebackquote(text):
return r"\`\`"
def escapeplus(text):
return "\\" + text
1.5 Function to apply multiple escaping rules to the input text
def escape(text, flag=0):
# In all other places characters
# _ * [ ] ( ) ~ ` > # + - = | { } . !
# must be escaped with the preceding character '\'.
text = re.sub(r"\\\[", "@->@", text)
text = re.sub(r"\\\]", "@<-@", text)
text = re.sub(r"\\\(", "@-->@", text)
text = re.sub(r"\\\)", "@<--@", text)
if flag:
text = re.sub(r"\\\\", "@@@", text)
text = re.sub(r"\\", r"\\\\", text)
if flag:
text = re.sub(r"\@{3}", r"\\\\", text)
text = re.sub(r"_", "\_", text)
text = re.sub(r"\*{2}(.*?)\*{2}", "@@@\\1@@@", text)
text = re.sub(r"\n{1,2}\*\s", "\n\n• ", text)
text = re.sub(r"\*", "\*", text)
text = re.sub(r"\@{3}(.*?)\@{3}", "*\\1*", text)
text = re.sub(r"\!?\[(.*?)\]\((.*?)\)", "@@@\\1@@@^^^\\2^^^", text)
text = re.sub(r"\[", "\[", text)
text = re.sub(r"\]", "\]", text)
text = re.sub(r"\(", "\(", text)
text = re.sub(r"\)", "\)", text)
text = re.sub(r"\@\-\>\@", "\[", text)
text = re.sub(r"\@\<\-\@", "\]", text)
text = re.sub(r"\@\-\-\>\@", "\(", text)
text = re.sub(r"\@\<\-\-\@", "\)", text)
text = re.sub(r"\@{3}(.*?)\@{3}\^{3}(.*?)\^{3}", "[\\1](\\2)", text)
text = re.sub(r"~", "\~", text)
text = re.sub(r">", "\>", text)
text = replace_all(text, r"(^#+\s.+?$)|```[\D\d\s]+?```", escapeshape)
text = re.sub(r"#", "\#", text)
text = replace_all(
text, r"(\+)|\n[\s]*-\s|```[\D\d\s]+?```|`[\D\d\s]*?`", escapeplus
)
text = re.sub(r"\n{1,2}(\s*)-\s", "\n\n\\1• ", text)
text = re.sub(r"\n{1,2}(\s*\d{1,2}\.\s)", "\n\n\\1", text)
text = replace_all(
text, r"(-)|\n[\s]*-\s|```[\D\d\s]+?```|`[\D\d\s]*?`", escapeminus
)
text = re.sub(r"```([\D\d\s]+?)```", "@@@\\1@@@", text)
text = replace_all(text, r"(``)", escapebackquote)
text = re.sub(r"\@{3}([\D\d\s]+?)\@{3}", "```\\1```", text)
text = re.sub(r"=", "\=", text)
text = re.sub(r"\|", "\|", text)
text = re.sub(r"{", "\{", text)
text = re.sub(r"}", "\}", text)
text = re.sub(r"\.", "\.", text)
text = re.sub(r"!", "\!", text)
return text
1.6 Asynchronous function to create a new Gemini conversation using the generative model
async def make_new_gemini_convo():
model = genai.GenerativeModel(
model_name="gemini-pro",
generation_config=generation_config,
safety_settings=safety_settings,
)
convo = model.start_chat()
return convo
1.7 Asynchronous main function to initialize and run the Telegram bot
async def main():
# Argument parsing
parser = argparse.ArgumentParser()
parser.add_argument("tg_token", help="telegram token")
parser.add_argument("GOOGLE_GEMINI_KEY", help="Google Gemini API key")
options = parser.parse_args()
print("Arg parse done.")
# Dictionary to store ongoing Gemini conversations for each user
gemini_player_dict = {}
# Configure the Google Gemini API with the provided key
genai.configure(api_key=options.GOOGLE_GEMINI_KEY)
# Initialize the Telegram bot
bot = AsyncTeleBot(options.tg_token)
await bot.delete_my_commands(scope=None, language_code=None)
await bot.set_my_commands(
commands=[
telebot.types.BotCommand("gemini", "call bot in a group"),
telebot.types.BotCommand("clear", "delete history")
],
)
print("Bot init done.")
# Define command handlers for the bot
# Init commands
# Handler for the /gemini command
@bot.message_handler(commands=["gemini"])
async def gemini_handler(message: Message):
try:
m = message.text.strip().split(maxsplit=1)[1].strip()
except IndexError:
await bot.reply_to( message , escape("Please type /gemini followed by what you want to say.For example:\n`/gemini Hello!`"), parse_mode="MarkdownV2")
return
player = None
# restart will lose all TODO
if str(message.from_user.id) not in gemini_player_dict:
player = await make_new_gemini_convo()
gemini_player_dict[str(message.from_user.id)] = player
else:
player = gemini_player_dict[str(message.from_user.id)]
if len(player.history) > 10:
player.history = player.history[2:]
try:
player.send_message(m)
try:
await bot.reply_to( message , escape(player.last.text) , parse_mode="MarkdownV2",)
except:
await bot.reply_to(message, escape(player.last.text))
except Exception as e:
traceback.print_exc()
await bot.reply_to(message, "Something wrong, possibly due to security, please try changing your prompt or contact admin")
# Handler for the /clear command
@bot.message_handler(commands=["clear"])
async def gemini_handler(message: Message):
# Check if the player is already in gemini_player_dict.
if str(message.from_user.id) in gemini_player_dict:
del gemini_player_dict[str(message.from_user.id)]
await bot.reply_to(message, "Your history has been cleared")
else:
await bot.reply_to(message, "your history is empty already")
# Handler for private messages with text content
@bot.message_handler(func=lambda message: message.chat.type == "private", content_types=['text'])
async def gemini_private_handler(message: Message):
m = message.text.strip()
player = None
# Check if the player is already in gemini_player_dict.
if str(message.from_user.id) not in gemini_player_dict:
player = await make_new_gemini_convo()
gemini_player_dict[str(message.from_user.id)] = player
else:
player = gemini_player_dict[str(message.from_user.id)]
# Control the length of the history record.
if len(player.history) > 10:
player.history = player.history[2:]
try:
player.send_message(m)
try:
await bot.reply_to(message, escape(player.last.text), parse_mode="MarkdownV2")
except:
await bot.reply_to(message, escape(player.last.text))
except Exception as e:
traceback.print_exc()
await bot.reply_to(message, "Something wrong, possibly due to security, please try changing your prompt or contact admin")
# Handler for incoming photos
@bot.message_handler(content_types=["photo"])
async def gemini_photo_handler(message: Message) -> None:
if message.chat.type != "private":
s = message.caption
if not s or not (s.startswith("/gemini")):
return
try:
prompt = s.strip().split(maxsplit=1)[1].strip() if len(s.strip().split(maxsplit=1)) > 1 else "no prompt"
file_path = await bot.get_file(message.photo[-1].file_id)
downloaded_file = await bot.download_file(file_path.file_path)
with open("gemini_temp.jpg", "wb") as temp_file:
temp_file.write(downloaded_file)
except Exception as e:
traceback.print_exc()
await bot.reply_to(message, "Something is wrong reading your photo or prompt")
model = genai.GenerativeModel("gemini-pro-vision")
image_path = Path("gemini_temp.jpg")
image_data = image_path.read_bytes()
contents = {
"parts": [{"mime_type": "image/jpeg", "data": image_data}, {"text": prompt}]
}
try:
response = model.generate_content(contents=contents)
await bot.reply_to(message, response.text)
except Exception as e:
traceback.print_exc()
await bot.reply_to(message, "Something wrong, possibly due to security, please try changing your prompt or contact admin")
else:
s = message.caption if message.caption else "no prompt"
try:
prompt = s.strip()
file_path = await bot.get_file(message.photo[-1].file_id)
downloaded_file = await bot.download_file(file_path.file_path)
with open("gemini_temp.jpg", "wb") as temp_file:
temp_file.write(downloaded_file)
except Exception as e:
traceback.print_exc()
await bot.reply_to(message, "Something is wrong reading your photo or prompt")
model = genai.GenerativeModel("gemini-pro-vision")
image_path = Path("gemini_temp.jpg")
image_data = image_path.read_bytes()
contents = {
"parts": [{"mime_type": "image/jpeg", "data": image_data}, {"text": prompt}]
}
try:
response = model.generate_content(contents=contents)
await bot.reply_to(message, response.text)
except Exception as e:
traceback.print_exc()
await bot.reply_to(message, "Something wrong, possibly due to security, please try changing your prompt or contact admin")
# Start the bot and keep it running
print("Starting Gemini_Telegram_Bot.")
await bot.polling(none_stop=True)
1.8 Entry point for the script
if __name__ == '__main__':
# Run the asynchronous main function using the asyncio library
asyncio.run(main())
2. Requirement.txt
pyTelegramBotAPI==4.14.0
google-generativeai
aiohttp
3. Obtain API keys
- Obtain Telegram Bot API via @BotFather
- Get Gemini API keys from Google AI Studio
4. Dockerfile
FROM python:3.9.18-slim-bullseye
WORKDIR /app
COPY ./ /app/
RUN pip install --no-cache-dir -r requirements.txt
ENV TELEGRAM_BOT_API_KEY=""
ENV GEMINI_API_KEYS=""
CMD ["sh", "-c", "python main.py ${TELEGRAM_BOT_API_KEY} ${GEMINI_API_KEYS}"]
5. Build Docker Image
docker build -t gemini-tg-bot:alpha .
6. Compose file
nano docker-compose.yml
version: '3.9'
services:
gemini-tg-bot:
image: gemini-tg-bot:alpha
environment:
- TELEGRAM_BOT_API_KEY={Telegram Bot API Token}
- GEMINI_API_KEYS={Google Gemini API key}
7. Bring up the container
docker compose up -d
DEMO
Copyright statement: Unless otherwise stated, all articles on this blog adopt the CC BY-NC-SA 4.0 license agreement. For non-commercial reprints and citations, please indicate the author: Henry, and original article URL. For commercial reprints, please contact the author for authorization.