其實這個東西跟restframework沒有卵關係,順便寫在這裏前端
Django REST framework的各類技巧【目錄索引】python
導入導出在cms中是一個很是經常使用的功能,思考作成通用的東西,最終選擇django-import-export,雖然這玩意兒最初是想放在admin中用的,雖然用起來很繁瑣,可是能夠作成通用的東西,並且用起來比較像rest的serializer。git
django-import-export==0.4.2 文檔數據庫
須要看的源碼 cd 你的virtualenv/local/lib/python2.7/site-packages/import_exportdjango
resources.py instance_loaders.pyapi
經過view能夠看到,代碼在這裏是很乾淨的,跟一個正常的restframework的api沒有什麼區別。app
class SchoolExportView(ExportMixin, GenericAPIView): serializer_class = SchoolSerializer permission_classes = (IsAuthenticated, ModulePermission) queryset = School.objects.filter(is_active=True).order_by('-id') resource_class = SchoolResource filter_backends = (filters.DjangoFilterBackend, filters.SearchFilter) filter_class = SchoolFilter search_fields = ('name', 'contact') module_perms = ['school.school'] class SchoolImportView(ImportMixin, GenericAPIView): serializer_class = SchoolSerializer permission_classes = (IsAuthenticated, ModulePermission) queryset = School.objects.filter(is_active=True).order_by('-id') resource_class = SchoolResource module_perms = ['school.school']
class ExportMixin(object): @GET('filename', type='string', default='download.xls') @GET('format', type='string', default='xls', validators='in: xls,xlsx') @GET('empty', type='bool', default=False) def get(self, request, format, filename, empty): queryset = None if not empty: queryset = self.filter_queryset(self.get_queryset()) resourse = self.resource_class() export_data = resourse.export(queryset, empty) return attachment_response(getattr(export_data, format), filename=filename) class ImportMixin(object): @POST('file', validators='required') def post(self, request, file): import_file = request.FILES['file'] resource = self.resource_class() extra_data = {} if not hasattr(self, 'get_resoucre_extra_data') else self.get_resoucre_extra_data() resource.set_extra_data(extra_data) dataset = resource.get_dataset(import_file) result = resource.import_data(dataset, use_transactions=True, raise_errors=True) return Response()
export很是簡單,所以先說export,先看demo(僅僅寫export)python2.7
# -*- coding: utf-8 -*- from __future__ import absolute_import from import_export import resources from school.models import School class SchoolResource(resources.ModelResource): def dehydrate_category(self, school): if school.category == School.MIDDLE_SCHOOL: return u'中學' elif school.category == School.COLLEGE: return u'高校' return '' def get_export_headers(self): return [u'分類', u'省份', u'城市', u'學校', u'地址', u'聯繫人', u'職務', u'聯繫電話', u'郵箱'] class Meta: model = School fields = ('category', 'city__province__name', 'city__name', 'name', 'address', 'contact', 'position', 'phone', 'email') export_order = ('category', 'city__province__name', 'city__name', 'name', 'address', 'contact', 'position', 'phone', 'email')
resource的寫法以下post
因爲import的複雜性,致使import的resource寫起來很是複雜,由於import的時候有各類需求,例如導入了某些列可是隻更新某些列,導入了不少列只更新不新建,導入列的各類數據校檢...ui
首先基礎的import_export中的InstanceLoader不能知足一個很是重要的查詢需求,例如咱們的model上面有is_active字段,然而又不能把這個東西導出區,導入的時候is_active又是get_instance的一個查詢條件;以及ModelResource上面有些東西支持也很是不夠,例如我輸入一個文件就能夠拿到dataset數據,例如我export的時候想傳一個能夠迭代的東西而不是queryset,還有給出更人性化的錯誤提示等等。
class ModelExtraParamInstanceLoader(BaseInstanceLoader): """ get_instance時支持額外的附加參數, 對說的就是is_active=True""" def get_queryset(self): return self.resource._meta.model.objects.all() def get_instance(self, row): try: params = self.resource._meta.import_instanceloader_extra_params for key in self.resource.get_import_id_fields(): field = self.resource.fields[key] params[field.attribute] = field.clean(row) return self.get_queryset().get(**params) except self.resource._meta.model.DoesNotExist: return None class ModelResource(resources.ModelResource): def set_extra_data(self, extra_data): self.extra_data = extra_data def get_clean_row(self, row): _row = [] for each in row: if isinstance(each, float): each = int(each) each = unicode(each).strip() _row.append(each) return _row def get_dataset_data(self, file_obj): '''從前端傳來的excel得到原始數據''' headers = self.get_export_headers() try: self._dataset_data = get_data_from_excel(file_obj=file_obj, header=headers) except Exception as ex: logger.warn(ex) raise Error( errors.ExcelFormatError, err_message=unicode(ex), message=unicode(ex) ) return self._dataset_data def get_printable_row(self, row): _row = [unicode(each) for each in row] return u'({})'.format(u', '.join(_row)) def get_printable_error_message(self, error_type, index, row): return u'excel格式錯誤:[{}]\n錯誤的行:{}行\n內容:{}'.format( error_type, index, self.get_printable_row(row) ) def get_error(self, error_type, index, row): return Error( errors.ExcelFormatError, err_message='excel格式錯誤', message=self.get_printable_error_message(error_type, index, row) ) def clean_dataset_data(self, data): '''洗清原始數據,將data洗成跟diff_header同樣的對應格式 diff_header即model上面對應的列, 由於import_data是直接在model上作的 有邏輯寫在這裏,好比參數檢查,錯誤直接在這裏raise Error,提示用戶 ''' headers = self.get_export_headers() header_length = len(headers) for index, row in enumerate(data): if len(row) != header_length: raise self.get_error(u'列數錯誤', index+2, row) return data def get_dataset(self, file_obj=None): assert hasattr(self, '_dataset_data') or file_obj, 'You need call get_dataset_data first or pass file_obj' if file_obj: data = self.get_dataset_data(file_obj) else: data = self._dataset_data data = self.clean_dataset_data(data) headers = self.get_diff_headers() dataset = get_dataset(data, headers) return dataset def export(self, queryset=None, empty=False): """ Exports a resource. """ if queryset is None: if empty: if hasattr(self._meta, 'empty_export_data'): queryset = self._meta.empty_export_data else: queryset = [] else: queryset = self.get_queryset() headers = self.get_export_headers() data = tablib.Dataset(headers=headers) if isinstance(queryset, QuerySet): # Iterate without the queryset cache, to avoid wasting memory when # exporting large datasets. iterable = queryset.iterator() else: iterable = queryset for obj in iterable: if empty and isinstance(obj, Iterable): data.append(obj) else: data.append(self.export_resource(obj)) return data def init_instance(self, row=None): if not row: row = {} instance = self._meta.model() for attr, value in row.iteritems(): setattr(instance, attr, value) return instance
先給出一個沒有複雜外鍵的model的導入Resource
class SchoolResource(ModelResource): def dehydrate_category(self, school): if school.category == School.MIDDLE_SCHOOL: return u'中學' elif school.category == School.COLLEGE: return u'高校' return '' def get_export_headers(self): return [u'分類', u'省份', u'城市', u'學校', u'地址', u'聯繫人', u'職務', u'聯繫電話', u'郵箱'] def get_diff_headers(self): return ['category', 'city', 'name', 'address', 'contact', 'position', 'phone', 'email'] def clean_dataset_data(self, data): data = super(SchoolResource, self).clean_dataset_data(data) clean_data = [] for index, row in enumerate(data): _index = index + 2 _row = self.get_clean_row(row) category = self.clean_dataset_category(_row[0], _index, row) city = self.clean_dataset_city((_row[1], _row[2]), _index, row) clean_data.append([category, city]+ _row[3:]) return clean_data def clean_dataset_category(self, category, index, row): if category not in (u'中學', u'高校'): raise self.get_error(u'分類錯誤', index, row) if category == u'中學': return 1 else: return 2 class Meta: model = School import_id_fields = ['name',] import_instanceloader_extra_params = {'is_active': True} instance_loader_class = ModelExtraParamInstanceLoader empty_export_data = [...] fields = ('category', 'city__province__name', 'city__name', 'name', 'address', 'contact', 'position', 'phone', 'email') export_order = ('category', 'city__province__name', 'city__name', 'name', 'address', 'contact', 'position', 'phone', 'email')
resource的寫法以下
稍微複雜的demo
class CourseResource(ModelResource): def dehydrate_is_authentication(self, course): if course.is_authentication: return u'已認證' else: return u'未認證' def get_export_headers(self): return [ u'年份', u'開課科目', u'學校', u'教師姓名', u'身份證號', u'聯繫方式', u'郵箱', u'教師認證狀態', u'班級規模' ] def get_diff_headers(self): return ['term__name', 'name', 'school__name', 'teacher', 'ID_number', 'phone', 'email', 'is_authentication', 'enrollment'] def init_instance(self, row=None): if not row: row = {} instance = self._meta.model() for attr, value in row.iteritems(): setattr(instance, attr, value) instance.term = row['term__name'] instance.school = row['school__name'] return instance def clean_dataset_data(self, data): data = super(CourseResource, self).clean_dataset_data(data) clean_data = [] for index, row in enumerate(data): _index = index + 2 _row = self.get_clean_row(row) term = self.clean_dataset_term(_row[0], _index, row) school = self.clean_dataset_school(_row[2], _index, row) is_authentication = self.clean_dataset_is_authentication(_row[7], _index, row) enrollment = self.clean_dataset_enrollment(_row[8], _index, row) clean_data.append([term, _row[1], school, _row[3], _row[4], _row[5], _row[6], is_authentication, enrollment]) return clean_data def clean_dataset_term(self, term, index, row): try: return Term.objects.get(name=term, is_active=True) except Term.DoesNotExist: raise self.get_error(u'年份錯誤', index, row) def clean_dataset_school(self, school, index, row): try: school = School.objects.get(name=school, is_active=True) user = self.extra_data['user'] if not SchoolPermissionFilterBackend().has_school_permission(user, school): raise self.get_error(u'沒有對應的學校權限', index, row) return school except School.DoesNotExist: raise self.get_error(u'學校錯誤', index, row) def clean_dataset_is_authentication(self, is_authentication, index, row): if is_authentication == u'已認證': return True if is_authentication == u'未認證': return False raise self.get_error(u'教師認證狀態錯誤', index, row) def clean_dataset_enrollment(self, enrollment, index, row): try: if not enrollment: enrollment = 0 return int(float(enrollment)) except: raise self.get_error(u'班級規模錯誤', index, row) class Meta: model = Course import_id_fields = ['term__name', 'name', 'school__name'] import_instanceloader_extra_params = { 'is_active': True, 'term__is_active': True, 'school__is_active': True} instance_loader_class = ModelExtraParamInstanceLoader fields = ('term__name', 'name', 'school__name', 'teacher', 'ID_number', 'phone', 'email', 'is_authentication', 'enrollment') export_order = ('term__name', 'name', 'school__name', 'teacher', 'ID_number', 'phone', 'email', 'is_authentication', 'enrollment') empty_export_data = [...]
def extract_data(sheet, header, skip_header=True, row_type='list'): assert header data = [] for row_index in xrange(1 if skip_header else 0, sheet.nrows): row = sheet.row_values(row_index) assert len(header) == len(row), u'excel 第{}行,列數對應數據不對'.format(row_index) if row_type == 'list': data.append(row) else: each_data = {} for col_index in xrange(len(header)): each_data[header[col_index]] = row[col_index] data.append(each_data) return data def get_data_from_excel(file_path=None, file_obj=None, header=None, sheet_index=0, skip_header=True): assert header assert file_path or file_obj if file_path: with open_workbook(file_path) as wb: data = extract_data(wb.sheet_by_index(sheet_index), header, skip_header) else: with tempinput(file_obj) as tempfilename: with open_workbook(tempfilename) as wb: data = extract_data(wb.sheet_by_index(sheet_index), header, skip_header) return data def get_dataset(data, header): return tablib.Dataset(*data, headers=header) def attachment_response(export_data, filename='download.xls', content_type='application/vnd.ms-excel'): # Django 1.7 uses the content_type kwarg instead of mimetype try: response = HttpResponse(export_data, content_type=content_type) except TypeError: response = HttpResponse(export_data, mimetype=content_type) response['Content-Disposition'] = 'attachment; filename={}'.format(filename) return response