视障人士语音拨号助手
一、实际应用场景与痛点
应用场景
视障用户张先生需要经常联系家人和朋友。传统的通讯录操作对视障人士极不友好:触摸屏无法提供有效反馈,输入号码容易出错,查找联系人需要别人帮助。一次紧急情况下,他急需拨打急救电话,但无法快速找到联系人,耽误了宝贵时间。他急需一款能够通过语音指令快速拨打电话的智能助手。
核心痛点
1. 屏幕触摸困难:触摸屏缺乏触觉反馈,无法准确点击
2. 联系人查找慢:滚动查找联系人效率低下
3. 号码输入易错:语音输入号码容易识别错误
4. 紧急情况无助:紧急时无法快速拨打关键联系人
5. 隐私泄露风险:让他人帮助拨号泄露个人隐私
6. 操作步骤复杂:多步操作对视障用户不友好
二、核心逻辑设计
1. 启动语音监听,等待唤醒词
2. 识别用户语音指令
3. 解析指令意图(拨号/搜索/紧急呼叫)
4. 搜索通讯录匹配联系人
5. 语音确认联系人信息
6. 自动拨打确认的联系人
7. 通话状态监控和提示
8. 通话记录和常用联系人管理
三、模块化代码实现
主程序文件:voice_dialer_assistant.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
视障人士语音拨号助手
通过语音指令搜索通讯录并自动拨号
版本:2.0.0
作者:无障碍智能助手
"""
import os
import sys
import json
import time
import sqlite3
import threading
import queue
import re
from datetime import datetime
from typing import Dict, List, Tuple, Optional, Any, Union
from dataclasses import dataclass, asdict, field
from enum import Enum
import warnings
warnings.filterwarnings('ignore')
# 语音识别
try:
import speech_recognition as sr
SPEECH_RECOGNITION_AVAILABLE = True
except ImportError:
SPEECH_RECOGNITION_AVAILABLE = False
print("警告: speech_recognition未安装,语音识别功能受限")
# 语音合成
try:
import pyttsx3
TTS_AVAILABLE = True
except ImportError:
TTS_AVAILABLE = False
print("警告: pyttsx3未安装,语音合成功能受限")
# 音频处理
try:
import pyaudio
import wave
import simpleaudio as sa
AUDIO_AVAILABLE = True
except ImportError:
AUDIO_AVAILABLE = False
# Android相关(如果运行在Android上)
try:
from android import Android
ANDROID_AVAILABLE = True
except ImportError:
ANDROID_AVAILABLE = False
# 系统调用(用于拨号)
import subprocess
import webbrowser
class CallType(Enum):
"""通话类型枚举"""
OUTGOING = "outgoing" # 拨出电话
INCOMING = "incoming" # 来电
MISSED = "missed" # 未接来电
REJECTED = "rejected" # 拒接
EMERGENCY = "emergency" # 紧急呼叫
class CommandType(Enum):
"""语音指令类型枚举"""
DIAL = "dial" # 拨打电话
SEARCH = "search" # 搜索联系人
EMERGENCY = "emergency" # 紧急呼叫
REDIAL = "redial" # 重拨
CANCEL = "cancel" # 取消操作
HELP = "help" # 帮助
LIST_CONTACTS = "list" # 列出联系人
ADD_CONTACT = "add" # 添加联系人
DELETE_CONTACT = "delete" # 删除联系人
UNKNOWN = "unknown" # 未知指令
@dataclass
class Contact:
"""联系人信息类"""
id: int
name: str
phone_numbers: List[str] # 可能有多个号码
groups: List[str] # 分组标签
notes: Optional[str] = None
frequency: int = 0 # 呼叫频率
last_called: Optional[float] = None
created_at: float = field(default_factory=time.time)
def to_dict(self) -> Dict:
"""转换为字典"""
return {
'id': self.id,
'name': self.name,
'phone_numbers': self.phone_numbers,
'groups': self.groups,
'notes': self.notes,
'frequency': self.frequency,
'last_called': datetime.fromtimestamp(self.last_called).isoformat()
if self.last_called else None,
'created_at': datetime.fromtimestamp(self.created_at).isoformat()
}
@dataclass
class CallRecord:
"""通话记录类"""
id: int
contact_id: Optional[int]
contact_name: str
phone_number: str
call_type: CallType
timestamp: float
duration: float = 0.0 # 通话时长(秒)
successful: bool = True
def to_dict(self) -> Dict:
"""转换为字典"""
return {
'id': self.id,
'contact_id': self.contact_id,
'contact_name': self.contact_name,
'phone_number': self.phone_number,
'call_type': self.call_type.value,
'timestamp': datetime.fromtimestamp(self.timestamp).isoformat(),
'duration': self.duration,
'successful': self.successful
}
@dataclass
class VoiceCommand:
"""语音指令类"""
raw_text: str
command_type: CommandType
parameters: Dict[str, Any]
confidence: float
timestamp: float = field(default_factory=time.time)
def to_dict(self) -> Dict:
"""转换为字典"""
return {
'raw_text': self.raw_text,
'command_type': self.command_type.value,
'parameters': self.parameters,
'confidence': self.confidence,
'timestamp': datetime.fromtimestamp(self.timestamp).isoformat()
}
class ContactDatabase:
"""联系人数据库管理器"""
def __init__(self, db_path: str = "data/contacts.db"):
"""
初始化数据库
Args:
db_path: 数据库文件路径
"""
self.db_path = db_path
self.connection = None
self.setup_database()
def setup_database(self):
"""设置数据库"""
# 确保数据目录存在
os.makedirs(os.path.dirname(self.db_path), exist_ok=True)
# 连接数据库
self.connection = sqlite3.connect(self.db_path)
self.connection.row_factory = sqlite3.Row
# 创建表
self._create_tables()
# 插入示例数据(如果数据库为空)
self._insert_sample_data()
def _create_tables(self):
"""创建数据表"""
cursor = self.connection.cursor()
# 联系人表
cursor.execute('''
CREATE TABLE IF NOT EXISTS contacts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
phone_numbers TEXT NOT NULL, -- JSON数组
groups TEXT NOT NULL, -- JSON数组
notes TEXT,
frequency INTEGER DEFAULT 0,
last_called REAL,
created_at REAL NOT NULL
)
''')
# 通话记录表
cursor.execute('''
CREATE TABLE IF NOT EXISTS call_records (
id INTEGER PRIMARY KEY AUTOINCREMENT,
contact_id INTEGER,
contact_name TEXT NOT NULL,
phone_number TEXT NOT NULL,
call_type TEXT NOT NULL,
timestamp REAL NOT NULL,
duration REAL DEFAULT 0,
successful BOOLEAN DEFAULT TRUE,
FOREIGN KEY (contact_id) REFERENCES contacts (id)
)
''')
# 语音指令记录表
cursor.execute('''
CREATE TABLE IF NOT EXISTS voice_commands (
id INTEGER PRIMARY KEY AUTOINCREMENT,
raw_text TEXT NOT NULL,
command_type TEXT NOT NULL,
parameters TEXT NOT NULL, -- JSON对象
confidence REAL NOT NULL,
timestamp REAL NOT NULL
)
''')
# 紧急联系人表
cursor.execute('''
CREATE TABLE IF NOT EXISTS emergency_contacts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
contact_id INTEGER NOT NULL,
emergency_type TEXT NOT NULL, -- police/fire/ambulance/family/doctor
priority INTEGER DEFAULT 1,
FOREIGN KEY (contact_id) REFERENCES contacts (id)
)
''')
self.connection.commit()
def _insert_sample_data(self):
"""插入示例数据"""
cursor = self.connection.cursor()
# 检查是否有数据
cursor.execute("SELECT COUNT(*) FROM contacts")
count = cursor.fetchone()[0]
if count == 0:
# 添加示例联系人
sample_contacts = [
("张三", ["13800138000"], ["家人"], "父亲", 10),
("李四", ["13900139000"], ["朋友"], "好友", 5),
("王五", ["13600136000"], ["同事"], "同事", 3),
("急救中心", ["120"], ["紧急"], "医疗急救", 0),
("报警电话", ["110"], ["紧急"], "警察", 0),
("火警电话", ["119"], ["紧急"], "消防", 0)
]
for name, phones, groups, notes, freq in sample_contacts:
self.add_contact(
name=name,
phone_numbers=phones,
groups=groups,
notes=notes,
frequency=freq
)
# 添加紧急联系人
emergency_mapping = [
("急救中心", "ambulance", 1),
("报警电话", "police", 1),
("火警电话", "fire", 1),
("张三", "family", 2)
]
for contact_name, etype, priority in emergency_mapping:
cursor.execute("SELECT id FROM contacts WHERE name = ?", (contact_name,))
result = cursor.fetchone()
if result:
self.add_emergency_contact(result[0], etype, priority)
self.connection.commit()
print("已添加示例联系人数据")
def add_contact(self, name: str, phone_numbers: List[str],
groups: List[str] = None, notes: str = None,
frequency: int = 0) -> int:
"""
添加联系人
Args:
name: 联系人姓名
phone_numbers: 电话号码列表
groups: 分组列表
notes: 备注
frequency: 呼叫频率
Returns:
联系人ID
"""
if groups is None:
groups = []
cursor = self.connection.cursor()
cursor.execute('''
INSERT INTO contacts
(name, phone_numbers, groups, notes, frequency, created_at)
VALUES (?, ?, ?, ?, ?, ?)
''', (
name,
json.dumps(phone_numbers, ensure_ascii=False),
json.dumps(groups, ensure_ascii=False),
notes,
frequency,
time.time()
))
contact_id = cursor.lastrowid
self.connection.commit()
return contact_id
def get_contact(self, contact_id: int) -> Optional[Contact]:
"""获取联系人"""
cursor = self.connection.cursor()
cursor.execute("SELECT * FROM contacts WHERE id = ?", (contact_id,))
row = cursor.fetchone()
if row:
return self._row_to_contact(row)
return None
def search_contacts(self, keyword: str, limit: int = 10) -> List[Contact]:
"""
搜索联系人
Args:
keyword: 搜索关键词
limit: 返回结果数量限制
Returns:
匹配的联系人列表
"""
cursor = self.connection.cursor()
# 使用SQLite的全文搜索(如果支持)或LIKE搜索
if keyword:
query = '''
SELECT * FROM contacts
WHERE name LIKE ? OR phone_numbers LIKE ? OR groups LIKE ?
ORDER BY frequency DESC, last_called DESC
LIMIT ?
'''
like_keyword = f"%{keyword}%"
cursor.execute(query, (like_keyword, like_keyword, like_keyword, limit))
else:
# 如果没有关键词,返回最常联系的人
query = '''
SELECT * FROM contacts
ORDER BY frequency DESC, last_called DESC
LIMIT ?
'''
cursor.execute(query, (limit,))
rows = cursor.fetchall()
return [self._row_to_contact(row) for row in rows]
def _row_to_contact(self, row) -> Contact:
"""将数据库行转换为Contact对象"""
return Contact(
id=row['id'],
name=row['name'],
phone_numbers=json.loads(row['phone_numbers']),
groups=json.loads(row['groups']),
notes=row['notes'],
frequency=row['frequency'],
last_called=row['last_called'],
created_at=row['created_at']
)
def update_contact_frequency(self, contact_id: int):
"""更新联系人呼叫频率"""
cursor = self.connection.cursor()
cursor.execute('''
UPDATE contacts
SET frequency = frequency + 1, last_called = ?
WHERE id = ?
''', (time.time(), contact_id))
self.connection.commit()
def add_call_record(self, contact_id: Optional[int], contact_name: str,
phone_number: str, call_type: CallType,
duration: float = 0.0, successful: bool = True) -> int:
"""
添加通话记录
Returns:
记录ID
"""
cursor = self.connection.cursor()
cursor.execute('''
INSERT INTO call_records
(contact_id, contact_name, phone_number, call_type,
timestamp, duration, successful)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
contact_id,
contact_name,
phone_number,
call_type.value,
time.time(),
duration,
successful
))
record_id = cursor.lastrowid
self.connection.commit()
return record_id
def get_recent_calls(self, limit: int = 20) -> List[CallRecord]:
"""获取最近通话记录"""
cursor = self.connection.cursor()
cursor.execute('''
SELECT * FROM call_records
ORDER BY timestamp DESC
LIMIT ?
''', (limit,))
rows = cursor.fetchall()
records = []
for row in rows:
records.append(CallRecord(
id=row['id'],
contact_id=row['contact_id'],
contact_name=row['contact_name'],
phone_number=row['phone_number'],
call_type=CallType(row['call_type']),
timestamp=row['timestamp'],
duration=row['duration'],
successful=bool(row['successful'])
))
return records
def add_emergency_contact(self, contact_id: int,
emergency_type: str, priority: int = 1):
"""添加紧急联系人"""
cursor = self.connection.cursor()
cursor.execute('''
INSERT OR REPLACE INTO emergency_contacts
(contact_id, emergency_type, priority)
VALUES (?, ?, ?)
''', (contact_id, emergency_type, priority))
self.connection.commit()
def get_emergency_contacts(self, emergency_type: str = None) -> List[Contact]:
"""获取紧急联系人"""
cursor = self.connection.cursor()
if emergency_type:
query = '''
SELECT c.* FROM contacts c
JOIN emergency_contacts ec ON c.id = ec.contact_id
WHERE ec.emergency_type = ?
ORDER BY ec.priority ASC
'''
cursor.execute(query, (emergency_type,))
else:
query = '''
SELECT c.* FROM contacts c
JOIN emergency_contacts ec ON c.id = ec.contact_id
ORDER BY ec.emergency_type, ec.priority ASC
'''
cursor.execute(query)
rows = cursor.fetchall()
return [self._row_to_contact(row) for row in rows]
def log_voice_command(self, command: VoiceCommand):
"""记录语音指令"""
cursor = self.connection.cursor()
cursor.execute('''
INSERT INTO voice_commands
(raw_text, command_type, parameters, confidence, timestamp)
VALUES (?, ?, ?, ?, ?)
''', (
command.raw_text,
command.command_type.value,
json.dumps(command.parameters, ensure_ascii=False),
command.confidence,
command.timestamp
))
self.connection.commit()
def close(self):
"""关闭数据库连接"""
if self.connection:
self.connection.close()
class SpeechRecognizer:
"""语音识别器"""
def __init__(self, config: Dict):
"""
初始化语音识别器
Args:
config: 识别器配置
"""
self.config = config
self.recognizer = None
self.microphone = None
if SPEECH_RECOGNITION_AVAILABLE:
self._initialize_recognizer()
def _initialize_recognizer(self):
"""初始化语音识别组件"""
try:
self.recognizer = sr.Recognizer()
self.microphone = sr.Microphone()
# 调整环境噪声
with self.microphone as source:
print("正在调整环境噪声,请保持安静...")
self.recognizer.adjust_for_ambient_noise(source, duration=1)
print("语音识别器初始化成功")
except Exception as e:
print(f"语音识别器初始化失败: {e}")
self.recognizer = None
self.microphone = None
def listen_for_wake_word(self, wake_word: str = "小助手",
timeout: float = None, phrase_time_limit: float = 2) -> bool:
"""
监听唤醒词
Args:
wake_word: 唤醒词
timeout: 超时时间(秒)
phrase_time_limit: 短语时长限制
Returns:
是否检测到唤醒词
"""
if not self.recognizer or not self.microphone:
return False
try:
with self.microphone as source:
print(f"正在监听唤醒词: '{wake_word}'...")
audio = self.recognizer.listen(source, timeout=timeout,
phrase_time_limit=phrase_time_limit)
# 识别语音
try:
text = self.recognizer.recognize_google(audio, language='zh-CN')
print(f"识别到语音: {text}")
# 检查是否包含唤醒词
if wake_word.lower() in text.lower():
print(f"检测到唤醒词: {wake_word}")
return True
except sr.UnknownValueError:
print("无法识别语音")
except sr.RequestError as e:
print(f"语音识别服务错误: {e}")
except sr.WaitTimeoutError:
pass # 超时是正常的
return False
def listen_for_command(self, timeout: float = 5,
phrase_time_limit: float = 5) -> Optional[str]:
"""
监听语音指令
Args:
timeout: 超时时间
phrase_time_limit: 短语时长限制
Returns:
识别的文本,失败返回None
"""
if not self.recognizer or not self.microphone:
return None
try:
print("请说出指令...")
with self.microphone as source:
audio = self.recognizer.listen(source, timeout=timeout,
phrase_time_limit=phrase_time_limit)
# 识别语音
try:
text = self.recognizer.recognize_google(audio, language='zh-CN')
print(f"识别到指令: {text}")
return text
except sr.UnknownValueError:
print("无法识别语音指令")
return None
except sr.RequestError as e:
print(f"语音识别服务错误: {e}")
return None
except sr.WaitTimeoutError:
print("等待指令超时")
return None
def continuous_listen(self, callback, stop_event: threading.Event):
"""
持续监听语音
Args:
callback: 识别到语音时的回调函数
stop_event: 停止事件
"""
if not self.recognizer or not self.microphone:
return
print("开始持续语音监听...")
while not stop_event.is_set():
try:
text = self.listen_for_command(timeout=1)
if text and callback:
callback(text)
except Exception as e:
print(f"监听错误: {e}")
time.sleep(0.1)
class VoiceSynthesizer:
"""语音合成器"""
def __init__(self, config: Dict):
"""
初始化语音合成器
Args:
config: 合成器配置
"""
self.config = config
self.tts_engine = None
if TTS_AVAILABLE:
self._initialize_tts()
def _initialize_tts(self):
"""初始化TTS引擎"""
try:
self.tts_engine = pyttsx3.init()
# 设置语音属性
rate = self.config.get('speech_r
如果你觉得这个工具好用,欢迎关注我!