国产无遮挡裸体免费直播视频,久久精品国产蜜臀av,动漫在线视频一区二区,欧亚日韩一区二区三区,久艹在线 免费视频,国产精品美女网站免费,正在播放 97超级视频在线观看,斗破苍穹年番在线观看免费,51最新乱码中文字幕

PostgreSQL高級特性與性能優(yōu)化的實(shí)戰(zhàn)指南

 更新時間:2026年02月09日 08:54:04   作者:閑人編程  
本文將深入探討PostgreSQL的高級特性與性能優(yōu)化技術(shù),結(jié)合Python實(shí)踐,幫助開發(fā)者充分發(fā)揮PostgreSQL的潛力,文中的示例代碼講解詳細(xì),需要的小伙伴可以了解下

1. 引言:PostgreSQL的技術(shù)演進(jìn)與現(xiàn)狀

PostgreSQL作為全球最先進(jìn)的開源關(guān)系型數(shù)據(jù)庫,自1986年誕生以來,歷經(jīng)30多年的持續(xù)發(fā)展,已成為企業(yè)級應(yīng)用的首選數(shù)據(jù)庫之一。根據(jù)2023年DB-Engines排名數(shù)據(jù)顯示,PostgreSQL在"流行度"和"功能完備性"兩方面均位列開源數(shù)據(jù)庫第一。其強(qiáng)大的擴(kuò)展性、嚴(yán)格的數(shù)據(jù)完整性和豐富的功能特性,使其在處理復(fù)雜查詢、海量數(shù)據(jù)和高并發(fā)場景時表現(xiàn)出色。

本文將深入探討PostgreSQL的高級特性與性能優(yōu)化技術(shù),結(jié)合Python實(shí)踐,幫助開發(fā)者充分發(fā)揮PostgreSQL的潛力。根據(jù)國際數(shù)據(jù)公司(IDC)的報告,使用PostgreSQL的企業(yè)在數(shù)據(jù)庫運(yùn)營成本上平均降低65%,同時查詢性能提升300% 以上。

2. PostgreSQL高級特性詳解

2.1 JSONB與半結(jié)構(gòu)化數(shù)據(jù)處理

PostgreSQL的JSONB類型提供了對JSON數(shù)據(jù)的二進(jìn)制存儲,支持索引、查詢和修改操作,實(shí)現(xiàn)了關(guān)系型數(shù)據(jù)庫與文檔數(shù)據(jù)庫的完美結(jié)合。

2.1.1 JSONB性能對比分析

操作類型JSONB性能JSON性能性能提升
數(shù)據(jù)插入O(log n)O(n)5-10倍
路徑查詢O(log n)O(n)20-100倍
索引構(gòu)建O(n log n)不支持無限
數(shù)據(jù)更新O(log n)O(n)10-50倍

JSONB的存儲格式優(yōu)勢體現(xiàn)在:

2.2 全文搜索與文本分析

PostgreSQL內(nèi)置了強(qiáng)大的全文搜索功能,支持多語言、詞干提取、相關(guān)性排序等高級特性。

"""
PostgreSQL全文搜索高級應(yīng)用示例
"""
import psycopg2
from psycopg2.extras import Json, DictCursor
import json
from typing import List, Dict, Any, Optional
import logging
from datetime import datetime

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class PostgreSQLFullTextSearch:
    """PostgreSQL全文搜索高級功能封裝"""
    
    def __init__(self, dsn: str):
        """
        初始化數(shù)據(jù)庫連接
        
        Args:
            dsn: 數(shù)據(jù)庫連接字符串
        """
        self.conn = psycopg2.connect(dsn)
        self.conn.autocommit = False
        self.cursor = self.conn.cursor(cursor_factory=DictCursor)
        
    def create_fulltext_configuration(self, lang: str = 'english'):
        """
        創(chuàng)建自定義全文搜索配置
        
        Args:
            lang: 語言配置
        """
        config_sql = f"""
        -- 創(chuàng)建自定義文本搜索配置
        CREATE TEXT SEARCH CONFIGURATION {lang}_custom (
            COPY = {lang}
        );
        
        -- 添加同義詞字典
        CREATE TEXT SEARCH DICTIONARY synonym_dict (
            TEMPLATE = synonym,
            SYNONYMS = synonym_sample
        );
        
        -- 添加自定義字典到配置
        ALTER TEXT SEARCH CONFIGURATION {lang}_custom
            ALTER MAPPING FOR asciiword, asciihword, hword_asciipart
            WITH synonym_dict, english_stem;
        """
        
        try:
            self.cursor.execute(config_sql)
            self.conn.commit()
            logger.info(f"全文搜索配置創(chuàng)建成功: {lang}_custom")
        except Exception as e:
            self.conn.rollback()
            logger.error(f"創(chuàng)建配置失敗: {e}")
            
    def create_searchable_table(self):
        """
        創(chuàng)建支持全文搜索的表
        """
        create_table_sql = """
        -- 創(chuàng)建文檔表
        CREATE TABLE IF NOT EXISTS documents (
            id SERIAL PRIMARY KEY,
            title VARCHAR(500) NOT NULL,
            content TEXT NOT NULL,
            author VARCHAR(200),
            category VARCHAR(100),
            tags JSONB DEFAULT '[]',
            publication_date DATE DEFAULT CURRENT_DATE,
            
            -- 生成列:用于全文搜索
            content_search_vector TSVECTOR GENERATED ALWAYS AS (
                setweight(to_tsvector('english_custom', coalesce(title, '')), 'A') ||
                setweight(to_tsvector('english_custom', coalesce(content, '')), 'B')
            ) STORED,
            
            -- 生成列:用于元數(shù)據(jù)搜索
            metadata_search_vector TSVECTOR GENERATED ALWAYS AS (
                to_tsvector('english_custom', 
                    coalesce(author, '') || ' ' || 
                    coalesce(category, '') || ' ' ||
                    (tags::text)
                )
            ) STORED,
            
            -- 創(chuàng)建GIN索引優(yōu)化搜索性能
            CONSTRAINT valid_tags CHECK (jsonb_typeof(tags) = 'array')
        );
        
        -- 創(chuàng)建GIN索引
        CREATE INDEX IF NOT EXISTS idx_documents_content_search 
        ON documents USING GIN(content_search_vector);
        
        CREATE INDEX IF NOT EXISTS idx_documents_metadata_search 
        ON documents USING GIN(metadata_search_vector);
        
        -- 創(chuàng)建部分索引優(yōu)化特定查詢
        CREATE INDEX IF NOT EXISTS idx_documents_recent 
        ON documents(publication_date) 
        WHERE publication_date > CURRENT_DATE - INTERVAL '1 year';
        
        -- 創(chuàng)建BRIN索引用于時間范圍查詢
        CREATE INDEX IF NOT EXISTS idx_documents_date_brin 
        ON documents USING BRIN(publication_date);
        """
        
        try:
            self.cursor.execute(create_table_sql)
            self.conn.commit()
            logger.info("全文搜索表創(chuàng)建成功")
        except Exception as e:
            self.conn.rollback()
            logger.error(f"創(chuàng)建表失敗: {e}")
            
    def insert_document(self, document: Dict[str, Any]) -> Optional[int]:
        """
        插入文檔數(shù)據(jù)
        
        Args:
            document: 文檔數(shù)據(jù)
            
        Returns:
            插入的文檔ID
        """
        insert_sql = """
        INSERT INTO documents (title, content, author, category, tags, publication_date)
        VALUES (%s, %s, %s, %s, %s, %s)
        RETURNING id
        """
        
        try:
            self.cursor.execute(
                insert_sql,
                (
                    document.get('title'),
                    document.get('content'),
                    document.get('author'),
                    document.get('category'),
                    Json(document.get('tags', [])),
                    document.get('publication_date')
                )
            )
            doc_id = self.cursor.fetchone()['id']
            self.conn.commit()
            logger.info(f"文檔插入成功,ID: {doc_id}")
            return doc_id
        except Exception as e:
            self.conn.rollback()
            logger.error(f"插入文檔失敗: {e}")
            return None
            
    def search_documents(
        self, 
        query: str,
        categories: List[str] = None,
        start_date: str = None,
        end_date: str = None,
        min_relevance: float = 0.1,
        limit: int = 20,
        offset: int = 0
    ) -> List[Dict[str, Any]]:
        """
        高級全文搜索
        
        Args:
            query: 搜索查詢詞
            categories: 分類篩選
            start_date: 開始日期
            end_date: 結(jié)束日期
            min_relevance: 最小相關(guān)性閾值
            limit: 返回結(jié)果數(shù)量
            offset: 偏移量
            
        Returns:
            搜索結(jié)果列表
        """
        search_sql = """
        SELECT 
            id,
            title,
            author,
            category,
            publication_date,
            tags,
            
            -- 計算相關(guān)性得分
            ts_rank(
                content_search_vector, 
                plainto_tsquery('english_custom', %s)
            ) AS relevance_score,
            
            -- 高亮顯示匹配內(nèi)容
            ts_headline(
                'english_custom',
                content,
                plainto_tsquery('english_custom', %s),
                'StartSel=<mark>, StopSel=</mark>, MaxWords=50, MinWords=10'
            ) AS content_highlight,
            
            -- 提取匹配片段
            ts_headline(
                'english_custom',
                title,
                plainto_tsquery('english_custom', %s),
                'StartSel=<mark>, StopSel=</mark>'
            ) AS title_highlight
            
        FROM documents
        WHERE 
            -- 全文搜索條件
            content_search_vector @@ plainto_tsquery('english_custom', %s)
            
            -- 分類篩選
            {category_filter}
            
            -- 日期范圍篩選
            {date_filter}
            
            -- 相關(guān)性閾值篩選
            AND ts_rank(
                content_search_vector, 
                plainto_tsquery('english_custom', %s)
            ) > %s
            
        ORDER BY 
            -- 按相關(guān)性和時間加權(quán)排序
            (ts_rank(
                content_search_vector, 
                plainto_tsquery('english_custom', %s)
            ) * 0.7 + 
            (CASE WHEN publication_date > CURRENT_DATE - INTERVAL '30 days' 
                  THEN 0.3 ELSE 0 END)) DESC,
            publication_date DESC
            
        LIMIT %s OFFSET %s
        """
        
        # 構(gòu)建動態(tài)WHERE條件
        category_filter = ""
        date_filter = ""
        params = [query, query, query, query]
        
        if categories:
            placeholders = ', '.join(['%s'] * len(categories))
            category_filter = f"AND category IN ({placeholders})"
            params.extend(categories)
            
        if start_date and end_date:
            date_filter = "AND publication_date BETWEEN %s AND %s"
            params.extend([start_date, end_date])
        elif start_date:
            date_filter = "AND publication_date >= %s"
            params.append(start_date)
        elif end_date:
            date_filter = "AND publication_date <= %s"
            params.append(end_date)
            
        # 添加剩余參數(shù)
        params.extend([query, min_relevance, query, limit, offset])
        
        # 格式化SQL
        formatted_sql = search_sql.format(
            category_filter=category_filter,
            date_filter=date_filter
        )
        
        try:
            self.cursor.execute(formatted_sql, params)
            results = self.cursor.fetchall()
            
            # 轉(zhuǎn)換為字典列表
            return [
                {
                    'id': row['id'],
                    'title': row['title'],
                    'author': row['author'],
                    'category': row['category'],
                    'publication_date': row['publication_date'],
                    'tags': row['tags'],
                    'relevance_score': float(row['relevance_score']),
                    'content_highlight': row['content_highlight'],
                    'title_highlight': row['title_highlight']
                }
                for row in results
            ]
            
        except Exception as e:
            logger.error(f"搜索失敗: {e}")
            return []
            
    def search_similar_documents(self, doc_id: int, limit: int = 10) -> List[Dict[str, Any]]:
        """
        查找相似文檔(基于內(nèi)容相似度)
        
        Args:
            doc_id: 參考文檔ID
            limit: 返回結(jié)果數(shù)量
            
        Returns:
            相似文檔列表
        """
        similarity_sql = """
        WITH target_doc AS (
            SELECT content_search_vector 
            FROM documents 
            WHERE id = %s
        )
        SELECT 
            d.id,
            d.title,
            d.author,
            d.category,
            
            -- 計算余弦相似度
            (d.content_search_vector <=> td.content_search_vector) AS similarity,
            
            -- 提取共同標(biāo)簽
            (
                SELECT jsonb_agg(tag)
                FROM jsonb_array_elements_text(d.tags) AS tag
                WHERE tag IN (
                    SELECT jsonb_array_elements_text(td.tags)
                    FROM documents td 
                    WHERE td.id = %s
                )
            ) AS common_tags
            
        FROM documents d, target_doc td
        WHERE d.id != %s
        ORDER BY similarity DESC
        LIMIT %s
        """
        
        try:
            self.cursor.execute(similarity_sql, (doc_id, doc_id, doc_id, limit))
            results = self.cursor.fetchall()
            
            return [
                {
                    'id': row['id'],
                    'title': row['title'],
                    'author': row['author'],
                    'category': row['category'],
                    'similarity': float(row['similarity']),
                    'common_tags': row['common_tags']
                }
                for row in results
            ]
            
        except Exception as e:
            logger.error(f"查找相似文檔失敗: {e}")
            return []
            
    def get_search_statistics(self, time_period: str = '1 month') -> Dict[str, Any]:
        """
        獲取搜索統(tǒng)計信息
        
        Args:
            time_period: 統(tǒng)計時間周期
            
        Returns:
            統(tǒng)計信息字典
        """
        stats_sql = """
        -- 總文檔數(shù)
        SELECT COUNT(*) as total_documents FROM documents;
        
        -- 按分類統(tǒng)計
        SELECT 
            category,
            COUNT(*) as count,
            ROUND(COUNT(*) * 100.0 / (SELECT COUNT(*) FROM documents), 2) as percentage
        FROM documents 
        WHERE category IS NOT NULL
        GROUP BY category 
        ORDER BY count DESC;
        
        -- 時間分布統(tǒng)計
        SELECT 
            DATE_TRUNC('month', publication_date) as month,
            COUNT(*) as documents_count
        FROM documents
        WHERE publication_date > CURRENT_DATE - INTERVAL %s
        GROUP BY DATE_TRUNC('month', publication_date)
        ORDER BY month DESC;
        
        -- 標(biāo)簽使用統(tǒng)計
        SELECT 
            tag,
            COUNT(*) as usage_count
        FROM documents, jsonb_array_elements_text(tags) as tag
        GROUP BY tag
        ORDER BY usage_count DESC
        LIMIT 20;
        """
        
        try:
            stats = {}
            
            # 執(zhí)行多個統(tǒng)計查詢
            self.cursor.execute("SELECT COUNT(*) as total_documents FROM documents")
            stats['total_documents'] = self.cursor.fetchone()['total_documents']
            
            self.cursor.execute("""
                SELECT category, COUNT(*) as count
                FROM documents 
                WHERE category IS NOT NULL
                GROUP BY category 
                ORDER BY count DESC
            """)
            stats['category_distribution'] = self.cursor.fetchall()
            
            self.cursor.execute(f"""
                SELECT 
                    DATE_TRUNC('month', publication_date) as month,
                    COUNT(*) as documents_count
                FROM documents
                WHERE publication_date > CURRENT_DATE - INTERVAL '{time_period}'
                GROUP BY DATE_TRUNC('month', publication_date)
                ORDER BY month DESC
            """)
            stats['time_distribution'] = self.cursor.fetchall()
            
            self.cursor.execute("""
                SELECT 
                    tag,
                    COUNT(*) as usage_count
                FROM documents, jsonb_array_elements_text(tags) as tag
                GROUP BY tag
                ORDER BY usage_count DESC
                LIMIT 20
            """)
            stats['popular_tags'] = self.cursor.fetchall()
            
            return stats
            
        except Exception as e:
            logger.error(f"獲取統(tǒng)計信息失敗: {e}")
            return {}
            
    def optimize_search_indexes(self):
        """
        優(yōu)化全文搜索索引
        """
        optimize_sql = """
        -- 重新構(gòu)建GIN索引以提高搜索性能
        REINDEX INDEX CONCURRENTLY idx_documents_content_search;
        REINDEX INDEX CONCURRENTLY idx_documents_metadata_search;
        
        -- 更新表統(tǒng)計信息
        ANALYZE documents;
        
        -- 清理索引膨脹
        VACUUM ANALYZE documents;
        
        -- 更新全文搜索配置字典
        ALTER TEXT SEARCH CONFIGURATION english_custom 
        REFRESH VERSION;
        """
        
        try:
            # 分步執(zhí)行優(yōu)化操作
            self.cursor.execute("REINDEX INDEX CONCURRENTLY idx_documents_content_search")
            logger.info("內(nèi)容搜索索引重建完成")
            
            self.cursor.execute("REINDEX INDEX CONCURRENTLY idx_documents_metadata_search")
            logger.info("元數(shù)據(jù)搜索索引重建完成")
            
            self.cursor.execute("ANALYZE documents")
            logger.info("表統(tǒng)計信息更新完成")
            
            self.cursor.execute("VACUUM ANALYZE documents")
            logger.info("表清理完成")
            
            self.conn.commit()
            logger.info("全文搜索索引優(yōu)化完成")
            
        except Exception as e:
            self.conn.rollback()
            logger.error(f"索引優(yōu)化失敗: {e}")
            
    def close(self):
        """關(guān)閉數(shù)據(jù)庫連接"""
        if self.cursor:
            self.cursor.close()
        if self.conn:
            self.conn.close()
        logger.info("數(shù)據(jù)庫連接已關(guān)閉")


def example_usage():
    """使用示例"""
    # 數(shù)據(jù)庫連接字符串
    dsn = "dbname=testdb user=postgres password=password host=localhost port=5432"
    
    # 創(chuàng)建全文搜索實(shí)例
    search = PostgreSQLFullTextSearch(dsn)
    
    try:
        # 1. 創(chuàng)建全文搜索配置
        search.create_fulltext_configuration('english')
        
        # 2. 創(chuàng)建搜索表
        search.create_searchable_table()
        
        # 3. 插入示例文檔
        sample_documents = [
            {
                'title': 'PostgreSQL Performance Optimization',
                'content': 'PostgreSQL provides advanced optimization techniques including query planning, indexing strategies, and configuration tuning.',
                'author': 'John Doe',
                'category': 'Database',
                'tags': ['postgresql', 'optimization', 'performance'],
                'publication_date': '2024-01-15'
            },
            {
                'title': 'Full Text Search in Modern Applications',
                'content': 'Implementing efficient full-text search using PostgreSQL GIN indexes and relevance scoring algorithms.',
                'author': 'Jane Smith',
                'category': 'Search',
                'tags': ['search', 'full-text', 'postgresql'],
                'publication_date': '2024-02-01'
            }
        ]
        
        for doc in sample_documents:
            search.insert_document(doc)
        
        # 4. 執(zhí)行高級搜索
        print("執(zhí)行全文搜索...")
        results = search.search_documents(
            query='PostgreSQL optimization',
            categories=['Database'],
            min_relevance=0.05,
            limit=10
        )
        
        print(f"找到 {len(results)} 個結(jié)果:")
        for result in results:
            print(f"- {result['title']} (相關(guān)性: {result['relevance_score']:.3f})")
        
        # 5. 查找相似文檔
        if results:
            similar = search.search_similar_documents(results[0]['id'])
            print(f"\n相似文檔: {len(similar)} 個")
        
        # 6. 獲取統(tǒng)計信息
        stats = search.get_search_statistics()
        print(f"\n總文檔數(shù): {stats.get('total_documents', 0)}")
        
    finally:
        search.close()


if __name__ == "__main__":
    example_usage()

2.3 分區(qū)表與數(shù)據(jù)管理

PostgreSQL的分區(qū)表功能通過繼承和約束排除實(shí)現(xiàn),顯著提升大數(shù)據(jù)量查詢性能。

2.3.1 分區(qū)策略對比

分區(qū)類型適用場景優(yōu)勢限制
范圍分區(qū)時間序列數(shù)據(jù)支持自動分區(qū)創(chuàng)建分區(qū)鍵必須有序
列表分區(qū)離散值分類支持非連續(xù)值分區(qū)數(shù)量有限
哈希分區(qū)均勻分布數(shù)據(jù)分布均勻不支持范圍查詢

2.3.2 分區(qū)性能公式

分區(qū)表的查詢性能提升可以通過以下公式估算:

?

其中:

  • Tpartitioned?:分區(qū)表查詢時間
  • Tfull?:未分區(qū)表查詢時間
  • n:相關(guān)分區(qū)數(shù)量
  • Coverhead?:分區(qū)管理開銷

3. PostgreSQL性能優(yōu)化實(shí)戰(zhàn)

3.1 查詢性能分析與優(yōu)化

"""
PostgreSQL查詢性能分析與優(yōu)化工具
"""
import psycopg2
from psycopg2.extras import DictCursor
import time
from typing import Dict, List, Any, Optional, Tuple
import statistics
import json
from datetime import datetime, timedelta
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class QueryPerformanceAnalyzer:
    """查詢性能分析器"""
    
    def __init__(self, dsn: str):
        self.conn = psycopg2.connect(dsn)
        self.cursor = self.conn.cursor(cursor_factory=DictCursor)
        self.query_cache = {}
        
    def analyze_query_plan(self, query: str, params: tuple = None) -> Dict[str, Any]:
        """
        分析查詢執(zhí)行計劃
        
        Args:
            query: SQL查詢語句
            params: 查詢參數(shù)
            
        Returns:
            執(zhí)行計劃分析結(jié)果
        """
        try:
            # 獲取詳細(xì)執(zhí)行計劃
            explain_query = f"EXPLAIN (ANALYZE, BUFFERS, VERBOSE, FORMAT JSON) {query}"
            self.cursor.execute(explain_query, params)
            plan_result = self.cursor.fetchone()[0]
            
            plan = plan_result[0]['Plan']
            return self._parse_execution_plan(plan)
            
        except Exception as e:
            logger.error(f"分析執(zhí)行計劃失敗: {e}")
            return {}
    
    def _parse_execution_plan(self, plan: Dict[str, Any]) -> Dict[str, Any]:
        """
        解析執(zhí)行計劃
        
        Args:
            plan: 執(zhí)行計劃字典
            
        Returns:
            解析后的分析結(jié)果
        """
        analysis = {
            'operation_type': plan.get('Node Type'),
            'relation_name': plan.get('Relation Name'),
            'alias': plan.get('Alias'),
            'startup_cost': plan.get('Startup Cost'),
            'total_cost': plan.get('Total Cost'),
            'plan_rows': plan.get('Plan Rows'),
            'plan_width': plan.get('Plan Width'),
            'actual_rows': plan.get('Actual Rows'),
            'actual_time': plan.get('Actual Total Time'),
            'shared_hit_blocks': 0,
            'shared_read_blocks': 0,
            'shared_dirtied_blocks': 0,
            'shared_written_blocks': 0,
            'local_hit_blocks': 0,
            'local_read_blocks': 0,
            'local_dirtied_blocks': 0,
            'local_written_blocks': 0,
            'temp_read_blocks': 0,
            'temp_written_blocks': 0,
            'buffers': plan.get('Shared Hit Blocks', 0) + plan.get('Shared Read Blocks', 0),
            'children': [],
            'issues': []
        }
        
        # 解析緩沖區(qū)使用情況
        if 'Shared Hit Blocks' in plan:
            analysis['shared_hit_blocks'] = plan['Shared Hit Blocks']
        if 'Shared Read Blocks' in plan:
            analysis['shared_read_blocks'] = plan['Shared Read Blocks']
        
        # 遞歸解析子節(jié)點(diǎn)
        if 'Plans' in plan:
            for child_plan in plan['Plans']:
                child_analysis = self._parse_execution_plan(child_plan)
                analysis['children'].append(child_analysis)
        
        # 識別潛在問題
        self._identify_issues(analysis, plan)
        
        return analysis
    
    def _identify_issues(self, analysis: Dict[str, Any], plan: Dict[str, Any]):
        """識別查詢計劃中的潛在問題"""
        
        # 檢查全表掃描
        if analysis['operation_type'] == 'Seq Scan' and analysis['actual_rows'] > 10000:
            analysis['issues'].append({
                'type': 'FULL_TABLE_SCAN',
                'severity': 'HIGH',
                'message': '檢測到大量行的全表掃描',
                'suggestion': '考慮添加合適的索引'
            })
        
        # 檢查嵌套循環(huán)連接
        if analysis['operation_type'] == 'Nested Loop' and analysis['actual_rows'] > 1000:
            analysis['issues'].append({
                'type': 'INEFFICIENT_JOIN',
                'severity': 'MEDIUM',
                'message': '嵌套循環(huán)連接可能效率較低',
                'suggestion': '考慮使用Hash Join或Merge Join'
            })
        
        # 檢查排序操作
        if analysis['operation_type'] == 'Sort' and analysis['actual_rows'] > 10000:
            analysis['issues'].append({
                'type': 'LARGE_SORT',
                'severity': 'MEDIUM',
                'message': '大規(guī)模排序操作',
                'suggestion': '考慮添加索引以避免排序'
            })
        
        # 檢查緩沖區(qū)命中率
        total_blocks = analysis['shared_hit_blocks'] + analysis['shared_read_blocks']
        if total_blocks > 0:
            hit_ratio = analysis['shared_hit_blocks'] / total_blocks
            if hit_ratio < 0.9:
                analysis['issues'].append({
                    'type': 'LOW_BUFFER_HIT',
                    'severity': 'MEDIUM',
                    'message': f'緩沖區(qū)命中率較低: {hit_ratio:.2%}',
                    'suggestion': '考慮增加shared_buffers或優(yōu)化查詢'
                })
    
    def benchmark_query(self, query: str, params: tuple = None, 
                       iterations: int = 10) -> Dict[str, Any]:
        """
        基準(zhǔn)測試查詢性能
        
        Args:
            query: SQL查詢語句
            params: 查詢參數(shù)
            iterations: 迭代次數(shù)
            
        Returns:
            性能基準(zhǔn)測試結(jié)果
        """
        execution_times = []
        row_counts = []
        
        try:
            # 預(yù)熱緩存
            self.cursor.execute(query, params)
            _ = self.cursor.fetchall()
            
            # 執(zhí)行基準(zhǔn)測試
            for i in range(iterations):
                start_time = time.perf_counter()
                self.cursor.execute(query, params)
                rows = self.cursor.fetchall()
                end_time = time.perf_counter()
                
                execution_times.append(end_time - start_time)
                row_counts.append(len(rows))
            
            # 分析執(zhí)行計劃
            plan_analysis = self.analyze_query_plan(query, params)
            
            # 計算統(tǒng)計信息
            stats = {
                'iterations': iterations,
                'total_time': sum(execution_times),
                'avg_time': statistics.mean(execution_times),
                'min_time': min(execution_times),
                'max_time': max(execution_times),
                'std_dev': statistics.stdev(execution_times) if len(execution_times) > 1 else 0,
                'avg_rows': statistics.mean(row_counts),
                'plan_analysis': plan_analysis,
                'percentiles': {
                    'p50': sorted(execution_times)[int(len(execution_times) * 0.5)],
                    'p90': sorted(execution_times)[int(len(execution_times) * 0.9)],
                    'p95': sorted(execution_times)[int(len(execution_times) * 0.95)],
                    'p99': sorted(execution_times)[int(len(execution_times) * 0.99)],
                }
            }
            
            return stats
            
        except Exception as e:
            logger.error(f"基準(zhǔn)測試失敗: {e}")
            return {}
    
    def generate_optimization_suggestions(self, query: str, 
                                        stats: Dict[str, Any]) -> List[Dict[str, Any]]:
        """
        生成優(yōu)化建議
        
        Args:
            query: SQL查詢語句
            stats: 性能統(tǒng)計信息
            
        Returns:
            優(yōu)化建議列表
        """
        suggestions = []
        plan_analysis = stats.get('plan_analysis', {})
        
        # 基于執(zhí)行時間建議
        avg_time = stats.get('avg_time', 0)
        if avg_time > 1.0:  # 超過1秒
            suggestions.append({
                'priority': 'HIGH',
                'area': 'PERFORMANCE',
                'suggestion': '查詢執(zhí)行時間較長,考慮優(yōu)化查詢或添加索引',
                'estimated_impact': 'HIGH'
            })
        
        # 基于執(zhí)行計劃建議
        for issue in plan_analysis.get('issues', []):
            suggestions.append({
                'priority': issue['severity'],
                'area': 'QUERY_PLAN',
                'suggestion': issue['suggestion'],
                'estimated_impact': 'MEDIUM'
            })
        
        # 基于統(tǒng)計信息建議
        if stats.get('std_dev', 0) / stats.get('avg_time', 1) > 0.5:
            suggestions.append({
                'priority': 'MEDIUM',
                'area': 'CONSISTENCY',
                'suggestion': '查詢執(zhí)行時間波動較大,可能存在并發(fā)或資源競爭問題',
                'estimated_impact': 'MEDIUM'
            })
        
        return suggestions


class IndexOptimizer:
    """索引優(yōu)化器"""
    
    def __init__(self, dsn: str):
        self.conn = psycopg2.connect(dsn)
        self.cursor = self.conn.cursor(cursor_factory=DictCursor)
    
    def analyze_table_indexes(self, table_name: str) -> List[Dict[str, Any]]:
        """
        分析表索引
        
        Args:
            table_name: 表名
            
        Returns:
            索引分析結(jié)果
        """
        query = """
        SELECT 
            i.relname as index_name,
            am.amname as index_type,
            idx.indisunique as is_unique,
            idx.indisprimary as is_primary,
            idx.indisexclusion as is_exclusion,
            idx.indisclustered as is_clustered,
            idx.indisvalid as is_valid,
            idx.indpred as partial_index_predicate,
            pg_relation_size(i.oid) as index_size_bytes,
            pg_size_pretty(pg_relation_size(i.oid)) as index_size,
            pg_stat_get_numscans(i.oid) as scan_count,
            pg_stat_get_tuples_returned(i.oid) as tuples_returned,
            pg_stat_get_tuples_fetched(i.oid) as tuples_fetched,
            
            -- 索引定義
            pg_get_indexdef(idx.indexrelid) as index_definition,
            
            -- 索引列
            array_to_string(array_agg(a.attname), ', ') as index_columns
            
        FROM pg_index idx
        JOIN pg_class i ON i.oid = idx.indexrelid
        JOIN pg_class t ON t.oid = idx.indrelid
        JOIN pg_am am ON i.relam = am.oid
        JOIN pg_attribute a ON a.attrelid = t.oid AND a.attnum = ANY(idx.indkey)
        
        WHERE t.relname = %s
        GROUP BY i.relname, am.amname, idx.indisunique, idx.indisprimary,
                 idx.indisexclusion, idx.indisclustered, idx.indisvalid,
                 idx.indpred, i.oid, idx.indexrelid
        ORDER BY pg_relation_size(i.oid) DESC
        """
        
        try:
            self.cursor.execute(query, (table_name,))
            indexes = self.cursor.fetchall()
            
            analysis = []
            for idx in indexes:
                usage_ratio = 0
                if idx['tuples_returned'] > 0:
                    usage_ratio = idx['tuples_fetched'] / idx['tuples_returned']
                
                analysis.append({
                    'index_name': idx['index_name'],
                    'index_type': idx['index_type'],
                    'is_unique': idx['is_unique'],
                    'is_primary': idx['is_primary'],
                    'index_size_bytes': idx['index_size_bytes'],
                    'index_size': idx['index_size'],
                    'scan_count': idx['scan_count'],
                    'usage_ratio': usage_ratio,
                    'index_definition': idx['index_definition'],
                    'index_columns': idx['index_columns'],
                    'efficiency_score': self._calculate_index_efficiency(idx)
                })
            
            return analysis
            
        except Exception as e:
            logger.error(f"分析索引失敗: {e}")
            return []
    
    def _calculate_index_efficiency(self, index_info: Dict[str, Any]) -> float:
        """
        計算索引效率評分
        
        Args:
            index_info: 索引信息
            
        Returns:
            效率評分 (0-100)
        """
        score = 100.0
        
        # 基于使用率扣分
        if index_info['scan_count'] == 0:
            score -= 50  # 從未使用
        
        # 基于大小扣分
        size_mb = index_info['index_size_bytes'] / (1024 * 1024)
        if size_mb > 1000:  # 超過1GB
            score -= 30
        elif size_mb > 100:  # 超過100MB
            score -= 15
        
        # 基于唯一性加分
        if index_info['is_unique']:
            score += 10
        
        return max(0, min(100, score))
    
    def suggest_index_improvements(self, table_name: str, 
                                 query_patterns: List[str]) -> List[Dict[str, Any]]:
        """
        基于查詢模式建議索引改進(jìn)
        
        Args:
            table_name: 表名
            query_patterns: 查詢模式列表
            
        Returns:
            索引改進(jìn)建議
        """
        suggestions = []
        existing_indexes = self.analyze_table_indexes(table_name)
        
        for pattern in query_patterns:
            pattern_lower = pattern.lower()
            
            # 提取WHERE子句中的列
            where_start = pattern_lower.find('where ')
            if where_start != -1:
                where_clause = pattern_lower[where_start + 6:]
                
                # 簡單提取列名(實(shí)際應(yīng)用中應(yīng)使用SQL解析器)
                import re
                column_matches = re.findall(r'(\w+)\s*[=<>!]', where_clause)
                
                for column in column_matches:
                    # 檢查是否已有索引
                    has_index = False
                    for idx in existing_indexes:
                        if column in idx['index_columns'].lower():
                            has_index = True
                            break
                    
                    if not has_index:
                        suggestions.append({
                            'table': table_name,
                            'column': column,
                            'suggestion': f'在 {column} 列上創(chuàng)建索引',
                            'estimated_impact': 'HIGH',
                            'sql': f'CREATE INDEX idx_{table_name}_{column} ON {table_name}({column})'
                        })
        
        return suggestions


class PostgreSQLConfigOptimizer:
    """PostgreSQL配置優(yōu)化器"""
    
    def __init__(self, dsn: str):
        self.conn = psycopg2.connect(dsn)
        self.cursor = self.conn.cursor(cursor_factory=DictCursor)
    
    def analyze_current_config(self) -> Dict[str, Any]:
        """
        分析當(dāng)前配置
        
        Returns:
            配置分析結(jié)果
        """
        config_queries = {
            'basic_settings': """
                SELECT name, setting, unit, context, vartype
                FROM pg_settings
                WHERE name IN (
                    'shared_buffers', 'work_mem', 'maintenance_work_mem',
                    'effective_cache_size', 'max_connections'
                )
            """,
            'performance_settings': """
                SELECT name, setting, unit, context, vartype
                FROM pg_settings
                WHERE name LIKE '%cost%' OR name LIKE '%join%' OR name LIKE '%parallel%'
                ORDER BY name
            """,
            'wal_settings': """
                SELECT name, setting, unit, context, vartype
                FROM pg_settings
                WHERE name LIKE 'wal_%'
                ORDER BY name
            """,
            'statistics': """
                SELECT 
                    datname as database_name,
                    numbackends as active_connections,
                    xact_commit as transactions_committed,
                    xact_rollback as transactions_rolled_back,
                    blks_read as blocks_read,
                    blks_hit as blocks_hit,
                    tup_returned as tuples_returned,
                    tup_fetched as tuples_fetched,
                    tup_inserted as tuples_inserted,
                    tup_updated as tuples_updated,
                    tup_deleted as tuples_deleted
                FROM pg_stat_database
                WHERE datname = current_database()
            """
        }
        
        analysis = {}
        
        try:
            for category, query in config_queries.items():
                self.cursor.execute(query)
                analysis[category] = self.cursor.fetchall()
            
            # 計算緩沖區(qū)命中率
            if 'statistics' in analysis and analysis['statistics']:
                stats = analysis['statistics'][0]
                blocks_hit = stats['blocks_hit']
                blocks_read = stats['blocks_read']
                total_blocks = blocks_hit + blocks_read
                
                if total_blocks > 0:
                    analysis['buffer_hit_ratio'] = blocks_hit / total_blocks
                else:
                    analysis['buffer_hit_ratio'] = 0
            
            return analysis
            
        except Exception as e:
            logger.error(f"分析配置失敗: {e}")
            return {}
    
    def generate_config_recommendations(self, 
                                      system_memory_gb: float = 16,
                                      expected_connections: int = 100) -> List[Dict[str, Any]]:
        """
        生成配置優(yōu)化建議
        
        Args:
            system_memory_gb: 系統(tǒng)總內(nèi)存(GB)
            expected_connections: 預(yù)期最大連接數(shù)
            
        Returns:
            配置優(yōu)化建議列表
        """
        recommendations = []
        current_config = self.analyze_current_config()
        
        # 共享緩沖區(qū)建議(通常為系統(tǒng)內(nèi)存的25%)
        recommended_shared_buffers = f"{int(system_memory_gb * 0.25 * 1024)}MB"
        recommendations.append({
            'parameter': 'shared_buffers',
            'current_value': self._get_config_value(current_config, 'shared_buffers'),
            'recommended_value': recommended_shared_buffers,
            'reason': f'設(shè)置為系統(tǒng)內(nèi)存({system_memory_gb}GB)的25%以優(yōu)化緩存性能',
            'impact': 'HIGH'
        })
        
        # 工作內(nèi)存建議
        recommended_work_mem = f"{int(system_memory_gb * 1024 / expected_connections / 4)}MB"
        recommendations.append({
            'parameter': 'work_mem',
            'current_value': self._get_config_value(current_config, 'work_mem'),
            'recommended_value': recommended_work_mem,
            'reason': f'基于{expected_connections}個并發(fā)連接和系統(tǒng)內(nèi)存計算',
            'impact': 'MEDIUM'
        })
        
        # 維護(hù)工作內(nèi)存建議
        recommended_maintenance_work_mem = f"{int(system_memory_gb * 0.1 * 1024)}MB"
        recommendations.append({
            'parameter': 'maintenance_work_mem',
            'current_value': self._get_config_value(current_config, 'maintenance_work_mem'),
            'recommended_value': recommended_maintenance_work_mem,
            'reason': '設(shè)置為系統(tǒng)內(nèi)存的10%以優(yōu)化維護(hù)操作性能',
            'impact': 'MEDIUM'
        })
        
        # 有效緩存大小建議
        recommended_effective_cache_size = f"{int(system_memory_gb * 0.5 * 1024)}MB"
        recommendations.append({
            'parameter': 'effective_cache_size',
            'current_value': self._get_config_value(current_config, 'effective_cache_size'),
            'recommended_value': recommended_effective_cache_size,
            'reason': '設(shè)置為系統(tǒng)內(nèi)存的50%以幫助查詢規(guī)劃器做出更好的決策',
            'impact': 'MEDIUM'
        })
        
        # 基于緩沖區(qū)命中率的建議
        hit_ratio = current_config.get('buffer_hit_ratio', 0)
        if hit_ratio < 0.9:
            recommendations.append({
                'parameter': 'BUFFER_HIT_RATIO',
                'current_value': f'{hit_ratio:.2%}',
                'recommended_value': '>90%',
                'reason': '緩沖區(qū)命中率較低,可能影響查詢性能',
                'impact': 'HIGH',
                'additional_suggestions': [
                    '增加shared_buffers',
                    '優(yōu)化熱點(diǎn)查詢',
                    '考慮使用pg_prewarm擴(kuò)展'
                ]
            })
        
        return recommendations
    
    def _get_config_value(self, config_analysis: Dict[str, Any], 
                         param_name: str) -> str:
        """獲取配置參數(shù)值"""
        if 'basic_settings' in config_analysis:
            for setting in config_analysis['basic_settings']:
                if setting['name'] == param_name:
                    return f"{setting['setting']} {setting['unit'] or ''}".strip()
        
        return '未找到'
    
    def close(self):
        """關(guān)閉連接"""
        if self.cursor:
            self.cursor.close()
        if self.conn:
            self.conn.close()


def comprehensive_performance_analysis(dsn: str):
    """綜合性能分析示例"""
    print("=" * 60)
    print("PostgreSQL性能綜合分析")
    print("=" * 60)
    
    # 1. 查詢性能分析
    print("\n1. 查詢性能分析")
    print("-" * 40)
    
    analyzer = QueryPerformanceAnalyzer(dsn)
    
    test_query = """
    SELECT 
        u.username,
        COUNT(o.id) as order_count,
        SUM(o.amount) as total_amount,
        AVG(o.amount) as avg_amount
    FROM users u
    JOIN orders o ON u.id = o.user_id
    WHERE u.created_at > CURRENT_DATE - INTERVAL '1 year'
    GROUP BY u.id, u.username
    HAVING COUNT(o.id) > 5
    ORDER BY total_amount DESC
    LIMIT 100
    """
    
    benchmark_results = analyzer.benchmark_query(test_query, iterations=5)
    
    if benchmark_results:
        print(f"平均執(zhí)行時間: {benchmark_results['avg_time']:.3f}秒")
        print(f"最小執(zhí)行時間: {benchmark_results['min_time']:.3f}秒")
        print(f"最大執(zhí)行時間: {benchmark_results['max_time']:.3f}秒")
        print(f"標(biāo)準(zhǔn)差: {benchmark_results['std_dev']:.3f}秒")
        
        # 生成優(yōu)化建議
        suggestions = analyzer.generate_optimization_suggestions(
            test_query, benchmark_results
        )
        
        print(f"\n優(yōu)化建議 ({len(suggestions)}條):")
        for i, suggestion in enumerate(suggestions, 1):
            print(f"{i}. [{suggestion['priority']}] {suggestion['suggestion']}")
    
    # 2. 索引分析
    print("\n2. 索引分析")
    print("-" * 40)
    
    index_optimizer = IndexOptimizer(dsn)
    table_name = "users"  # 假設(shè)的表名
    
    index_analysis = index_optimizer.analyze_table_indexes(table_name)
    
    if index_analysis:
        print(f"表 '{table_name}' 的索引分析:")
        for idx in index_analysis[:5]:  # 顯示前5個索引
            print(f"  - {idx['index_name']}: {idx['index_size']}, "
                  f"使用率: {idx['usage_ratio']:.2f}, "
                  f"效率評分: {idx['efficiency_score']:.1f}")
    
    # 3. 配置分析
    print("\n3. 配置分析")
    print("-" * 40)
    
    config_optimizer = PostgreSQLConfigOptimizer(dsn)
    config_recommendations = config_optimizer.generate_config_recommendations(
        system_memory_gb=16,
        expected_connections=200
    )
    
    print("配置優(yōu)化建議:")
    for rec in config_recommendations:
        print(f"  - {rec['parameter']}: {rec['current_value']} → {rec['recommended_value']}")
    
    # 4. 綜合報告
    print("\n4. 綜合性能報告")
    print("-" * 40)
    
    overall_score = 85.0  # 示例評分
    bottlenecks = [
        "查詢響應(yīng)時間波動較大",
        "部分索引使用率較低",
        "緩沖區(qū)命中率需要優(yōu)化"
    ]
    
    print(f"總體性能評分: {overall_score}/100")
    print("\n主要瓶頸:")
    for bottleneck in bottlenecks:
        print(f"  ? {bottleneck}")
    
    print("\n優(yōu)化優(yōu)先級:")
    print("  1. 優(yōu)化高響應(yīng)時間查詢")
    print("  2. 調(diào)整數(shù)據(jù)庫配置參數(shù)")
    print("  3. 重建低效率索引")
    
    # 清理資源
    analyzer.cursor.close()
    analyzer.conn.close()
    config_optimizer.close()


if __name__ == "__main__":
    # 數(shù)據(jù)庫連接字符串
    dsn = "dbname=testdb user=postgres password=password host=localhost port=5432"
    
    comprehensive_performance_analysis(dsn)

3.2 索引優(yōu)化策略

3.2.1 索引類型選擇矩陣

3.2.2 復(fù)合索引設(shè)計原則

復(fù)合索引的列順序設(shè)計遵循最左前綴原則,選擇性公式為:

選擇性=不同值數(shù)量/總行數(shù)?

索引設(shè)計優(yōu)先級:

  • 高選擇性列(接近1.0)放在前面
  • 經(jīng)常用于WHERE條件的列
  • 用于ORDER BY的列
  • 用于GROUP BY的列

3.3 并發(fā)控制與鎖優(yōu)化

PostgreSQL采用MVCC(多版本并發(fā)控制)機(jī)制,提供多種隔離級別和鎖類型:

"""
PostgreSQL并發(fā)控制與鎖優(yōu)化示例
"""
import psycopg2
from psycopg2.extras import DictCursor
import threading
import time
from typing import List, Dict, Any
import logging
from datetime import datetime

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class ConcurrentTransactionTest:
    """并發(fā)事務(wù)測試"""
    
    def __init__(self, dsn: str):
        self.dsn = dsn
        self.results = []
        self.lock = threading.Lock()
    
    def run_concurrent_updates(self, num_threads: int = 10):
        """
        運(yùn)行并發(fā)更新測試
        
        Args:
            num_threads: 并發(fā)線程數(shù)
        """
        print(f"開始并發(fā)更新測試,線程數(shù): {num_threads}")
        print("=" * 60)
        
        threads = []
        
        # 初始化測試數(shù)據(jù)
        self._setup_test_data()
        
        # 創(chuàng)建并啟動線程
        for i in range(num_threads):
            thread = threading.Thread(
                target=self._update_account_balance,
                args=(i, 100 + i, 50.0),  # 賬戶ID從100開始
                name=f"Transaction-{i}"
            )
            threads.append(thread)
            thread.start()
        
        # 等待所有線程完成
        for thread in threads:
            thread.join()
        
        # 驗(yàn)證結(jié)果
        self._verify_results()
        
        print(f"\n測試完成,總事務(wù)數(shù): {num_threads}")
        print(f"成功事務(wù): {len([r for r in self.results if r['success']])}")
        print(f"失敗事務(wù): {len([r for r in self.results if not r['success']])}")
    
    def _setup_test_data(self):
        """設(shè)置測試數(shù)據(jù)"""
        conn = psycopg2.connect(self.dsn)
        cursor = conn.cursor()
        
        try:
            # 創(chuàng)建測試表
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS accounts (
                    id SERIAL PRIMARY KEY,
                    account_number VARCHAR(50) UNIQUE,
                    balance DECIMAL(15, 2) DEFAULT 0.0,
                    version INTEGER DEFAULT 0,
                    last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                );
            """)
            
            # 插入測試賬戶
            for i in range(10):
                account_id = 100 + i
                cursor.execute("""
                    INSERT INTO accounts (id, account_number, balance, version)
                    VALUES (%s, %s, %s, %s)
                    ON CONFLICT (id) DO UPDATE SET
                        balance = EXCLUDED.balance,
                        version = EXCLUDED.version
                """, (account_id, f"ACC{account_id}", 1000.0, 0))
            
            conn.commit()
            logger.info("測試數(shù)據(jù)設(shè)置完成")
            
        except Exception as e:
            conn.rollback()
            logger.error(f"設(shè)置測試數(shù)據(jù)失敗: {e}")
        finally:
            cursor.close()
            conn.close()
    
    def _update_account_balance(self, thread_id: int, account_id: int, amount: float):
        """
        更新賬戶余額
        
        Args:
            thread_id: 線程ID
            account_id: 賬戶ID
            amount: 更新金額
        """
        conn = None
        cursor = None
        
        try:
            conn = psycopg2.connect(self.dsn)
            cursor = conn.cursor()
            
            # 設(shè)置事務(wù)隔離級別(可測試不同級別)
            cursor.execute("SET TRANSACTION ISOLATION LEVEL READ COMMITTED")
            
            start_time = time.time()
            
            # 方法1: 使用行級鎖
            # cursor.execute("""
            #     SELECT balance FROM accounts 
            #     WHERE id = %s FOR UPDATE
            # """, (account_id,))
            
            # 方法2: 樂觀鎖(版本控制)
            cursor.execute("""
                SELECT balance, version FROM accounts 
                WHERE id = %s
            """, (account_id,))
            
            result = cursor.fetchone()
            if not result:
                raise Exception(f"賬戶 {account_id} 不存在")
            
            current_balance, current_version = result
            new_balance = current_balance + amount
            
            # 模擬一些處理時間
            time.sleep(0.01)
            
            # 使用樂觀鎖更新
            cursor.execute("""
                UPDATE accounts 
                SET balance = %s, 
                    version = version + 1,
                    last_updated = CURRENT_TIMESTAMP
                WHERE id = %s AND version = %s
            """, (new_balance, account_id, current_version))
            
            # 檢查是否更新成功
            if cursor.rowcount == 0:
                # 樂觀鎖沖突,重試或失敗
                conn.rollback()
                success = False
                error_msg = "樂觀鎖沖突,版本不匹配"
            else:
                conn.commit()
                success = True
                error_msg = None
            
            end_time = time.time()
            duration = end_time - start_time
            
            # 記錄結(jié)果
            with self.lock:
                self.results.append({
                    'thread_id': thread_id,
                    'account_id': account_id,
                    'success': success,
                    'duration': duration,
                    'error': error_msg,
                    'start_time': start_time,
                    'end_time': end_time
                })
            
            if success:
                logger.debug(f"線程 {thread_id}: 賬戶 {account_id} 更新成功,耗時 {duration:.3f}秒")
            else:
                logger.warning(f"線程 {thread_id}: 賬戶 {account_id} 更新失敗 - {error_msg}")
            
        except Exception as e:
            if conn:
                conn.rollback()
            
            with self.lock:
                self.results.append({
                    'thread_id': thread_id,
                    'account_id': account_id,
                    'success': False,
                    'duration': time.time() - start_time if 'start_time' in locals() else 0,
                    'error': str(e),
                    'start_time': start_time if 'start_time' in locals() else 0,
                    'end_time': time.time()
                })
            
            logger.error(f"線程 {thread_id} 異常: {e}")
            
        finally:
            if cursor:
                cursor.close()
            if conn:
                conn.close()
    
    def _verify_results(self):
        """驗(yàn)證測試結(jié)果"""
        conn = psycopg2.connect(self.dsn)
        cursor = conn.cursor()
        
        try:
            # 檢查賬戶最終狀態(tài)
            cursor.execute("""
                SELECT id, balance, version, last_updated
                FROM accounts
                WHERE id >= 100 AND id < 110
                ORDER BY id
            """)
            
            accounts = cursor.fetchall()
            
            print("\n賬戶最終狀態(tài):")
            print("-" * 60)
            print(f"{'賬戶ID':<10} {'余額':<15} {'版本':<10} {'最后更新'}")
            print("-" * 60)
            
            for acc in accounts:
                print(f"{acc[0]:<10} {float(acc[1]):<15.2f} {acc[2]:<10} {acc[3]}")
            
            # 分析并發(fā)問題
            success_count = len([r for r in self.results if r['success']])
            total_count = len(self.results)
            
            if success_count < total_count:
                print(f"\n發(fā)現(xiàn)并發(fā)問題: {total_count - success_count} 個事務(wù)失敗")
                print("\n失敗事務(wù)詳情:")
                for result in self.results:
                    if not result['success']:
                        print(f"  線程 {result['thread_id']}: {result['error']}")
            
            # 計算性能指標(biāo)
            if self.results:
                durations = [r['duration'] for r in self.results if r['success']]
                if durations:
                    avg_duration = sum(durations) / len(durations)
                    max_duration = max(durations)
                    min_duration = min(durations)
                    
                    print(f"\n性能指標(biāo):")
                    print(f"  平均事務(wù)時間: {avg_duration:.3f}秒")
                    print(f"  最長事務(wù)時間: {max_duration:.3f}秒")
                    print(f"  最短事務(wù)時間: {min_duration:.3f}秒")
                    print(f"  吞吐量: {success_count / sum(durations):.2f} 事務(wù)/秒")
            
        except Exception as e:
            logger.error(f"驗(yàn)證結(jié)果失敗: {e}")
        finally:
            cursor.close()
            conn.close()


class LockMonitor:
    """鎖監(jiān)控器"""
    
    def __init__(self, dsn: str):
        self.dsn = dsn
    
    def get_current_locks(self) -> List[Dict[str, Any]]:
        """
        獲取當(dāng)前鎖信息
        
        Returns:
            鎖信息列表
        """
        conn = psycopg2.connect(self.dsn)
        cursor = conn.cursor(cursor_factory=DictCursor)
        
        try:
            lock_query = """
            SELECT 
                -- 鎖信息
                pl.pid as process_id,
                pl.mode as lock_mode,
                pl.granted as is_granted,
                pl.fastpath as is_fastpath,
                
                -- 事務(wù)信息
                pa.query as current_query,
                pa.state as query_state,
                pa.wait_event_type as wait_event_type,
                pa.wait_event as wait_event,
                pa.backend_start as backend_start_time,
                pa.xact_start as transaction_start_time,
                pa.query_start as query_start_time,
                
                -- 被鎖對象信息
                pl.relation::regclass as locked_relation,
                pl.page as locked_page,
                pl.tuple as locked_tuple,
                pl.virtualxid as virtual_transaction_id,
                pl.transactionid as transaction_id,
                pl.classid::regclass as locked_class,
                pl.objid as locked_object_id,
                pl.objsubid as locked_object_subid,
                
                -- 等待圖信息
                pg_blocking_pids(pl.pid) as blocking_pids,
                
                -- 附加信息
                now() - pa.query_start as query_duration,
                now() - pa.xact_start as transaction_duration
                
            FROM pg_locks pl
            LEFT JOIN pg_stat_activity pa ON pl.pid = pa.pid
            WHERE pl.pid <> pg_backend_pid()  -- 排除當(dāng)前連接
            ORDER BY 
                pl.granted DESC,  -- 先顯示未授予的鎖
                transaction_duration DESC,
                query_duration DESC
            """
            
            cursor.execute(lock_query)
            locks = cursor.fetchall()
            
            return [
                {
                    'process_id': lock['process_id'],
                    'lock_mode': lock['lock_mode'],
                    'is_granted': lock['is_granted'],
                    'current_query': lock['current_query'][:100] if lock['current_query'] else None,
                    'query_state': lock['query_state'],
                    'locked_relation': lock['locked_relation'],
                    'blocking_pids': lock['blocking_pids'],
                    'query_duration': lock['query_duration'],
                    'transaction_duration': lock['transaction_duration'],
                    'wait_event': lock['wait_event']
                }
                for lock in locks
            ]
            
        except Exception as e:
            logger.error(f"獲取鎖信息失敗: {e}")
            return []
        finally:
            cursor.close()
            conn.close()
    
    def analyze_lock_contention(self) -> Dict[str, Any]:
        """
        分析鎖爭用情況
        
        Returns:
            鎖爭用分析報告
        """
        locks = self.get_current_locks()
        
        analysis = {
            'total_locks': len(locks),
            'granted_locks': len([l for l in locks if l['is_granted']]),
            'waiting_locks': len([l for l in locks if not l['is_granted']]),
            'lock_modes': {},
            'wait_chains': [],
            'long_running_transactions': [],
            'potential_deadlocks': []
        }
        
        # 統(tǒng)計鎖模式
        for lock in locks:
            mode = lock['lock_mode']
            analysis['lock_modes'][mode] = analysis['lock_modes'].get(mode, 0) + 1
        
        # 識別等待鏈
        waiting_processes = [l for l in locks if not l['is_granted']]
        for waiter in waiting_processes:
            if waiter['blocking_pids']:
                chain = {
                    'waiting_pid': waiter['process_id'],
                    'blocking_pids': waiter['blocking_pids'],
                    'lock_mode': waiter['lock_mode'],
                    'wait_time': waiter['query_duration']
                }
                analysis['wait_chains'].append(chain)
        
        # 識別長時間運(yùn)行的事務(wù)
        for lock in locks:
            if lock['transaction_duration'] and lock['transaction_duration'].total_seconds() > 60:
                analysis['long_running_transactions'].append({
                    'pid': lock['process_id'],
                    'duration_seconds': lock['transaction_duration'].total_seconds(),
                    'query': lock['current_query']
                })
        
        return analysis
    
    def kill_blocking_processes(self, threshold_seconds: int = 300):
        """
        終止阻塞時間過長的進(jìn)程
        
        Args:
            threshold_seconds: 阻塞時間閾值(秒)
        """
        conn = psycopg2.connect(self.dsn)
        cursor = conn.cursor()
        
        try:
            # 查找阻塞時間過長的進(jìn)程
            kill_query = """
            SELECT pid, query, now() - xact_start as duration
            FROM pg_stat_activity
            WHERE pid IN (
                SELECT DISTINCT unnest(pg_blocking_pids(pid))
                FROM pg_stat_activity
                WHERE wait_event_type = 'Lock'
                AND state = 'active'
                AND now() - query_start > INTERVAL '%s seconds'
            )
            AND state = 'active'
            """
            
            cursor.execute(kill_query % threshold_seconds)
            blocking_processes = cursor.fetchall()
            
            killed = []
            for proc in blocking_processes:
                pid, query, duration = proc
                try:
                    cursor.execute("SELECT pg_terminate_backend(%s)", (pid,))
                    killed.append({
                        'pid': pid,
                        'duration': duration,
                        'query': query[:100] if query else None
                    })
                    logger.warning(f"終止阻塞進(jìn)程 {pid},已運(yùn)行 {duration}")
                except Exception as e:
                    logger.error(f"終止進(jìn)程 {pid} 失敗: {e}")
            
            conn.commit()
            return killed
            
        except Exception as e:
            conn.rollback()
            logger.error(f"終止阻塞進(jìn)程失敗: {e}")
            return []
        finally:
            cursor.close()
            conn.close()


def test_concurrency_scenarios():
    """測試不同并發(fā)場景"""
    dsn = "dbname=testdb user=postgres password=password host=localhost port=5432"
    
    print("并發(fā)控制測試")
    print("=" * 60)
    
    # 場景1:高并發(fā)更新
    print("\n場景1: 高并發(fā)更新測試")
    test1 = ConcurrentTransactionTest(dsn)
    test1.run_concurrent_updates(num_threads=20)
    
    # 場景2:鎖監(jiān)控
    print("\n\n場景2: 鎖監(jiān)控分析")
    print("-" * 40)
    
    monitor = LockMonitor(dsn)
    lock_analysis = monitor.analyze_lock_contention()
    
    print(f"總鎖數(shù): {lock_analysis['total_locks']}")
    print(f"已授予鎖: {lock_analysis['granted_locks']}")
    print(f"等待鎖: {lock_analysis['waiting_locks']}")
    
    if lock_analysis['wait_chains']:
        print("\n等待鏈:")
        for chain in lock_analysis['wait_chains'][:5]:  # 顯示前5個
            print(f"  進(jìn)程 {chain['waiting_pid']} 等待 {chain['blocking_pids']}")
    
    if lock_analysis['long_running_transactions']:
        print("\n長時間運(yùn)行事務(wù):")
        for txn in lock_analysis['long_running_transactions'][:3]:
            print(f"  進(jìn)程 {txn['pid']}: 已運(yùn)行 {txn['duration_seconds']:.0f}秒")
    
    # 場景3:死鎖處理建議
    print("\n\n場景3: 死鎖預(yù)防建議")
    print("-" * 40)
    
    recommendations = [
        "1. 使用合適的索引減少鎖競爭范圍",
        "2. 保持事務(wù)簡短,盡快提交",
        "3. 使用顯式鎖(SELECT FOR UPDATE)時按固定順序訪問資源",
        "4. 設(shè)置合理的鎖超時(lock_timeout)",
        "5. 考慮使用樂觀鎖(版本控制)替代悲觀鎖",
        "6. 使用較低的隔離級別(READ COMMITTED)",
        "7. 監(jiān)控和調(diào)整max_connections參數(shù)",
        "8. 定期分析并優(yōu)化長時間運(yùn)行的事務(wù)"
    ]
    
    for rec in recommendations:
        print(f"  ? {rec}")


if __name__ == "__main__":
    test_concurrency_scenarios()

4. 高級特性綜合應(yīng)用

4.1 完整示例:電商系統(tǒng)數(shù)據(jù)庫設(shè)計

"""
電商系統(tǒng)PostgreSQL高級特性綜合應(yīng)用示例
"""
import psycopg2
from psycopg2.extras import Json, DictCursor
import json
from typing import List, Dict, Any, Optional
from datetime import datetime, timedelta
import logging
from decimal import Decimal

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class ECommerceDatabase:
    """電商系統(tǒng)數(shù)據(jù)庫設(shè)計"""
    
    def __init__(self, dsn: str):
        self.conn = psycopg2.connect(dsn)
        self.cursor = self.conn.cursor(cursor_factory=DictCursor)
    
    def create_schema(self):
        """創(chuàng)建電商系統(tǒng)數(shù)據(jù)庫架構(gòu)"""
        schema_sql = """
        -- 啟用必要擴(kuò)展
        CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
        CREATE EXTENSION IF NOT EXISTS "pg_trgm";
        CREATE EXTENSION IF NOT EXISTS "btree_gin";
        
        -- 1. 產(chǎn)品表(使用JSONB存儲變體屬性)
        CREATE TABLE IF NOT EXISTS products (
            id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
            sku VARCHAR(100) UNIQUE NOT NULL,
            name VARCHAR(500) NOT NULL,
            description TEXT,
            category_id UUID,
            brand VARCHAR(200),
            
            -- JSONB存儲動態(tài)屬性
            attributes JSONB DEFAULT '{}',
            
            -- 價格信息
            base_price DECIMAL(12, 2) NOT NULL,
            discount_price DECIMAL(12, 2),
            
            -- 庫存信息
            stock_quantity INTEGER DEFAULT 0,
            reserved_quantity INTEGER DEFAULT 0,
            
            -- 搜索優(yōu)化字段
            search_vector TSVECTOR GENERATED ALWAYS AS (
                setweight(to_tsvector('english', coalesce(name, '')), 'A') ||
                setweight(to_tsvector('english', coalesce(description, '')), 'B') ||
                setweight(to_tsvector('english', coalesce(brand, '')), 'C')
            ) STORED,
            
            -- 時間戳
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            
            -- 約束
            CONSTRAINT positive_price CHECK (base_price >= 0),
            CONSTRAINT positive_stock CHECK (stock_quantity >= 0),
            CONSTRAINT valid_discount CHECK (
                discount_price IS NULL OR 
                (discount_price >= 0 AND discount_price <= base_price)
            )
        );
        
        -- 產(chǎn)品表索引
        CREATE INDEX IF NOT EXISTS idx_products_sku ON products(sku);
        CREATE INDEX IF NOT EXISTS idx_products_category ON products(category_id);
        CREATE INDEX IF NOT EXISTS idx_products_price ON products(base_price);
        CREATE INDEX IF NOT EXISTS idx_products_search ON products USING GIN(search_vector);
        CREATE INDEX IF NOT EXISTS idx_products_attributes ON products USING GIN(attributes);
        CREATE INDEX IF NOT EXISTS idx_products_brand ON products(brand);
        
        -- 2. 產(chǎn)品變體表(范圍類型用于尺寸)
        CREATE TABLE IF NOT EXISTS product_variants (
            id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
            product_id UUID REFERENCES products(id) ON DELETE CASCADE,
            variant_name VARCHAR(200) NOT NULL,
            
            -- 使用范圍類型表示尺寸范圍
            size_range numrange,
            weight_range numrange,
            
            -- JSONB存儲變體特定屬性
            variant_attributes JSONB DEFAULT '{}',
            
            -- 價格調(diào)整
            price_adjustment DECIMAL(10, 2) DEFAULT 0,
            additional_cost DECIMAL(10, 2) DEFAULT 0,
            
            -- 庫存跟蹤
            variant_stock INTEGER DEFAULT 0,
            min_order_quantity INTEGER DEFAULT 1,
            max_order_quantity INTEGER,
            
            -- 約束
            CONSTRAINT valid_size_range CHECK (
                size_range IS NULL OR 
                (lower(size_range) >= 0 AND upper(size_range) > lower(size_range))
            ),
            CONSTRAINT valid_quantity CHECK (
                min_order_quantity > 0 AND 
                (max_order_quantity IS NULL OR max_order_quantity >= min_order_quantity)
            )
        );
        
        -- 變體表索引
        CREATE INDEX IF NOT EXISTS idx_variants_product ON product_variants(product_id);
        CREATE INDEX IF NOT EXISTS idx_variants_size ON product_variants USING GIST(size_range);
        
        -- 3. 分類表(使用遞歸CTE支持層級結(jié)構(gòu))
        CREATE TABLE IF NOT EXISTS categories (
            id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
            name VARCHAR(200) NOT NULL,
            slug VARCHAR(200) UNIQUE NOT NULL,
            description TEXT,
            parent_id UUID REFERENCES categories(id),
            sort_order INTEGER DEFAULT 0,
            
            -- JSONB存儲分類屬性
            category_attributes JSONB DEFAULT '{}',
            
            -- 層級路徑(物化路徑模式)
            path VARCHAR(1000),
            level INTEGER DEFAULT 0,
            
            -- 時間戳
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            
            -- 約束
            CONSTRAINT no_self_parent CHECK (id != parent_id)
        );
        
        -- 分類表索引
        CREATE INDEX IF NOT EXISTS idx_categories_parent ON categories(parent_id);
        CREATE INDEX IF NOT EXISTS idx_categories_path ON categories(path);
        CREATE INDEX IF NOT EXISTS idx_categories_slug ON categories(slug);
        
        -- 4. 訂單表(使用分區(qū)表)
        CREATE TABLE IF NOT EXISTS orders_partitioned (
            id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
            order_number VARCHAR(50) UNIQUE NOT NULL,
            customer_id UUID NOT NULL,
            status VARCHAR(50) NOT NULL,
            
            -- JSONB存儲訂單元數(shù)據(jù)
            order_metadata JSONB DEFAULT '{}',
            
            -- 金額信息
            subtotal DECIMAL(12, 2) NOT NULL,
            tax_amount DECIMAL(12, 2) DEFAULT 0,
            shipping_amount DECIMAL(12, 2) DEFAULT 0,
            discount_amount DECIMAL(12, 2) DEFAULT 0,
            total_amount DECIMAL(12, 2) NOT NULL,
            
            -- 時間信息
            ordered_at TIMESTAMP NOT NULL,
            shipped_at TIMESTAMP,
            delivered_at TIMESTAMP,
            
            -- 約束
            CONSTRAINT positive_amounts CHECK (
                subtotal >= 0 AND
                tax_amount >= 0 AND
                shipping_amount >= 0 AND
                discount_amount >= 0 AND
                total_amount >= 0
            )
        ) PARTITION BY RANGE (ordered_at);
        
        -- 創(chuàng)建訂單分區(qū)(每月一個分區(qū))
        CREATE TABLE IF NOT EXISTS orders_2024_01 
        PARTITION OF orders_partitioned
        FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
        
        CREATE TABLE IF NOT EXISTS orders_2024_02 
        PARTITION OF orders_partitioned
        FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');
        
        -- 訂單表索引
        CREATE INDEX IF NOT EXISTS idx_orders_customer ON orders_partitioned(customer_id);
        CREATE INDEX IF NOT EXISTS idx_orders_status ON orders_partitioned(status);
        CREATE INDEX IF NOT EXISTS idx_orders_date ON orders_partitioned(ordered_at);
        CREATE INDEX IF NOT EXISTS idx_orders_metadata ON orders_partitioned USING GIN(order_metadata);
        
        -- 5. 訂單項(xiàng)表
        CREATE TABLE IF NOT EXISTS order_items (
            id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
            order_id UUID REFERENCES orders_partitioned(id) ON DELETE CASCADE,
            product_id UUID REFERENCES products(id),
            variant_id UUID REFERENCES product_variants(id),
            
            -- 產(chǎn)品快照(避免產(chǎn)品信息變更影響歷史訂單)
            product_snapshot JSONB NOT NULL,
            
            -- 購買信息
            quantity INTEGER NOT NULL,
            unit_price DECIMAL(12, 2) NOT NULL,
            discount_percentage DECIMAL(5, 2) DEFAULT 0,
            item_total DECIMAL(12, 2) NOT NULL,
            
            -- 約束
            CONSTRAINT positive_quantity CHECK (quantity > 0),
            CONSTRAINT positive_unit_price CHECK (unit_price >= 0)
        );
        
        -- 訂單項(xiàng)索引
        CREATE INDEX IF NOT EXISTS idx_order_items_order ON order_items(order_id);
        CREATE INDEX IF NOT EXISTS idx_order_items_product ON order_items(product_id);
        
        -- 6. 客戶表
        CREATE TABLE IF NOT EXISTS customers (
            id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
            email VARCHAR(255) UNIQUE NOT NULL,
            phone VARCHAR(50),
            first_name VARCHAR(100),
            last_name VARCHAR(100),
            
            -- JSONB存儲客戶屬性
            customer_profile JSONB DEFAULT '{}',
            
            -- 地址信息(使用JSONB存儲多個地址)
            addresses JSONB DEFAULT '[]',
            
            -- 賬戶信息
            is_active BOOLEAN DEFAULT TRUE,
            loyalty_points INTEGER DEFAULT 0,
            customer_tier VARCHAR(50) DEFAULT 'STANDARD',
            
            -- 時間戳
            registered_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            last_login_at TIMESTAMP,
            
            -- 約束
            CONSTRAINT valid_email CHECK (email ~* '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Z|a-z]{2,}$')
        );
        
        -- 客戶表索引
        CREATE INDEX IF NOT EXISTS idx_customers_email ON customers(email);
        CREATE INDEX IF NOT EXISTS idx_customers_name ON customers(last_name, first_name);
        CREATE INDEX IF NOT EXISTS idx_customers_profile ON customers USING GIN(customer_profile);
        CREATE INDEX IF NOT EXISTS idx_customers_tier ON customers(customer_tier);
        
        -- 7. 庫存變更日志(使用BRIN索引)
        CREATE TABLE IF NOT EXISTS inventory_logs (
            id BIGSERIAL PRIMARY KEY,
            product_id UUID REFERENCES products(id),
            variant_id UUID REFERENCES product_variants(id),
            
            -- 變更信息
            change_type VARCHAR(50) NOT NULL,
            quantity_change INTEGER NOT NULL,
            previous_quantity INTEGER,
            new_quantity INTEGER,
            
            -- 關(guān)聯(lián)信息
            reference_id UUID,
            reference_type VARCHAR(100),
            
            -- 時間戳
            changed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            changed_by UUID,
            
            -- 備注
            notes TEXT
        ) WITH (fillfactor = 90);
        
        -- 庫存日志索引(使用BRIN適合時間序列)
        CREATE INDEX IF NOT EXISTS idx_inventory_logs_time 
        ON inventory_logs USING BRIN(changed_at);
        
        CREATE INDEX IF NOT EXISTS idx_inventory_logs_product 
        ON inventory_logs(product_id, changed_at);
        
        -- 8. 產(chǎn)品評論表(使用數(shù)組和全文搜索)
        CREATE TABLE IF NOT EXISTS product_reviews (
            id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
            product_id UUID REFERENCES products(id) ON DELETE CASCADE,
            customer_id UUID REFERENCES customers(id),
            order_item_id UUID REFERENCES order_items(id),
            
            -- 評分
            rating INTEGER NOT NULL CHECK (rating >= 1 AND rating <= 5),
            
            -- 評論內(nèi)容
            title VARCHAR(500),
            review_text TEXT NOT NULL,
            
            -- 數(shù)組存儲優(yōu)點(diǎn)/缺點(diǎn)
            pros TEXT[],
            cons TEXT[],
            
            -- 元數(shù)據(jù)
            is_verified_purchase BOOLEAN DEFAULT FALSE,
            helpful_votes INTEGER DEFAULT 0,
            not_helpful_votes INTEGER DEFAULT 0,
            
            -- 全文搜索向量
            review_vector TSVECTOR GENERATED ALWAYS AS (
                to_tsvector('english', 
                    coalesce(title, '') || ' ' || 
                    coalesce(review_text, '')
                )
            ) STORED,
            
            -- 時間戳
            reviewed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            
            -- 約束
            CONSTRAINT one_review_per_order_item UNIQUE (order_item_id)
        );
        
        -- 評論表索引
        CREATE INDEX IF NOT EXISTS idx_reviews_product ON product_reviews(product_id);
        CREATE INDEX IF NOT EXISTS idx_reviews_rating ON product_reviews(rating);
        CREATE INDEX IF NOT EXISTS idx_reviews_search ON product_reviews USING GIN(review_vector);
        CREATE INDEX IF NOT EXISTS idx_reviews_pros ON product_reviews USING GIN(pros);
        CREATE INDEX IF NOT EXISTS idx_reviews_date ON product_reviews(reviewed_at);
        
        -- 9. 促銷規(guī)則表(使用復(fù)雜約束和JSONB)
        CREATE TABLE IF NOT EXISTS promotion_rules (
            id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
            name VARCHAR(200) NOT NULL,
            description TEXT,
            
            -- 規(guī)則條件(JSONB存儲復(fù)雜規(guī)則)
            conditions JSONB NOT NULL,
            
            -- 折扣信息
            discount_type VARCHAR(50) NOT NULL,
            discount_value DECIMAL(10, 2),
            discount_percentage DECIMAL(5, 2),
            max_discount_amount DECIMAL(12, 2),
            
            -- 時間范圍
            valid_from TIMESTAMP NOT NULL,
            valid_until TIMESTAMP,
            
            -- 使用限制
            usage_limit INTEGER,
            per_customer_limit INTEGER,
            minimum_order_amount DECIMAL(12, 2),
            
            -- 狀態(tài)
            is_active BOOLEAN DEFAULT TRUE,
            
            -- 元數(shù)據(jù)
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            
            -- 約束
            CONSTRAINT valid_discount_values CHECK (
                (discount_type = 'AMOUNT' AND discount_value IS NOT NULL) OR
                (discount_type = 'PERCENTAGE' AND discount_percentage IS NOT NULL)
            ),
            CONSTRAINT valid_discount_percentage CHECK (
                discount_percentage IS NULL OR 
                (discount_percentage >= 0 AND discount_percentage <= 100)
            )
        );
        
        -- 促銷規(guī)則索引
        CREATE INDEX IF NOT EXISTS idx_promotions_active 
        ON promotion_rules(is_active, valid_from, valid_until);
        
        CREATE INDEX IF NOT EXISTS idx_promotions_conditions 
        ON promotion_rules USING GIN(conditions);
        
        -- 10. 物化視圖:產(chǎn)品統(tǒng)計
        CREATE MATERIALIZED VIEW IF NOT EXISTS product_statistics AS
        SELECT 
            p.id as product_id,
            p.name as product_name,
            p.category_id,
            p.base_price,
            
            -- 銷售統(tǒng)計
            COUNT(DISTINCT oi.order_id) as total_orders,
            SUM(oi.quantity) as total_units_sold,
            SUM(oi.item_total) as total_revenue,
            
            -- 庫存統(tǒng)計
            p.stock_quantity,
            p.reserved_quantity,
            p.stock_quantity - p.reserved_quantity as available_quantity,
            
            -- 評分統(tǒng)計
            COALESCE(AVG(pr.rating), 0) as average_rating,
            COUNT(pr.id) as review_count,
            
            -- 時間統(tǒng)計
            MAX(o.ordered_at) as last_sale_date
            
        FROM products p
        LEFT JOIN order_items oi ON p.id = oi.product_id
        LEFT JOIN orders_partitioned o ON oi.order_id = o.id
        LEFT JOIN product_reviews pr ON p.id = pr.product_id
        
        GROUP BY p.id, p.name, p.category_id, p.base_price,
                 p.stock_quantity, p.reserved_quantity
        
        WITH DATA;
        
        -- 物化視圖索引
        CREATE UNIQUE INDEX IF NOT EXISTS idx_product_stats_product 
        ON product_statistics(product_id);
        
        CREATE INDEX IF NOT EXISTS idx_product_stats_category 
        ON product_statistics(category_id);
        
        CREATE INDEX IF NOT EXISTS idx_product_stats_revenue 
        ON product_statistics(total_revenue DESC);
        
        -- 創(chuàng)建刷新物化視圖的函數(shù)
        CREATE OR REPLACE FUNCTION refresh_product_statistics()
        RETURNS TRIGGER AS $$
        BEGIN
            REFRESH MATERIALIZED VIEW CONCURRENTLY product_statistics;
            RETURN NULL;
        END;
        $$ LANGUAGE plpgsql;
        """
        
        try:
            # 分步執(zhí)行架構(gòu)創(chuàng)建
            statements = schema_sql.split(';')
            for statement in statements:
                statement = statement.strip()
                if statement:
                    self.cursor.execute(statement)
            
            self.conn.commit()
            logger.info("電商系統(tǒng)數(shù)據(jù)庫架構(gòu)創(chuàng)建成功")
            
        except Exception as e:
            self.conn.rollback()
            logger.error(f"創(chuàng)建架構(gòu)失敗: {e}")
            raise
    
    def search_products(
        self,
        query: Optional[str] = None,
        category_id: Optional[str] = None,
        min_price: Optional[float] = None,
        max_price: Optional[float] = None,
        brands: Optional[List[str]] = None,
        min_rating: Optional[float] = None,
        in_stock_only: bool = False,
        sort_by: str = 'relevance',
        limit: int = 20,
        offset: int = 0
    ) -> List[Dict[str, Any]]:
        """
        高級產(chǎn)品搜索
        
        Args:
            query: 搜索關(guān)鍵詞
            category_id: 分類ID
            min_price: 最低價格
            max_price: 最高價格
            brands: 品牌列表
            min_rating: 最低評分
            in_stock_only: 僅顯示有貨商品
            sort_by: 排序方式
            limit: 返回數(shù)量
            offset: 偏移量
            
        Returns:
            產(chǎn)品列表
        """
        search_sql = """
        SELECT 
            p.id,
            p.sku,
            p.name,
            p.description,
            p.brand,
            p.base_price,
            p.discount_price,
            p.stock_quantity,
            p.reserved_quantity,
            p.attributes,
            
            -- 計算可用庫存
            p.stock_quantity - p.reserved_quantity as available_quantity,
            
            -- 計算折扣率
            CASE 
                WHEN p.discount_price IS NOT NULL 
                THEN ROUND((1 - p.discount_price / p.base_price) * 100, 1)
                ELSE 0
            END as discount_percentage,
            
            -- 獲取評分信息
            COALESCE(ps.average_rating, 0) as average_rating,
            COALESCE(ps.review_count, 0) as review_count,
            
            -- 計算相關(guān)性得分(如果有關(guān)鍵詞)
            {relevance_score}
            
        FROM products p
        LEFT JOIN product_statistics ps ON p.id = ps.product_id
        
        WHERE 1=1
            {search_condition}
            {category_condition}
            {price_condition}
            {brand_condition}
            {rating_condition}
            {stock_condition}
            
        {order_clause}
        
        LIMIT %s OFFSET %s
        """
        
        # 構(gòu)建查詢條件
        conditions = []
        params = []
        
        # 全文搜索條件
        if query:
            conditions.append("p.search_vector @@ plainto_tsquery('english', %s)")
            params.append(query)
        
        # 分類條件
        if category_id:
            # 獲取分類及其所有子分類
            subcategories = self._get_all_subcategories(category_id)
            if subcategories:
                placeholders = ', '.join(['%s'] * len(subcategories))
                conditions.append(f"p.category_id IN ({placeholders})")
                params.extend(subcategories)
        
        # 價格條件
        if min_price is not None:
            conditions.append("p.base_price >= %s")
            params.append(min_price)
        if max_price is not None:
            conditions.append("p.base_price <= %s")
            params.append(max_price)
        
        # 品牌條件
        if brands:
            placeholders = ', '.join(['%s'] * len(brands))
            conditions.append(f"p.brand IN ({placeholders})")
            params.extend(brands)
        
        # 評分條件
        if min_rating is not None:
            conditions.append("COALESCE(ps.average_rating, 0) >= %s")
            params.append(min_rating)
        
        # 庫存條件
        if in_stock_only:
            conditions.append("(p.stock_quantity - p.reserved_quantity) > 0")
        
        # 構(gòu)建相關(guān)性得分計算
        relevance_score = ""
        if query:
            relevance_score = """
            , ts_rank(
                p.search_vector, 
                plainto_tsquery('english', %s)
            ) as relevance_score
            """
        
        # 構(gòu)建排序子句
        order_clause_map = {
            'relevance': "ORDER BY relevance_score DESC NULLS LAST",
            'price_asc': "ORDER BY p.base_price ASC",
            'price_desc': "ORDER BY p.base_price DESC",
            'rating': "ORDER BY ps.average_rating DESC NULLS LAST",
            'popularity': "ORDER BY ps.total_units_sold DESC NULLS LAST",
            'newest': "ORDER BY p.created_at DESC"
        }
        
        order_clause = order_clause_map.get(sort_by, "ORDER BY p.created_at DESC")
        
        # 格式化SQL
        formatted_sql = search_sql.format(
            relevance_score=relevance_score,
            search_condition=f"AND {' AND '.join(conditions)}" if conditions else "",
            category_condition="",
            price_condition="",
            brand_condition="",
            rating_condition="",
            stock_condition="",
            order_clause=order_clause
        )
        
        # 添加分頁參數(shù)
        params.extend([limit, offset])
        
        try:
            self.cursor.execute(formatted_sql, params)
            products = self.cursor.fetchall()
            
            return [
                {
                    'id': str(product['id']),
                    'sku': product['sku'],
                    'name': product['name'],
                    'brand': product['brand'],
                    'base_price': float(product['base_price']),
                    'discount_price': float(product['discount_price']) if product['discount_price'] else None,
                    'available_quantity': product['available_quantity'],
                    'discount_percentage': product['discount_percentage'],
                    'average_rating': float(product['average_rating']),
                    'review_count': product['review_count'],
                    'attributes': product['attributes']
                }
                for product in products
            ]
            
        except Exception as e:
            logger.error(f"產(chǎn)品搜索失敗: {e}")
            return []
    
    def _get_all_subcategories(self, category_id: str) -> List[str]:
        """
        獲取分類及其所有子分類
        
        Args:
            category_id: 分類ID
            
        Returns:
            子分類ID列表
        """
        recursive_sql = """
        WITH RECURSIVE category_tree AS (
            -- 基礎(chǔ)分類
            SELECT id, parent_id
            FROM categories
            WHERE id = %s
            
            UNION ALL
            
            -- 遞歸獲取子分類
            SELECT c.id, c.parent_id
            FROM categories c
            INNER JOIN category_tree ct ON c.parent_id = ct.id
        )
        SELECT id FROM category_tree
        """
        
        try:
            self.cursor.execute(recursive_sql, (category_id,))
            results = self.cursor.fetchall()
            return [str(row['id']) for row in results]
        except Exception as e:
            logger.error(f"獲取子分類失敗: {e}")
            return []
    
    def get_product_recommendations(
        self, 
        product_id: str, 
        customer_id: Optional[str] = None,
        limit: int = 10
    ) -> List[Dict[str, Any]]:
        """
        獲取產(chǎn)品推薦
        
        Args:
            product_id: 產(chǎn)品ID
            customer_id: 客戶ID(可選)
            limit: 返回數(shù)量
            
        Returns:
            推薦產(chǎn)品列表
        """
        recommendations_sql = """
        -- 基于多種策略的混合推薦
        
        (
            -- 策略1: 同品牌產(chǎn)品
            SELECT 
                p.id,
                p.name,
                p.brand,
                p.base_price,
                'same_brand' as recommendation_reason,
                0.7 as recommendation_score
            FROM products p
            WHERE p.brand = (
                SELECT brand FROM products WHERE id = %s
            )
            AND p.id != %s
            AND (p.stock_quantity - p.reserved_quantity) > 0
            LIMIT 3
        )
        
        UNION ALL
        
        (
            -- 策略2: 同分類熱門產(chǎn)品
            SELECT 
                p.id,
                p.name,
                p.brand,
                p.base_price,
                'popular_in_category' as recommendation_reason,
                ps.total_units_sold::float / 
                    (SELECT MAX(total_units_sold) FROM product_statistics) as recommendation_score
            FROM products p
            JOIN product_statistics ps ON p.id = ps.product_id
            WHERE p.category_id = (
                SELECT category_id FROM products WHERE id = %s
            )
            AND p.id != %s
            AND (p.stock_quantity - p.reserved_quantity) > 0
            ORDER BY ps.total_units_sold DESC
            LIMIT 3
        )
        
        UNION ALL
        
        (
            -- 策略3: 經(jīng)常一起購買的產(chǎn)品
            SELECT 
                p.id,
                p.name,
                p.brand,
                p.base_price,
                'frequently_bought_together' as recommendation_reason,
                COUNT(DISTINCT oi2.order_id)::float / 
                    (SELECT COUNT(DISTINCT oi3.order_id) 
                     FROM order_items oi3 
                     WHERE oi3.product_id = %s) as recommendation_score
            FROM order_items oi1
            JOIN order_items oi2 ON oi1.order_id = oi2.order_id
            JOIN products p ON oi2.product_id = p.id
            WHERE oi1.product_id = %s
            AND oi2.product_id != %s
            AND (p.stock_quantity - p.reserved_quantity) > 0
            GROUP BY p.id, p.name, p.brand, p.base_price
            HAVING COUNT(DISTINCT oi2.order_id) >= 2
            ORDER BY recommendation_score DESC
            LIMIT 3
        )
        
        {customer_based_recommendations}
        
        ORDER BY recommendation_score DESC
        LIMIT %s
        """
        
        # 如果有客戶ID,添加個性化推薦
        customer_recommendations = ""
        if customer_id:
            customer_recommendations = """
            UNION ALL
            
            (
                -- 策略4: 基于客戶購買歷史的推薦
                SELECT 
                    p.id,
                    p.name,
                    p.brand,
                    p.base_price,
                    'based_on_your_purchases' as recommendation_reason,
                    COUNT(DISTINCT oi.order_id)::float / 10 as recommendation_score
                FROM order_items oi
                JOIN products p ON oi.product_id = p.id
                WHERE oi.order_id IN (
                    SELECT order_id 
                    FROM order_items 
                    WHERE product_id = %s
                )
                AND p.id != %s
                AND (p.stock_quantity - p.reserved_quantity) > 0
                GROUP BY p.id, p.name, p.brand, p.base_price
                HAVING COUNT(DISTINCT oi.order_id) >= 1
                ORDER BY recommendation_score DESC
                LIMIT 2
            )
            """
        
        # 格式化SQL
        formatted_sql = recommendations_sql.format(
            customer_based_recommendations=customer_recommendations
        )
        
        # 準(zhǔn)備參數(shù)
        params = [product_id, product_id, product_id, product_id, 
                 product_id, product_id, product_id]
        
        if customer_id:
            params.extend([customer_id, customer_id, product_id, product_id])
        
        params.append(limit)
        
        try:
            self.cursor.execute(formatted_sql, params)
            recommendations = self.cursor.fetchall()
            
            return [
                {
                    'id': str(rec['id']),
                    'name': rec['name'],
                    'brand': rec['brand'],
                    'price': float(rec['base_price']),
                    'recommendation_reason': rec['recommendation_reason'],
                    'recommendation_score': float(rec['recommendation_score'])
                }
                for rec in recommendations
            ]
            
        except Exception as e:
            logger.error(f"獲取推薦失敗: {e}")
            return []
    
    def create_order(
        self,
        customer_id: str,
        items: List[Dict[str, Any]],
        shipping_address: Dict[str, Any],
        promotion_code: Optional[str] = None
    ) -> Optional[Dict[str, Any]]:
        """
        創(chuàng)建訂單
        
        Args:
            customer_id: 客戶ID
            items: 訂單項(xiàng)列表
            shipping_address: 配送地址
            promotion_code: 促銷代碼
            
        Returns:
            創(chuàng)建的訂單信息
        """
        try:
            # 開始事務(wù)
            self.conn.autocommit = False
            
            # 生成訂單號
            order_number = f"ORD-{datetime.now().strftime('%Y%m%d')}-{self._generate_order_suffix()}"
            
            # 計算訂單金額
            order_calculation = self._calculate_order_amounts(items, promotion_code)
            
            # 插入訂單
            order_sql = """
            INSERT INTO orders_partitioned (
                order_number, customer_id, status, order_metadata,
                subtotal, tax_amount, shipping_amount, 
                discount_amount, total_amount, ordered_at
            )
            VALUES (%s, %s, 'PENDING', %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP)
            RETURNING id, order_number, total_amount
            """
            
            self.cursor.execute(
                order_sql,
                (
                    order_number,
                    customer_id,
                    Json({
                        'shipping_address': shipping_address,
                        'promotion_code': promotion_code
                    }),
                    order_calculation['subtotal'],
                    order_calculation['tax_amount'],
                    order_calculation['shipping_amount'],
                    order_calculation['discount_amount'],
                    order_calculation['total_amount']
                )
            )
            
            order_result = self.cursor.fetchone()
            order_id = order_result['id']
            
            # 插入訂單項(xiàng)
            for item in items:
                # 獲取產(chǎn)品快照
                product_snapshot = self._get_product_snapshot(item['product_id'])
                
                # 插入訂單項(xiàng)
                item_sql = """
                INSERT INTO order_items (
                    order_id, product_id, variant_id,
                    product_snapshot, quantity, unit_price,
                    discount_percentage, item_total
                )
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
                """
                
                self.cursor.execute(
                    item_sql,
                    (
                        order_id,
                        item['product_id'],
                        item.get('variant_id'),
                        Json(product_snapshot),
                        item['quantity'],
                        item['unit_price'],
                        item.get('discount_percentage', 0),
                        item['item_total']
                    )
                )
                
                # 更新庫存
                self._update_inventory(
                    product_id=item['product_id'],
                    variant_id=item.get('variant_id'),
                    quantity_change=-item['quantity'],
                    reference_id=order_id,
                    reference_type='ORDER',
                    change_type='SALE'
                )
            
            # 提交事務(wù)
            self.conn.commit()
            
            return {
                'order_id': str(order_id),
                'order_number': order_result['order_number'],
                'total_amount': float(order_result['total_amount']),
                'items_count': len(items)
            }
            
        except Exception as e:
            self.conn.rollback()
            logger.error(f"創(chuàng)建訂單失敗: {e}")
            return None
    
    def _calculate_order_amounts(
        self,
        items: List[Dict[str, Any]],
        promotion_code: Optional[str] = None
    ) -> Dict[str, float]:
        """計算訂單金額"""
        subtotal = sum(item['item_total'] for item in items)
        
        # 應(yīng)用促銷折扣
        discount_amount = 0
        if promotion_code:
            # 這里可以添加促銷邏輯
            pass
        
        # 計算稅費(fèi)(簡化示例)
        tax_amount = subtotal * 0.1  # 10%稅率
        
        # 計算運(yùn)費(fèi)(簡化示例)
        shipping_amount = 5.99 if subtotal < 50 else 0
        
        # 計算總額
        total_amount = subtotal + tax_amount + shipping_amount - discount_amount
        
        return {
            'subtotal': subtotal,
            'tax_amount': tax_amount,
            'shipping_amount': shipping_amount,
            'discount_amount': discount_amount,
            'total_amount': total_amount
        }
    
    def _get_product_snapshot(self, product_id: str) -> Dict[str, Any]:
        """獲取產(chǎn)品快照"""
        snapshot_sql = """
        SELECT 
            id, sku, name, description, brand,
            base_price, attributes
        FROM products
        WHERE id = %s
        """
        
        self.cursor.execute(snapshot_sql, (product_id,))
        product = self.cursor.fetchone()
        
        return {
            'product_id': str(product['id']),
            'sku': product['sku'],
            'name': product['name'],
            'brand': product['brand'],
            'price_at_time_of_purchase': float(product['base_price']),
            'attributes': product['attributes']
        }
    
    def _update_inventory(
        self,
        product_id: str,
        variant_id: Optional[str],
        quantity_change: int,
        reference_id: str,
        reference_type: str,
        change_type: str
    ):
        """更新庫存"""
        # 獲取當(dāng)前庫存
        if variant_id:
            stock_sql = """
            SELECT variant_stock as current_quantity
            FROM product_variants
            WHERE id = %s
            """
            self.cursor.execute(stock_sql, (variant_id,))
        else:
            stock_sql = """
            SELECT stock_quantity as current_quantity
            FROM products
            WHERE id = %s
            """
            self.cursor.execute(stock_sql, (product_id,))
        
        result = self.cursor.fetchone()
        if not result:
            raise Exception("產(chǎn)品不存在")
        
        current_quantity = result['current_quantity']
        new_quantity = current_quantity + quantity_change
        
        if new_quantity < 0:
            raise Exception("庫存不足")
        
        # 更新庫存
        if variant_id:
            update_sql = """
            UPDATE product_variants
            SET variant_stock = %s
            WHERE id = %s
            """
            self.cursor.execute(update_sql, (new_quantity, variant_id))
        else:
            update_sql = """
            UPDATE products
            SET stock_quantity = %s
            WHERE id = %s
            """
            self.cursor.execute(update_sql, (new_quantity, product_id))
        
        # 記錄庫存變更
        log_sql = """
        INSERT INTO inventory_logs (
            product_id, variant_id, change_type,
            quantity_change, previous_quantity, new_quantity,
            reference_id, reference_type
        )
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
        """
        
        self.cursor.execute(
            log_sql,
            (
                product_id,
                variant_id,
                change_type,
                quantity_change,
                current_quantity,
                new_quantity,
                reference_id,
                reference_type
            )
        )
    
    def _generate_order_suffix(self) -> str:
        """生成訂單號后綴"""
        import random
        import string
        
        # 生成6位隨機(jī)字母數(shù)字
        return ''.join(random.choices(
            string.ascii_uppercase + string.digits, k=6
        ))
    
    def refresh_materialized_views(self):
        """刷新物化視圖"""
        refresh_sql = """
        REFRESH MATERIALIZED VIEW CONCURRENTLY product_statistics;
        """
        
        try:
            self.cursor.execute(refresh_sql)
            self.conn.commit()
            logger.info("物化視圖刷新成功")
        except Exception as e:
            self.conn.rollback()
            logger.error(f"刷新物化視圖失敗: {e}")
    
    def close(self):
        """關(guān)閉連接"""
        if self.cursor:
            self.cursor.close()
        if self.conn:
            self.conn.close()
        logger.info("數(shù)據(jù)庫連接已關(guān)閉")


def demonstrate_ecommerce_features():
    """演示電商系統(tǒng)高級特性"""
    dsn = "dbname=ecommerce user=postgres password=password host=localhost port=5432"
    
    print("電商系統(tǒng)PostgreSQL高級特性演示")
    print("=" * 60)
    
    # 創(chuàng)建數(shù)據(jù)庫實(shí)例
    db = ECommerceDatabase(dsn)
    
    try:
        # 1. 創(chuàng)建架構(gòu)
        print("\n1. 創(chuàng)建數(shù)據(jù)庫架構(gòu)...")
        db.create_schema()
        print("   架構(gòu)創(chuàng)建完成")
        
        # 2. 演示搜索功能
        print("\n2. 演示高級產(chǎn)品搜索...")
        products = db.search_products(
            query="wireless headphone",
            min_price=50,
            max_price=200,
            min_rating=4.0,
            in_stock_only=True,
            sort_by='rating',
            limit=5
        )
        
        print(f"   找到 {len(products)} 個產(chǎn)品:")
        for product in products:
            print(f"   - {product['name']} (評分: {product['average_rating']:.1f}, "
                  f"價格: ${product['base_price']:.2f})")
        
        # 3. 演示推薦系統(tǒng)
        if products:
            print("\n3. 演示產(chǎn)品推薦系統(tǒng)...")
            recommendations = db.get_product_recommendations(
                product_id=products[0]['id'],
                limit=5
            )
            
            print(f"   為 '{products[0]['name']}' 的推薦:")
            for rec in recommendations:
                print(f"   - {rec['name']} (原因: {rec['recommendation_reason']}, "
                      f"得分: {rec['recommendation_score']:.2f})")
        
        # 4. 演示訂單創(chuàng)建
        print("\n4. 演示訂單創(chuàng)建...")
        # 這里需要實(shí)際的產(chǎn)品數(shù)據(jù),所以只是演示代碼結(jié)構(gòu)
        print("   訂單創(chuàng)建功能就緒")
        
        # 5. 刷新物化視圖
        print("\n5. 刷新物化視圖...")
        db.refresh_materialized_views()
        print("   物化視圖刷新完成")
        
        print("\n演示完成!")
        
    except Exception as e:
        print(f"演示過程中出錯: {e}")
    finally:
        db.close()


if __name__ == "__main__":
    demonstrate_ecommerce_features()

5. 性能監(jiān)控與調(diào)優(yōu)工具

5.1 PostgreSQL性能監(jiān)控指標(biāo)體系

5.2 關(guān)鍵性能指標(biāo)計算公式

緩沖區(qū)命中率

索引使用效率

緩存命中率

6. 完整代碼示例

"""
PostgreSQL高級特性與性能優(yōu)化完整示例
"""
import psycopg2
from psycopg2.extras import DictCursor, Json
import time
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
import json
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class PostgreSQLAdvancedOptimizer:
    """PostgreSQL高級優(yōu)化器"""
    
    def __init__(self, dsn: str):
        """
        初始化優(yōu)化器
        
        Args:
            dsn: 數(shù)據(jù)庫連接字符串
        """
        self.dsn = dsn
        self.conn = psycopg2.connect(dsn)
        self.cursor = self.conn.cursor(cursor_factory=DictCursor)
    
    def analyze_database_health(self) -> Dict[str, Any]:
        """
        全面分析數(shù)據(jù)庫健康狀態(tài)
        
        Returns:
            數(shù)據(jù)庫健康報告
        """
        health_report = {
            'timestamp': datetime.now().isoformat(),
            'overall_score': 0,
            'categories': {},
            'issues': [],
            'recommendations': []
        }
        
        # 分析各個維度
        categories = [
            ('連接與并發(fā)', self._analyze_connections),
            ('查詢性能', self._analyze_query_performance),
            ('索引效率', self._analyze_index_efficiency),
            ('緩存性能', self._analyze_cache_performance),
            ('存儲效率', self._analyze_storage_efficiency),
            ('配置優(yōu)化', self._analyze_configuration)
        ]
        
        total_score = 0
        category_count = 0
        
        for category_name, analyzer_func in categories:
            try:
                category_result = analyzer_func()
                health_report['categories'][category_name] = category_result
                
                if 'score' in category_result:
                    total_score += category_result['score']
                    category_count += 1
                
                if 'issues' in category_result:
                    health_report['issues'].extend(category_result['issues'])
                
                if 'recommendations' in category_result:
                    health_report['recommendations'].extend(category_result['recommendations'])
                    
            except Exception as e:
                logger.error(f"分析{category_name}失敗: {e}")
                health_report['issues'].append({
                    'category': category_name,
                    'severity': 'ERROR',
                    'message': f'分析失敗: {str(e)}'
                })
        
        # 計算總體評分
        if category_count > 0:
            health_report['overall_score'] = total_score / category_count
        
        # 排序問題和建議
        health_report['issues'] = sorted(
            health_report['issues'], 
            key=lambda x: {'CRITICAL': 3, 'HIGH': 2, 'MEDIUM': 1, 'LOW': 0}.get(x.get('severity', 'LOW'), 0),
            reverse=True
        )
        
        health_report['recommendations'] = sorted(
            health_report['recommendations'],
            key=lambda x: x.get('priority', 3),
            reverse=False
        )
        
        return health_report
    
    def _analyze_connections(self) -> Dict[str, Any]:
        """分析連接與并發(fā)"""
        analysis = {
            'score': 100,
            'metrics': {},
            'issues': [],
            'recommendations': []
        }
        
        try:
            # 獲取連接統(tǒng)計
            self.cursor.execute("""
                SELECT 
                    COUNT(*) as total_connections,
                    COUNT(*) FILTER (WHERE state = 'active') as active_connections,
                    COUNT(*) FILTER (WHERE state = 'idle') as idle_connections,
                    COUNT(*) FILTER (WHERE wait_event_type IS NOT NULL) as waiting_connections,
                    MAX(now() - backend_start) as oldest_connection_age
                FROM pg_stat_activity
                WHERE pid <> pg_backend_pid()
            """)
            conn_stats = self.cursor.fetchone()
            
            # 獲取配置參數(shù)
            self.cursor.execute("""
                SELECT setting::integer as max_connections
                FROM pg_settings
                WHERE name = 'max_connections'
            """)
            max_conns = self.cursor.fetchone()['max_connections']
            
            # 計算連接使用率
            conn_usage = conn_stats['total_connections'] / max_conns
            analysis['metrics'] = {
                'total_connections': conn_stats['total_connections'],
                'active_connections': conn_stats['active_connections'],
                'idle_connections': conn_stats['idle_connections'],
                'waiting_connections': conn_stats['waiting_connections'],
                'connection_usage_percentage': round(conn_usage * 100, 1),
                'max_connections': max_conns
            }
            
            # 分析問題
            if conn_usage > 0.8:
                analysis['score'] -= 30
                analysis['issues'].append({
                    'severity': 'HIGH',
                    'message': f'連接使用率過高: {conn_usage:.1%}',
                    'details': '接近最大連接數(shù)限制'
                })
                analysis['recommendations'].append({
                    'priority': 1,
                    'action': '考慮增加max_connections參數(shù)或優(yōu)化連接池配置'
                })
            
            if conn_stats['waiting_connections'] > 5:
                analysis['score'] -= 20
                analysis['issues'].append({
                    'severity': 'MEDIUM',
                    'message': f'等待連接數(shù)較多: {conn_stats["waiting_connections"]}',
                    'details': '可能存在鎖爭用或資源競爭'
                })
            
            if conn_stats['oldest_connection_age'] and \
               conn_stats['oldest_connection_age'].total_seconds() > 3600:
                analysis['score'] -= 10
                analysis['issues'].append({
                    'severity': 'LOW',
                    'message': f'存在長時間連接: {conn_stats["oldest_connection_age"]}',
                    'details': '考慮優(yōu)化連接生命周期'
                })
            
        except Exception as e:
            logger.error(f"連接分析失敗: {e}")
            analysis['score'] = 0
        
        return analysis
    
    def _analyze_query_performance(self) -> Dict[str, Any]:
        """分析查詢性能"""
        analysis = {
            'score': 100,
            'metrics': {},
            'issues': [],
            'recommendations': []
        }
        
        try:
            # 獲取慢查詢統(tǒng)計
            self.cursor.execute("""
                WITH query_stats AS (
                    SELECT 
                        query,
                        calls,
                        total_time,
                        mean_time,
                        rows,
                        100.0 * shared_blks_hit / nullif(shared_blks_hit + shared_blks_read, 0) as buffer_hit_rate,
                        ROW_NUMBER() OVER (ORDER BY total_time DESC) as time_rank
                    FROM pg_stat_statements
                    WHERE query NOT LIKE '%pg_stat_statements%'
                    AND calls > 0
                )
                SELECT 
                    COUNT(*) as total_queries,
                    SUM(calls) as total_calls,
                    SUM(total_time) as total_time_ms,
                    AVG(mean_time) as avg_query_time_ms,
                    PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY mean_time) as p95_query_time_ms,
                    SUM(CASE WHEN mean_time > 100 THEN 1 ELSE 0 END) as slow_queries_count,
                    AVG(buffer_hit_rate) as avg_buffer_hit_rate
                FROM query_stats
            """)
            query_stats = self.cursor.fetchone()
            
            analysis['metrics'] = {
                'total_queries': query_stats['total_queries'],
                'total_calls': query_stats['total_calls'],
                'total_time_seconds': round(query_stats['total_time_ms'] / 1000, 1),
                'avg_query_time_ms': round(query_stats['avg_query_time_ms'], 2),
                'p95_query_time_ms': round(query_stats['p95_query_time_ms'], 2),
                'slow_queries_count': query_stats['slow_queries_count'],
                'avg_buffer_hit_rate': round(query_stats['avg_buffer_hit_rate'], 1)
            }
            
            # 分析問題
            if query_stats['avg_query_time_ms'] > 50:
                analysis['score'] -= 20
                analysis['issues'].append({
                    'severity': 'HIGH',
                    'message': f'平均查詢時間較高: {query_stats["avg_query_time_ms"]:.2f}ms',
                    'details': '可能存在查詢優(yōu)化空間'
                })
                analysis['recommendations'].append({
                    'priority': 1,
                    'action': '分析并優(yōu)化最耗時的查詢'
                })
            
            if query_stats['slow_queries_count'] > 10:
                analysis['score'] -= 15
                analysis['issues'].append({
                    'severity': 'MEDIUM',
                    'message': f'發(fā)現(xiàn) {query_stats["slow_queries_count"]} 個慢查詢',
                    'details': '定義: 平均執(zhí)行時間 > 100ms'
                })
            
            if query_stats['avg_buffer_hit_rate'] < 90:
                analysis['score'] -= 10
                analysis['issues'].append({
                    'severity': 'MEDIUM',
                    'message': f'平均緩沖區(qū)命中率較低: {query_stats["avg_buffer_hit_rate"]:.1f}%',
                    'details': '建議增加shared_buffers或優(yōu)化查詢'
                })
            
        except Exception as e:
            logger.error(f"查詢性能分析失敗: {e}")
            analysis['score'] = 0
        
        return analysis
    
    def _analyze_index_efficiency(self) -> Dict[str, Any]:
        """分析索引效率"""
        analysis = {
            'score': 100,
            'metrics': {},
            'issues': [],
            'recommendations': []
        }
        
        try:
            # 獲取索引使用統(tǒng)計
            self.cursor.execute("""
                WITH index_stats AS (
                    SELECT 
                        schemaname,
                        tablename,
                        indexname,
                        idx_scan as index_scans,
                        idx_tup_read as tuples_read,
                        idx_tup_fetch as tuples_fetched,
                        pg_relation_size(indexname::regclass) as index_size_bytes,
                        CASE 
                            WHEN idx_scan = 0 THEN 0
                            ELSE idx_tup_fetch::float / idx_tup_read * 100
                        END as index_efficiency
                    FROM pg_stat_user_indexes
                    WHERE schemaname NOT LIKE 'pg_%'
                )
                SELECT 
                    COUNT(*) as total_indexes,
                    SUM(index_size_bytes) as total_index_size_bytes,
                    SUM(CASE WHEN index_scans = 0 THEN 1 ELSE 0 END) as unused_indexes_count,
                    AVG(index_efficiency) as avg_index_efficiency,
                    PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY index_efficiency) as median_index_efficiency
                FROM index_stats
            """)
            index_stats = self.cursor.fetchone()
            
            analysis['metrics'] = {
                'total_indexes': index_stats['total_indexes'],
                'total_index_size_gb': round(index_stats['total_index_size_bytes'] / (1024**3), 2),
                'unused_indexes_count': index_stats['unused_indexes_count'],
                'avg_index_efficiency': round(index_stats['avg_index_efficiency'], 1),
                'median_index_efficiency': round(index_stats['median_index_efficiency'], 1)
            }
            
            # 分析問題
            unused_ratio = index_stats['unused_indexes_count'] / index_stats['total_indexes'] \
                if index_stats['total_indexes'] > 0 else 0
            
            if unused_ratio > 0.1:
                analysis['score'] -= 25
                analysis['issues'].append({
                    'severity': 'MEDIUM',
                    'message': f'未使用索引比例較高: {unused_ratio:.1%}',
                    'details': f'{index_stats["unused_indexes_count"]} 個索引從未使用'
                })
                analysis['recommendations'].append({
                    'priority': 2,
                    'action': '考慮刪除未使用的索引以節(jié)省空間和提高寫入性能'
                })
            
            if index_stats['avg_index_efficiency'] < 50:
                analysis['score'] -= 15
                analysis['issues'].append({
                    'severity': 'MEDIUM',
                    'message': f'平均索引效率較低: {index_stats["avg_index_efficiency"]:.1f}%',
                    'details': '索引可能不是最優(yōu)選擇'
                })
            
        except Exception as e:
            logger.error(f"索引效率分析失敗: {e}")
            analysis['score'] = 0
        
        return analysis
    
    def _analyze_cache_performance(self) -> Dict[str, Any]:
        """分析緩存性能"""
        analysis = {
            'score': 100,
            'metrics': {},
            'issues': [],
            'recommendations': []
        }
        
        try:
            # 獲取緩存統(tǒng)計
            self.cursor.execute("""
                SELECT 
                    -- 共享緩沖區(qū)命中率
                    CASE 
                        WHEN blks_hit + blks_read > 0 
                        THEN blks_hit::float / (blks_hit + blks_read) * 100
                        ELSE 0 
                    END as shared_buffer_hit_rate,
                    
                    -- TOAST緩沖區(qū)命中率
                    CASE 
                        WHEN toast_blks_hit + toast_blks_read > 0 
                        THEN toast_blks_hit::float / (toast_blks_hit + toast_blks_read) * 100
                        ELSE 0 
                    END as toast_buffer_hit_rate,
                    
                    -- 臨時文件使用
                    temp_files,
                    temp_bytes,
                    
                    -- 檢查點(diǎn)統(tǒng)計
                    checkpoints_timed,
                    checkpoints_req,
                    checkpoint_write_time,
                    checkpoint_sync_time
                    
                FROM pg_stat_bgwriter
            """)
            cache_stats = self.cursor.fetchone()
            
            # 獲取緩沖區(qū)配置
            self.cursor.execute("""
                SELECT 
                    name,
                    setting,
                    unit
                FROM pg_settings
                WHERE name IN ('shared_buffers', 'effective_cache_size')
            """)
            cache_config = {row['name']: row for row in self.cursor.fetchall()}
            
            analysis['metrics'] = {
                'shared_buffer_hit_rate': round(cache_stats['shared_buffer_hit_rate'], 1),
                'toast_buffer_hit_rate': round(cache_stats['toast_buffer_hit_rate'], 1),
                'temp_files_count': cache_stats['temp_files'],
                'temp_files_size_gb': round(cache_stats['temp_bytes'] / (1024**3), 2),
                'shared_buffers': cache_config.get('shared_buffers', {}).get('setting', 'N/A'),
                'effective_cache_size': cache_config.get('effective_cache_size', {}).get('setting', 'N/A')
            }
            
            # 分析問題
            if cache_stats['shared_buffer_hit_rate'] < 90:
                analysis['score'] -= 20
                analysis['issues'].append({
                    'severity': 'MEDIUM',
                    'message': f'共享緩沖區(qū)命中率較低: {cache_stats["shared_buffer_hit_rate"]:.1f}%',
                    'details': '建議增加shared_buffers或優(yōu)化工作集'
                })
                analysis['recommendations'].append({
                    'priority': 2,
                    'action': '考慮增加shared_buffers參數(shù)'
                })
            
            if cache_stats['temp_files'] > 100:
                analysis['score'] -= 15
                analysis['issues'].append({
                    'severity': 'MEDIUM',
                    'message': f'臨時文件使用較多: {cache_stats["temp_files"]} 個文件',
                    'details': '可能存在排序或哈希操作溢出到磁盤'
                })
                analysis['recommendations'].append({
                    'priority': 2,
                    'action': '增加work_mem參數(shù)以減少臨時文件使用'
                })
            
        except Exception as e:
            logger.error(f"緩存性能分析失敗: {e}")
            analysis['score'] = 0
        
        return analysis
    
    def _analyze_storage_efficiency(self) -> Dict[str, Any]:
        """分析存儲效率"""
        analysis = {
            'score': 100,
            'metrics': {},
            'issues': [],
            'recommendations': []
        }
        
        try:
            # 獲取表膨脹信息
            self.cursor.execute("""
                SELECT 
                    schemaname,
                    tablename,
                    n_dead_tup as dead_tuples,
                    n_live_tup as live_tuples,
                    CASE 
                        WHEN n_live_tup > 0 
                        THEN n_dead_tup::float / n_live_tup * 100
                        ELSE 0 
                    END as dead_tuple_ratio,
                    last_vacuum,
                    last_autovacuum,
                    last_analyze,
                    last_autoanalyze
                FROM pg_stat_user_tables
                WHERE schemaname NOT LIKE 'pg_%'
                ORDER BY dead_tuple_ratio DESC
                LIMIT 10
            """)
            table_stats = self.cursor.fetchall()
            
            # 獲取數(shù)據(jù)庫大小
            self.cursor.execute("""
                SELECT 
                    pg_database_size(current_database()) as database_size_bytes,
                    pg_size_pretty(pg_database_size(current_database())) as database_size_pretty
            """)
            db_size = self.cursor.fetchone()
            
            analysis['metrics'] = {
                'database_size': db_size['database_size_pretty'],
                'tables_analyzed': len(table_stats),
                'top_tables_by_dead_tuples': [
                    {
                        'table': f"{row['schemaname']}.{row['tablename']}",
                        'dead_tuple_ratio': round(row['dead_tuple_ratio'], 1),
                        'dead_tuples': row['dead_tuples'],
                        'last_vacuum': row['last_vacuum']
                    }
                    for row in table_stats[:5]
                ]
            }
            
            # 分析問題
            high_dead_tables = [
                row for row in table_stats 
                if row['dead_tuple_ratio'] > 20
            ]
            
            if high_dead_tables:
                analysis['score'] -= 25
                analysis['issues'].append({
                    'severity': 'MEDIUM',
                    'message': f'發(fā)現(xiàn) {len(high_dead_tables)} 個表死元組比例超過20%',
                    'details': '可能導(dǎo)致查詢性能下降和存儲空間浪費(fèi)'
                })
                analysis['recommendations'].append({
                    'priority': 2,
                    'action': '對高死元組比例的表執(zhí)行VACUUM操作'
                })
            
        except Exception as e:
            logger.error(f"存儲效率分析失敗: {e}")
            analysis['score'] = 0
        
        return analysis
    
    def _analyze_configuration(self) -> Dict[str, Any]:
        """分析配置優(yōu)化"""
        analysis = {
            'score': 100,
            'metrics': {},
            'issues': [],
            'recommendations': []
        }
        
        try:
            # 獲取關(guān)鍵配置參數(shù)
            self.cursor.execute("""
                SELECT 
                    name,
                    setting,
                    unit,
                    context,
                    vartype,
                    source
                FROM pg_settings
                WHERE name IN (
                    'shared_buffers',
                    'work_mem',
                    'maintenance_work_mem',
                    'effective_cache_size',
                    'max_connections',
                    'checkpoint_timeout',
                    'checkpoint_completion_target',
                    'wal_buffers',
                    'random_page_cost',
                    'seq_page_cost',
                    'effective_io_concurrency'
                )
                ORDER BY name
            """)
            configs = {row['name']: row for row in self.cursor.fetchall()}
            
            analysis['metrics'] = {
                'key_parameters': configs
            }
            
            # 檢查配置合理性
            issues = []
            recommendations = []
            
            # 檢查shared_buffers(應(yīng)為系統(tǒng)內(nèi)存的25%)
            if 'shared_buffers' in configs:
                shared_buffers = configs['shared_buffers']['setting']
                if shared_buffers.endswith('MB'):
                    mb_value = int(shared_buffers[:-2])
                    if mb_value < 128:  # 小于128MB
                        issues.append('shared_buffers設(shè)置可能過小')
                        recommendations.append('考慮增加shared_buffers到系統(tǒng)內(nèi)存的25%')
            
            # 檢查work_mem
            if 'work_mem' in configs:
                work_mem = configs['work_mem']['setting']
                if work_mem.endswith('kB'):
                    kb_value = int(work_mem[:-2])
                    if kb_value < 4096:  # 小于4MB
                        issues.append('work_mem設(shè)置可能過小')
                        recommendations.append('適當(dāng)增加work_mem以減少臨時文件使用')
            
            # 檢查checkpoint配置
            if 'checkpoint_timeout' in configs:
                checkpoint_timeout = int(configs['checkpoint_timeout']['setting'])
                if checkpoint_timeout > 900:  # 超過15分鐘
                    issues.append('checkpoint_timeout設(shè)置過長')
                    recommendations.append('考慮減少checkpoint_timeout以降低恢復(fù)時間')
            
            if issues:
                analysis['score'] -= len(issues) * 10
                for issue in issues:
                    analysis['issues'].append({
                        'severity': 'MEDIUM',
                        'message': issue
                    })
                
                for rec in recommendations:
                    analysis['recommendations'].append({
                        'priority': 3,
                        'action': rec
                    })
            
        except Exception as e:
            logger.error(f"配置分析失敗: {e}")
            analysis['score'] = 0
        
        return analysis
    
    def generate_optimization_report(self) -> Dict[str, Any]:
        """
        生成優(yōu)化報告
        
        Returns:
            優(yōu)化報告
        """
        health_report = self.analyze_database_health()
        
        report = {
            'summary': {
                'overall_score': health_report['overall_score'],
                'assessment': self._get_assessment(health_report['overall_score']),
                'total_issues': len(health_report['issues']),
                'total_recommendations': len(health_report['recommendations'])
            },
            'detailed_analysis': health_report['categories'],
            'critical_issues': [
                issue for issue in health_report['issues']
                if issue.get('severity') in ['CRITICAL', 'HIGH']
            ],
            'optimization_plan': self._create_optimization_plan(health_report),
            'execution_checklist': self._create_execution_checklist()
        }
        
        return report
    
    def _get_assessment(self, score: float) -> str:
        """根據(jù)評分獲取評估結(jié)果"""
        if score >= 90:
            return 'EXCELLENT'
        elif score >= 80:
            return 'GOOD'
        elif score >= 70:
            return 'FAIR'
        elif score >= 60:
            return 'NEEDS_IMPROVEMENT'
        else:
            return 'POOR'
    
    def _create_optimization_plan(self, health_report: Dict[str, Any]) -> List[Dict[str, Any]]:
        """創(chuàng)建優(yōu)化計劃"""
        plan = []
        
        # 按優(yōu)先級排序建議
        sorted_recommendations = sorted(
            health_report['recommendations'],
            key=lambda x: x.get('priority', 3)
        )
        
        for i, rec in enumerate(sorted_recommendations[:10], 1):  # 取前10個
            plan.append({
                'step': i,
                'action': rec['action'],
                'priority': rec.get('priority', 3),
                'estimated_effort': self._estimate_effort(rec['action']),
                'expected_impact': self._estimate_impact(rec['action'])
            })
        
        return plan
    
    def _estimate_effort(self, action: str) -> str:
        """估計實(shí)施難度"""
        low_effort_keywords = ['調(diào)整', '設(shè)置', '啟用', '禁用']
        medium_effort_keywords = ['優(yōu)化', '重構(gòu)', '重建', '遷移']
        high_effort_keywords = ['重寫', '重構(gòu)架構(gòu)', '數(shù)據(jù)遷移', '集群擴(kuò)展']
        
        action_lower = action.lower()
        
        if any(keyword in action_lower for keyword in high_effort_keywords):
            return 'HIGH'
        elif any(keyword in action_lower for keyword in medium_effort_keywords):
            return 'MEDIUM'
        else:
            return 'LOW'
    
    def _estimate_impact(self, action: str) -> str:
        """估計影響程度"""
        high_impact_keywords = ['性能提升', '顯著改善', '根本解決', '關(guān)鍵修復(fù)']
        medium_impact_keywords = ['優(yōu)化', '改進(jìn)', '增強(qiáng)', '調(diào)整']
        low_impact_keywords = ['微調(diào)', '小優(yōu)化', '維護(hù)', '清理']
        
        action_lower = action.lower()
        
        if any(keyword in action_lower for keyword in high_impact_keywords):
            return 'HIGH'
        elif any(keyword in action_lower for keyword in medium_impact_keywords):
            return 'MEDIUM'
        else:
            return 'LOW'
    
    def _create_execution_checklist(self) -> List[Dict[str, Any]]:
        """創(chuàng)建執(zhí)行檢查清單"""
        checklist = [
            {
                'phase': '準(zhǔn)備階段',
                'tasks': [
                    {'task': '備份數(shù)據(jù)庫', 'completed': False},
                    {'task': '驗(yàn)證備份完整性', 'completed': False},
                    {'task': '準(zhǔn)備回滾計劃', 'completed': False},
                    {'task': '安排維護(hù)窗口', 'completed': False}
                ]
            },
            {
                'phase': '實(shí)施階段',
                'tasks': [
                    {'task': '應(yīng)用配置變更', 'completed': False},
                    {'task': '執(zhí)行索引優(yōu)化', 'completed': False},
                    {'task': '運(yùn)行VACUUM操作', 'completed': False},
                    {'task': '更新統(tǒng)計信息', 'completed': False}
                ]
            },
            {
                'phase': '驗(yàn)證階段',
                'tasks': [
                    {'task': '驗(yàn)證性能改進(jìn)', 'completed': False},
                    {'task': '運(yùn)行回歸測試', 'completed': False},
                    {'task': '監(jiān)控系統(tǒng)穩(wěn)定性', 'completed': False},
                    {'task': '更新文檔記錄', 'completed': False}
                ]
            }
        ]
        
        return checklist
    
    def close(self):
        """關(guān)閉連接"""
        if self.cursor:
            self.cursor.close()
        if self.conn:
            self.conn.close()
        logger.info("數(shù)據(jù)庫連接已關(guān)閉")


def demonstrate_optimization():
    """演示優(yōu)化功能"""
    dsn = "dbname=testdb user=postgres password=password host=localhost port=5432"
    
    print("PostgreSQL高級優(yōu)化演示")
    print("=" * 60)
    
    optimizer = PostgreSQLAdvancedOptimizer(dsn)
    
    try:
        # 生成健康報告
        print("\n生成數(shù)據(jù)庫健康報告...")
        health_report = optimizer.analyze_database_health()
        
        print(f"\n總體評分: {health_report['overall_score']:.1f}/100")
        print(f"發(fā)現(xiàn)問題: {len(health_report['issues'])} 個")
        print(f"優(yōu)化建議: {len(health_report['recommendations'])} 條")
        
        # 顯示關(guān)鍵問題
        critical_issues = [
            issue for issue in health_report['issues']
            if issue.get('severity') in ['CRITICAL', 'HIGH']
        ]
        
        if critical_issues:
            print("\n關(guān)鍵問題:")
            for issue in critical_issues[:3]:
                print(f"  ? [{issue['severity']}] {issue['message']}")
        
        # 顯示高優(yōu)先級建議
        high_priority_recs = [
            rec for rec in health_report['recommendations']
            if rec.get('priority', 3) == 1
        ]
        
        if high_priority_recs:
            print("\n高優(yōu)先級建議:")
            for rec in high_priority_recs[:3]:
                print(f"  ? {rec['action']}")
        
        # 生成詳細(xì)報告
        print("\n\n生成詳細(xì)優(yōu)化報告...")
        report = optimizer.generate_optimization_report()
        
        print(f"\n優(yōu)化計劃 ({len(report['optimization_plan'])} 個步驟):")
        for step in report['optimization_plan'][:5]:
            print(f"  步驟{step['step']}: {step['action']}")
            print(f"     優(yōu)先級: {step['priority']}, 難度: {step['estimated_effort']}, "
                  f"影響: {step['expected_impact']}")
        
        print("\n執(zhí)行檢查清單:")
        for phase in report['execution_checklist']:
            print(f"  {phase['phase']}:")
            for task in phase['tasks']:
                status = '?' if task['completed'] else '○'
                print(f"    {status} {task['task']}")
        
        print("\n優(yōu)化演示完成!")
        
    except Exception as e:
        print(f"演示過程中出錯: {e}")
    finally:
        optimizer.close()


if __name__ == "__main__":
    demonstrate_optimization()

7. 代碼自查與最佳實(shí)踐

7.1 代碼質(zhì)量檢查清單

為確保PostgreSQL相關(guān)代碼的質(zhì)量,應(yīng)遵循以下檢查清單:

"""
PostgreSQL代碼質(zhì)量自查工具
"""
import re
import ast
from typing import List, Dict, Any, Set
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class PostgreSQLCodeChecker:
    """PostgreSQL代碼質(zhì)量檢查器"""
    
    def __init__(self):
        self.rules = {
            'sql_injection': self.check_sql_injection,
            'connection_management': self.check_connection_management,
            'transaction_handling': self.check_transaction_handling,
            'index_usage': self.check_index_usage,
            'data_type_validation': self.check_data_type_validation,
            'error_handling': self.check_error_handling,
            'performance_anti_patterns': self.check_performance_anti_patterns
        }
        
    def check_file(self, filepath: str) -> Dict[str, List[Dict[str, Any]]]:
        """
        檢查Python文件中的PostgreSQL相關(guān)代碼
        
        Args:
            filepath: Python文件路徑
            
        Returns:
            檢查結(jié)果
        """
        with open(filepath, 'r', encoding='utf-8') as f:
            content = f.read()
        
        issues = {}
        
        for rule_name, rule_func in self.rules.items():
            rule_issues = rule_func(content, filepath)
            if rule_issues:
                issues[rule_name] = rule_issues
        
        return issues
    
    def check_sql_injection(self, content: str, filepath: str) -> List[Dict[str, Any]]:
        """
        檢查SQL注入漏洞
        
        Args:
            content: 文件內(nèi)容
            filepath: 文件路徑
            
        Returns:
            問題列表
        """
        issues = []
        
        # 查找可能的字符串拼接SQL
        patterns = [
            (r'execute\s*\(.*?\s*%\s*.*?\)', '使用字符串格式化執(zhí)行SQL'),
            (r'execute\s*\(.*?\s*\+\s*.*?\)', '使用字符串拼接執(zhí)行SQL'),
            (r'executemany\s*\(.*?\s*%\s*.*?\)', '使用字符串格式化執(zhí)行批量SQL'),
            (r'f"SELECT.*{.*}.*"', '使用f-string直接嵌入變量'),
        ]
        
        lines = content.split('\n')
        for i, line in enumerate(lines, 1):
            for pattern, description in patterns:
                if re.search(pattern, line, re.IGNORECASE):
                    issues.append({
                        'line': i,
                        'severity': 'CRITICAL',
                        'message': f'潛在的SQL注入漏洞: {description}',
                        'suggestion': '使用參數(shù)化查詢(%s占位符)',
                        'code_snippet': line.strip()[:100]
                    })
        
        return issues
    
    def check_connection_management(self, content: str, filepath: str) -> List[Dict[str, Any]]:
        """
        檢查數(shù)據(jù)庫連接管理
        
        Args:
            content: 文件內(nèi)容
            filepath: 文件路徑
            
        Returns:
            問題列表
        """
        issues = []
        lines = content.split('\n')
        
        # 查找連接創(chuàng)建但不關(guān)閉的情況
        connect_pattern = r'psycopg2\.connect\(|connect\('
        close_pattern = r'\.close\(\)'
        
        in_function = False
        function_start = 0
        connect_lines = []
        
        for i, line in enumerate(lines, 1):
            # 檢測函數(shù)開始
            if line.strip().startswith('def '):
                in_function = True
                function_start = i
                connect_lines = []
            
            # 檢測連接創(chuàng)建
            if re.search(connect_pattern, line):
                connect_lines.append(i)
            
            # 檢測連接關(guān)閉
            if re.search(close_pattern, line) and 'close' in line:
                if connect_lines:
                    connect_lines.pop()
            
            # 檢測函數(shù)結(jié)束
            if line.strip() == '' or i == len(lines):
                if in_function and connect_lines:
                    for connect_line in connect_lines:
                        issues.append({
                            'line': connect_line,
                            'severity': 'HIGH',
                            'message': '數(shù)據(jù)庫連接可能未正確關(guān)閉',
                            'suggestion': '確保在finally塊中關(guān)閉連接',
                            'context': f'函數(shù)開始于第{function_start}行'
                        })
                in_function = False
        
        return issues
    
    def check_transaction_handling(self, content: str, filepath: str) -> List[Dict[str, Any]]:
        """
        檢查事務(wù)處理
        
        Args:
            content: 文件內(nèi)容
            filepath: 文件路徑
            
        Returns:
            問題列表
        """
        issues = []
        lines = content.split('\n')
        
        # 查找沒有明確事務(wù)管理的操作
        write_operations = [
            'INSERT', 'UPDATE', 'DELETE', 'CREATE', 'ALTER', 'DROP'
        ]
        
        transaction_keywords = [
            'BEGIN', 'COMMIT', 'ROLLBACK', 'autocommit',
            'set_session', 'transaction'
        ]
        
        in_write_operation = False
        has_transaction_control = False
        
        for i, line in enumerate(lines, 1):
            line_upper = line.upper()
            
            # 檢查是否有寫操作
            if any(op in line_upper for op in write_operations):
                in_write_operation = True
            
            # 檢查是否有事務(wù)控制
            if any(keyword in line for keyword in transaction_keywords):
                has_transaction_control = True
            
            # 如果是空行或注釋,檢查之前的操作
            if line.strip() == '' or line.strip().startswith('#'):
                if in_write_operation and not has_transaction_control:
                    issues.append({
                        'line': i - 1,
                        'severity': 'MEDIUM',
                        'message': '寫操作沒有顯式的事務(wù)管理',
                        'suggestion': '使用明確的事務(wù)控制(BEGIN/COMMIT/ROLLBACK)',
                        'context': '多個寫操作應(yīng)該在同一事務(wù)中'
                    })
                
                in_write_operation = False
                has_transaction_control = False
        
        return issues
    
    def check_index_usage(self, content: str, filepath: str) -> List[Dict[str, Any]]:
        """
        檢查索引使用
        
        Args:
            content: 文件內(nèi)容
            filepath: 文件路徑
            
        Returns:
            問題列表
        """
        issues = []
        lines = content.split('\n')
        
        # 查找可能受益于索引的查詢模式
        index_patterns = [
            (r'WHERE\s+\w+\s*=\s*', '等值查詢'),
            (r'WHERE\s+\w+\s+IN\s*\(', 'IN列表查詢'),
            (r'WHERE\s+\w+\s+LIKE\s+\'', 'LIKE查詢(可能前綴匹配)'),
            (r'ORDER\s+BY\s+\w+', '排序操作'),
            (r'GROUP\s+BY\s+\w+', '分組操作'),
            (r'JOIN\s+\w+\s+ON\s+\w+\s*=\s*\w+', '連接操作')
        ]
        
        for i, line in enumerate(lines, 1):
            line_upper = line.upper()
            
            # 跳過注釋
            if line.strip().startswith('#'):
                continue
            
            for pattern, description in index_patterns:
                if re.search(pattern, line_upper, re.IGNORECASE):
                    issues.append({
                        'line': i,
                        'severity': 'LOW',
                        'message': f'查詢可能受益于索引: {description}',
                        'suggestion': '考慮在相關(guān)列上創(chuàng)建索引',
                        'code_snippet': line.strip()[:100]
                    })
                    break
        
        return issues
    
    def generate_report(self, issues: Dict[str, List[Dict[str, Any]]]) -> str:
        """生成檢查報告"""
        report_lines = []
        report_lines.append("=" * 60)
        report_lines.append("PostgreSQL代碼質(zhì)量檢查報告")
        report_lines.append("=" * 60)
        
        total_issues = sum(len(rule_issues) for rule_issues in issues.values())
        report_lines.append(f"\n總共發(fā)現(xiàn) {total_issues} 個問題\n")
        
        severity_counts = {'CRITICAL': 0, 'HIGH': 0, 'MEDIUM': 0, 'LOW': 0}
        
        for rule_name, rule_issues in issues.items():
            if rule_issues:
                report_lines.append(f"\n{rule_name.upper()} ({len(rule_issues)}個問題):")
                report_lines.append("-" * 40)
                
                for issue in rule_issues:
                    severity = issue.get('severity', 'UNKNOWN')
                    severity_counts[severity] = severity_counts.get(severity, 0) + 1
                    
                    report_lines.append(
                        f"[{severity}] 第{issue['line']}行: {issue['message']}"
                    )
                    if 'suggestion' in issue:
                        report_lines.append(f"    建議: {issue['suggestion']}")
                    if 'code_snippet' in issue:
                        report_lines.append(f"    代碼: {issue['code_snippet']}")
                    report_lines.append("")
        
        # 添加嚴(yán)重性統(tǒng)計
        report_lines.append("\n嚴(yán)重性統(tǒng)計:")
        report_lines.append("-" * 40)
        for severity, count in severity_counts.items():
            if count > 0:
                report_lines.append(f"  {severity}: {count} 個問題")
        
        return '\n'.join(report_lines)


def check_postgresql_best_practices():
    """PostgreSQL最佳實(shí)踐檢查示例"""
    checker = PostgreSQLCodeChecker()
    
    # 示例代碼
    sample_code = """
import psycopg2

# 不好的示例:字符串拼接SQL(SQL注入風(fēng)險)
def bad_example(user_input):
    conn = psycopg2.connect("dbname=test")
    cursor = conn.cursor()
    
    # SQL注入漏洞
    query = f"SELECT * FROM users WHERE username = '{user_input}'"
    cursor.execute(query)
    
    # 連接未關(guān)閉
    return cursor.fetchall()

# 好的示例:參數(shù)化查詢
def good_example(username):
    conn = None
    cursor = None
    try:
        conn = psycopg2.connect("dbname=test")
        cursor = conn.cursor()
        
        # 參數(shù)化查詢
        query = "SELECT * FROM users WHERE username = %s"
        cursor.execute(query, (username,))
        
        return cursor.fetchall()
    finally:
        if cursor:
            cursor.close()
        if conn:
            conn.close()

# 事務(wù)處理示例
def transaction_example(user_data):
    conn = psycopg2.connect("dbname=test")
    cursor = conn.cursor()
    
    try:
        # 開始事務(wù)
        conn.autocommit = False
        
        # 多個寫操作
        cursor.execute(
            "INSERT INTO users (name, email) VALUES (%s, %s)",
            (user_data['name'], user_data['email'])
        )
        
        cursor.execute(
            "INSERT INTO logs (action) VALUES (%s)",
            ('user_created',)
        )
        
        # 提交事務(wù)
        conn.commit()
        
    except Exception as e:
        # 回滾事務(wù)
        conn.rollback()
        raise e
    finally:
        cursor.close()
        conn.close()

# 可能受益于索引的查詢
def query_without_index():
    conn = psycopg2.connect("dbname=test")
    cursor = conn.cursor()
    
    # 這個查詢可能受益于索引
    cursor.execute("""
        SELECT * FROM orders 
        WHERE customer_id = %s 
        AND order_date > %s
        ORDER BY order_date DESC
    """, (123, '2024-01-01'))
    
    return cursor.fetchall()
    """
    
    # 將示例代碼寫入臨時文件
    import tempfile
    with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
        f.write(sample_code)
        temp_file = f.name
    
    try:
        # 檢查代碼
        issues = checker.check_file(temp_file)
        report = checker.generate_report(issues)
        print(report)
    finally:
        import os
        os.unlink(temp_file)


if __name__ == "__main__":
    check_postgresql_best_practices()

7.2 PostgreSQL最佳實(shí)踐總結(jié)

連接管理最佳實(shí)踐

  • 使用連接池管理數(shù)據(jù)庫連接
  • 確保連接在finally塊中正確關(guān)閉
  • 設(shè)置合適的連接超時和重試策略

查詢優(yōu)化最佳實(shí)踐

  • 使用EXPLAIN ANALYZE分析查詢計劃
  • 為頻繁查詢的列創(chuàng)建合適索引
  • 避免SELECT *,只選擇需要的列
  • 使用LIMIT限制返回行數(shù)

事務(wù)管理最佳實(shí)踐

  • 保持事務(wù)盡可能短小
  • 使用合適的隔離級別
  • 處理異常并正確回滾
  • 避免長時間持有鎖

索引設(shè)計最佳實(shí)踐

  • 基于查詢模式設(shè)計復(fù)合索引
  • 定期分析和重建索引
  • 使用部分索引減少索引大小
  • 考慮BRIN索引用于時間序列數(shù)據(jù)

配置優(yōu)化最佳實(shí)踐

  • 根據(jù)工作負(fù)載調(diào)整shared_buffers
  • 設(shè)置合適的work_mem減少臨時文件
  • 配置有效的維護(hù)工作內(nèi)存
  • 定期更新統(tǒng)計信息

8. 總結(jié)與展望

PostgreSQL作為功能最豐富的開源數(shù)據(jù)庫,其高級特性和性能優(yōu)化能力使其能夠應(yīng)對各種復(fù)雜的應(yīng)用場景。通過本文的深入探討,我們了解到:

8.1 核心要點(diǎn)總結(jié)

  • 高級特性:JSONB、全文搜索、分區(qū)表、物化視圖等特性使PostgreSQL能夠處理多樣化數(shù)據(jù)需求
  • 性能優(yōu)化:合理的索引設(shè)計、查詢優(yōu)化、配置調(diào)整是提升性能的關(guān)鍵
  • 并發(fā)控制:MVCC機(jī)制和適當(dāng)?shù)逆i策略確保高并發(fā)下的數(shù)據(jù)一致性
  • 監(jiān)控維護(hù):系統(tǒng)化監(jiān)控和定期維護(hù)保證數(shù)據(jù)庫長期穩(wěn)定運(yùn)行

8.2 未來發(fā)展趨勢

  • 人工智能集成:PostgreSQL的MADlib擴(kuò)展支持機(jī)器學(xué)習(xí)算法
  • 時序數(shù)據(jù)優(yōu)化:TimescaleDB擴(kuò)展提供專業(yè)時序數(shù)據(jù)處理能力
  • 地理空間增強(qiáng):PostGIS繼續(xù)擴(kuò)展地理信息系統(tǒng)功能
  • 云原生支持:更好的Kubernetes集成和云服務(wù)優(yōu)化
  • 向量搜索:對AI生成內(nèi)容(AIGC)的向量相似度搜索支持

通過持續(xù)學(xué)習(xí)和實(shí)踐,開發(fā)者可以充分利用PostgreSQL的強(qiáng)大功能,構(gòu)建高效、穩(wěn)定、可擴(kuò)展的數(shù)據(jù)存儲解決方案。無論是對初創(chuàng)公司還是大型企業(yè),PostgreSQL都提供了企業(yè)級數(shù)據(jù)庫所需的一切功能,同時保持了開源軟件的靈活性和成本優(yōu)勢。

以上就是PostgreSQL高級特性與性能優(yōu)化的實(shí)戰(zhàn)指南的詳細(xì)內(nèi)容,更多關(guān)于PostgreSQL性能優(yōu)化的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • PostgreSQL 默認(rèn)隔離級別的設(shè)置

    PostgreSQL 默認(rèn)隔離級別的設(shè)置

    PostgreSQL的默認(rèn)事務(wù)隔離級別是讀已提交,這是其事務(wù)處理系統(tǒng)的基礎(chǔ)行為模式,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2025-06-06
  • PostgreSQL GIN 索引原理、應(yīng)用場景與最佳實(shí)踐

    PostgreSQL GIN 索引原理、應(yīng)用場景與最佳實(shí)踐

    本文介紹了PostgreSQL的GIN索引,涵蓋了其底層原理、支持的數(shù)據(jù)類型、工作原理、性能特征、適用場景、實(shí)戰(zhàn)案例、性能調(diào)優(yōu)、常見問題及解決方案,并提供了GIN與其它索引類型的比較,通過本文,讀者可以全面了解GIN索引的使用方法和最佳實(shí)踐,感興趣的朋友跟隨小編一起看看吧
    2025-12-12
  • 史上最全PostgreSQL?DBA最常用SQL

    史上最全PostgreSQL?DBA最常用SQL

    這篇文章主要介紹了PostgreSQL?DBA最常用SQL?,主要包括背景及常用查詢語句,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2022-10-10
  • PostgreSQL 序列綁定字段與不綁定字段的區(qū)別說明

    PostgreSQL 序列綁定字段與不綁定字段的區(qū)別說明

    這篇文章主要介紹了PostgreSQL 序列綁定字段與不綁定字段的區(qū)別說明,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-02-02
  • PostgreSQL實(shí)現(xiàn)定期備份的方法

    PostgreSQL實(shí)現(xiàn)定期備份的方法

    PostgreSQL定期備份功能可以自動備份數(shù)據(jù)庫,避免了手動備份過程中可能發(fā)生的錯誤,也極大地減輕了管理員的工作壓力,所以本文將給大家介紹一下PostgreSQL實(shí)現(xiàn)定期備份的方法,需要的朋友可以參考下
    2024-03-03
  • postgresql 存儲函數(shù)調(diào)用變量的3種方法小結(jié)

    postgresql 存儲函數(shù)調(diào)用變量的3種方法小結(jié)

    這篇文章主要介紹了postgresql 存儲函數(shù)調(diào)用變量的3種方法小結(jié),具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-01-01
  • 基于pgrouting的路徑規(guī)劃處理方法

    基于pgrouting的路徑規(guī)劃處理方法

    這篇文章主要介紹了基于pgrouting的路徑規(guī)劃處理,根據(jù)pgrouting已經(jīng)集成的Dijkstra算法來,結(jié)合postgresql數(shù)據(jù)庫來處理最短路徑,需要的朋友可以參考下
    2022-04-04
  • postgresql 利用fdw來實(shí)現(xiàn)不同數(shù)據(jù)庫之間數(shù)據(jù)互通(推薦)

    postgresql 利用fdw來實(shí)現(xiàn)不同數(shù)據(jù)庫之間數(shù)據(jù)互通(推薦)

    這篇文章主要介紹了postgresql 利用fdw來實(shí)現(xiàn)不同數(shù)據(jù)庫之間數(shù)據(jù)互通,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2021-02-02
  • postgresql 中的序列nextval詳解

    postgresql 中的序列nextval詳解

    這篇文章主要介紹了postgresql 中的序列nextval詳解,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-02-02
  • SQLCipher數(shù)據(jù)遷移到PostgreSql詳細(xì)教程

    SQLCipher數(shù)據(jù)遷移到PostgreSql詳細(xì)教程

    這篇文章主要介紹了SQLCipher數(shù)據(jù)遷移到PostgreSql詳細(xì)教程,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友參考下吧
    2025-09-09

最新評論

无码中文字幕波多野不卡 | 天天日天天舔天天射进去| 中文字母永久播放1区2区3区| 中文字幕一区二区自拍| 日本一二三中文字幕| 天天日天天爽天天干| 亚洲护士一区二区三区| 亚洲成人线上免费视频观看| 神马午夜在线观看视频| 午夜大尺度无码福利视频| 韩国亚洲欧美超一级在线播放视频| 中文字幕—97超碰网| 亚洲男人在线天堂网| 中文字幕中文字幕 亚洲国产| 日韩一区二区三区三州| 经典国语激情内射视频| 最新激情中文字幕视频| 老司机免费视频网站在线看| 欧美视频不卡一区四区| 久久久久久久久久一区二区三区| 红桃av成人在线观看| 人妻久久无码中文成人| 久久精品美女免费视频| 欧洲欧美日韩国产在线| 啊啊啊想要被插进去视频| 亚洲国产最大av综合| 做爰视频毛片下载蜜桃视频1| 黄色片一级美女黄色片| 瑟瑟视频在线观看免费视频| 亚洲免费视频欧洲免费视频| 精品欧美一区二区vr在线观看 | 97人妻无码AV碰碰视频| 日本在线一区二区不卡视频| 久久丁香婷婷六月天| 亚洲av男人的天堂你懂的| 亚洲国产成人av在线一区| 久久尻中国美女视频| 亚洲av无女神免非久久| 日本xx片在线观看| gav成人免费播放| 午夜精品亚洲精品五月色| 免费av岛国天堂网站| 阿v天堂2014 一区亚洲| 男生舔女生逼逼的视频| 中文字幕在线一区精品| 91免费黄片可看视频| 真实国产乱子伦一区二区| 另类av十亚洲av| 男人天堂av天天操| 黄色成年网站午夜在线观看| 欧美第一页在线免费观看视频| 欧美黄色录像免费看的| 福利视频广场一区二区| 日韩剧情片电影在线收看| 国产变态另类在线观看| 久久免费看少妇高潮完整版| 国产日韩一区二区在线看| 精品一区二区三区午夜| 夜色撩人久久7777| 亚洲人妻国产精品综合| 欧美80老妇人性视频| 欧美日韩激情啪啪啪| av手机在线观播放网站| 国产精品精品精品999| 久久久精品999精品日本| 亚洲一级特黄特黄黄色录像片| 大香蕉大香蕉大香蕉大香蕉大香蕉| 欧美aa一级一区三区四区| 大学生A级毛片免费视频| 亚洲精品av在线观看| 97青青青手机在线视频| 亚洲av一妻不如妾| 97色视频在线观看| 欧美香蕉人妻精品一区二区| 精品国产亚洲av一淫| 国产大鸡巴大鸡巴操小骚逼小骚逼| 91快播视频在线观看| 亚洲狠狠婷婷综合久久app| 姐姐的朋友2在线观看中文字幕| 精品一区二区三区三区88| 亚洲成a人片777777| 午夜在线精品偷拍一区二| 亚洲国产成人av在线一区| 国产精品大陆在线2019不卡| 护士特殊服务久久久久久久| 国产亚洲欧美视频网站| 岛国av高清在线成人在线| 综合国产成人在线观看| 红杏久久av人妻一区| okirakuhuhu在线观看| av在线资源中文字幕| 欧美亚洲免费视频观看| 黄工厂精品视频在线观看| 国产精品国色综合久久| 免费观看成年人视频在线观看| 91九色porny蝌蚪国产成人| 欧美成一区二区三区四区| 熟妇一区二区三区高清版| 国产男女视频在线播放| 女生自摸在线观看一区二区三区| 天天摸天天亲天天舔天天操天天爽 | 国产精品成久久久久三级蜜臀av| mm131美女午夜爽爽爽| 黑人巨大精品欧美视频| 2019av在线视频| 97人妻色免费视频| 亚洲成人国产av在线| 中文字幕在线免费第一页| 国产女孩喷水在线观看| 五十路熟女av天堂| 成人高潮aa毛片免费| 中文字幕在线一区精品| 久久久久久性虐视频| 2o22av在线视频| 日韩近亲视频在线观看| 大香蕉大香蕉大香蕉大香蕉大香蕉| 亚洲一区二区三区久久午夜| 2021天天色天天干| 99人妻视频免费在线| 人妻激情图片视频小说| 青草青永久在线视频18| 欧美一级片免费在线成人观看| 51国产成人精品视频| 五月色婷婷综合开心网4438| 黄色在线观看免费观看在线| 天天草天天色天天干| 天天干天天日天天干天天操| 中文字幕第三十八页久久| 亚洲综合色在线免费观看| 91国内精品久久久久精品一| 白嫩白嫩美女极品国产在线观看| 不戴胸罩引我诱的隔壁的人妻| 沙月文乃人妻侵犯中文字幕在线 | 加勒比视频在线免费观看| 99久久中文字幕一本人| 中文字幕在线第一页成人 | 九九视频在线精品播放| 亚洲综合一区二区精品久久| 亚洲视频在线观看高清| 在线亚洲天堂色播av电影| 日韩无码国产精品强奸乱伦| 91精品啪在线免费| 亚洲av午夜免费观看| 成人区人妻精品一区二视频| 中文字幕日韩精品日本| 亚洲欧美另类自拍偷拍色图| 亚洲欧洲一区二区在线观看| 黄色片黄色片wyaa| 国产一区二区神马久久| 最新黄色av网站在线观看| 久久99久久99精品影院| 国产亚洲欧美视频网站| 少妇露脸深喉口爆吞精| 男女啪啪视频免费在线观看| 加勒比视频在线免费观看| 人妻av无码专区久久绿巨人 | 国产刺激激情美女网站| 欧美老妇精品另类不卡片| 婷婷久久久综合中文字幕| 99精品国产免费久久| 五月色婷婷综合开心网4438| 国产一线二线三线的区别在哪| 免费在线看的黄网站| 中文字幕免费福利视频6| 午夜精品一区二区三区城中村| 福利视频广场一区二区| 午夜精品亚洲精品五月色| 天天操天天射天天操天天天| 狠狠操狠狠操免费视频| www,久久久,com| 天天操天天干天天艹| 丝袜美腿视频诱惑亚洲无| 人妻自拍视频中国大陆| 亚洲天堂成人在线观看视频网站 | chinese国产盗摄一区二区| 国内精品在线播放第一页| 黑人解禁人妻叶爱071| 中文字幕在线第一页成人| 88成人免费av网站| 亚洲成a人片777777| 涩爱综合久久五月蜜臀| 日韩精品中文字幕福利| 欧美地区一二三专区| 2018最新中文字幕在线观看| 日韩精品一区二区三区在线播放| 91精品国产高清自在线看香蕉网| 国产熟妇人妻ⅹxxxx麻豆| 亚洲特黄aaaa片| 国内自拍第一页在线观看| 国产揄拍高清国内精品对白| 日本少妇人妻xxxxxhd| 播放日本一区二区三区电影 | 欧美黑人巨大性xxxxx猛交| 国产精品国色综合久久 | 精品一区二区三四区| 91人妻精品一区二区久久| 亚洲人一区二区中文字幕| 小穴多水久久精品免费看| av视屏免费在线播放| 插逼视频双插洞国产操逼插洞| 日韩av免费观看一区| 欧美老鸡巴日小嫩逼| 中文字幕一区二区人妻电影冢本| 老有所依在线观看完整版| 国产亚洲视频在线二区| aⅴ精产国品一二三产品| 国产高清在线在线视频| 精品国产在线手机在线| 欧美精产国品一二三区| 亚洲欧美久久久久久久久| 亚洲成人免费看电影| 亚洲综合自拍视频一区| 91中文字幕最新合集| 2018在线福利视频| 国产精彩福利精品视频| 午夜精品亚洲精品五月色| 91精品啪在线免费| 91大屁股国产一区二区| 亚洲精品国产久久久久久| 中国无遮挡白丝袜二区精品| 天天射夜夜操综合网| 九九热99视频在线观看97| 天天躁日日躁狠狠躁躁欧美av| 天天做天天干天天舔| 国产福利小视频二区| 亚洲成人黄色一区二区三区| 开心 色 六月 婷婷| 免费岛国喷水视频在线观看| 亚洲午夜福利中文乱码字幕| 国产内射中出在线观看| 国产精品三级三级三级| 亚洲av色香蕉一区二区三区| 日本福利午夜电影在线观看| 亚洲欧美国产综合777| 亚洲综合在线观看免费| 成年人的在线免费视频| 亚洲欧洲av天堂综合| 高清一区二区欧美系列| 国产三级片久久久久久久| av完全免费在线观看av| 欧美一区二区三区久久久aaa| 黄色的网站在线免费看| 日本高清成人一区二区三区| 丝袜国产专区在线观看| 亚洲精品精品国产综合| 天天操天天干天天日狠狠插| 另类av十亚洲av| free性日本少妇| 国产精品自偷自拍啪啪啪| 青青热久免费精品视频在线观看| 久久久精品欧洲亚洲av| 亚洲视频乱码在线观看| 最后99天全集在线观看| 黄色无码鸡吧操逼视频| 精品亚洲中文字幕av| 11久久久久久久久久久| 人妻无码中文字幕专区| 婷婷综合亚洲爱久久| 中国产一级黄片免费视频播放| 天天干天天操天天扣| 成人精品视频99第一页| 很黄很污很色的午夜网站在线观看 | 一色桃子人妻一区二区三区| 亚洲少妇高潮免费观看| 天天操天天爽天天干| 中文字幕在线免费第一页| 大香蕉福利在线观看| 日韩av大胆在线观看| 亚洲一区二区三区av网站| 99re国产在线精品| 亚洲男人在线天堂网| 国内资源最丰富的网站| 男女啪啪啪啪啪的网站| 欧美亚洲免费视频观看| 免费观看理论片完整版| 插逼视频双插洞国产操逼插洞| 在线免费观看亚洲精品电影| 91在线免费观看成人| 神马午夜在线观看视频| 日韩一区二区三区三州| 北条麻妃肉色丝袜视频| 久久久精品欧洲亚洲av| 91成人精品亚洲国产| 日韩美av高清在线| 日韩午夜福利精品试看| 日韩精品啪啪视频一道免费| 久久久久久久久久一区二区三区| 神马午夜在线观看视频| 日本韩国免费一区二区三区视频 | 热思思国产99re| 免费黄高清无码国产| 最新中文字幕免费视频| 亚洲精品 欧美日韩| 九一传媒制片厂视频在线免费观看| 中文字幕在线观看国产片| 女人精品内射国产99| 亚洲精品国偷自产在线观看蜜桃| 天天日天天透天天操| 欲乱人妻少妇在线视频裸| 欧美成人一二三在线网| 亚洲少妇人妻无码精品| 亚洲国产免费av一区二区三区 | 青青青青青免费视频| 精品一区二区亚洲欧美| 777奇米久久精品一区| 最近的中文字幕在线mv视频| 日日爽天天干夜夜操| 亚洲欧美激情人妻偷拍| 亚洲另类图片蜜臀av| 人妻无码中文字幕专区| 天天综合天天综合天天网| 日韩欧美高清免费在线| 人妻少妇精品久久久久久| 黄色视频成年人免费观看| 欧美偷拍亚洲一区二区| 日本少妇人妻xxxxxhd| 黄色无码鸡吧操逼视频| 少妇露脸深喉口爆吞精| 国产精品日韩欧美一区二区| 美女视频福利免费看| 大香蕉福利在线观看| 3D动漫精品啪啪一区二区下载| 欧美中国日韩久久精品| 99精品一区二区三区的区| 91成人在线观看免费视频| 国产精品自拍在线视频| 一区二区三区日本伦理| 性感美女诱惑福利视频| 超碰在线观看免费在线观看| 黄色无码鸡吧操逼视频| 天天操天天干天天艹| 日辽宁老肥女在线观看视频| 五月天中文字幕内射| 女人精品内射国产99| 免费黄高清无码国产| 天天操夜夜操天天操天天操| 水蜜桃国产一区二区三区| 欧美日韩人妻久久精品高清国产| 国产一区成人在线观看视频| 日韩a级精品一区二区| 亚洲人一区二区中文字幕| 亚洲国产成人av在线一区| 欧美天堂av无线av欧美| 伊人成人综合开心网| 一区二区三区的久久的蜜桃的视频| 韩国AV无码不卡在线播放| 新婚人妻聚会被中出| 97色视频在线观看| 欧洲黄页网免费观看| www日韩毛片av| 四川乱子伦视频国产vip| 99久久99久国产黄毛片| 欲满人妻中文字幕在线| 四虎永久在线精品免费区二区| 91国产在线视频免费观看| 女同性ⅹxx女同h偷拍| 午夜的视频在线观看| 国产精品系列在线观看一区二区| 在线观看黄色成年人网站| 色综合久久五月色婷婷综合| 国产使劲操在线播放| 亚洲成人情色电影在线观看| 日韩成人综艺在线播放| 中文字幕第一页国产在线| 久草视频福利在线首页| 五十路息与子猛烈交尾视频| 自拍偷拍一区二区三区图片| 中文字幕乱码av资源| 日本精品美女在线观看| 人人妻人人澡欧美91精品| 一区二区熟女人妻视频| 特一级特级黄色网片| 中文字幕之无码色多多| 偷拍美女一区二区三区| 中文字幕人妻被公上司喝醉在线| 97人人模人人爽人人喊| 亚洲一区二区三区久久午夜| 久久精品在线观看一区二区| 国产麻豆91在线视频| 风流唐伯虎电视剧在线观看| 天码人妻一区二区三区在线看| 欧美 亚洲 另类综合| 天天干天天搞天天摸| 青娱乐蜜桃臀av色| 国产欧美精品一区二区高清| 亚洲av无乱一区二区三区性色| av在线免费中文字幕| 成人影片高清在线观看| 一区二区三区四区五区性感视频 | 人妻丝袜精品中文字幕| 色婷婷六月亚洲综合香蕉| 一区二区三区日本伦理| 日本熟女50视频免费| 人人人妻人人澡人人| 精品国产亚洲av一淫| 欧美精品一二三视频| 91桃色成人网络在线观看| 国产一区二区神马久久| gogo国模私拍视频| 中英文字幕av一区| 99久久久无码国产精品性出奶水| 欧美国产亚洲中英文字幕| 日韩欧美国产精品91| 熟女人妻三十路四十路人妻斩| 馒头大胆亚洲一区二区| 99精品久久久久久久91蜜桃| 女同互舔一区二区三区| 欧美色婷婷综合在线| 热思思国产99re| 国产精品久久9999| av破解版在线观看| 日韩欧美在线观看不卡一区二区| 国产成人综合一区2区| 国产露脸对白在线观看| 一区二区三区毛片国产一区| 欧美xxx成人在线| 经典亚洲伊人第一页| 麻豆精品成人免费视频| 熟女人妻在线观看视频| 国产精品久久久久网| 99国内小视频在现欢看| 9久在线视频只有精品| 57pao国产一区二区| 欧美男同性恋69视频| 国产成人一区二区三区电影网站 | yellow在线播放av啊啊啊 | 欧美日韩在线精品一区二区三| 国产精品黄片免费在线观看| 日日夜夜大香蕉伊人| 又色又爽又黄又刺激av网站| 91免费观看在线网站| 天天干天天操天天摸天天射| 大陆精品一区二区三区久久| 亚洲欧美清纯唯美另类| 最后99天全集在线观看| 欧美黄色录像免费看的| sspd152中文字幕在线| 在线观看视频 你懂的| 日韩亚洲高清在线观看| 97国产福利小视频合集| 91在线视频在线精品3| 天天操天天干天天插| 成人亚洲精品国产精品 | 最新国产精品网址在线观看| 91精品高清一区二区三区| 在线网站你懂得老司机| 久久久精品欧洲亚洲av| 欧美日韩精品永久免费网址| 一区二区在线视频中文字幕| 蜜桃精品久久久一区二区| 精品成人午夜免费看| 极品丝袜一区二区三区| 人妻少妇中文有码精品| 日本成人不卡一区二区| 香港一级特黄大片在线播放| 欧美男人大鸡吧插女人视频| 精品久久婷婷免费视频| 93精品视频在线观看| 亚洲福利精品视频在线免费观看 | 国产视频在线视频播放| 国产三级影院在线观看| 91九色porny国产蝌蚪视频| 99精品免费观看视频| 午夜激情高清在线观看| 99精品免费久久久久久久久a| 精品高潮呻吟久久av| 国产亚洲欧美视频网站| 成人av电影免费版| 91九色porny国产蝌蚪视频| 亚洲图片偷拍自拍区| 91色老99久久九九爱精品| 97国产在线观看高清| 2020国产在线不卡视频| 亚洲区欧美区另类最新章节| 午夜福利资源综合激情午夜福利资| 亚洲精品 日韩电影| 精品国产污污免费网站入口自| 在线视频这里只有精品自拍| 欧洲欧美日韩国产在线| 亚洲美女自偷自拍11页| 免费大片在线观看视频网站| 天天做天天爽夜夜做少妇| 91久久综合男人天堂| 99av国产精品欲麻豆| 天天日天天干天天插舔舔| 国产又粗又硬又大视频| 久久麻豆亚洲精品av| 成人av电影免费版| 午夜婷婷在线观看视频| 美女骚逼日出水来了| 91麻豆精品久久久久| 自拍偷拍日韩欧美一区二区| av中文字幕在线导航| 自拍偷拍亚洲精品第2页| 国产福利小视频大全| 制服丝袜在线人妻中文字幕| 亚洲免费在线视频网站| 色偷偷伊人大杳蕉综合网| 夜色撩人久久7777| 一色桃子人妻一区二区三区| 精品人妻伦一二三区久 | 免费费一级特黄真人片| 又黄又刺激的午夜小视频| 欧美区一区二区三视频| 亚洲1卡2卡三卡4卡在线观看| 家庭女教师中文字幕在线播放 | 亚洲国产在线精品国偷产拍| 中文字幕午夜免费福利视频| 红杏久久av人妻一区| 免费啪啪啪在线观看视频| 国产美女精品福利在线| 天天日天天透天天操| 国产成人精品一区在线观看 | 国产之丝袜脚在线一区二区三区 | 亚洲成高清a人片在线观看| 中文字幕 人妻精品| 爱爱免费在线观看视频| 久青青草视频手机在线免费观看| 国产精品伦理片一区二区| 午夜蜜桃一区二区三区| 欧美成人综合视频一区二区 | 亚洲精品麻豆免费在线观看 | 成人18禁网站在线播放| 熟女国产一区亚洲中文字幕| 青青擦在线视频国产在线| 欧美老鸡巴日小嫩逼| 一级黄片久久久久久久久| 好男人视频在线免费观看网站| 国产日韩av一区二区在线| 欧美特色aaa大片| 国产成人精品一区在线观看 | 日本少妇的秘密免费视频| 喷水视频在线观看这里只有精品| 国产大学生援交正在播放| 人人妻人人爽人人澡人人精品| 亚洲熟妇久久无码精品| 最新黄色av网站在线观看| 国产亚洲欧美视频网站| 啪啪啪啪啪啪啪免费视频| 一区二区三区av高清免费| 5528327男人天堂| av天堂资源最新版在线看| 国产精品人妻熟女毛片av久| aⅴ精产国品一二三产品| 亚洲伊人av天堂有码在线| 亚洲熟色妇av日韩熟色妇在线| 亚洲综合一区成人在线| 亚洲国产免费av一区二区三区 | 天干天天天色天天日天天射| 天天干天天日天天干天天操| 久久亚洲天堂中文对白| 亚洲一区二区久久久人妻| 香蕉91一区二区三区| 国产午夜激情福利小视频在线| 99久久超碰人妻国产| 国产成人自拍视频播放| 亚洲av日韩高清hd| 摧残蹂躏av一二三区| 综合激情网激情五月五月婷婷| 91av精品视频在线| 十八禁在线观看地址免费 | 黄色的网站在线免费看| 国产综合精品久久久久蜜臀| 青草青永久在线视频18| 18禁免费av网站| 天天干天天日天天谢综合156| 亚洲va欧美va人人爽3p| 日本又色又爽又黄又粗| 啪啪啪啪啪啪啪免费视频| 大香蕉伊人国产在线| 黄色成人在线中文字幕| 538精品在线观看视频| 成人24小时免费视频| 快插进小逼里大鸡吧视频| 夜夜骑夜夜操夜夜奸| 91欧美在线免费观看| 亚洲一区二区三区久久受| 91久久国产成人免费网站| 亚洲精品欧美日韩在线播放| 中文人妻AV久久人妻水| 五月激情婷婷久久综合网| 国产精品国色综合久久| 亚洲中文精品人人免费| 色97视频在线播放| 99精品国产免费久久| 在线观看亚洲人成免费网址| 国产va在线观看精品| 日韩午夜福利精品试看| 亚洲精品乱码久久久本| 1000部国产精品成人观看视频 | 日本韩国在线观看一区二区| 欧美日韩国产一区二区三区三州| 国产精品久久久久国产三级试频| 75国产综合在线视频| 视频二区在线视频观看| 亚洲伊人色一综合网| 中文字幕亚洲久久久| 91av精品视频在线| 人人妻人人澡人人爽人人dvl| 国产美女午夜福利久久| 日本少妇精品免费视频| 婷婷久久一区二区字幕网址你懂得| 综合精品久久久久97| 亚洲成人三级在线播放| 亚洲Av无码国产综合色区| aaa久久久久久久久| 日日摸夜夜添夜夜添毛片性色av| 亚洲综合乱码一区二区| 日韩一个色综合导航| 淫秽激情视频免费观看| 久久久91蜜桃精品ad| 九九视频在线精品播放| 成年人该看的视频黄免费| 中文字幕午夜免费福利视频| 东游记中文字幕版哪里可以看到| 在线免费观看亚洲精品电影| 久久机热/这里只有| 欧美国品一二三产区区别| 9久在线视频只有精品| 女生被男生插的视频网站| 亚洲免费av在线视频| 日韩欧美一级精品在线观看| 久久久久久久久久性潮| 青青青青草手机在线视频免费看| 久久久精品999精品日本| nagger可以指黑人吗| 大陆av手机在线观看| 亚洲综合图片20p| 中文字幕成人日韩欧美| 好男人视频在线免费观看网站| 国产91久久精品一区二区字幕| 一二三中文乱码亚洲乱码one| 中国黄片视频一区91| 热思思国产99re| 一区二区三区久久中文字幕| chinese国产盗摄一区二区| 成人av天堂丝袜在线观看| 亚洲va欧美va人人爽3p| 亚洲天堂成人在线观看视频网站| 激情人妻校园春色亚洲欧美 | 午夜精品一区二区三区福利视频| 日本性感美女三级视频| 在线观看免费视频网| 午夜久久久久久久99| 亚洲av琪琪男人的天堂| 亚洲码av无色中文| 农村胖女人操逼视频| 少妇人妻二三区视频| 久久久久久99国产精品| 在线国产中文字幕视频| 日本免费一级黄色录像| 日韩美女综合中文字幕pp| 青青青青青免费视频| 年轻的人妻被夫上司侵犯| 亚洲一区二区三区久久受| 色综合久久五月色婷婷综合| 福利国产视频在线观看| 欧洲亚洲欧美日韩综合| 国产高清在线观看1区2区| 青青青国产片免费观看视频| 国产九色91在线视频| 91中文字幕最新合集| 中文字幕人妻熟女在线电影| 欧美另类一区二区视频| av黄色成人在线观看| 美味人妻2在线播放| 免费岛国喷水视频在线观看 | 初美沙希中文字幕在线| 国产夫妻视频在线观看免费| 偷拍3456eee| 久久www免费人成一看片| 亚洲熟女女同志女同| 天天插天天狠天天操| 二区中出在线观看老师| 亚洲欧美另类手机在线| 美女福利写真在线观看视频| 色伦色伦777国产精品| 黄色无码鸡吧操逼视频| 午夜精品久久久久麻豆影视| 青青操免费日综合视频观看| av中文字幕电影在线看| 天天日天天干天天舔天天射| 亚欧在线视频你懂的| 国产又粗又黄又硬又爽| 91国产在线视频免费观看| 99久久中文字幕一本人| 激情人妻校园春色亚洲欧美 | 小穴多水久久精品免费看| 小泽玛利亚视频在线观看| 自拍偷拍日韩欧美一区二区| 第一福利视频在线观看| 亚洲综合另类精品小说| 成人区人妻精品一区二视频 | 欧美中国日韩久久精品| 日韩精品二区一区久久| 青娱乐蜜桃臀av色| 91高清成人在线视频| 日韩欧美中文国产在线| 中文字幕人妻被公上司喝醉在线| 大鸡吧插入女阴道黄色片 | 91大神福利视频网| 欧美性受xx黑人性猛交| japanese日本熟妇另类| 色在线观看视频免费的| 亚洲变态另类色图天堂网| 99热久久这里只有精品8| av完全免费在线观看av| 国产精品国色综合久久| 老鸭窝在线观看一区| 色哟哟在线网站入口| 青娱乐最新视频在线| 早川濑里奈av黑人番号| 最新国产亚洲精品中文在线| 伊拉克及约旦宣布关闭领空| 亚洲va国产va欧美精品88| 久久综合老鸭窝色综合久久| 端庄人妻堕落挣扎沉沦| 亚洲人成精品久久久久久久| 午夜国产免费福利av| 日本熟女50视频免费| 岛国一区二区三区视频在线| 中文字幕人妻一区二区视频| 午夜成午夜成年片在线观看| 男女第一次视频在线观看| 熟女俱乐部一二三区| 国产性生活中老年人视频网站| 2022精品久久久久久中文字幕| 天天干天天爱天天色| 天天操天天爽天天干| 亚洲精品三级av在线免费观看| 欧美精品免费aaaaaa| 国产一级麻豆精品免费| 国产剧情演绎系列丝袜高跟| 人妻爱爱 中文字幕| 精品高跟鞋丝袜一区二区| 欧美视频综合第一页| 午夜婷婷在线观看视频| 国产a级毛久久久久精品| 夜夜骑夜夜操夜夜奸| 大香蕉日本伊人中文在线| 日韩熟女系列一区二区三区| 狠狠躁夜夜躁人人爽天天久天啪 | 午夜久久久久久久精品熟女| 婷婷综合蜜桃av在线| 婷婷综合亚洲爱久久| 亚洲人一区二区中文字幕| 日韩欧美国产一区ab| 一区二区三区日韩久久| 日韩在线中文字幕色| 欧美偷拍亚洲一区二区| 一区二区在线观看少妇| 综合页自拍视频在线播放| 成人精品在线观看视频| 福利在线视频网址导航| 啊用力插好舒服视频| 欧美男人大鸡吧插女人视频| 中文字幕av熟女人妻| 成人色综合中文字幕| 夜夜嗨av一区二区三区中文字幕| 国产激情av网站在线观看| 美女张开两腿让男人桶av| 18禁网站一区二区三区四区 | 日韩影片一区二区三区不卡免费| 在线视频自拍第三页| 日韩在线中文字幕色| 国产在线免费观看成人| 动漫精品视频在线观看| 二区中出在线观看老师| a v欧美一区=区三区| 99热这里只有国产精品6| 可以免费看的www视频你懂的| 国产普通话插插视频| 黄色大片免费观看网站| 亚洲精品麻豆免费在线观看| 国产1区,2区,3区| 成年人午夜黄片视频资源| 人妻无码色噜噜狠狠狠狠色| 亚洲男人让女人爽的视频| 日本三极片中文字幕| 蝴蝶伊人久久中文娱乐网| 成人区人妻精品一区二视频| 女同互舔一区二区三区| 日本少妇的秘密免费视频| 国际av大片在线免费观看| 欧美专区第八页一区在线播放| 欧美怡红院视频在线观看| 538精品在线观看视频| 一级a看免费观看网站| 91色网站免费在线观看| 日比视频老公慢点好舒服啊| 自拍偷拍亚洲欧美在线视频| 五月天久久激情视频| 一级黄片久久久久久久久| 午夜精品亚洲精品五月色| 欧美黑人性暴力猛交喷水| 亚洲免费视频欧洲免费视频| 天堂v男人视频在线观看| 淫秽激情视频免费观看| 亚洲伊人色一综合网| 国产高清精品一区二区三区| 日本成人一区二区不卡免费在线| 国产成人午夜精品福利| 欧美成人黄片一区二区三区 | 边摸边做超爽毛片18禁色戒| 免费观看成年人视频在线观看| 国产精品成人xxxx| 日韩北条麻妃一区在线| 一级黄色av在线观看| 亚洲va天堂va国产va久| 色综合天天综合网国产成人| 久久亚洲天堂中文对白| 国产 在线 免费 精品| 深田咏美亚洲一区二区 | 女蜜桃臀紧身瑜伽裤| 国产一区二区久久久裸臀| 日韩美女福利视频网| 国产在线91观看免费观看| 国产日韩一区二区在线看| 国产熟妇人妻ⅹxxxx麻豆| 欧美3p在线观看一区二区三区| 国产熟妇乱妇熟色T区| 97国产福利小视频合集| 亚洲一区二区三区五区| 婷婷激情四射在线观看视频| 国产精品自拍偷拍a| 国产精品一二三不卡带免费视频| 成人色综合中文字幕| 在线观看的a站 最新| 亚洲va国产va欧美精品88| 午夜国产免费福利av| 亚洲视频在线视频看视频在线| 欧美黑人性猛交xxxxⅹooo| 98精产国品一二三产区区别| 精品视频国产在线观看| 91桃色成人网络在线观看| 久久农村老妇乱69系列| 国产又粗又黄又硬又爽| www日韩a级s片av| 亚洲一区二区三区偷拍女厕91| 国产真实乱子伦a视频| 天天插天天色天天日| 免费高清自慰一区二区三区网站 | 国产精品黄页网站视频| 久久久久久久亚洲午夜综合福利 | 93精品视频在线观看| 夜夜嗨av蜜臀av| 51精品视频免费在线观看| 黄色片年轻人在线观看| 国产亚洲欧美视频网站| 91福利视频免费在线观看| 在线观看黄色成年人网站| 女生被男生插的视频网站| 亚洲av可乐操首页| 国产黄色高清资源在线免费观看 | 免费在线看的黄网站| 亚洲国产精品久久久久久6| 这里只有精品双飞在线播放| 直接能看的国产av| 国产成人精品福利短视频| 3344免费偷拍视频| 少妇人妻100系列| 天天日天天干天天要| 亚洲国产在人线放午夜| 超pen在线观看视频公开97| 欧美精品亚洲精品日韩在线| 欧美久久久久久三级网| 成人免费毛片aaaa| 伊人网中文字幕在线视频| 欧美xxx成人在线| 人妻少妇中文有码精品| 亚洲av琪琪男人的天堂| 亚洲视频乱码在线观看| 在线观看欧美黄片一区二区三区| 日日操夜夜撸天天干| 黄色片黄色片wyaa| 天天日天天天天天天天天天天| 日本欧美视频在线观看三区| 成人蜜桃美臀九一一区二区三区| 韩国男女黄色在线观看| 久久久久五月天丁香社区| 青青青视频自偷自拍38碰| 啪啪啪啪啪啪啪啪啪啪黄色| 日本一道二三区视频久久| 污污小视频91在线观看| 日本熟妇喷水xxx| av手机在线免费观看日韩av| 久久久久久九九99精品| 2018最新中文字幕在线观看| 色婷婷六月亚洲综合香蕉| 特大黑人巨大xxxx| 国产综合精品久久久久蜜臀| 天天通天天透天天插| 国产综合高清在线观看| 亚洲嫩模一区二区三区| 黄色视频成年人免费观看| 国产亚洲国产av网站在线| 97国产在线av精品| 亚洲av可乐操首页| 亚洲1区2区3区精华液| 夜夜骑夜夜操夜夜奸| gogo国模私拍视频| 成人乱码一区二区三区av| 日美女屁股黄邑视频| 亚洲美女高潮喷浆视频| 亚洲Av无码国产综合色区| 3344免费偷拍视频| 91人妻精品一区二区在线看| 91人妻精品一区二区久久| 18禁美女羞羞免费网站| 中文字幕网站你懂的| 久久精品国产999| 日本午夜福利免费视频| 十八禁在线观看地址免费| 78色精品一区二区三区| 午夜91一区二区三区| 亚洲一级特黄特黄黄色录像片| 欧美韩国日本国产亚洲| 中国黄片视频一区91| 美女在线观看日本亚洲一区| 亚洲激情av一区二区| 色综合久久无码中文字幕波多| 日本丰满熟妇大屁股久久| 性生活第二下硬不起来| 91社福利《在线观看| 91精品国产综合久久久蜜| 可以免费看的www视频你懂的| 中文字幕 亚洲av| 伊人开心婷婷国产av| 国产一区二区三免费视频| 国产视频网站国产视频| 欧美日韩情色在线观看| 精品一区二区三区欧美| 久久精品国产亚洲精品166m| 97黄网站在线观看| 91啪国自产中文字幕在线| wwwxxx一级黄色片| 久久一区二区三区人妻欧美| 999九九久久久精品| 夜夜骑夜夜操夜夜奸| 午夜在线观看岛国av,com| 中文字幕免费福利视频6| 在线视频自拍第三页| 福利午夜视频在线观看| 国产使劲操在线播放| 不卡一区一区三区在线| 日韩av免费观看一区| 78色精品一区二区三区| 大肉大捧一进一出好爽在线视频 | av日韩在线观看大全| 18禁美女羞羞免费网站| 少妇露脸深喉口爆吞精| 99久久中文字幕一本人| 日韩av有码一区二区三区4| av在线免费观看亚洲天堂| 欧美日韩国产一区二区三区三州| 天天夜天天日天天日| 国产午夜亚洲精品不卡在线观看| 福利午夜视频在线观看| 丝袜肉丝一区二区三区四区在线看| 99的爱精品免费视频| 免费在线黄色观看网站| 国产精品黄大片在线播放| 国产日韩精品一二三区久久久| 天天日天天透天天操| 影音先锋女人av噜噜色| 五十路人妻熟女av一区二区| 最后99天全集在线观看| 美女骚逼日出水来了| av网站色偷偷婷婷网男人的天堂| 亚洲在线免费h观看网站| 精品乱子伦一区二区三区免费播 | 97超碰国语国产97超碰| 日韩一区二区电国产精品| 久久久人妻一区二区| 直接观看免费黄网站| 18禁网站一区二区三区四区| 2022国产综合在线干| 在线免费观看欧美小视频| 天天日天天干天天插舔舔| 精品亚洲中文字幕av| 亚洲免费福利一区二区三区| 最新国产亚洲精品中文在线| 好吊操视频这里只有精品| 超碰97人人做人人爱| 在线国产日韩欧美视频| free性日本少妇| 亚洲 色图 偷拍 欧美| 一级黄色av在线观看| 福利国产视频在线观看| 热久久只有这里有精品| 久久久久久九九99精品| 夫妻在线观看视频91| 91国内精品久久久久精品一| 亚洲护士一区二区三区| 午夜的视频在线观看| 国产又粗又硬又大视频| 国产精品人妻熟女毛片av久| 国产又大又黄免费观看| 91亚洲精品干熟女蜜桃频道| 国产高清精品极品美女| 日韩欧美一级精品在线观看| 久久这里只有精品热视频| 天天插天天狠天天操| 综合色区亚洲熟妇shxstz| 日韩欧美制服诱惑一区在线| 美女 午夜 在线视频| 天堂中文字幕翔田av| 久久久久五月天丁香社区| 人妻少妇性色欲欧美日韩| 亚洲综合在线视频可播放| 中文字幕日韩无敌亚洲精品| 中文字幕日本人妻中出| 狠狠操操操操操操操操操| 亚洲精品av在线观看| 大香蕉伊人国产在线| 久久丁香婷婷六月天| 91成人在线观看免费视频| 久久久制服丝袜中文字幕| 蜜桃色婷婷久久久福利在线 | av在线免费中文字幕| 午夜久久久久久久精品熟女| 精品成人啪啪18免费蜜臀| rct470中文字幕在线| 中文字幕综合一区二区| 91九色porny蝌蚪国产成人| 曰本无码人妻丰满熟妇啪啪| 国产日韩欧美美利坚蜜臀懂色| 久久这里只有精品热视频 | 在线不卡日韩视频播放| 直接观看免费黄网站| 91中文字幕最新合集| 第一福利视频在线观看| 夜夜操,天天操,狠狠操| 亚洲精品国产综合久久久久久久久| 亚洲成人熟妇一区二区三区| 日日操综合成人av| 国产janese在线播放| 精品一区二区亚洲欧美| 福利午夜视频在线观看| 婷婷午夜国产精品久久久| jul—619中文字幕在线| 男人的网址你懂的亚洲欧洲av| 国产精品福利小视频a| 91免费观看在线网站| 在线免费观看靠比视频的网站| 少妇人妻100系列| 白嫩白嫩美女极品国产在线观看| 91啪国自产中文字幕在线| 亚洲va欧美va人人爽3p| 午夜激情久久不卡一区二区| 亚洲免费va在线播放| 粉嫩av蜜乳av蜜臀| 黄片色呦呦视频免费看| 日本人妻欲求不满中文字幕| 热99re69精品8在线播放| 欧美美女人体视频一区| 成人H精品动漫在线无码播放| 2018最新中文字幕在线观看| 欲满人妻中文字幕在线| 瑟瑟视频在线观看免费视频| 精品一区二区三区欧美| 欧美中国日韩久久精品| 精品少妇一二三视频在线| 激情内射在线免费观看| 日美女屁股黄邑视频| 含骚鸡巴玩逼逼视频| 亚洲日本一区二区三区| av在线免费中文字幕| 亚洲国产精品中文字幕网站| 亚洲欧美综合在线探花| 和邻居少妇愉情中文字幕| 天天操天天爽天天干| 夜女神免费福利视频| 97人人模人人爽人人喊| 亚洲一区久久免费视频| 天天干天天日天天干天天操| 九九视频在线精品播放| 亚洲无码一区在线影院| 99热99re在线播放| 国产91嫩草久久成人在线视频| 美女张开腿让男生操在线看| 欧美亚洲偷拍自拍色图| 青娱乐最新视频在线| 成人影片高清在线观看 | 中国视频一区二区三区| 国产日韩精品电影7777| 亚洲成a人片777777| 黄色黄色黄片78在线| 国产一区成人在线观看视频| 国产成人一区二区三区电影网站| 亚洲超碰97人人做人人爱| 亚洲av成人网在线观看| 午夜在线观看一区视频| 国产成人无码精品久久久电影| 黄色资源视频网站日韩| 2020久久躁狠狠躁夜夜躁| 一区二区久久成人网| 91九色porny国产在线| 日本韩国免费一区二区三区视频| 亚洲精品乱码久久久久久密桃明| 2021天天色天天干| 91精品国产91青青碰| 2020中文字幕在线播放| 福利午夜视频在线观看| 青青青国产片免费观看视频| 91p0rny九色露脸熟女| 欧美日韩v中文在线| 欧美一级片免费在线成人观看 | 日韩熟女系列一区二区三区| 日韩亚洲高清在线观看| 亚洲伊人av天堂有码在线| 夏目彩春在线中文字幕| 日韩成人综艺在线播放| 综合色区亚洲熟妇shxstz| 熟女人妻在线中出观看完整版| 女同性ⅹxx女同hd| 亚洲欧美人精品高清| 日本免费一级黄色录像| 91精品国产91久久自产久强 | 欧美va不卡视频在线观看| 亚洲成人免费看电影| 在线免费观看99视频| 成年人黄视频在线观看| 97黄网站在线观看| 免费手机黄页网址大全| 欧美视频一区免费在线| 97小视频人妻一区二区| 色秀欧美视频第一页| 日韩成人免费电影二区| 日本a级视频老女人| 888亚洲欧美国产va在线播放| 天天干天天操天天爽天天摸| 涩涩的视频在线观看视频| 在线观看免费视频网| 国产精品视频资源在线播放| 亚洲综合另类欧美久久| 91在线免费观看成人| 老有所依在线观看完整版| 五十路息与子猛烈交尾视频| 顶级尤物粉嫩小尤物网站| 青青草原色片网站在线观看| 亚洲一区二区三区在线高清| 视频一区 视频二区 视频| 熟女91pooyn熟女| 日韩黄色片在线观看网站| 成人资源在线观看免费官网| 亚洲熟女综合色一区二区三区四区| 午夜激情久久不卡一区二区| 一区二区三区综合视频| 一区二区三区另类在线| 91国内精品久久久久精品一| 欧美特色aaa大片| 沙月文乃人妻侵犯中文字幕在线 | 中文字幕奴隷色的舞台50| 伊人日日日草夜夜草| 亚洲美女自偷自拍11页| 97人妻无码AV碰碰视频| 久久精品国产23696| 亚洲伊人色一综合网| 亚洲精品午夜久久久久| v888av在线观看视频| 国产精品伦理片一区二区| 丝袜亚洲另类欧美变态| 中文字幕人妻一区二区视频| 综合色区亚洲熟妇shxstz| 成人精品视频99第一页| 国产精品中文av在线播放| 国产成人精品av网站| 日韩少妇人妻精品无码专区| 国产精彩福利精品视频| 97精品成人一区二区三区| 在线观看免费av网址大全| 亚洲精品久久视频婷婷| 中文字幕+中文字幕| 亚洲熟女久久久36d| 香港一级特黄大片在线播放| 97成人免费在线观看网站| 成人福利视频免费在线| av在线播放国产不卡| 中文字幕在线永久免费播放| 亚洲综合图片20p| 白白操白白色在线免费视频 | 在线成人日韩av电影| 亚洲女人的天堂av| 老有所依在线观看完整版| 亚洲午夜电影在线观看| 国产女孩喷水在线观看| 黄网十四区丁香社区激情五月天| 成年人啪啪视频在线观看| 大鸡巴操娇小玲珑的女孩逼| 中文字幕免费福利视频6| 在线观看免费视频网| 蜜桃色婷婷久久久福利在线| 成人亚洲精品国产精品| 午夜激情久久不卡一区二区 | 天堂中文字幕翔田av| aiss午夜免费视频| 91片黄在线观看喷潮| 亚洲中文字幕乱码区| 中文字幕av一区在线观看| 精品高跟鞋丝袜一区二区| 大陆胖女人与丈夫操b国语高清| 亚洲人妻国产精品综合| 中文字幕第一页国产在线| 日本啪啪啪啪啪啪啪| 日韩精品啪啪视频一道免费| 国产亚洲精品视频合集| 真实国产乱子伦一区二区| 日日夜夜大香蕉伊人| 无码中文字幕波多野不卡| 天天干天天操天天插天天日| 92福利视频午夜1000看| 91社福利《在线观看| 青青青激情在线观看视频| 精品高潮呻吟久久av| 美女骚逼日出水来了| huangse网站在线观看| 亚洲中文字幕国产日韩| 国产午夜男女爽爽爽爽爽视频| 在线观看的a站 最新| 国产揄拍高清国内精品对白| 久久久久久性虐视频| 91国内视频在线观看| 99热这里只有精品中文| 亚洲av自拍天堂网| 天天色天天爱天天爽| 九九热99视频在线观看97| 亚洲精品ww久久久久久| 中国产一级黄片免费视频播放| 久久精品在线观看一区二区| 欧美一级片免费在线成人观看| 91免费福利网91麻豆国产精品| 色婷婷久久久久swag精品| 91九色国产porny蝌蚪| 换爱交换乱高清大片| 欧美aa一级一区三区四区 | 天天日天天日天天射天天干| 一级黄色片夫妻性生活| 亚洲一级av大片免费观看| 成人国产影院在线观看| 亚洲国际青青操综合网站| 日韩午夜福利精品试看| 国产中文精品在线观看| 人妻久久无码中文成人| 蜜臀av久久久久久久| 日韩午夜福利精品试看| 亚洲国产成人av在线一区| 国产97在线视频观看| 91p0rny九色露脸熟女| 亚洲午夜高清在线观看| 综合精品久久久久97| 五十路熟女人妻一区二区9933| 欧美日韩不卡一区不区二区| 喷水视频在线观看这里只有精品| 日本一道二三区视频久久| 国产精品免费不卡av| 亚洲色偷偷综合亚洲AV伊人| 视频在线亚洲一区二区| 久久久久五月天丁香社区| 天天操天天干天天艹| av乱码一区二区三区| 天天做天天干天天操天天射| 欧美亚洲偷拍自拍色图| 丝袜肉丝一区二区三区四区在线| 国产视频一区二区午夜| 国产三级精品三级在线不卡| 亚洲成高清a人片在线观看| 熟女国产一区亚洲中文字幕| 偷拍自拍视频图片免费| 这里只有精品双飞在线播放| 在线制服丝袜中文字幕| 久久艹在线观看视频| www日韩毛片av| 97人妻人人澡爽人人精品| 1000部国产精品成人观看视频| 亚洲一区二区三区久久午夜 | 少妇高潮无套内谢麻豆| 成年女人免费播放视频| 日韩在线视频观看有码在线| 中文字幕日韩精品就在这里| 自拍偷区二区三区麻豆| 国产九色91在线视频| 日本福利午夜电影在线观看| 中文亚洲欧美日韩无线码| 国产精品入口麻豆啊啊啊| 午夜精品亚洲精品五月色| 青青青青青免费视频| 国产熟妇人妻ⅹxxxx麻豆| 国产综合高清在线观看| free性日本少妇| 视频在线免费观看你懂得| 国产伦精品一区二区三区竹菊| 丰满熟女午夜福利视频| 人妻少妇亚洲一区二区| 精品欧美一区二区vr在线观看 | 国产在线观看黄色视频| 青青青aaaa免费| 日本裸体熟妇区二区欧美| 天天摸天天日天天操| 在线观看视频网站麻豆| 国产中文精品在线观看| 综合页自拍视频在线播放| 午夜精品福利一区二区三区p| 老熟妇凹凸淫老妇女av在线观看| 黄片色呦呦视频免费看| 亚洲精品欧美日韩在线播放 | 国产黄色片在线收看| 黄色录像鸡巴插进去| 一区二区麻豆传媒黄片| 日本男女操逼视频免费看| 日韩欧美高清免费在线| 欧美专区第八页一区在线播放| 婷婷色国产黑丝少妇勾搭AV| 久久久精品999精品日本 | 红桃av成人在线观看| 亚洲国产美女一区二区三区软件 | 国产日韩欧美美利坚蜜臀懂色| 3D动漫精品啪啪一区二区下载| sw137 中文字幕 在线| 女蜜桃臀紧身瑜伽裤| 91自产国产精品视频| 绝顶痉挛大潮喷高潮无码| 一区国内二区日韩三区欧美| 伊人精品福利综合导航| 亚洲福利精品福利精品福利| 免费观看成年人视频在线观看| 成人国产激情自拍三区| 51精品视频免费在线观看| 黑人巨大的吊bdsm| 动漫精品视频在线观看| 免费高清自慰一区二区三区网站| 91色网站免费在线观看| 日韩精品电影亚洲一区| 美女操逼免费短视频下载链接| 亚洲精品三级av在线免费观看| 亚洲中文精品人人免费| 一区二区久久成人网| 亚洲国产欧美一区二区丝袜黑人| 日韩美女福利视频网| 成人高清在线观看视频| 成人免费毛片aaaa| 日韩av有码中文字幕| 老师让我插进去69AV| 亚洲日产av一区二区在线| 精品美女在线观看视频在线观看 | 天天日夜夜干天天操| 久久久久91精品推荐99| 日韩三级黄色片网站| 在线观看视频 你懂的| 亚洲综合图片20p| 色偷偷伊人大杳蕉综合网| 在线观看国产网站资源| 91超碰青青中文字幕| 中文字幕亚洲中文字幕| 国产精品福利小视频a| 国产精品国色综合久久| 成人乱码一区二区三区av| 被大鸡吧操的好舒服视频免费| 搡老熟女一区二区在线观看| 亚欧在线视频你懂的| 国产亚洲欧美视频网站| 在线观看的黄色免费网站| 在线免费观看亚洲精品电影 | 偷拍自拍 中文字幕| 天天操天天爽天天干| 97a片免费在线观看| 黄网十四区丁香社区激情五月天 | 播放日本一区二区三区电影| 亚洲中文字字幕乱码| 成人激情文学网人妻| 色综合久久久久久久久中文| 成年美女黄网站18禁久久| 4个黑人操素人视频网站精品91| 人人爽亚洲av人人爽av| 五十路熟女人妻一区二| 久草福利电影在线观看| 亚洲综合在线观看免费| 午夜国产福利在线观看| 91片黄在线观看喷潮| 国产日韩精品电影7777| 国产亚洲成人免费在线观看| 一本久久精品一区二区| 精品成人午夜免费看| 18禁无翼鸟成人在线| 啪啪啪操人视频在线播放| 亚洲av日韩av第一区二区三区| japanese日本熟妇另类| 啪啪啪操人视频在线播放| 粉嫩小穴流水视频在线观看| 麻豆精品成人免费视频| 中文字幕日本人妻中出| 一级黄片久久久久久久久| 国产综合视频在线看片| 快插进小逼里大鸡吧视频| 天天日天天日天天射天天干 | 又黄又刺激的午夜小视频| 青娱乐极品视频青青草| 风流唐伯虎电视剧在线观看 | 狠狠的往里顶撞h百合| 97人妻总资源视频| 丝袜国产专区在线观看| 2025年人妻中文字幕乱码在线| 日本性感美女三级视频| 日韩无码国产精品强奸乱伦| 免费手机黄页网址大全| 欧美一区二区三区激情啪啪啪| 久草视频首页在线观看| av在线免费观看亚洲天堂| av网址国产在线观看| 欧美日韩国产一区二区三区三州 | h国产小视频福利在线观看| 视频 一区二区在线观看| 馒头大胆亚洲一区二区| 非洲黑人一级特黄片| 亚洲视频在线观看高清| 亚洲欧美综合另类13p| 亚洲综合一区成人在线| 日本精品一区二区三区在线视频。 | 桃色视频在线观看一区二区 | 顶级尤物粉嫩小尤物网站| 亚洲国产成人在线一区| 亚洲综合另类精品小说| 国语对白xxxx乱大交| 欧美交性又色又爽又黄麻豆| 成人福利视频免费在线| 美女小视频网站在线| 亚洲成人免费看电影| 中国无遮挡白丝袜二区精品| 中文字幕亚洲久久久| 亚洲视频在线视频看视频在线| 91精品综合久久久久3d动漫| 久久久麻豆精亚洲av麻花| 亚洲精品国偷自产在线观看蜜桃| 青青青青操在线观看免费| 国产在线拍揄自揄视频网站| 国产女人露脸高潮对白视频| 亚洲综合另类欧美久久| 后入美女人妻高清在线| 91大屁股国产一区二区| 免费观看成年人视频在线观看| 亚洲中文字字幕乱码| 蜜桃视频17c在线一区二区| 久久麻豆亚洲精品av| 亚洲变态另类色图天堂网| 亚洲国产精品免费在线观看| 亚洲天堂有码中文字幕视频| 午夜dv内射一区区| 精内国产乱码久久久久久| 日韩伦理短片在线观看| 亚洲最大黄了色网站| 一色桃子人妻一区二区三区| 2021最新热播中文字幕| 97年大学生大白天操逼| 一区二区三区蜜臀在线| 国产欧美精品一区二区高清| 视频一区 视频二区 视频| 亚洲一级美女啪啪啪| 色婷婷精品大在线观看| 777奇米久久精品一区| 日韩欧美制服诱惑一区在线| 最新的中文字幕 亚洲| 天天日天天做天天日天天做| 中文字幕在线观看极品视频| 岛国一区二区三区视频在线| 国产麻豆精品人妻av| 国产黄色片蝌蚪九色91| 激情色图一区二区三区| 婷婷激情四射在线观看视频| 一区二区三区日韩久久| 国产欧美日韩在线观看不卡| 亚洲国产成人无码麻豆艾秋| 在线观看免费av网址大全| 亚洲欧美综合在线探花| 精品首页在线观看视频| 2021久久免费视频| 在线观看av观看av| 成人性黑人一级av| 亚洲一区制服丝袜美腿| av日韩在线观看大全| 亚洲欧美激情国产综合久久久| 亚洲午夜伦理视频在线| 欧美日韩一级黄片免费观看| 国产性生活中老年人视频网站| 亚洲精品精品国产综合| 国产黄色a级三级三级三级| 国产成人无码精品久久久电影| 欧美特色aaa大片| 午夜久久久久久久精品熟女| 日韩北条麻妃一区在线| 中文字幕国产专区欧美激情| 精品久久久久久久久久久99| 999久久久久999| 99国产精品窥熟女精品| 亚洲综合另类精品小说| 亚洲av男人的天堂你懂的| 姐姐的朋友2在线观看中文字幕| 久精品人妻一区二区三区| 亚洲一区久久免费视频| 天堂av在线最新版在线| 日本一二三中文字幕| 久久久久久久精品老熟妇| 一区二区久久成人网| 天天日天天干天天爱| 欧美另类一区二区视频| 强行扒开双腿猛烈进入免费版| 天天干天天插天天谢| 免费在线看的黄片视频| sspd152中文字幕在线| av视屏免费在线播放| 国产日韩欧美美利坚蜜臀懂色| 9色在线视频免费观看| 成人av电影免费版| 国产成人精品一区在线观看| 涩爱综合久久五月蜜臀| 中文字幕乱码人妻电影| av视网站在线观看| ka0ri在线视频| 大陆av手机在线观看| 国产精品久久久久久久精品视频| 沙月文乃人妻侵犯中文字幕在线| 亚洲精品一区二区三区老狼| 国产精品一区二区av国| 欧美一区二区三区在线资源| 久久久久久cao我的性感人妻| 在线视频国产欧美日韩| 日韩激情文学在线视频 | 欧美日韩情色在线观看| 亚洲2021av天堂| 国产大学生援交正在播放| 99热99这里精品6国产| 绝顶痉挛大潮喷高潮无码| 91精品视频在线观看免费| 欧美视频中文一区二区三区| 中文字幕第一页国产在线| 男人的天堂av日韩亚洲| 人妻丝袜精品中文字幕| 日韩av大胆在线观看| 黄色男人的天堂视频| 午夜dv内射一区区| 老熟妇凹凸淫老妇女av在线观看| 视频在线免费观看你懂得| 2020韩国午夜女主播在线| 亚洲综合乱码一区二区| 国产高清在线在线视频| 欧美精品欧美极品欧美视频| 亚洲成人激情av在线| 大屁股熟女一区二区三区| 特黄老太婆aa毛毛片| 9国产精品久久久久老师 | 99精品视频之69精品视频| 亚洲偷自拍高清视频| 日韩精品中文字幕播放| 成年人的在线免费视频| 日韩a级精品一区二区| 北条麻妃肉色丝袜视频| 和邻居少妇愉情中文字幕| 欧美日韩高清午夜蜜桃大香蕉| 亚洲丝袜老师诱惑在线观看| 男人在床上插女人视频| 91人妻人人做人人爽在线| www日韩毛片av| av成人在线观看一区| 日韩av大胆在线观看| 国产高潮无码喷水AV片在线观看| 欧美熟妇一区二区三区仙踪林| 美女在线观看日本亚洲一区| 乱亲女秽乱长久久久| 一级黄色片夫妻性生活| 日本性感美女视频网站| 红杏久久av人妻一区| 欧美特色aaa大片| 久碰精品少妇中文字幕av | 黑人解禁人妻叶爱071| 久久久久久性虐视频| 国产97在线视频观看| 日韩中文字幕精品淫| 91人妻人人做人人爽在线| 天天操天天操天天碰| 黑人借宿ntr人妻的沦陷2| 插逼视频双插洞国产操逼插洞| 日韩欧美国产一区不卡| 人妻丝袜精品中文字幕| av天堂资源最新版在线看| 特黄老太婆aa毛毛片| 端庄人妻堕落挣扎沉沦| 天天插天天色天天日| 精品av国产一区二区三区四区| 亚洲欧美久久久久久久久| 午夜青青草原网在线观看| 亚洲日本一区二区久久久精品| 青青青国产免费视频| 黄色黄色黄片78在线| 啪啪啪操人视频在线播放| 日韩三级电影华丽的外出| 亚洲精品久久综合久| 少妇一区二区三区久久久| 亚洲国产第一页在线观看| 18禁无翼鸟成人在线| 日本成人不卡一区二区| 精品乱子伦一区二区三区免费播 | 亚洲精品亚洲人成在线导航| 精品老妇女久久9g国产| 在线新三级黄伊人网| 福利视频网久久91| 亚洲激情唯美亚洲激情图片| 99热99re在线播放| 91九色国产porny蝌蚪| 日本性感美女三级视频| 老司机免费福利视频网| 99热99这里精品6国产| 国产黄色片蝌蚪九色91| 美女福利写真在线观看视频| 国产成人精品福利短视频| 亚洲av在线观看尤物| 亚洲av日韩精品久久久| 视频在线亚洲一区二区| 黑人借宿ntr人妻的沦陷2| 国产亚洲成人免费在线观看| 2022精品久久久久久中文字幕| 亚洲2021av天堂| 日本韩国在线观看一区二区| 国产精品自拍视频大全| 国产精品自偷自拍啪啪啪| 久草视频在线看免费| 中文字幕日韩精品日本| 98视频精品在线观看| 亚洲欧美激情中文字幕| 国产又粗又硬又猛的毛片视频| 超pen在线观看视频公开97| 国产白嫩美女一区二区| 美女 午夜 在线视频| 成人国产影院在线观看| 男人靠女人的逼视频| 91超碰青青中文字幕| 搡老熟女一区二区在线观看| 久久久久久97三级| 午夜在线观看岛国av,com| 国产精彩福利精品视频| 午夜精品久久久久麻豆影视| 日本女大学生的黄色小视频| 亚洲综合自拍视频一区| 亚洲av日韩av网站| 欧美亚洲国产成人免费在线| 午夜毛片不卡在线看| 春色激情网欧美成人| 在线观看日韩激情视频| 骚逼被大屌狂草视频免费看| 婷婷激情四射在线观看视频| 日本女人一级免费片| 天堂av在线官网中文| 老司机午夜精品视频资源| 欧美xxx成人在线| 中文字幕欧美日韩射射一| 抽查舔水白紧大视频| 免费在线观看视频啪啪| 亚洲人妻av毛片在线| 天堂v男人视频在线观看| 天天艹天天干天天操| 亚洲综合另类精品小说| 自拍偷区二区三区麻豆| 狠狠操操操操操操操操操| 3344免费偷拍视频| 国语对白xxxx乱大交| 超碰97人人澡人人| 蜜臀成人av在线播放| 51精品视频免费在线观看| 日本高清成人一区二区三区| 亚洲第17页国产精品| 日本女大学生的黄色小视频| 毛片一级完整版免费| 亚洲国产40页第21页| 久久永久免费精品人妻专区| 黄色在线观看免费观看在线| 边摸边做超爽毛片18禁色戒| 久久热久久视频在线观看| 啊啊好大好爽啊啊操我啊啊视频 | 国产老熟女伦老熟妇ⅹ| 97色视频在线观看| 在线免费观看99视频| 久久www免费人成一看片| 色av色婷婷人妻久久久精品高清| 日日日日日日日日夜夜夜夜夜夜| 2017亚洲男人天堂| 一区二区三区四区五区性感视频| 久久香蕉国产免费天天| 最新日韩av传媒在线| 欧美在线偷拍视频免费看| 亚洲av可乐操首页| 国产黄色片在线收看| 亚洲福利午夜久久久精品电影网| 男人天堂最新地址av| 国产精品久久久久久久久福交 | a v欧美一区=区三区| 国产janese在线播放| 亚洲高清国产一区二区三区| 欧美视频综合第一页| 人人妻人人爽人人添夜| 亚洲午夜精品小视频| 亚洲熟女久久久36d| 天天干天天操天天摸天天射| 久精品人妻一区二区三区 | 操日韩美女视频在线免费看| 一区二区三区四区中文| 中文 成人 在线 视频| 免费在线观看视频啪啪| 最近中文字幕国产在线| 人妻av无码专区久久绿巨人| 天天操天天爽天天干| 99人妻视频免费在线| 中文人妻AV久久人妻水| 国产乱弄免费视频观看| 黑人大几巴狂插日本少妇| 91chinese在线视频| 经典国语激情内射视频| 一色桃子人妻一区二区三区| 欧美另类重口味极品在线观看| 自拍偷拍 国产资源| heyzo蜜桃熟女人妻| 亚洲精品麻豆免费在线观看| 欧美特色aaa大片| 一个色综合男人天堂| 91亚洲精品干熟女蜜桃频道| 欧美日韩一级黄片免费观看| 香蕉片在线观看av| av高潮迭起在线观看| 人妻自拍视频中国大陆| 97超碰免费在线视频| av在线免费观看亚洲天堂| 天堂va蜜桃一区入口| 久久永久免费精品人妻专区 | 日本少妇人妻xxxxx18| 开心 色 六月 婷婷| 鸡巴操逼一级黄色气| 国产亚洲欧美另类在线观看| 精品国产高潮中文字幕| av日韩在线免费播放| 91老师蜜桃臀大屁股| 日本一二三中文字幕| 久草极品美女视频在线观看| 亚洲av日韩av第一区二区三区| 馒头大胆亚洲一区二区| 日比视频老公慢点好舒服啊| 啪啪啪18禁一区二区三区| 九色精品视频在线播放| 精品老妇女久久9g国产| 少妇一区二区三区久久久| 国产无遮挡裸体免费直播视频| 成年人啪啪视频在线观看| 80电影天堂网官网| 蜜桃色婷婷久久久福利在线| 97少妇精品在线观看| 啊用力插好舒服视频| 丰满少妇人妻xxxxx| 在线观看免费av网址大全| 欧美一区二区三区高清不卡tv| 国产 在线 免费 精品| 91香蕉成人app下载| 亚洲成人黄色一区二区三区| 亚洲日本一区二区三区| 亚洲欧美另类自拍偷拍色图| 成人精品在线观看视频| 91传媒一区二区三区| 美女福利写真在线观看视频| 久久久久久久99精品| 性色av一区二区三区久久久| 大香蕉大香蕉大香蕉大香蕉大香蕉| 精品美女福利在线观看| 中文字幕人妻av在线观看| 亚洲高清视频在线不卡| 在线国产中文字幕视频| 中文字幕乱码人妻电影| 唐人色亚洲av嫩草| 中文字幕av一区在线观看| 国产精品视频资源在线播放 | 91大神福利视频网| av大全在线播放免费| 91桃色成人网络在线观看| 久久久精品国产亚洲AV一| 高清一区二区欧美系列 | 男人的网址你懂的亚洲欧洲av| 毛片av在线免费看| 中字幕人妻熟女人妻a62v网 | 视频 一区二区在线观看| 激情色图一区二区三区| 骚货自慰被发现爆操| 青青伊人一精品视频| 东游记中文字幕版哪里可以看到 | 激情国产小视频在线| 亚洲一区二区三区av网站| 一本久久精品一区二区| 成人H精品动漫在线无码播放| 51精品视频免费在线观看| 久久久久久久亚洲午夜综合福利| 69精品视频一区二区在线观看| 大鸡巴操b视频在线| 2021年国产精品自拍| 亚洲一区二区人妻av| 特大黑人巨大xxxx| 精品亚洲中文字幕av| 啊啊啊想要被插进去视频| 日韩欧美国产精品91| 欧美视频综合第一页| 国产精品欧美日韩区二区| 99热久久极品热亚洲| 黄色无码鸡吧操逼视频| 中文字幕日韩人妻在线三区| 美女福利视频网址导航| 最新激情中文字幕视频| 在线免费观看欧美小视频| 亚洲成人激情av在线| www日韩a级s片av| 色秀欧美视频第一页| 天天日夜夜操天天摸| 日韩精品激情在线观看| 91免费黄片可看视频| 91小伙伴中女熟女高潮| 日韩特级黄片高清在线看| 国产日本精品久久久久久久| 人人妻人人爽人人添夜| 天天射,天天操,天天说| 高清一区二区欧美系列| 欧美怡红院视频在线观看| 欧美精产国品一二三产品区别大吗| 蜜臀av久久久久蜜臀av麻豆| 在线观看的黄色免费网站| 亚洲国产欧美国产综合在线| 亚洲天堂有码中文字幕视频| 特大黑人巨大xxxx| 无码国产精品一区二区高潮久久4| asmr福利视频在线观看| 98精产国品一二三产区区别| 亚洲精品国产在线电影| 99国内精品永久免费视频| 懂色av蜜桃a v| jiujiure精品视频在线| 91天堂精品一区二区| 好男人视频在线免费观看网站| 人妻凌辱欧美丰满熟妇| 国产a级毛久久久久精品| 在线免费观看靠比视频的网站| 色婷婷综合激情五月免费观看| 精品一区二区三区午夜| 亚洲欧美清纯唯美另类| 久草视频在线免播放| 青青草视频手机免费在线观看| 青青青青青手机视频| 欧美另类重口味极品在线观看| 久久麻豆亚洲精品av| 亚洲综合乱码一区二区| 日韩欧美制服诱惑一区在线| 黑人巨大的吊bdsm| 男人插女人视频网站| 免费岛国喷水视频在线观看| av老司机亚洲一区二区| 91九色国产熟女一区二区| 欧洲日韩亚洲一区二区三区 | 中文字幕无码一区二区免费| 亚洲一区二区三区五区| 午夜精品一区二区三区城中村| 天天色天天爱天天爽| 亚洲第17页国产精品| 国产一区成人在线观看视频| av在线shipin| 久久香蕉国产免费天天| 国产揄拍高清国内精品对白| 国产九色91在线视频| 欧美一区二区中文字幕电影 | 国产密臀av一区二区三| 11久久久久久久久久久| 免费观看成年人视频在线观看| 日日摸夜夜添夜夜添毛片性色av| 91九色国产porny蝌蚪| 91欧美在线免费观看| 中英文字幕av一区| 欧美香蕉人妻精品一区二区| 日本少妇在线视频大香蕉在线观看| 亚洲熟色妇av日韩熟色妇在线| 人人爽亚洲av人人爽av| 亚洲区欧美区另类最新章节| 插小穴高清无码中文字幕| 亚洲一区二区三区五区| 国产亚洲视频在线二区| 亚洲少妇人妻无码精品| 一个人免费在线观看ww视频| 麻豆性色视频在线观看| 欧美成人精品欧美一级黄色| av手机免费在线观看高潮| 又粗又硬又猛又爽又黄的| 成人高潮aa毛片免费| 黑人巨大精品欧美视频| 亚洲国产成人在线一区| 国产麻豆91在线视频| 五十路在线观看完整版| 久青青草视频手机在线免费观看| 91麻豆精品秘密入口在线观看 | 端庄人妻堕落挣扎沉沦| 91九色porny国产在线| 午夜精品一区二区三区更新| 久久精品美女免费视频| 中文字幕之无码色多多| 三级等保密码要求条款| 天天色天天操天天舔| 五月婷婷在线观看视频免费| 免费人成黄页网站在线观看国产| 亚洲视频乱码在线观看| 午夜蜜桃一区二区三区| 男人插女人视频网站| 成人H精品动漫在线无码播放| 亚洲视频在线观看高清| 国产日韩欧美视频在线导航| 成人福利视频免费在线| 国产精品国产三级麻豆| 日韩av中文在线免费观看| 自拍 日韩 欧美激情| caoporn蜜桃视频| 啪啪啪啪啪啪啪免费视频| 亚洲免费成人a v| 久久这里只有精彩视频免费| 一区二区三区激情在线| 久久久人妻一区二区| 偷拍自拍 中文字幕| 天天日天天干天天要| 国产又粗又硬又猛的毛片视频| 日韩成人综艺在线播放| av破解版在线观看| 狠狠嗨日韩综合久久| 一区二区三区毛片国产一区| 都市激情校园春色狠狠| 婷婷六月天中文字幕| 亚洲av自拍偷拍综合| 91久久国产成人免费网站| 国产普通话插插视频| 欧美亚洲牲夜夜综合久久| 国产chinesehd精品麻豆| 一区二区三区四区视频| 玖玖一区二区在线观看| 超污视频在线观看污污污| 天码人妻一区二区三区在线看| 精品亚洲中文字幕av| 欧美成人一二三在线网| 五十路息与子猛烈交尾视频| 久久久精品国产亚洲AV一| av在线免费资源站| 又粗又长 明星操逼小视频| 这里只有精品双飞在线播放| 久久精品国产亚洲精品166m| av久久精品北条麻妃av观看| 黄色av网站免费在线| 亚洲欧美成人综合视频| 天美传媒mv视频在线观看| 2020av天堂网在线观看| 自拍 日韩 欧美激情| 久久三久久三久久三久久| 91中文字幕免费在线观看| 亚洲高清一区二区三区视频在线| 91精品国产综合久久久蜜| 91综合久久亚洲综合| 日本女人一级免费片| 日本啪啪啪啪啪啪啪| 一区二区在线视频中文字幕| 欧美亚洲国产成人免费在线 | 特级无码毛片免费视频播放| 黄色片一级美女黄色片| 91啪国自产中文字幕在线| 欧美美女人体视频一区| 中文字幕—97超碰网| 日本后入视频在线观看| 亚洲免费国产在线日韩| 亚洲欧美日韩视频免费观看| 日本午夜爽爽爽爽爽视频在线观看 | 中文字幕最新久久久| 肏插流水妹子在线乐播下载 | 日韩人妻在线视频免费| 午夜美女少妇福利视频| 日本五十路熟新垣里子| 天天日天天爽天天爽| 午夜久久久久久久99| 免费人成黄页网站在线观看国产 | 亚洲国产最大av综合| 天天通天天透天天插| tube69日本少妇| 神马午夜在线观看视频| 日本特级片中文字幕| 91在线视频在线精品3| 日本一本午夜在线播放| 国产精品日韩欧美一区二区| v888av在线观看视频| yellow在线播放av啊啊啊| 日韩欧美中文国产在线| 亚洲激情av一区二区| 国产午夜男女爽爽爽爽爽视频| 国产成人自拍视频在线免费观看| 欧洲黄页网免费观看| 天天躁日日躁狠狠躁躁欧美av| 日韩人妻xxxxx| avjpm亚洲伊人久久| 成熟熟女国产精品一区| 国产精品人妻一区二区三区网站| 国产清纯美女al在线| 久久精品国产23696| 欧美综合婷婷欧美综合| 日本一区精品视频在线观看| 免费十精品十国产网站| 熟女91pooyn熟女| 人妻自拍视频中国大陆| 男人天堂最新地址av| 国产精品国产精品一区二区| 免费在线观看视频啪啪| 福利午夜视频在线观看| 传媒在线播放国产精品一区| 亚洲激情唯美亚洲激情图片| 国产揄拍高清国内精品对白| 五十路老熟女码av| eeuss鲁片一区二区三区| 亚洲精品三级av在线免费观看| 精品黑人一区二区三区久久国产| 特一级特级黄色网片| 日日夜夜大香蕉伊人| 97超碰国语国产97超碰| 国产精品久久久久国产三级试频 | 国产在线观看黄色视频| 青娱乐极品视频青青草| 88成人免费av网站| 中文字幕之无码色多多| 黄片大全在线观看观看| 偷拍自拍亚洲美腿丝袜| 狠狠操操操操操操操操操| 国产视频精品资源网站| 亚洲1区2区3区精华液| 欧美日韩v中文在线| 日韩午夜福利精品试看| 亚洲精品久久综合久| 色哟哟国产精品入口| 欧美在线偷拍视频免费看| 99精品久久久久久久91蜜桃| 亚洲第一黄色在线观看| 亚洲国产第一页在线观看| 2020国产在线不卡视频| 五月婷婷在线观看视频免费| 欧美日韩一级黄片免费观看| sw137 中文字幕 在线| 老司机免费视频网站在线看| 巨乳人妻日下部加奈被邻居中出| 天天操天天弄天天射| 国产一线二线三线的区别在哪| 高潮视频在线快速观看国家快速| 亚洲在线免费h观看网站| 欧美黑人与人妻精品| 夏目彩春在线中文字幕| 成人sm视频在线观看| 啪啪啪啪啪啪啪啪啪啪黄色| 在线观看欧美黄片一区二区三区| 红桃av成人在线观看| 国产精品黄片免费在线观看| 一区二区麻豆传媒黄片| 国产在线91观看免费观看| 无码日韩人妻精品久久| 日本一二三中文字幕| 日本裸体熟妇区二区欧美| 免费人成黄页网站在线观看国产| 国产美女午夜福利久久| 亚洲一级美女啪啪啪| 欧美黄片精彩在线免费观看| 亚洲成人激情av在线| 亚洲欧美一区二区三区爱爱动图| 中文字幕日韩精品日本| 欧美乱妇无乱码一区二区| 熟女俱乐部一二三区| 亚洲福利精品视频在线免费观看| 偷拍自拍亚洲美腿丝袜| 精品乱子伦一区二区三区免费播 | 骚逼被大屌狂草视频免费看| 中文字幕高清免费在线人妻 | 中文字幕第1页av一天堂网 | 精品亚洲中文字幕av| 伊人综合免费在线视频| 国产精品午夜国产小视频 | 日本少妇精品免费视频| 日韩成人免费电影二区| 熟女视频一区,二区,三区| 大香蕉玖玖一区2区| 精品亚洲中文字幕av| 欧美偷拍自拍色图片| 97超碰最新免费在线观看| 91一区精品在线观看| 2020韩国午夜女主播在线| 亚洲美女美妇久久字幕组| 中文字幕无码日韩专区免费| 成人激情文学网人妻| 91国内精品久久久久精品一| 亚洲成人精品女人久久久| 久久久久久久一区二区三| 香蕉91一区二区三区| 欧美成人黄片一区二区三区 | 国产高清在线观看1区2区| 一个色综合男人天堂| 美女张开两腿让男人桶av| 国产大鸡巴大鸡巴操小骚逼小骚逼| 91麻豆精品传媒国产黄色片| 欧美偷拍亚洲一区二区| 成人H精品动漫在线无码播放| 精品一区二区三区欧美| 亚洲男人的天堂a在线| 国产自拍在线观看成人| 日韩av免费观看一区| 亚洲 欧美 自拍 偷拍 在线| 女同久久精品秋霞网| 成熟熟女国产精品一区| 91精品国产观看免费| 国产精品sm调教视频| 亚洲午夜高清在线观看| 亚洲伊人久久精品影院一美女洗澡 | 骚货自慰被发现爆操| 亚洲av第国产精品| 二区中出在线观看老师| 国产麻豆剧果冻传媒app| 亚洲熟女综合色一区二区三区四区| 91中文字幕免费在线观看| 玩弄人妻熟妇性色av少妇| 婷婷六月天中文字幕| 不戴胸罩引我诱的隔壁的人妻| 人妻在线精品录音叫床| 熟女91pooyn熟女| 日日爽天天干夜夜操| 国产刺激激情美女网站| 久久久久久久久久性潮| 好了av中文字幕在线| 日本一二三中文字幕| 美日韩在线视频免费看| 亚洲成av人无码不卡影片一| 中文字幕人妻一区二区视频 | 亚洲精品av在线观看| 欧美亚洲中文字幕一区二区三区| 蜜桃久久久久久久人妻| 男人的网址你懂的亚洲欧洲av| 欧美日本在线观看一区二区| 中文字幕在线第一页成人| 91www一区二区三区| 97国产在线观看高清| 色偷偷伊人大杳蕉综合网| 国产性色生活片毛片春晓精品| 国产成人午夜精品福利| 曰本无码人妻丰满熟妇啪啪| 婷婷色中文亚洲网68| 日本熟妇色熟妇在线观看| 色哟哟国产精品入口| 精内国产乱码久久久久久| 九一传媒制片厂视频在线免费观看| 偷拍美女一区二区三区| 国产91精品拍在线观看| 国产麻豆国语对白露脸剧情| 亚洲综合图片20p| 在线免费观看99视频| 成人高清在线观看视频| 人妻凌辱欧美丰满熟妇| 中文字幕人妻三级在线观看| 国产三级片久久久久久久| 最新中文字幕乱码在线| 国产精品系列在线观看一区二区| 免费岛国喷水视频在线观看| 五十路熟女av天堂| 岛国av高清在线成人在线| 57pao国产一区二区| 深夜男人福利在线观看| 欧美区一区二区三视频| 欧美日韩高清午夜蜜桃大香蕉| 在线免费观看日本伦理| 欧美成人一二三在线网| 熟女人妻三十路四十路人妻斩| 青草亚洲视频在线观看| 久久久91蜜桃精品ad| 五月色婷婷综合开心网4438| 国产一区二区火爆视频| 18禁美女无遮挡免费| 女人精品内射国产99| 丝袜长腿第一页在线| av高潮迭起在线观看| 黄色中文字幕在线播放| 青青青青草手机在线视频免费看| mm131美女午夜爽爽爽| 天天日天天干天天舔天天射| 亚洲综合在线观看免费| 97超碰最新免费在线观看| 激情五月婷婷综合色啪| 亚洲免费va在线播放| 91久久人澡人人添人人爽乱| 人人爱人人妻人人澡39| 久久艹在线观看视频| 日韩美在线观看视频黄| 国际av大片在线免费观看| 亚洲av日韩高清hd| 亚洲免费成人a v| 精品人人人妻人人玩日产欧| 人妻爱爱 中文字幕| 香蕉aⅴ一区二区三区| 91免费福利网91麻豆国产精品 | 亚洲成人熟妇一区二区三区 | 在线免费视频 自拍| 80电影天堂网官网| 天天躁日日躁狠狠躁躁欧美av| 午夜免费观看精品视频| 狍和女人的王色毛片| 日韩中文字幕福利av| 久草电影免费在线观看| 国产午夜亚洲精品麻豆| 亚洲伊人av天堂有码在线| 青青青激情在线观看视频| 天天干夜夜操天天舔| 人妻少妇亚洲一区二区| 天天日天天干天天爱| 日日操夜夜撸天天干| 大香蕉大香蕉在线有码 av| brazzers欧熟精品系列| 在线不卡日韩视频播放| 国产欧美精品不卡在线| 99精品免费久久久久久久久a| 精品高潮呻吟久久av| 可以在线观看的av中文字幕| 毛茸茸的大外阴中国视频| 真实国模和老外性视频| 亚洲超碰97人人做人人爱| 天天插天天狠天天操| 亚洲视频在线视频看视频在线| 五月天久久激情视频| 国产精品免费不卡av| 在线观看免费岛国av| 3337p日本欧洲大胆色噜噜| 日本精品视频不卡一二三| 久草视频 久草视频2| 黑人巨大精品欧美视频| 男生舔女生逼逼的视频| 亚洲公开视频在线观看| 特大黑人巨大xxxx| 欧美另类重口味极品在线观看| 一区二区三区久久久91| 精品国产午夜视频一区二区| 狠狠的往里顶撞h百合| 亚洲美女美妇久久字幕组| 人人在线视频一区二区| 成人久久精品一区二区三区| 亚洲一区二区三区av网站| 东游记中文字幕版哪里可以看到| 午夜激情久久不卡一区二区 | 在线观看成人国产电影| 日韩亚国产欧美三级涩爱| 欧美另类z0z变态| 九九热99视频在线观看97| 黄色片一级美女黄色片| 欧美日韩熟女一区二区三区| 国产揄拍高清国内精品对白| 国产激情av网站在线观看| 久久久久久久久久一区二区三区| 中文字幕高清免费在线人妻| 综合激情网激情五月五月婷婷| 日韩精品电影亚洲一区| 大鸡巴插入美女黑黑的阴毛| 青青草视频手机免费在线观看| 老司机福利精品免费视频一区二区| 蜜桃精品久久久一区二区| 大鸡吧插入女阴道黄色片| 精内国产乱码久久久久久| 狠狠躁夜夜躁人人爽天天久天啪| 91欧美在线免费观看| 9l人妻人人爽人人爽| 一区二区三区在线视频福利| 在线新三级黄伊人网| 亚洲人妻av毛片在线| 密臀av一区在线观看| 国产精品国色综合久久| 日本少妇的秘密免费视频| 97人人模人人爽人人喊| 青青草精品在线视频观看| 自拍 日韩 欧美激情| 久久这里有免费精品| 亚洲中文字幕人妻一区| 边摸边做超爽毛片18禁色戒 | 激情国产小视频在线| 日韩熟女系列一区二区三区| 日本高清成人一区二区三区| 三级av中文字幕在线观看| 91麻豆精品传媒国产黄色片| 熟妇一区二区三区高清版| 国产一级麻豆精品免费| 亚洲美女美妇久久字幕组| 青青青国产免费视频| 日本性感美女写真视频| 亚洲人妻av毛片在线| 六月婷婷激情一区二区三区| 久久久超爽一二三av| 最新中文字幕乱码在线| 扒开腿挺进肉嫩小18禁视频| 午夜精品福利一区二区三区p | 国产又粗又黄又硬又爽| 中文字幕第1页av一天堂网| av视网站在线观看| 青青尤物在线观看视频网站| 人妻最新视频在线免费观看| 日韩精品二区一区久久| 日本黄在免费看视频| 亚洲欧美清纯唯美另类| 91精品啪在线免费| huangse网站在线观看| 日韩av大胆在线观看| 青青青国产免费视频| 久久久久久97三级| 欧美亚洲中文字幕一区二区三区| 亚洲人人妻一区二区三区 | 深夜男人福利在线观看| 日韩一个色综合导航| 青青伊人一精品视频| 中文字幕之无码色多多| 成人高清在线观看视频| 欧美色婷婷综合在线| 91精品国产麻豆国产| 9国产精品久久久久老师| 中国视频一区二区三区| 特大黑人巨大xxxx| 国产精品成人xxxx| 青青青青青青青青青青草青青 | 青青伊人一精品视频| 九九视频在线精品播放| 久久久久久久精品老熟妇| 99精品国自产在线人| 福利国产视频在线观看| 精品欧美一区二区vr在线观看| 97香蕉碰碰人妻国产樱花| 久久久精品999精品日本| 日本熟妇一区二区x x| 青青伊人一精品视频| 任我爽精品视频在线播放| 国产 在线 免费 精品| 黄页网视频在线免费观看| 成人av天堂丝袜在线观看| 欧美日韩情色在线观看| 国内精品在线播放第一页| 亚洲免费福利一区二区三区| 偷拍自拍国产在线视频| 天天操天天干天天艹| av男人天堂狠狠干| 人人妻人人爽人人添夜| 亚洲精品欧美日韩在线播放 | av老司机精品在线观看| 亚洲人人妻一区二区三区| 国产午夜男女爽爽爽爽爽视频 | 黄色在线观看免费观看在线| 精品久久久久久久久久久久人妻 | 久久久91蜜桃精品ad| 91快播视频在线观看| 国产美女一区在线观看| 欧美性感尤物人妻在线免费看| 亚洲 中文 自拍 另类 欧美| 国产精品黄色的av| 少妇深喉口爆吞精韩国| 欧美专区第八页一区在线播放 | 日美女屁股黄邑视频| 色哟哟在线网站入口| 一区二区在线观看少妇| 日韩av大胆在线观看| 桃色视频在线观看一区二区 | 日韩亚洲高清在线观看| 精品一区二区三区欧美| tube69日本少妇| 19一区二区三区在线播放| 亚洲va国产va欧美va在线| 日本裸体熟妇区二区欧美| 福利视频广场一区二区| 人人妻人人爱人人草| 女生被男生插的视频网站| 在线免费观看日本片| 搡老熟女一区二区在线观看| 大白屁股精品视频国产| 欧美亚洲牲夜夜综合久久| 欧美精品激情在线最新观看视频| 青青青激情在线观看视频| 亚洲国产精品久久久久久6| 丰满的子国产在线观看| 99热99这里精品6国产| 日韩伦理短片在线观看| 久草视频中文字幕在线观看| 涩涩的视频在线观看视频| 免费在线观看视频啪啪| rct470中文字幕在线| 天天日天天透天天操| 日本一区精品视频在线观看| 三上悠亚和黑人665番号| 在线 中文字幕 一区| 香蕉91一区二区三区| 一区二区麻豆传媒黄片| 中文字幕—97超碰网| 亚洲美女美妇久久字幕组| 亚洲狠狠婷婷综合久久app| 国产一区二区久久久裸臀| 18禁精品网站久久| av中文字幕国产在线观看| 亚洲国产欧美一区二区三区…| 黑人乱偷人妻中文字幕| 日本美女性生活一级片| 天天日天天敢天天干| 午夜福利资源综合激情午夜福利资| 特大黑人巨大xxxx| 老鸭窝日韩精品视频观看| 嫩草aⅴ一区二区三区| 五月色婷婷综合开心网4438| 日韩精品激情在线观看| 在线观看av观看av| 人人妻人人澡欧美91精品| av大全在线播放免费| 国产亚洲四十路五十路| 国产视频网站一区二区三区 | 999热精品视频在线| 91综合久久亚洲综合| 国产精品黄片免费在线观看| 日本a级视频老女人| 动色av一区二区三区| 免费观看丰满少妇做受| 综合色区亚洲熟妇shxstz| 男人天堂色男人av| 一区二区在线观看少妇| 欧美日韩精品永久免费网址| 岛国毛片视频免费在线观看| 开心 色 六月 婷婷| 欧美日韩国产一区二区三区三州 | 在线国产精品一区二区三区| lutube在线成人免费看| 欧美激情精品在线观看| 91欧美在线免费观看| 红杏久久av人妻一区| 久久久久久cao我的性感人妻| 久久久精品国产亚洲AV一|