服饰流行周期计算程序,输入单品上线数据,预判款式衰退清仓最佳时间点。

📅 2026/7/3 10:06:53 👁️ 阅读次数 📝 编程学习
服饰流行周期计算程序,输入单品上线数据,预判款式衰退清仓最佳时间点。

服饰流行周期计算程序 —— 预判款式衰退与清仓最佳时间点

一、实际应用场景描述

在《时尚产业与品牌创新》课程中,服饰流行周期(Fashion Cycle) 是一个核心理论框架。从 "引入期 → 成长期 → 成熟期 → 衰退期 → 滞销期" 的完整生命周期,决定了品牌每一个关键决策的时间窗口:

阶段 典型表现 品牌动作

引入期 (Introduction) 首批 KOL 穿搭曝光,小红书/抖音种草开始 小批量试水,观察数据

成长期 (Growth) 搜索量攀升,竞品跟进,销量周环比 > 15% 追单补货,加大投放

成熟期 (Maturity) 销量 plateau,周环比 ±5% 波动 维持曝光,准备新品衔接

衰退期 (Decline) 销量周环比持续下滑 > 3 周,退货率上升 启动折扣,减少新流量投入

滞销期 (Obsolescence) 库存周转率 < 0.5,仓储成本 > 边际贡献 清仓退出,回收现金流

核心决策问题:一款衣服上架后,什么时候开始打折?什么时候必须清仓?晚一周可能多赚,但也多压一周库存;早一周可能损失利润。这个"最佳清仓时间点"就是本程序要解决的核心问题。

二、引入痛点

2.1 行业现状的"三拍决策"

痛点 真实表现 后果

拍脑袋定折扣 "卖了一个月了,该打折了吧" 要么打折太早损失毛利,要么太晚变死库存

看总量不看趋势 只看累计销量,不看周环比斜率 成熟期误判为衰退,或衰退期误判为季节性波动

统一折扣节奏 所有款一起 618 打折、双 11 打折 不同款的生命周期阶段完全不同,一刀切效率低

缺乏量化预警 等发现"卖不动了"已经晚了 库存积压 → 折价 70% 才能清掉 → 吞噬前期利润

不懂品类差异 连衣裙和羽绒服用同一套节奏 连衣裙衰退以周计,羽绒服以月计

2.2 一个典型损失场景

某独立品牌,一款 ¥899 的碎花连衣裙:

第 1~3 周:周销量 120 → 150 → 180(成长期,✅ 不打折)

第 4~6 周:周销量 185 → 178 → 170(成熟期,🟡 临界区)

第 7~9 周:周销量 152 → 130 → 105(衰退期,❌ 应该打 8 折)

但实际决策:等到第 10 周才打 7 折

结果:库存 200 件 × 折价损失 ¥180/件 = ¥36,000 利润蒸发

核心矛盾:不是"要不要打折",而是"在哪一周打折,能使总利润最大化"——这就是流行周期量化的价值。

三、核心逻辑讲解

3.1 流行周期曲线模型

本程序采用 修正的 Bass 扩散模型 + 指数衰减叠加,将单款销量拟合为:

周销量(t) = 基础扩散曲线(t) × 季节衰减因子(t) × 随机噪声(t)

其中:

基础扩散曲线(Bass 模型):

S(t) = p + (q − p) × F(t) − p × F(t)²

p = 创新系数(KOL/种草驱动,约 0.03~0.08)

q = 模仿系数(大众跟风驱动,约 0.15~0.40)

F(t) = 累积采纳比例

季节衰减因子(品类差异):

连衣裙:快衰减 λ = 0.12(生命周期 8~12 周)

羽绒服:慢衰减 λ = 0.05(生命周期 16~24 周)

T恤: 中衰减 λ = 0.08(生命周期 12~16 周)

衰减公式:e^(−λ × t)

3.2 衰退判定算法

程序通过三重判定确认衰退起点:

判定条件(需同时满足):

① 连续 N 周(默认 3 周)周环比 < 0

② 最近一周销量 < 峰值 × 衰退阈值(默认 70%)

③ 斜率(线性回归)显著为负(p < 0.05)

3.3 最佳清仓时间点算法

核心思路:比较"继续持有"vs"立即清仓"的期望利润

持有策略(再等 w 周):

E[利润(w)] = Σ[P(t) × (1−d(t)) × M] − C_hold × w − C_risk

其中:

P(t) = 第 t 周预测销量

d(t) = 第 t 周自然折扣率(季节性加深)

M = 每件边际贡献(售价 × 毛利率 − 单件可变成本)

C_hold = 周库存持有成本(仓储 + 资金占用 + 贬值)

C_risk = 滞销风险成本(概率 × 最终折价损失)

清仓策略(第 T 周立即打折):

E[利润(T)] = P(T) × (1−d_clear) × M − C_clear_cost

其中:

d_clear = 清仓折扣率(如 0.30 = 7 折)

C_clear_cost = 清仓执行成本(额外推广/直播坑位费)

最优清仓周 = argmax E[利润(w)] over all w

3.4 折扣策略建议

衰退期折扣阶梯(动态定价):

衰退早期(销量降至峰值 70%): 9 折(轻微刺激)

衰退中期(销量降至峰值 50%): 8 折(主动去库存)

衰退晚期(销量降至峰值 30%): 6.5~7 折(清仓价)

滞销期 (库存周转率 < 0.3): 5 折以下(回收现金流)

四、项目结构

fashion_cycle_tracker/

├── config.py # 品类参数配置(衰减率/周期阶段阈值)

├── data_models.py # 数据模型(单品/周销量/生命周期事件)

├── curve_fitter.py # 曲线拟合器(Bass模型 + 季节衰减)

├── decline_detector.py # 衰退检测器(三重判定算法)

├── clearance_optimizer.py # 清仓优化器(利润最大化)

├── discount_strategy.py # 折扣策略生成器

├── report.py # 报告生成(表格 + 可视化)

├── main.py # 主程序入口(含完整示例数据)

├── README.md # 项目说明

└── requirements.txt # 依赖声明

五、代码模块化实现

"requirements.txt"

numpy>=1.24.0

scipy>=1.10.0

matplotlib>=3.7.0

"config.py"

#!/usr/bin/env python3

# -*- coding: utf-8 -*-

"""

config.py

品类参数配置中心:不同服饰品类的生命周期特征参数

"""

from typing import Dict

import numpy as np

# ========== 品类生命周期参数 ==========

# 基于时尚产业研究的典型值

CATEGORY_PARAMS = {

# 连衣裙:春夏核心品类,生命周期短,衰减快

"dress": {

"bass_p": 0.06, # 创新系数(种草敏感度)

"bass_q": 0.35, # 模仿系数(跟风效应强)

"decay_lambda": 0.12, # 衰减速率(快)

"typical_weeks": 10, # 典型总生命周期(周)

"peak_week_range": (3, 6), # 峰值通常出现在第几周

"seasonal_strength": 0.8, # 季节性强度(高)

},

# 羽绒服:秋冬核心品类,生命周期长,衰减慢

"down_jacket": {

"bass_p": 0.04,

"bass_q": 0.25,

"decay_lambda": 0.05,

"typical_weeks": 20,

"peak_week_range": (5, 10),

"seasonal_strength": 0.9,

},

# T恤/基础款:全年可穿,生命周期中等

"tshirt": {

"bass_p": 0.05,

"bass_q": 0.30,

"decay_lambda": 0.08,

"typical_weeks": 14,

"peak_week_range": (4, 8),

"seasonal_strength": 0.4,

},

# 牛仔裤:经典款,生命周期最长

"jeans": {

"bass_p": 0.03,

"bass_q": 0.20,

"decay_lambda": 0.04,

"typical_weeks": 24,

"peak_week_range": (6, 12),

"seasonal_strength": 0.3,

},

# 运动服/瑜伽裤:功能驱动,衰减中等偏快

"activewear": {

"bass_p": 0.07,

"bass_q": 0.38,

"decay_lambda": 0.10,

"typical_weeks": 12,

"peak_week_range": (3, 7),

"seasonal_strength": 0.5,

},

}

# ========== 衰退判定阈值 ==========

DECLINE_THRESHOLDS = {

"min_consecutive_weeks": 3, # 至少连续 N 周下滑

"peak_ratio_threshold": 0.70, # 销量降至峰值的 X% 以下

"slope_p_value": 0.10, # 斜率显著性阈值

"min_weeks_data": 5, # 最少需要多少周数据才判定

}

# ========== 清仓策略参数 ==========

CLEARANCE_PARAMS = {

"holding_cost_per_unit_week": 8.0, # 每件每周持有成本(仓储+资金)

"risk_discount_rate": 0.15, # 滞销风险折价率

"clearance_execution_cost": 500, # 清仓执行成本(直播/推广)

"discount_tiers": { # 折扣阶梯

0.70: "衰退早期(温和刺激)",

0.50: "衰退中期(主动去库存)",

0.30: "衰退晚期(清仓价)",

0.15: "滞销期(回收现金流)",

},

}

# ========== 可视化配色 ==========

COLORS = {

"introduction": "#4CAF50", # 绿 - 引入期

"growth": "#2196F3", # 蓝 - 成长期

"maturity": "#FF9800", # 橙 - 成熟期

"decline": "#F44336", # 红 - 衰退期

"obsolescence": "#9C27B0", # 紫 - 滞销期

"predicted": "#BDBDBD", # 灰 - 预测区间

"clearance": "#FF5722", # 深橙 - 清仓点

"peak": "#E91E63", # 粉 - 峰值

}

"data_models.py"

#!/usr/bin/env python3

# -*- coding: utf-8 -*-

"""

data_models.py

数据模型层:单品定义、周销量数据、生命周期事件记录

"""

from dataclasses import dataclass, field

from typing import Dict, List, Optional, Tuple

import numpy as np

from datetime import date, timedelta

@dataclass

class Product:

"""

单品定义

"""

product_id: str # 商品编号,如 "DR-2024-001"

name: str # 商品名称

category: str # 品类(dress/down_jacket/tshirt/jeans/activewear)

price: float # 售价(元)

gross_margin: float # 毛利率(0~1)

launch_date: date # 上架日期

initial_stock: int # 初始库存(件)

variable_cost_per_unit: float = 0.0 # 单件可变成本(包装/物流等)

def __post_init__(self):

if self.category not in CATEGORY_PARAMS:

raise ValueError(f"未注册的品类: {self.category}")

if self.price <= 0:

raise ValueError("售价必须大于零")

if not (0 < self.gross_margin <= 1):

raise ValueError("毛利率必须在 0~1 之间")

def marginal_contribution(self) -> float:

"""每件边际贡献 = 售价 × 毛利率 − 单件可变成本"""

return self.price * self.gross_margin - self.variable_cost_per_unit

@dataclass

class WeeklySales:

"""

周销量数据(支持逐步录入,模拟真实追踪场景)

"""

product_id: str

sales: Dict[int, int] = field(default_factory=dict) # {周序号: 销量}

weeks_recorded: int = 0

def add_week(self, week_num: int, units_sold: int) -> None:

"""录入一周销量"""

if units_sold < 0:

raise ValueError("销量不能为负")

self.sales[week_num] = units_sold

self.weeks_recorded = max(self.weeks_recorded, week_num + 1)

def get_array(self, max_weeks: Optional[int] = None) -> np.ndarray:

"""转为连续数组,缺失周填 0"""

if max_weeks is None:

max_weeks = self.weeks_recorded

arr = np.zeros(max_weeks)

for w, v in self.sales.items():

if w < max_weeks:

arr[w] = v

return arr

def get_peak(self) -> Tuple[int, int]:

"""返回 (峰值周序号, 峰值销量)"""

if not self.sales:

return (-1, 0)

peak_w = max(self.sales, key=self.sales.get)

return (peak_w, self.sales[peak_w])

def week_over_week(self) -> np.ndarray:

"""计算周环比增长率"""

arr = self.get_array()

if len(arr) < 2:

return np.array([])

return (arr[1:] - arr[:-1]) / np.where(arr[:-1] > 0, arr[:-1], 1)

@dataclass

class LifecycleEvent:

"""

生命周期事件记录

"""

event_type: str # introduction/growth/maturity/decline/obsolescence/clearance

week: int # 发生周次

description: str = ""

metadata: Dict = field(default_factory=dict)

"curve_fitter.py"

#!/usr/bin/env python3

# -*- coding: utf-8 -*-

"""

curve_fitter.py

曲线拟合器:用 Bass 扩散模型 + 指数衰减拟合/预测销量曲线

"""

import numpy as np

from scipy.optimize import curve_fit

from typing import Dict, Tuple, Optional

from config import CATEGORY_PARAMS

from data_models import Product, WeeklySales

class BassCurveFitter:

"""

Bass 扩散模型拟合器

核心公式:

S(t) = (p + q × F(t)) × (1 − F(t))

F(t) = 1 − e^(−(p+q)×t) / (1 + (q/p) × e^(−(p+q)×t))

叠加季节衰减:

S_final(t) = S(t) × e^(−λ×t)

"""

def __init__(self, product: Product, weekly_sales: WeeklySales):

self.product = product

self.weekly_sales = weekly_sales

self.category_params = CATEGORY_PARAMS[product.category]

# 拟合结果缓存

self.fitted_params: Optional[Tuple[float, float, float]] = None

self.fitted_curve: Optional[np.ndarray] = None

@staticmethod

def bass_function(t: np.ndarray, p: float, q: float, m: float) -> np.ndarray:

"""

Bass 模型核心函数

t: 时间数组

p: 创新系数

q: 模仿系数

m: 市场总潜力(最大累计采纳数)

"""

# 累计采纳比例 F(t)

if p + q == 0:

f_t = np.zeros_like(t, dtype=float)

else:

exp_term = np.exp(-(p + q) * t)

f_t = (1 - exp_term) / (1 + (q / p) * exp_term) if p > 0 else np.zeros_like(t)

# 当期采纳数(即销量)

s_t = m * (p + q * f_t) * (1 - f_t)

return s_t

def seasonal_decay(self, t: np.ndarray) -> np.ndarray:

"""季节衰减因子"""

lam = self.category_params["decay_lambda"]

strength = self.category_params["seasonal_strength"]

# 基础衰减

decay = np.exp(-lam * t)

# 叠加周期性波动(模拟时尚周期的"长尾")

seasonal = 1 + 0.15 * strength * np.sin(2 * np.pi * t / 52)

return decay * seasonal

def full_model(self, t: np.ndarray, p: float, q: float, m: float) -> np.ndarray:

"""完整模型 = Bass + 季节衰减"""

bass = self.bass_function(t, p, q, m)

decay = self.seasonal_decay(t)

return bass * decay

def fit(self, max_weeks: int = 30) -> Dict:

"""

用已有数据拟合模型参数

Returns:

拟合结果字典,包含参数、拟合曲线、预测曲线

"""

observed = self.weekly_sales.get_array(max_weeks)

t = np.arange(len(observed))

if len(observed) < 4:

raise ValueError("数据不足 4 周,无法拟合")

# 初始参数猜测

p0 = self.category_params["bass_p"]

q0 = self.category_params["bass_q"]

m0 = observed.max() * 3 # 市场潜力约为峰值的 3 倍

try:

popt, pcov = curve_fit(

self.full_model,

t,

observed,

p0=[p0, q0, m0],

bounds=([0, 0, 0], [1, 1, 1e6]),

maxfev=5000,

)

self.fitted_params = tuple(popt)

# 生成拟合曲线(已有数据部分)

self.fitted_curve = self.full_model(t, *popt)

# 预测未来(再预测 max_weeks 周)

t_future = np.arange(max_weeks)

future_curve = self.full_model(t_future, *popt)

return {

"status": "success",

"params": {

"p (创新系数)": round(popt[0], 4),

"q (模仿系数)": round(popt[1], 4),

"m (市场潜力)": round(popt[2], 1),

},

"fitted_weeks": len(observed),

"fitted_curve": self.fitted_curve,

"predicted_curve": future_curve,

"predicted_weeks": max_weeks,

"rss": float(np.sum((observed - self.fitted_curve) ** 2)),

}

except Exception as e:

return {

"status": "failed",

"error": str(e),

"params": {"p": p0, "q": q0, "m": m0},

}

def predict_future(self, total_weeks: int = 20) -> np.ndarray:

"""预测未来销量"""

if self.fitted_params is None:

raise ValueError("请先调用 fit() 拟合模型")

t = np.arange(total_weeks)

return self.full_model(t, *self.fitted_params)

def identify_stage(self, week: int) -> str:

"""

根据拟合曲线判断某周所处的生命周期阶段

"""

if self.fitted_params is None:

return "unknown"

t = np.arange(max(week + 1, 10))

curve = self.full_model(t, *self.fitted_params)

if week >= len(curve):

return "obsolescence"

# 找到峰值位置

peak_idx = int(np.argmax(curve))

peak_val = curve[peak_idx]

current_val = curve[week]

peak_ratio = current_val / peak_val if peak_val > 0 else 0

if week < peak_idx * 0.5:

return "introduction"

elif week < peak_idx:

return "growth"

elif peak_ratio > 0.70:

return "maturity"

elif peak_ratio > 0.35:

return "decline"

else:

return "obsolescence"

"decline_detector.py"

#!/usr/bin/env python3

# -*- coding: utf-8 -*-

"""

decline_detector.py

衰退检测器:三重判定算法识别款式进入衰退期的精确时间点

"""

import numpy as np

from typing import Dict, List, Tuple

from scipy import stats

from config import DECLINE_THRESHOLDS

from data_models import WeeklySales

class DeclineDetector:

"""

衰退检测算法

三重判定:

① 连续 N 周销量环比下滑

② 销量降至峰值 X% 以下

③ 线性回归斜率显著为负

"""

def __init__(self, weekly_sales: WeeklySales):

self.sales = weekly_sales

self.thresholds = DECLINE_THRESHOLDS

def check_consecutive_decline(self, window: int = 3) -> Dict:

"""

判定①:检查是否存在连续 N 周下滑

Returns:

{detected: bool, start_week: int, decline_weeks: int}

"""

arr = self.sales.get_array()

if len(arr) < window:

return {"detected": False, "reason": "数据不足"}

# 滑动窗口检测

max_consecutive = 0

current_consecutive = 0

decline_start = -1

for i in range(1, len(arr)):

if arr[i] < arr[i - 1]:

current_consecutive += 1

if current_consecutive == 1:

decline_start = i - 1

max_consecutive = max(max_consecutive, current_consecutive)

else:

current_consecutive = 0

min_required = self.thresholds["min_consecutive_weeks"]

detected = max_consecutive >= min_required

return {

"detected": detected,

"max_consecutive_decline": max_consecutive,

"decline_start_week": decline_start if detected else -1,

"reason": f"最长连续下滑 {max_consecutive} 周" if detected else "未检测到连续下滑",

}

def check_peak_ratio(self) -> Dict:

"""

判定②:当前销量是否降至峰值的 X% 以下

Returns:

{detected: bool, current_ratio: float, peak_week: int, peak_value: int}

"""

peak_week, peak_val = self.sales.get_peak()

if peak_week < 0:

return {"detected": False, "reason": "无销量数据"}

arr = self.sales.get_array()

threshold = self.thresholds["peak_ratio_threshold"]

# 检查最近几周是否低于阈值

recent_weeks = 3

detected = False

for i in range(max(0, len(arr) - recent_weeks), len(arr)):

ratio = arr[i] / peak_val if peak_val > 0 else 0

if ratio < threshold:

detected = True

break

current_ratio = arr[-1] / peak_val if peak_val > 0 and len(arr) > 0 else 0

return {

"detected": detected,

"current_ratio": round(current_ratio, 3),

"peak_week": peak_week,

"peak_value": peak_val,

"threshold": threshold,

"reason": f"当前销量是峰值的 {current_ratio*100:.1f}%" if detected else "仍在峰值以上",

}

def check_slope_significance(self, window: int = 5) -> Dict:

"""

判定③:线性回归斜率是否显著为负

Returns:

{detected: bool, slope: float, p_value: float}

"""

arr = self.sales.get_array()

if len(arr) < window:

return {"detected": False, "reason": "数据不足"}

# 取最近 window 周做线性回归

recent = arr[-window:]

t = np.arange(window)

slope, intercept, r_value, p_value, std_err = stats.linregress(t, recent)

alpha = self.thresholds["slope_p_value"]

detected = slope < 0 and p_value < alpha

return {

"detected": detected,

"slope": round(slope, 2),

"r_squared": round(r_value ** 2, 3),

"p_value": round(p_value, 4),

"reason": f"斜率={slope:.1f}, p={p_value:.3f}" if detected else "斜率不显著",

}

def detect_decline(self) -> Dict:

"""

三重判定汇总

Returns:

综合判定结果,包含各子判定详情和最终结论

"""

if self.sales.weeks_recorded < self.thresholds["min_weeks_data"]:

return {

"status": "insufficient_data",

"weeks_recorded": self.sales.weeks_recorded,

"min_required": self.thresholds["min_weeks_data"],

"decline_detected": False,

}

result_1 = self.check_consecutive_decline()

result_2 = self.check_peak_ratio()

result_3 = self.check_slope_significance()

# 三重判定汇总

count = sum([

result_1["detected"],

result_2["detected"],

result_3["detected"],

])

# 至少满足 2/3 才判定为衰退

decline_detected = count >= 2

# 衰退起点 = 最严格的判定(连续下滑起点)

decline_week = -1

if decline_detected:

candidates = []

if result_1["detected"]:

candidates.append(result_1["decline_start_week"])

if result_2["detected"]:

candidates.append(result_2.get("peak_week", -1))

if result_3["detected"]:

candidates.append(self.sales.weeks_recorded - 1)

decline_week = min(c for c in candidates if c >= 0)

return {

"status": "analyzed",

"decline_detected": decline_detected,

"decline_week": decline_week,

"confidence": f"{count}/3 条件满足",

"checks": {

"连续下滑检测": result_1,

"峰值比例检测": result_2,

"斜率显著性检测": result_3,

},

"recommendation": self._get_recommendation(

decline_detected, count, decline_week

),

}

def _get_recommendation(

self, detected: bool, count: int, week: int

) -> str:

"""根据判定结果给出建议"""

if not detected:

if count == 0:

return "款式仍处于健康销售期,维持正常运营"

elif count == 1:

return "出现早期衰退信号,建议密切关注后续 1~2 周数据"

else:

return "数据矛盾,建议人工复核"

else:

return f"⚠️ 衰退已确认(第 {week} 周起),建议启动折扣/清仓评估"

class LifecycleTracker:

"""

生命周期阶段追踪器

基于销量曲线的形态特征,自动标注每个阶段

"""

@staticmethod

def identify_stages(sales_array: np.ndarray) -> List[Tuple[int, str]]:

"""

基于销量曲线的二阶导数识别阶段转换点

Returns:

[(week, stage_name), ...] 按时间排序的阶段列表

"""

if len(sales_array) < 4:

return [(0, "introduction")]

stages = []

arr = sales_array.copy()

# 一阶导数(变化率)

diff1 = np.diff(arr)

# 二阶导数(加速度)

diff2 = np.diff(diff1)

# 找峰值

peak_idx = int(np.argmax(arr))

# 找拐点(二阶导数过零)

inflection_points = []

for i in range(1, len(diff2)):

if diff2[i] * diff2[i - 1] < 0:

inflection_points.append(i + 1)

# 阶段划分逻辑

# 引入期:0 ~ 第一个拐点(或峰值的 1/3)

intro_end = inflection_points[0] if inflection_points else peak_idx // 3

stages.append((0, "introduction"))

# 成长期:拐点 ~ 峰值

if intro_end < peak_idx:

stages.append((intro_end, "growth"))

# 成熟期:峰值 ~ 峰值后第一个拐点

decline_start = peak_idx + 1

for ip in inflection_points:

if ip > peak_idx:

decline_start = ip

break

stages.append((peak_idx, "maturity"))

# 衰退期:拐点 ~ 低谷

if decline_start < len(arr) - 1:

stages.append((decline_start, "decline"))

利用AI解决实际问题,如果你觉得这个工具好用,欢迎关注长安牧笛!