在平常開發中,咱們有時會須要對數據的插入操做進行定製。好比,若是表裏已有某某記錄就不寫入新紀錄,或者表裏沒該記錄就插入,不然就更新。前者咱們稱爲TryInsert
,後者爲InsertOrUpdate
(也叫作upsert
)。通常來講,不少orm
框架都會附帶這樣的函數,可是若是你要批量插入數據,orm
自帶的函數就不太夠用了。下面咱們從手動拼SQL的角度來實現TryInsert
和InsertOrUpdate
。python
考慮到如今流行的兩大開源RDBMS
對SQL標準支持比較落後,而早期的標準並無這方面的標準語法,因此咱們分紅MySQL
篇和Postgres
篇來分別使用它們各自的方言解決上面提到的兩個問題。mysql
插入若是報錯(主鍵或者Unique
鍵重複),會把錯誤轉成警告,此時返回的影響行數爲0,能夠用來實現TryInsert()
。golang
replace
跟insert
語法基本一致,是Mysql
的擴展語法,官方的InsertOrUpdate
,replace
語句的基本邏輯以下:sql
ok:=Insert() if !ok { if duplicate-key { // key重複就刪掉從新插入 Delete() Insert() } }
從這裏咱們能夠看出replace
語句的影響行數,若是是插入,影響行數爲1;若是是更新,刪除再插入,影響行數爲2。數據庫
也是MySQL擴展語法。... on duplicate key update
的邏輯與replace
差很少,惟一的區別就是若是插入的新值與舊值同樣,默認返回的影響行數爲0,因此這裏的邏輯是若是新值和舊值相同就不做處理。app
下面是以golang
爲例,給出示例:框架
type User struct { UserID int64 `gorm:"user_id"` Username string `gorm:"username"` Password string `gorm:"password"` Address string `gorm:"address"` } func BulkTryInsert(data []*User) error{ str:=make([]string, 0, len(data)) param:=make([]interface{},0,len(data)*4) // 4個屬性 for _,d:=range data { str=append(str,"(?,?,?,?)") param=append(d.UserID) param=append(d.Username) param=append(d.Password) param=append(d.Address) } stmt:=fmt.Sprintf("INSERT IGNORE INTO table_name(user_id,username,password,address) VALUES %s",strings.Join(str,",") ) return DB.Exec(stmt, param...).Error } func BulkUpsert(data []*User) error{ str:=make([]string, 0, len(data)) param:=make([]interface{},0,len(data)*4) // 4個屬性 for _,d:=range data { str=append(str,"(?,?,?,?)") param=append(d.UserID) param=append(d.Username) param=append(d.Password) param=append(d.Address) } stmt:=fmt.Sprintf("REPLACE INTO table_name(user_id,username,password,address) VALUES %s",strings.Join(str,",") ) // 與上面的區別僅在這行的SQL return DB.Exec(stmt, param...).Error }
on conflict
後面須要帶上衝突的鍵,好比主鍵或者Unique
約束。這條SQL的意思就如字面所示,當某某鍵存在重複衝突的時候,什麼也不作,即TryInsert
。函數
這條SQL就比較複雜了,Postgres
這個語法表面上看比MySQL
自由度更高,實際上很是繁瑣笨重,不如MySQL
務實。set
的意思是,衝突時須要指定更新哪些屬性,這是強制的,必須具體地說明每一個字段,真是不友好啊。大概是要寫成這樣,其中EXCLUDED指代要插入的那條記錄:post
INSERT INTO ... on conflict (user_id, address) do update set password=EXCLUDED.password and username=EXCLUDED.username
此次咱們設想一種實用的場景,python
常常被用做科學計算,pandas
是你們偏心的計算包,pandas
的io
部分提供了傻瓜式的讀寫文件和數據庫裏數據的函數,好比寫數據庫的to_sql
,可是這個函數有侷限性,它只能作到TryInsert
和清空表數據再插入,對於upsert
則無能爲力。目前來講,咱們只能手動實現它。code
按照上面的解析,咱們須要給每張表設置好UniqueConstraint
才能使用這個語法。下面給出一個例子:
# 使用的是sqlalchemy Base = declarative_base() # 將一個list分割成m個大小爲n的list def chunks(a, n): return [a[i:i + n] for i in range(0, len(a), n)] class DBUser(Base): __tablename__ = 'user' # UniqueConstraint和PrimaryKey至少要有一個 __table_args__ = (UniqueConstraint('user_id', 'address'), {'schema': 'db'}) user_id = Column(BigInteger) username = Column(String(200)) password = Column(String(200)) address = Column(String(200)) def dtype(self): # pandas須要的dtype d = {c.name: c.type for c in self.__table__.c} if 'id' in d: el d['id'] # 通常id都是自動生成的,提供給pandas的dtype應該剔除id return d def fullname(self): return self.__table_args__[-1]['schema'] + '.' + self.__tablename__ # 只要DBUser再提供一個Unique Constraint的屬性列表,下面這兩個函數就能夠寫成通用的函數 # 這裏只是給出例子,點到爲止 def bulk_try_insert(self, engine, data): col = self.dtype().keys() col_str = ','.join(col) col_str = '(' + col_str + ')' update_col = [] for c in col: update_str = '{0}=EXCLUDED.{1}'.format(c, c) update_col.append(update_str) value_str = [] value_args = [] for d in data: tmp_str = '(' + col.__len__() * '%s,' tmp_str = tmp_str[:-1] + ')' value_str.append(tmp_str) for k in col: value_args.append(d[k]) stmt= 'insert into ' + self.fullname() + col_str + 'values ' + ','.join( value_str) + 'on conflict (user_id, address) do update set ' + ",".join(update_col) engine.execute(stmt, value_args) def bulk_insert_chunk(self, engine, data, n=1000): d_list = chunks(data, n) for a in d_list: self.bulk_insert(engine, a)