逆向工程Granola实现Obsidian笔记同步的技术实践

本文详细介绍了如何通过逆向工程分析Granola.ai桌面应用的API接口,获取认证令牌,并使用Python脚本将会议笔记自动同步到Obsidian笔记库中的技术实现过程。

逆向工程Granola实现Obsidian笔记同步

2025年5月8日hacking

我热爱使用granola.ai。我认识的每个人都在使用它进行会议转录。几个月来,我一直在用它转录我的通话和会议内容。

但我希望将所有笔记集中在一个地方,而我使用Obsidian。我喜欢它使用纯文本文件、自动渲染markdown并支持vim-motion功能。

我希望以某种方式将我所有的Granola笔记导入Obsidian。以下是我实现这一目标的过程(文末附有可供使用的脚本)。

初步探索

我首先尝试询问Granola是否有API或在磁盘上查找笔记的方法。他们已经提供了Notion同步功能。他们表示没有API,但我确实从其他用户那里收到了一些关于在磁盘上查找位置的信息。

实际上我从未在磁盘上找到笔记,但我找到了我想要的东西。

代理Granola

作为一名黑客,我决定代理Granola API,看看是否能逆向工程桌面应用获取笔记的方式。于是我将系统代理设置为指向Caido,并开始使用Granola。成功了:

get-documents端点就是保存笔记的地方!😊

认证机制分析

我知道API需要凭据。它使用Bearer令牌。于是我搜索文件系统,在Library/Application Support/Granola/supabase.json找到了supabase.json凭据文件。有了这些凭据,我知道可以从API拉取笔记了。

Python脚本实现

我让新的Gemini 2.5 pro生成了下面的脚本。它将笔记以markdown文件的形式拉取到你选择的文件夹中(自然,我选择了Obsidian库中的一个文件夹)。它只保存摘要笔记,不包含转录内容。如果你想拉取转录内容,我将这留给读者作为练习。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
import argparse
import logging
from pathlib import Path
import traceback
import json
import os
import requests
from datetime import datetime

# 配置日志
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('granola_sync.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

def load_credentials():
    """
    从supabase.json加载Granola凭据
    """
    creds_path = Path.home() / "Library/Application Support/Granola/supabase.json"
    if not creds_path.exists():
        logger.error(f"凭据文件未找到于: {creds_path}")
        return None
    
    try:
        with open(creds_path, 'r') as f:
            data = json.load(f)
            
        # 将cognito_tokens字符串解析为字典
        cognito_tokens = json.loads(data['cognito_tokens'])
        access_token = cognito_tokens.get('access_token')
        
        if not access_token:
            logger.error("在凭据文件中未找到访问令牌")
            return None
            
        logger.debug("成功加载凭据")
        return access_token
    except Exception as e:
        logger.error(f"读取凭据文件时出错: {str(e)}")
        return None

def fetch_granola_documents(token):
    """
    从Granola API获取文档
    """
    url = "https://api.granola.ai/v2/get-documents"
    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json",
        "Accept": "*/*",
        "User-Agent": "Granola/5.354.0",
        "X-Client-Version": "5.354.0"
    }
    data = {
        "limit": 100,
        "offset": 0,
        "include_last_viewed_panel": True
    }
    
    try:
        response = requests.post(url, headers=headers, json=data)
        response.raise_for_status()
        return response.json()
    except Exception as e:
        logger.error(f"获取文档时出错: {str(e)}")
        return None

def convert_prosemirror_to_markdown(content):
    """
    将ProseMirror JSON转换为Markdown
    """
    if not content or not isinstance(content, dict) or 'content' not in content:
        return ""
        
    markdown = []
    
    def process_node(node):
        if not isinstance(node, dict):
            return ""
            
        node_type = node.get('type', '')
        content = node.get('content', [])
        text = node.get('text', '')
        
        if node_type == 'heading':
            level = node.get('attrs', {}).get('level', 1)
            heading_text = ''.join(process_node(child) for child in content)
            return f"{'#' * level} {heading_text}\n\n"
            
        elif node_type == 'paragraph':
            para_text = ''.join(process_node(child) for child in content)
            return f"{para_text}\n\n"
            
        elif node_type == 'bulletList':
            items = []
            for item in content:
                if item.get('type') == 'listItem':
                    item_content = ''.join(process_node(child) for child in item.get('content', []))
                    items.append(f"- {item_content.strip()}")
            return '\n'.join(items) + '\n\n'
            
        elif node_type == 'text':
            return text
            
        return ''.join(process_node(child) for child in content)
    
    return process_node(content)

def sanitize_filename(title):
    """
    将标题转换为有效的文件名
    """
    # 移除无效字符
    invalid_chars = '<>:"/\\|?*'
    filename = ''.join(c for c in title if c not in invalid_chars)
    # 将空格替换为下划线
    filename = filename.replace(' ', '_')
    return filename

def main():
    logger.info("开始Granola同步进程")
    parser = argparse.ArgumentParser(description="获取Granola笔记并将其保存为Markdown文件到Obsidian文件夹。")
    parser.add_argument("output_dir", type=str, help="保存笔记的Obsidian子文件夹的完整路径。")
    args = parser.parse_args()

    output_path = Path(args.output_dir)
    logger.info(f"输出目录设置为: {output_path}")
    
    if not output_path.is_dir():
        logger.error(f"输出目录'{output_path}'不存在或不是目录。")
        logger.error("请先创建该目录。")
        return

    logger.info("尝试加载凭据...")
    token = load_credentials()
    if not token:
        logger.error("加载凭据失败。退出。")
        return

    logger.info("凭据加载成功。从Granola API获取文档...")
    api_response = fetch_granola_documents(token)

    if not api_response:
        logger.error("获取文档失败 - API响应为空")
        return
        
    if "docs" not in api_response:
        logger.error("API响应格式意外 - 未找到'docs'键")
        logger.debug(f"API响应: {api_response}")
        return

    documents = api_response["docs"]
    logger.info(f"成功从Granola获取{len(documents)}个文档")

    synced_count = 0
    for doc in documents:
        title = doc.get("title", "未命名Granola笔记")
        doc_id = doc.get("id", "unknown_id")
        logger.info(f"处理文档: {title} (ID: {doc_id})")
        
        content_to_parse = None
        if doc.get("last_viewed_panel") and \
           isinstance(doc["last_viewed_panel"], dict) and \
           doc["last_viewed_panel"].get("content") and \
           isinstance(doc["last_viewed_panel"]["content"], dict) and \
           doc["last_viewed_panel"]["content"].get("type") == "doc":
            content_to_parse = doc["last_viewed_panel"]["content"]
            logger.debug(f"找到文档内容可解析: {title}")

        if not content_to_parse:
            logger.warning(f"跳过文档'{title}' (ID: {doc_id}) - 在'last_viewed_panel'中未找到合适的内容")
            continue
        
        try:
            logger.debug(f"将文档转换为markdown: {title}")
            markdown_content = convert_prosemirror_to_markdown(content_to_parse)
            
            # 为元数据添加frontmatter块
            frontmatter = f"---\n"
            frontmatter += f"granola_id: {doc_id}\n"
            escaped_title_for_yaml = title.replace('"', '\\"') 
            frontmatter += f'title: "{escaped_title_for_yaml}"\n'
            
            if doc.get("created_at"):
                frontmatter += f"created_at: {doc.get('created_at')}\n"
            if doc.get("updated_at"):
                frontmatter += f"updated_at: {doc.get('updated_at')}\n"
            frontmatter += f"---\n\n"
            
            final_markdown = frontmatter + markdown_content

            filename = sanitize_filename(title) + ".md"
            filepath = output_path / filename

            logger.debug(f"写入文件到: {filepath}")
            with open(filepath, 'w', encoding='utf-8') as f:
                f.write(final_markdown)
            logger.info(f"成功保存: {filepath}")
            synced_count += 1
        except Exception as e:
            logger.error(f"处理文档'{title}'时出错 (ID: {doc_id}): {str(e)}")
            logger.debug("完整回溯:", exc_info=True)

    logger.info(f"同步完成。{synced_count}个笔记已保存到'{output_path}'")

if __name__ == "__main__":
    main()

进一步研究建议

如果读者想要做些什么,我认为这篇文章可能带来两件事。第一,有人可以轻松地将其转换为Obsidian插件。第二,我认为Granola API中有很多有趣的东西值得研究。

  • Joseph

注册我的邮件列表,以便在我发布更多类似内容时获得通知。 我还在Twitter/X上发布我的想法。

comments powered by Disqus
使用 Hugo 构建
主题 StackJimmy 设计