生成モデルってDGPを理解できるの?(前編)

はじめに

5月の記事で拡散モデルに関する覚書を書いて以降、地道にこの辺りのプログラムを書いたりして理解を深めている。

拡散モデルの論文だったりを見ていると、計量経済学というのか統計学というのか分からないが、data generating processを拡散モデルが表現できるのか?というところについてはあまり論点になっていないように思う。例えば$y=0.1 + 1.5x + \epsilon$(ここで$\epsilon$は標準正規分布に従うとする)というデータ生成プロセスに従う確率変数があったときに、この$0.1+1.5x$を再現できるのか?ということが気になっている。 

一方で既存の研究は画像生成タスクが中心であったり、テーブルデータでもデータ生成プロセスのことを意識していない(1変数のみ)ものが多い。

そこで試験的に今回はこうしたデータ生成プロセスが再現できるのか?に取り組んで見たいと思う。

と言いつつ、どうやって既存のモデルに載せるかというところも結構難しいポイントなので、まずはモデルに食わせるためのデータを作ろうと思う。

学習に使うデータを作るのだ

じゃあどういったデータなら面白いのかな?と思ったときに、表現できると嬉しいものはやはりパネルデータの構造だろうと思う。

そこでまずはDIDで効果が測定できるような状況を再現できるのかやってみる。 具体的には下記が要件

  • 一定人数をランダムに処置群に割り振る
  • 処置効果は特定の時点以降とする
  • 個人効果、時間トレンドが存在する
  • データ生成過程は適当に決める

今後モデルに食わせるところも含めて作業したいので、ここではpytorchを利用したデータセットとして定義していきたいと思う。

具体的な実装

class DID_Dataset(Dataset):
    def __init__(self,
                 num_ind,
                 num_period,
                 treatment_period,
                 treatment_effect,
                 time_trend=0.01,
                 random_state=12,
                 mode = 'train'):
        self.num_ind = num_ind
        self.num_period = num_period
        self.treatment_period = treatment_period
        self.treatment_effect = treatment_effect
        self.time_trend = time_trend
        self.random_state = random_state

        self.rg =np.random.RandomState(self.random_state)
        self.mode = mode
        self.ind_list = [i for i in range(self.num_ind)]
        self.period_list = [i for i in range(self.num_period)]




        self.df = self.prepare_dataset()

        if self.mode == 'train':
            self.df = self.df.query('treatment_period == 0').reset_index(drop=True)
        elif self.mode == 'test':
            self.df = self.df.query('treatment_period == 1').reset_index(drop=True)
        elif self.mode == 'debug':
            pass


    def generate_effect(self):

        # 個人固定効果
        ind_fix_effect_dict = {i: self.rg.randn() for i in self.ind_list}
        df_ind_fix_effect = (
            pd.DataFrame.from_dict(
                ind_fix_effect_dict, orient="index", columns=["ind_fix_effect"]
            )
            .reset_index()
            .rename(columns={"index": "user_id"})
        )

        # 線形なトレンド
        time_fix_effect_dict = {i: self.time_trend * i for i in self.period_list}
        df_time_fix_effect = (
            pd.DataFrame.from_dict(
                time_fix_effect_dict, orient="index", columns=["time_fix_effect"]
            )
            .reset_index()
            .rename(columns={"index": "period"})
        )

        # 処置の相手(50%ずつ)
        treatment_flag_dict = {i: self.rg.randint(2) for i in self.ind_list}
        df_treatment_flag = (
            pd.DataFrame.from_dict(treatment_flag_dict, orient="index")
            .reset_index()
            .rename(columns={"index": "user_id", 0: "treatment_flag"})
        )

        return df_ind_fix_effect, df_time_fix_effect, df_treatment_flag

    def prepare_dataset(self):

        df_ind_fix_effect, df_time_fix_effect, df_treatment_flag = self.generate_effect()
        df = (
    pd.DataFrame(itertools.product(self.ind_list, self.period_list), columns=["user_id", "period"])
    .merge(df_ind_fix_effect, on=["user_id"], how="inner")
    .merge(df_time_fix_effect, on=["period"], how="inner")
    .merge(df_treatment_flag, on=["user_id"], how="inner")
)

        # 特徴量
        df["x"] = self.rg.randn(len(df))


        # treatment以降のフラグ
        df["treatment_period"] = df["period"].apply(lambda x: 1 if x >= self.treatment_period else 0)

        # 評価したい指標
        df["y"] = (
            df["ind_fix_effect"]
            + df["time_fix_effect"]
            + 1.1 * df["x"]
            + df["treatment_flag"] * df["treatment_period"] * self.treatment_effect
        )

        # この係数が効果
        df["cross_term"] = df["treatment_flag"] * df["treatment_period"]


        return df


    def __getitem__(self, idx):

        temp = self.df.iloc[idx, :]

        return_dict = {
            'user_id' : temp['user_id'],
            'period': temp['period'],
            'x': temp['x'],
            'y': temp['y'],
            'treatment_period': temp['treatment_period'],
            'treatment_flag': temp['treatment_flag']
        }

        return return_dict

    def __len__(self):
        return len(self.df)

具体的にはこんな感じで作ってみた。ある程度柔軟性をもたせたかったので、次のようなインプットを受け入れることにした。

  • 対象となる個人の数、期間
  • いつ介入が起きるのか
  • 介入の効果はどの程度か
  • 時間トレンドはどの程度インパクトがあるか?

逆に入力としなかったのは次のようなもの

  • 個人効果の状況
    • ここは乱数という整理にした
  • データ生成プロセス
    • 一旦まずは決め打ちの構造ができるかどうかがポイントなので、このようにした。

それぞれポイントの部分を説明していくと重要なのはgenerate_effectprepare_datasetメソッドの2つになる。

  • generate_effectでは時間トレンド、個人固定効果、誰が介入群に入るのかを決定する
  • prepare_datasetではgenerate_effectで作成した結果を元にDIDで効果が推定できるような構造を作成する。
    • 特徴量は乱数で決定する。
    • 処置が起きた以降を1とするダミー変数
    • DIDで効果が測定できるようなデータをつくる。
      • 個人固定効果 + 時間固定効果 + 1.1×特徴量 + 処置群フラグ×介入以後フラグ×介入効果

これであればDIDで処置効果が推定できるようなデータセットが作れている。

処置が起きる前のデータで学習→処置後で予測したときに介入がなかったときの結果と近しくなっていれば、データ生成プロセスがわかっているのでは?というのが今回の仮説。

このデータセット正しいの?

とはいえこのデータセットが正しいのかを確認する必要があるので、まずはそこを確認する。

dummy_dataset = DID_Dataset(num_ind=100, num_period=100, treatment_period=50, treatment_effect=0.5, mode= 'debug')
exog = sm.add_constant(dummy_dataset.df[["x", "treatment_flag", "treatment_period", "cross_term"]])
fe_te_res = sm.OLS(dummy_dataset.df["y"], exog).fit()

このとき、cross_termの係数が0.5になっていれば、DIDで適切に介入効果が測れていることになる。

実際の結果がこんな感じ

coefstd errtP>|t|[0.0250.975]
const0.22860.02210.44400.1860.271
x1.09980.011104.14301.0791.121
treatment_flag-0.23680.03-7.9510-0.295-0.178
treatment_period0.50.03116.15500.4390.561
cross_term0.50.04211.87200.4170.583

きちんとcross_termの係数が0.5になっているので想定通り処置効果がDIDで推定できることがわかった。

次回以降はどういう形でモデルに乗せるかや実装について書いていこうと思う。